113 lines
3.5 KiB
Python
113 lines
3.5 KiB
Python
# Copyright © 2024 Nicole O'Connor
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import atexit
|
|
import fnmatch
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import time
|
|
import zipfile
|
|
|
|
import requests
|
|
|
|
import config
|
|
import screen
|
|
|
|
log = logging.getLogger("blocklists")
|
|
running = True
|
|
|
|
blocklist_data_src = "https://github.com/hagezi/dns-blocklists/archive/refs/tags/2024.310.16385.zip"
|
|
blocklist_data_src_path = "dnsmasq"
|
|
blocklist_lib_path = "/var/lib/starcontrol"
|
|
|
|
log.debug("preparing cache directories")
|
|
pth_cache_root = pathlib.Path(config.cparse["internal"]["cache_root"])
|
|
pth_cache_root.mkdir(exist_ok=True, parents=True)
|
|
pth_blocklist_lib = pathlib.Path(blocklist_lib_path)
|
|
pth_blocklist_lib.mkdir(exist_ok=True, parents=True)
|
|
|
|
deploy_pending = False
|
|
|
|
def blocklist_check():
|
|
log.debug("checking blocklist mtimes and updating if needed")
|
|
try:
|
|
ts_payload = os.path.getmtime(pth_blocklist_lib / "dns-blocklists-main.zip")
|
|
ts_now = time.time()
|
|
if (ts_now - ts_payload) >= 604800:
|
|
update_blocklists()
|
|
except OSError:
|
|
update_blocklists()
|
|
|
|
def update_blocklists():
|
|
global deploy_pending
|
|
|
|
log.info("blocklist update in progress")
|
|
screen.download_state = 1
|
|
|
|
fd_listpayload = open(pth_blocklist_lib / "dns-blocklists-main.zip", "wb")
|
|
hnd_download = requests.get(blocklist_data_src, stream=True)
|
|
|
|
for chunk in hnd_download.iter_content(chunk_size=256):
|
|
fd_listpayload.write(chunk)
|
|
|
|
hnd_download.close()
|
|
fd_listpayload.close()
|
|
screen.download_state = 0
|
|
deploy_pending = True
|
|
|
|
def build_dnsmasq_file():
|
|
with open(pth_cache_root / "dnsmasq.conf", "w") as cfg_dnsmasq:
|
|
for server in list(config.cparse["upstreams"].keys()):
|
|
cfg_dnsmasq.write(f"server={server}\n")
|
|
|
|
for blocklist in list(config.cparse["enabled_lists"].keys()):
|
|
listpath = pth_cache_root / f"lists/{blocklist}.txt"
|
|
if listpath.exists():
|
|
cfg_dnsmasq.write(f"conf-file={listpath.as_posix()}\n")
|
|
|
|
def deploy_blocklists():
|
|
global deploy_pending
|
|
|
|
with zipfile.ZipFile(pth_blocklist_lib / "dns-blocklists-main.zip") as zip_blocklists:
|
|
for item in fnmatch.filter(zip_blocklists.namelist(), "dns-blocklists-main/dnsmasq/*"):
|
|
basename = os.path.basename(item)
|
|
if not basename:
|
|
continue # skips raw directories
|
|
|
|
fd_source = zip_blocklists.open(item)
|
|
fd_destination = open(pth_cache_root / f"lists/{basename}", "wb")
|
|
with fd_source, fd_destination:
|
|
shutil.copyfileobj(fd_source, fd_destination)
|
|
|
|
deploy_pending = False
|
|
|
|
def blocklist_thread():
|
|
sz_blocklist_check_timer = 0
|
|
while running:
|
|
if sz_blocklist_check_timer >= 3600: # 1 hour
|
|
blocklist_check()
|
|
sz_blocklist_check_timer = 0
|
|
else:
|
|
sz_blocklist_check_timer += 1
|
|
|
|
if deploy_pending:
|
|
deploy_blocklists()
|
|
time.sleep(1)
|
|
|
|
@atexit.register
|
|
def stop_blocklist():
|
|
global running
|
|
running = False |