# Copyright © 2024 Nicole O'Connor # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import atexit import fnmatch import logging import os import pathlib import shutil import time import zipfile import requests import config import screen log = logging.getLogger("blocklists") running = True blocklist_data_src = "https://github.com/hagezi/dns-blocklists/archive/refs/tags/2024.310.16385.zip" blocklist_data_src_path = "dnsmasq" blocklist_lib_path = "/var/lib/starcontrol" log.debug("preparing cache directories") pth_cache_root = pathlib.Path(config.cparse["internal"]["cache_root"]) pth_cache_root.mkdir(exist_ok=True, parents=True) pth_blocklist_lib = pathlib.Path(blocklist_lib_path) pth_blocklist_lib.mkdir(exist_ok=True, parents=True) deploy_pending = False def blocklist_check(): log.debug("checking blocklist mtimes and updating if needed") try: ts_payload = os.path.getmtime(pth_blocklist_lib / "dns-blocklists-main.zip") ts_now = time.time() if (ts_now - ts_payload) >= 604800: update_blocklists() except OSError: update_blocklists() def update_blocklists(): global deploy_pending log.info("blocklist update in progress") screen.download_state = 1 fd_listpayload = open(pth_blocklist_lib / "dns-blocklists-main.zip", "wb") hnd_download = requests.get(blocklist_data_src, stream=True) for chunk in hnd_download.iter_content(chunk_size=256): fd_listpayload.write(chunk) hnd_download.close() fd_listpayload.close() screen.download_state = 0 deploy_pending = True def build_dnsmasq_file(): with open(pth_cache_root / "dnsmasq.conf", "w") as cfg_dnsmasq: for server in list(config.cparse["upstreams"].keys()): cfg_dnsmasq.write(f"server={server}\n") for blocklist in list(config.cparse["enabled_lists"].keys()): listpath = pth_cache_root / f"lists/{blocklist}.txt" if listpath.exists(): cfg_dnsmasq.write(f"conf-file={listpath.as_posix()}\n") def deploy_blocklists(): global deploy_pending with zipfile.ZipFile(pth_blocklist_lib / "dns-blocklists-main.zip") as zip_blocklists: for item in fnmatch.filter(zip_blocklists.namelist(), "dns-blocklists-main/dnsmasq/*"): basename = os.path.basename(item) if not basename: continue # skips raw directories fd_source = zip_blocklists.open(item) fd_destination = open(pth_cache_root / f"lists/{basename}", "wb") with fd_source, fd_destination: shutil.copyfileobj(fd_source, fd_destination) deploy_pending = False def blocklist_thread(): sz_blocklist_check_timer = 0 while running: if sz_blocklist_check_timer >= 3600: # 1 hour blocklist_check() sz_blocklist_check_timer = 0 else: sz_blocklist_check_timer += 1 if deploy_pending: deploy_blocklists() time.sleep(1) @atexit.register def stop_blocklist(): global running running = False