#!/usr/bin/env python3 import sys, re, dns.resolver, os, subprocess, json, requests, spf, time from datetime import datetime, date, timedelta, timezone from pathlib import Path from collections import defaultdict from typing import Iterator, Optional # Python 3.11+ : tomllib in der Standardlib. Auf 3.10 und älter -> tomli. try: import tomllib # type: ignore except ModuleNotFoundError: # pragma: no cover import tomli as tomllib # type: ignore # HINWEIS: Dieses Skript benötigt Lese-Zugriff auf das systemd-Journal. # Statt sudo zu verwenden (was ein Privilege-Escalation-Risiko darstellt), # sollte der ausführende User Mitglied der Gruppe 'systemd-journal' sein: # usermod -aG systemd-journal # Dadurch entfällt der sudo-Aufruf in journalctl. # --------------------------------------------------------------------------- # Konfiguration laden # --------------------------------------------------------------------------- # Pfad zur Config-Datei kann via ENV überschrieben werden. Der Default liegt # bewusst unter /etc — Sicherheits-relevante Listen gehören nicht ins # Working-Directory eines Daemons. CONFIG_FILE = Path(os.getenv("CONFIG_FILE", "/etc/policyguard.toml")) def _fatal(msg: str) -> None: """Schreibt eine Fehlermeldung nach stderr und beendet das Programm. Wird nur beim Start verwendet — danach läuft das Skript als Daemon und soll Fehler nur loggen, nicht crashen.""" sys.stderr.write(f"spf-policy: FATAL: {msg}\n") sys.stderr.flush() sys.exit(2) def _load_config(path: Path) -> dict: if not path.is_file(): _fatal( f"Config-Datei nicht gefunden: {path}\n" f" Pfad via Env-Var CONFIG_FILE überschreibbar.\n" f" Eine Beispiel-Config liegt im Repo neben dem Skript." ) try: with open(path, "rb") as f: return tomllib.load(f) except tomllib.TOMLDecodeError as e: _fatal(f"Config-Datei {path} ist kein gültiges TOML: {e}") except OSError as e: _fatal(f"Config-Datei {path} nicht lesbar: {e}") return {} # unreachable, _fatal() exits def _cfg_section(cfg: dict, name: str) -> dict: section = cfg.get(name) if not isinstance(section, dict): _fatal(f"Config: Sektion [{name}] fehlt oder ist kein Tabellen-Block.") return section def _cfg_list(section: dict, key: str, section_name: str) -> list: val = section.get(key, []) if not isinstance(val, list): _fatal(f"Config: [{section_name}].{key} muss eine Liste sein.") for item in val: if not isinstance(item, str): _fatal(f"Config: [{section_name}].{key} darf nur Strings enthalten.") return val def _env_or_int(env_name: str, fallback: int) -> int: raw = os.getenv(env_name) if raw is None: return fallback try: return int(raw) except ValueError: sys.stderr.write( f"spf-policy: WARN: ENV {env_name}={raw!r} ist keine Zahl, " f"fallback auf Config-Wert {fallback}\n" ) return fallback def _env_or_str(env_name: str, fallback: str) -> str: return os.getenv(env_name, fallback) _CFG = _load_config(CONFIG_FILE) # --- [tunables] (ENV überschreibt Config) --------------------------------- _t = _cfg_section(_CFG, "tunables") DAYS = _env_or_int("DAYS", int(_t.get("days", 50))) MIN_COUNT = _env_or_int("MIN_COUNT", int(_t.get("min_count", 4))) BLOCK_QUOTA = _env_or_int("BLOCK_QUOTA", int(_t.get("block_quota", 76))) MAX_SPF_DEPTH = int(_t.get("max_spf_depth", 10)) SPF_CACHE_SIZE = int(_t.get("spf_cache_size", 4096)) GENERIC_CACHE_SIZE = int(_t.get("generic_cache_size", 4096)) INCREMENTAL_UPDATE_INTERVAL = _env_or_int("INCREMENTAL_UPDATE_INTERVAL", int(_t.get("incremental_update_interval", 30))) LOG_RATE_LIMIT_PER_SEC = _env_or_int("LOG_RATE_LIMIT_PER_SEC", int(_t.get("log_rate_limit_per_sec", 200))) INITIAL_IMPORT_FLUSH_EVERY_DAYS = _env_or_int("INITIAL_IMPORT_FLUSH_EVERY_DAYS", int(_t.get("initial_import_flush_every_days", 5))) # Pre-computed Block Quota for faster division checks BLOCK_QUOTA_FLOAT = BLOCK_QUOTA / 100.0 # --- [paths] (ENV überschreibt Config) ------------------------------------ _p = _cfg_section(_CFG, "paths") TRANSPORTMAP = _env_or_str("TRANSPORTMAP", str(_p.get("transport_map", "/etc/pmg/transport"))) JOURNAL_CURSOR_FILE = Path(_env_or_str("JOURNAL_CURSOR_FILE", str(_p.get("journal_cursor_file", "/t/postfix.cursor")))) CACHE_FILE = Path(_env_or_str("CACHE_FILE", str(_p.get("cache_file", "/t/postfix_cache.json")))) # --- [domains] ------------------------------------------------------------- _d = _cfg_section(_CFG, "domains") BLOCKLIST_BAN = set(_cfg_list(_d, "blocklist_ban", "domains")) BLOCKLIST_SENDER = set(_cfg_list(_d, "blocklist_sender", "domains")) BLOCKLIST_CLIENTNAME = set(_cfg_list(_d, "blocklist_clientname", "domains")) BLOCKLIST_RECIPIENT = set(_cfg_list(_d, "blocklist_recipient", "domains")) UNTRUSTED_HELO = set(_cfg_list(_d, "untrusted_helo", "domains")) WHITELIST_HELO = set(_cfg_list(_d, "whitelist_helo", "domains")) WHITELIST_CLIENT_SPF = set(_cfg_list(_d, "whitelist_client_spf", "domains")) # --- [spf] ----------------------------------------------------------------- _s = _cfg_section(_CFG, "spf") BLOCKLIST_SPF = set(_cfg_list(_s, "blocklist_includes", "spf")) # --- [txt] ----------------------------------------------------------------- _x = _cfg_section(_CFG, "txt") BLOCKLIST_TXT_PREFIX = set(_cfg_list(_x, "blocklist_prefixes", "txt")) # --- [regex] --------------------------------------------------------------- _r = _cfg_section(_CFG, "regex") BLOCKLIST_REGEX = _cfg_list(_r, "blocklist_sender_patterns", "regex") # Sanity: leere kritische Listen sind verdächtig — wir bauen den Daemon # defensiv und warnen, brechen aber nicht ab (eine leere whitelist_helo # z.B. ist völlig legitim). if not UNTRUSTED_HELO: sys.stderr.write( "spf-policy: WARN: [domains].untrusted_helo ist leer — " "untrusted-HELO-spezifische Regeln greifen nicht.\n" ) # Regex-Patterns einmal kompilieren (Validierung passiert hier implizit: # ein ungültiges Pattern crasht beim Start, nicht erst bei der ersten Mail) try: BLOCKLIST_REGEX_COMPILED = [re.compile(p) for p in BLOCKLIST_REGEX] except re.error as e: _fatal(f"Config: ungültiger Regex in [regex].blocklist_sender_patterns: {e}") # --------------------------------------------------------------------------- # Resolver, Globale Konstanten # --------------------------------------------------------------------------- resolver = dns.resolver.Resolver() resolver.timeout = 2 resolver.lifetime = 3 # --- Single Source of Truth für Journal-Filter-Tokens --- # Diese Tokens werden an drei Stellen benötigt: # 1. BLOCK_RE / ACCEPT_RE -> finale Regex-Klassifizierung der Message # 2. _PREFILTER_TOKENS -> billiger Substring-Pre-Filter vor json.loads() # 3. _JOURNAL_GREP -> server-seitiger Filter via journalctl --grep # # Pro Eintrag definieren wir: # - prefilter_token: die Substring-Form, die im Logtext als Indikator reicht # - regex_pattern: die genauere Form für BLOCK_RE/ACCEPT_RE (z.B. mit ":" # oder Präfix "rejected: ") # Die Trennung ist nötig, weil das Original den Pre-Filter laxer hält als die # finale Regex (z.B. "Domain not found" als Pre-Filter, aber "rejected: Domain # not found" als Regex). Diese Asymmetrie wird hier 1:1 erhalten. _BLOCK_TOKENS: tuple[tuple[str, str], ...] = ( ("proxy-reject", r"proxy-reject:"), ("policyguard-500", r"policyguard-500"), ("Domain not found", r"rejected: Domain not found"), ) _ACCEPT_TOKENS: tuple[tuple[str, str], ...] = ( ("proxy-accept", r"proxy-accept:"), ) BLOCK_RE = re.compile("|".join(p for _, p in _BLOCK_TOKENS), re.IGNORECASE) ACCEPT_RE = re.compile("|".join(p for _, p in _ACCEPT_TOKENS), re.IGNORECASE) # --- State --- CACHE: dict = {} STATS: dict = {} SPF_CACHE: dict = {} SPF_BLOCK_CACHE: dict[str, bool] = {} HUNTER_CACHE: dict[str, bool] = {} TXT_PREFIX_CACHE: dict[str, bool] = {} _blocklist_cache: set[str] = set() _blocklist_dirty: bool = True _last_incremental_update: float = 0.0 # Log-Rate-Limit-State _log_window_start: float = 0.0 _log_window_count: int = 0 _log_dropped_count: int = 0 # --- Helfer für Pre-Computation --- def base_domain(dom: str) -> str: parts = dom.split(".") if len(parts) < 2: return dom tld = parts[-1] sld = parts[-2] if (tld in {"uk", "au", "nz", "za", "br", "jp"} and sld in {"co", "com", "net", "org", "gov", "ac", "edu"} and len(parts) >= 3): return ".".join(parts[-3:]) return ".".join(parts[-2:]) def normalize_domain(dom: str) -> str: return base_domain(dom.lower().rstrip(".")) # Vorab normalisierte Sets für O(1) Lookups BLOCKLIST_BAN_NORMALIZED = {normalize_domain(d) for d in BLOCKLIST_BAN} def domain_matches_list(domain: str, target_set: set[str]) -> bool: """O(1) Check für exakte Matches oder Subdomains statt O(n) String-Endswith.""" if domain in target_set: return True parts = domain.split('.') for i in range(1, len(parts)): if '.'.join(parts[i:]) in target_set: return True return False def ensure_cursor_dir(): JOURNAL_CURSOR_FILE.parent.mkdir(parents=True, exist_ok=True) def load_transport_domains() -> set[str]: domains = set() transport_path = Path(TRANSPORTMAP) if not transport_path.is_file(): return domains with open(transport_path, "r", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue key = line.split()[0].strip("[]").lower().rstrip(".") domains.add(normalize_domain(key)) return domains # --------------------------------------------------------------------------- # Print Stats (Optimized) # --------------------------------------------------------------------------- def stats(search_term: str = "", min_pct: int = 0, max_pct: int = 100, only_blocked: bool = False): transport_excludes = load_transport_domains() all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes # defaultdict ist signifikant schneller agg = defaultdict(lambda: {"total": 0, "blocked": 0, "accepted": 0}) for day_data in [*CACHE.values(), STATS]: for dom, s in day_data.items(): if dom in all_excludes or dom.endswith(".de"): continue e = agg[dom] e["total"] += s["total"] e["blocked"] += s["blocked"] e["accepted"] += s["accepted"] filtered = [] for dom, s in agg.items(): if s["total"] == 0 or s["blocked"] == 0: continue if search_term and search_term not in dom: continue pct = int(s["blocked"] / s["total"] * 100) if min_pct <= pct <= max_pct: filtered.append((dom, s, pct)) if not filtered: print("No stats available.") sys.stdout.flush() return # Sortieren greift direkt auf den vorberechneten pct-Wert (x[2]) zu filtered.sort(key=lambda x: (x[2], x[1]["total"]), reverse=True) print("==== TOTAL DOMAIN STATS (all days + today) ====") for dom, s, pct in filtered: meets_block_criteria = (s["total"] >= MIN_COUNT) and (pct >= BLOCK_QUOTA) if only_blocked and not meets_block_criteria: continue marker = f">>{pct}%<<" if meets_block_criteria else f" {pct}% " print(f"{dom:<40} accept: {s['accepted']:<5} blocked: {s['blocked']:<5} total: {s['total']:<5} {marker:<10}") print("==== END TOTAL DOMAIN STATS ====") sys.stdout.flush() # --------------------------------------------------------------------------- # Logging & Extraction # --------------------------------------------------------------------------- FROM_REGEX = re.compile(r"from=<[^>]*@([^> ,]+)>") def extract_domain(message: str) -> str: m = FROM_REGEX.search(message) if m: return normalize_domain(m.group(1)) return "" def log(msg: str): """Schreibt nach stderr mit Rate-Limit, um Pipe-Blocking zu vermeiden.""" global _log_window_start, _log_window_count, _log_dropped_count now = time.monotonic() if now - _log_window_start >= 1.0: # Neue Sekunde: ggf. Dropped-Counter ausgeben, dann Reset if _log_dropped_count > 0: try: sys.stderr.write(f"spf-policy: [rate-limit] {_log_dropped_count} log lines dropped\n") sys.stderr.flush() except Exception: pass _log_dropped_count = 0 _log_window_start = now _log_window_count = 0 if _log_window_count >= LOG_RATE_LIMIT_PER_SEC: _log_dropped_count += 1 return _log_window_count += 1 try: sys.stderr.write(f"spf-policy: {msg}\n") sys.stderr.flush() except Exception: pass # --------------------------------------------------------------------------- # Cache persistence # --------------------------------------------------------------------------- def save_cache(): CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) today_str = date.today().isoformat() cutoff = (date.today() - timedelta(days=DAYS)).isoformat() pruned = { day: data for day, data in CACHE.items() if day != today_str and day >= cutoff } tmp = CACHE_FILE.with_suffix(".tmp") try: with open(tmp, "w") as f: json.dump(pruned, f) tmp.replace(CACHE_FILE) except Exception as e: log(f"save_cache error: {e}") def load_cache(): global CACHE if not CACHE_FILE.exists(): return try: with open(CACHE_FILE) as f: data = json.load(f) today_str = date.today().isoformat() cutoff = (date.today() - timedelta(days=DAYS)).isoformat() CACHE = { day: stats for day, stats in data.items() if day != today_str and day >= cutoff } log(f"load_cache: {len(CACHE)} days loaded from {CACHE_FILE}") except Exception as e: log(f"load_cache error: {e}") # --------------------------------------------------------------------------- # Log reading # # Strategie für minimalen RAM/CPU-Verbrauch: # - Ein einziger journalctl-Aufruf für alle benötigten Tage am Stück, statt # N Aufrufe (spart journalctl-Index-Walk). # - Server-side Filterung mit --grep, sodass irrelevante Zeilen gar nicht erst # serialisiert werden -> drastisch weniger I/O und weniger json.loads(). # - Reduzierte Output-Felder (--output-fields=MESSAGE,__REALTIME_TIMESTAMP). # - Pre-Filter via 'in'-Substring-Check, BEVOR json.loads() aufgerufen wird: # Wenn weder "proxy-reject" noch "proxy-accept" noch "Domain not found" in # der Rohzeile vorkommen, ist json.loads() unnötig. Substring-Tests sind # ~50x schneller als json.loads(). # - Streaming-Generator + tagweises Flushen: Der RAM-Footprint bleibt auf der # Größenordnung *eines* Tages, statt aller Tage gleichzeitig. # - Lokale Bindings für Hot-Path-Funktionen. # --------------------------------------------------------------------------- # Pre-Filter-Tokens und --grep-Pattern werden zentral aus _BLOCK_TOKENS / # _ACCEPT_TOKENS oben abgeleitet -> keine drei Stellen mehr synchron zu halten. _PREFILTER_TOKENS: tuple[str, ...] = tuple( t for t, _ in (*_BLOCK_TOKENS, *_ACCEPT_TOKENS) ) # --grep-Pattern (regex, ERE) - läuft serverseitig in journalctl. # Reduziert die transferierte Datenmenge massiv bei stark verrauschten Logs. _JOURNAL_GREP: str = "|".join(_PREFILTER_TOKENS) def _journalctl_cmd(extra_args: list[str]) -> list[str]: """Baut den journalctl-Aufruf. Setzt voraus, dass der User Mitglied der Gruppe systemd-journal ist (siehe Hinweis am Dateianfang). Falls sudo zwingend benötigt wird, kann man hier ['sudo', '-n', ...] voranstellen.""" return [ "sudo", "journalctl", "--no-pager", "-u", "postfix", "-o", "json", "--output-fields=MESSAGE,__REALTIME_TIMESTAMP", "--grep", _JOURNAL_GREP, ] + extra_args def _iter_journal_events(extra_args: list[str]) -> Iterator[tuple[str, str, bool, bool]]: """Generator über (line_date, domain, is_blocked, is_accepted) Tupel. Streamt journalctl zeilenweise und hält keinen großen Buffer im RAM. Schluckt Fehler und loggt einmalig. """ # Lokale Bindings für Performance im Hot-Path block_search = BLOCK_RE.search accept_search = ACCEPT_RE.search json_loads = json.loads fromtimestamp = datetime.fromtimestamp _extract_domain = extract_domain prefilter_tokens = _PREFILTER_TOKENS proc = None try: proc = subprocess.Popen( _journalctl_cmd(extra_args), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1 << 16, # 64 KiB Pipe-Buffer ) for raw in proc.stdout: # Schneller Pre-Filter VOR json.loads (Substring ist ~50x schneller). # journalctl --grep filtert serverseitig, aber falls --grep mal # entfernt wird, bleibt diese Zeile als Sicherheitsnetz. if not any(tok in raw for tok in prefilter_tokens): continue line = raw.strip() if not line: continue try: entry = json_loads(line) except json.JSONDecodeError: continue message = entry.get("MESSAGE", "") if not isinstance(message, str): continue is_blocked = bool(block_search(message)) is_accepted = bool(accept_search(message)) and not is_blocked if not is_blocked and not is_accepted: continue dom = _extract_domain(message) if not dom: continue ts = entry.get("__REALTIME_TIMESTAMP") if not ts: continue try: # UTC-bewusst: Tagesgrenzen sind sonst zeitzonenabhängig. dt = fromtimestamp(int(ts) / 1_000_000, tz=timezone.utc) line_date = dt.date().isoformat() except (ValueError, TypeError, OSError): continue yield line_date, dom, is_blocked, is_accepted proc.stdout.close() proc.wait(timeout=120) except subprocess.TimeoutExpired: if proc is not None: proc.kill() log("_iter_journal_events: journalctl timed out") except Exception as e: log(f"_iter_journal_events error: {e}") finally: if proc is not None and proc.poll() is None: try: proc.kill() except Exception: pass def _iter_journal_events_after_cursor() -> Iterator[tuple[str, str, bool, bool]]: """Wie _iter_journal_events, aber mit Cursor-File. Wird für inkrementelle Updates während des laufenden Betriebs genutzt — typischerweise sehr wenig Output, daher kein Streaming-Overhead nötig.""" return _iter_journal_events([f"--cursor-file={JOURNAL_CURSOR_FILE}"]) def _bump(day_dict: dict, dom: str, is_blocked: bool, is_accepted: bool): """Inkrementiert total/blocked/accepted für eine Domain in einem Tagesdict.""" e = day_dict.get(dom) if e is None: e = {"total": 0, "blocked": 0, "accepted": 0} day_dict[dom] = e e["total"] += 1 if is_blocked: e["blocked"] += 1 if is_accepted: e["accepted"] += 1 # --------------------------------------------------------------------------- # Cache initialization # --------------------------------------------------------------------------- def initialize_cache(): """RAM-schonender Erst-Import: - Ein einziger journalctl-Stream für alle fehlenden Tage gemeinsam. - Tagesweises Flushen in CACHE: sobald der Datums-Wechsel erkannt wird, ist der vorherige Tag fertig und kann persistiert werden — der RAM hält immer nur einen aktiven Tag. - Periodisches save_cache() nach N fertigen Tagen, damit ein Crash während Erst-Import nicht alle Fortschritte verliert. """ global CACHE, STATS, _blocklist_dirty, _last_incremental_update load_cache() today = date.today() today_str = today.isoformat() needed_days = { (today - timedelta(days=i)).isoformat() for i in range(1, DAYS + 1) } - set(CACHE.keys()) if needed_days: oldest = min(needed_days) log(f"initialize_cache: need {len(needed_days)} days, streaming from {oldest}") # Streaming-Strategie: aktuellen Tag im RAM halten, beim Wechsel flushen. current_day: Optional[str] = None current_day_data: dict = {} days_done_since_flush = 0 # Tag-Set für O(1) Lookup beim Filtern (wir streamen ggf. auch # Tage, die schon im CACHE sind — die werden hier verworfen). needed_days_set = needed_days # Wir nehmen einen großen Stream "since oldest" — journalctl wird die # Tage in chronologischer Reihenfolge liefern, daher reicht ein # einfacher Tageswechsel-Detektor. for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", oldest]): # Tage außerhalb des needed-Fensters überspringen if line_date == today_str: # Heutige Daten kommen weiter unten via STATS rein continue if line_date not in needed_days_set: continue if current_day is None: current_day = line_date current_day_data = {} elif line_date != current_day: # Tag fertig — flushen CACHE[current_day] = current_day_data days_done_since_flush += 1 current_day_data = {} current_day = line_date if days_done_since_flush >= INITIAL_IMPORT_FLUSH_EVERY_DAYS: save_cache() days_done_since_flush = 0 _bump(current_day_data, dom, is_blocked, is_accepted) # Letzten Tag auch flushen if current_day is not None: CACHE[current_day] = current_day_data current_day_data = {} # RAM freigeben # Tage ohne Events trotzdem als leer markieren, sonst werden sie beim # nächsten Start wieder importiert for d in needed_days_set: if d not in CACHE: CACHE[d] = {} save_cache() else: log("initialize_cache: all historical days present in cache file") # Heutige Stats separat einlesen (kleiner Datensatz) log("initialize_cache: loading today's stats from journal") today_data: dict = {} for line_date, dom, is_blocked, is_accepted in _iter_journal_events(["--since", today_str]): if line_date != today_str: continue _bump(today_data, dom, is_blocked, is_accepted) STATS = today_data ensure_cursor_dir() # Cursor-File initialisieren, indem wir einmal "leer" durchlaufen # (oder: alle bereits gelesenen Events konsumieren) for _ in _iter_journal_events_after_cursor(): pass _blocklist_dirty = True _last_incremental_update = time.monotonic() def update_today_incremental(): """Inkrementelles Update — wird bei jedem Request via current_blocklist() aufgerufen, ist aber per Throttle auf INCREMENTAL_UPDATE_INTERVAL Sekunden begrenzt, um nicht bei jeder Mail einen subprocess zu spawnen. """ global STATS, CACHE, _blocklist_dirty, _last_incremental_update now = time.monotonic() if now - _last_incremental_update < INCREMENTAL_UPDATE_INTERVAL: return _last_incremental_update = now ensure_cursor_dir() today_str = date.today().isoformat() cache_updated = False new_stats_seen = False for line_date, dom, is_blocked, is_accepted in _iter_journal_events_after_cursor(): if line_date == today_str: _bump(STATS, dom, is_blocked, is_accepted) new_stats_seen = True else: # Vergangener Tag — nur eintragen, wenn noch nicht im CACHE # (verhindert Doppelzählung bei Tagesübergang). if line_date not in CACHE: CACHE[line_date] = {} cache_updated = True _bump(CACHE[line_date], dom, is_blocked, is_accepted) elif not CACHE[line_date]: _bump(CACHE[line_date], dom, is_blocked, is_accepted) cache_updated = True if cache_updated: save_cache() if new_stats_seen or cache_updated: _blocklist_dirty = True # --------------------------------------------------------------------------- # Candidate detection (Optimized) # --------------------------------------------------------------------------- def find_candidates(cache: dict, stats: dict) -> list[tuple[str, int]]: total = defaultdict(int) blocked = defaultdict(int) for day_data in [*cache.values(), stats]: for dom, s in day_data.items(): total[dom] += s["total"] blocked[dom] += s["blocked"] candidates = [ (dom, tot) for dom, tot in total.items() if tot >= MIN_COUNT and (blocked[dom] / tot) > BLOCK_QUOTA_FLOAT ] return sorted(candidates, key=lambda x: x[1], reverse=True) # --------------------------------------------------------------------------- # Autoblocklist # --------------------------------------------------------------------------- def current_blocklist() -> set[str]: global _blocklist_cache, _blocklist_dirty update_today_incremental() if not _blocklist_dirty: return _blocklist_cache transport_excludes = load_transport_domains() all_excludes = BLOCKLIST_BAN_NORMALIZED | transport_excludes candidates = find_candidates(CACHE, STATS) # Als O(1) Set statt Liste speichern _blocklist_cache = { dom for dom, _ in candidates if not dom.endswith(".de") and dom not in all_excludes } _blocklist_dirty = False return _blocklist_cache # --------------------------------------------------------------------------- # Hunter IO (mit RAM Cache) # --------------------------------------------------------------------------- def check_for_hunter_io(url: str) -> bool: global HUNTER_CACHE if url in HUNTER_CACHE: return HUNTER_CACHE[url] headers = {"User-Agent": "Mozilla/5.0 (compatible; Python-Checker/1.0)"} abusestring = "abuse@hunter.io" result = False try: response = requests.get(f'https://{url}', headers=headers, timeout=3, allow_redirects=False) response.raise_for_status() result = abusestring in response.text except Exception: pass if len(HUNTER_CACHE) < GENERIC_CACHE_SIZE: HUNTER_CACHE[url] = result return result # --------------------------------------------------------------------------- # SPF & TXT Check (mit RAM Cache) # --------------------------------------------------------------------------- def txt_contains_prefix(domain: str) -> bool: global TXT_PREFIX_CACHE if domain in TXT_PREFIX_CACHE: return TXT_PREFIX_CACHE[domain] result = False try: answers = resolver.resolve(domain, "TXT") for r in answers: txt = "".join([s.decode() for s in r.strings]) if any(txt.startswith(prefix) for prefix in BLOCKLIST_TXT_PREFIX): result = True break except Exception: pass if len(TXT_PREFIX_CACHE) < GENERIC_CACHE_SIZE: TXT_PREFIX_CACHE[domain] = result return result def get_spf(domain: str) -> str | None: global SPF_CACHE if domain in SPF_CACHE: return SPF_CACHE[domain] result = None try: answers = resolver.resolve(domain, "TXT") for r in answers: txt = "".join([s.decode() for s in r.strings]) if txt.startswith("v=spf1"): result = txt break except Exception: pass if len(SPF_CACHE) < SPF_CACHE_SIZE: SPF_CACHE[domain] = result return result def get_all_spf_includes(domain: str, depth: int = 0, visited: set | None = None) -> set: if visited is None: visited = set() includes = set() if depth > MAX_SPF_DEPTH or domain in visited: return includes visited.add(domain) spf_record = get_spf(domain) if not spf_record: return includes for part in spf_record.split(): if part.startswith("include:"): include = part.split(":", 1)[1].lower() includes.add(include) includes.update(get_all_spf_includes(include, depth + 1, visited)) return includes def spf_contains_block(domain: str) -> bool: """Cached: das Set-Building ist teuer, aber Domain-stabil.""" cached = SPF_BLOCK_CACHE.get(domain) if cached is not None: return cached includes = get_all_spf_includes(domain) blocklist_lower = {b.lower() for b in BLOCKLIST_SPF} result = bool(includes & blocklist_lower) if len(SPF_BLOCK_CACHE) < GENERIC_CACHE_SIZE: SPF_BLOCK_CACHE[domain] = result return result def sender_matches_regex(sender: str) -> tuple[bool, str | None]: for regex in BLOCKLIST_REGEX_COMPILED: if regex.match(sender): return True, regex.pattern return False, None # --------------------------------------------------------------------------- # Request handling (Optimized O(1) lookups) # --------------------------------------------------------------------------- def parse_search_args(arg: str) -> tuple[str, int, int, bool]: """Parsed `stats`-Argumente. Konvention: - 0 Zahlen -> min=0, max=100 - 1 Zahl N -> min=N, max=100 (»mindestens N%«) - 2+ Zahlen A B -> min=min(A,B), max=max(A,B) - 'b' / 'blocked' -> only_blocked = True """ parts = arg.strip().split() search_term = "" numbers: list[int] = [] only_blocked = False for p in parts: p_lower = p.lower() if p_lower in ("-b", "blocked"): only_blocked = True elif p.isdigit() and 0 <= int(p) <= 100: numbers.append(int(p)) else: search_term = p_lower if len(numbers) == 0: min_pct, max_pct = 0, 100 elif len(numbers) == 1: min_pct, max_pct = numbers[0], 100 else: min_pct, max_pct = min(numbers[0], numbers[1]), max(numbers[0], numbers[1]) return search_term, min_pct, max_pct, only_blocked def handle_request(attrs: dict) -> str: sender = attrs.get("sender", "") recipient = attrs.get("recipient", "").lower() client_name = attrs.get("client_name", "").lower() helo_name = attrs.get("helo_name", "").lower() client_address = attrs.get("client_address", "") # HELO Whitelist if domain_matches_list(helo_name, WHITELIST_HELO): log(f"whitelist sender by whitelist") return "dunno" # Untrusted-HELO einmalig auswerten — drei Stellen unten brauchen ihn. helo_is_untrusted = domain_matches_list(helo_name, UNTRUSTED_HELO) if recipient: # BLOCKLIST_RECIPIENT kann Volladdressen ("user@example.com") oder # Domains ("@example.com" oder "example.com") enthalten. Wir matchen: # - exakte Adresse # - "@domain"-Form (Legacy) # - Subdomain-Match auf den reinen Domainteil (konsistent mit Sender-Logik) if recipient in BLOCKLIST_RECIPIENT: log(f"blocked recipient: {recipient}") return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]" if "@" in recipient: rcpt_domain_only = recipient.split("@", 1)[1] if ("@" + rcpt_domain_only) in BLOCKLIST_RECIPIENT or domain_matches_list(rcpt_domain_only, BLOCKLIST_RECIPIENT): log(f"blocked recipient by domain: {recipient}") return "reject reject for policy reasons [policyguard-400] [non-existent-recipient]" if "@" not in sender: return "dunno" domain = sender.split("@", 1)[1].lower() try: # Block by Clientname (O(1) Set Lookup) if domain_matches_list(client_name, BLOCKLIST_CLIENTNAME): log(f"blocked client hostname: {client_name}") return "reject reject for policy reasons [policyguard-400] [blacklist-client]" # Block Google Groups (und andere untrusted HELOs) per Sender-Regex if helo_is_untrusted: matched, pattern = sender_matches_regex(sender) if matched: log(f"blocked sender by regex ({pattern}): {sender}") return "reject reject for policy reasons [policyguard-400] [gg]" # Block history bad domains (O(1) Set Lookup instead of loop) bl_cache = current_blocklist() if domain_matches_list(domain, bl_cache): log(f"blocked sender domain via autoblocklist {domain}") return "reject reject for policy reasons [policyguard-500] [abl]" # Block Firebase by Domain Ending (O(1) Set Lookup) if domain_matches_list(domain, BLOCKLIST_SENDER): log(f"blocked sender domain via blocklist {domain}") return "reject reject for policy reasons [policyguard-500] [blacklist-sender]" # Echte SPF-Auswertung via pyspf if client_address: spf_result, spf_explanation = spf.check2(i=client_address, s=sender, h=helo_name) if not domain_matches_list(client_name, WHITELIST_CLIENT_SPF): if spf_result == 'fail': log(f"blocked sender {sender} due to SPF Fail ({spf_explanation})") return "reject reject for policy reasons [policyguard-400] [spf-fail]" if helo_is_untrusted: if spf_result == 'permerror': log(f"blocked sender {sender} due to SPF PermError ({spf_explanation})") return "reject reject for policy reasons [policyguard-400] [spf-permerror]" if spf_result == 'softfail': log(f"blocked Google HELO {helo_name} for sender {sender} due to SPF SoftFail ({spf_explanation})") return f"reject reject for policy reasons [policyguard-400] [g-spf-sfail] SPF-Softfail from Google" #if spf_result == 'none': # log(f"blocked HELO {helo_name} without SPF for sender {domain}") # return f"reject reject for policy reasons [policyguard-500] [g-spf-none]" # Block Firebase by SPF if spf_contains_block(domain): log(f"blocked sender domain via spf {domain}") return "reject reject for policy reasons [policyguard-500] [fb-txt-spf-badentry]" # Block by TXT prefix (Cached) if txt_contains_prefix(domain): log(f"blocked sender domain via TXT prefix {domain}") return "reject reject for policy reasons [policyguard-500] [fb-txt-prefix]" # Block hunter.io (Cached) if "check" in sender and check_for_hunter_io(domain): log(f"blocked sender domain via hunter.io {domain}") return "reject reject for policy reasons [policyguard-500] [hunter]" if client_name == "unknown": log(f"deferred unknown client: {client_address}") return "defer 450 4.7.1 Client host rejected: temporary defer for unknown hostname [policyguard-500]" except Exception as e: log(f"error checking {domain}: {e}") return "dunno" def main(): initialize_cache() while True: attrs = {} while True: line = sys.stdin.readline() if line == "": return line = line.strip() if not line: break if "=" in line: k, v = line.split("=", 1) attrs[k] = v # Interaktive Befehle direkt während des Einlesens verarbeiten (Original-Verhalten) if line.startswith("blocklist"): parts = line.split(None, 1) search_term = parts[1].strip().lower() if len(parts) > 1 else "" bl = sorted(current_blocklist()) if search_term: bl = [d for d in bl if search_term in d] print("==== BEGIN CURRENT BLOCKLIST ====") print("\n".join(bl)) print("==== END CURRENT BLOCKLIST ====") sys.stdout.flush() if line == "spfcache": print("==== SPF CACHE ENTRY ====") print("\n".join(sorted(SPF_CACHE))) print("==== END SPF CACHE ENTRY ====") sys.stdout.flush() if line.startswith("stats"): parts = line.split(None, 1) search_term, min_pct, max_pct, only_blocked = parse_search_args(parts[1]) if len(parts) > 1 else ("", 0, 100, False) stats(search_term, min_pct, max_pct, only_blocked) if not attrs: continue action = handle_request(attrs) print(f"action={action}\n") sys.stdout.flush() if __name__ == "__main__": main()