starcontrol/blocklists.py
2024-11-04 20:52:37 -08:00

113 lines
3.5 KiB
Python

# Copyright © 2024 Nicole O'Connor
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import fnmatch
import logging
import os
import pathlib
import shutil
import time
import zipfile
import requests
import config
import screen
log = logging.getLogger("blocklists")
running = True
blocklist_data_src = "https://github.com/hagezi/dns-blocklists/archive/refs/tags/2024.310.16385.zip"
blocklist_data_src_path = "dnsmasq"
blocklist_lib_path = "/var/lib/starcontrol"
log.debug("preparing cache directories")
pth_cache_root = pathlib.Path(config.cparse["internal"]["cache_root"])
pth_cache_root.mkdir(exist_ok=True, parents=True)
pth_blocklist_lib = pathlib.Path(blocklist_lib_path)
pth_blocklist_lib.mkdir(exist_ok=True, parents=True)
deploy_pending = False
def blocklist_check():
log.debug("checking blocklist mtimes and updating if needed")
try:
ts_payload = os.path.getmtime(pth_blocklist_lib / "dns-blocklists-main.zip")
ts_now = time.time()
if (ts_now - ts_payload) >= 604800:
update_blocklists()
except OSError:
update_blocklists()
def update_blocklists():
global deploy_pending
log.info("blocklist update in progress")
screen.download_state = 1
fd_listpayload = open(pth_blocklist_lib / "dns-blocklists-main.zip", "wb")
hnd_download = requests.get(blocklist_data_src, stream=True)
for chunk in hnd_download.iter_content(chunk_size=256):
fd_listpayload.write(chunk)
hnd_download.close()
fd_listpayload.close()
screen.download_state = 0
deploy_pending = True
def build_dnsmasq_file():
with open(pth_cache_root / "dnsmasq.conf", "w") as cfg_dnsmasq:
for server in list(config.cparse["upstreams"].keys()):
cfg_dnsmasq.write(f"server={server}\n")
for blocklist in list(config.cparse["enabled_lists"].keys()):
listpath = pth_cache_root / f"lists/{blocklist}.txt"
if listpath.exists():
cfg_dnsmasq.write(f"conf-file={listpath.as_posix()}\n")
def deploy_blocklists():
global deploy_pending
with zipfile.ZipFile(pth_blocklist_lib / "dns-blocklists-main.zip") as zip_blocklists:
for item in fnmatch.filter(zip_blocklists.namelist(), "dns-blocklists-main/dnsmasq/*"):
basename = os.path.basename(item)
if not basename:
continue # skips raw directories
fd_source = zip_blocklists.open(item)
fd_destination = open(pth_cache_root / f"lists/{basename}", "wb")
with fd_source, fd_destination:
shutil.copyfileobj(fd_source, fd_destination)
deploy_pending = False
def blocklist_thread():
sz_blocklist_check_timer = 0
while running:
if sz_blocklist_check_timer >= 3600: # 1 hour
blocklist_check()
sz_blocklist_check_timer = 0
else:
sz_blocklist_check_timer += 1
if deploy_pending:
deploy_blocklists()
time.sleep(1)
@atexit.register
def stop_blocklist():
global running
running = False