#!/usr/bin/env python3 # pylint: disable=C0103 """ From a list of subdomains, output only the ones resolving to a first-party tracker. """ import argparse import sys import progressbar import csv import typing # DomainRule = typing.Union[bool, typing.Dict[str, 'DomainRule']] DomainRule = typing.Union[bool, typing.Dict] RULES_DICT: DomainRule = dict() def subdomain_matching(subdomain: str) -> bool: parts = subdomain.split('.') parts.reverse() dic = RULES_DICT for part in parts: if isinstance(dic, bool) or part not in dic: break dic = dic[part] if isinstance(dic, bool): return dic return False def get_matching(chain: typing.List[str], no_explicit: bool = False ) -> typing.Iterable[str]: initial = chain[0] cname_destinations = chain[1:-1] # a_destination = chain[-1] initial_matching = subdomain_matching(initial) if no_explicit and initial_matching: return cname_matching = any(map(subdomain_matching, cname_destinations)) if cname_matching or initial_matching: yield initial def register_rule(subdomain: str) -> None: # Make a tree with domain parts parts = subdomain.split('.') parts.reverse() dic = RULES_DICT last_part = len(parts) - 1 for p, part in enumerate(parts): if isinstance(dic, bool): return if p == last_part: dic[part] = True else: dic.setdefault(part, dict()) dic = dic[part] if __name__ == '__main__': # Parsing arguments parser = argparse.ArgumentParser( description="Filter first-party trackers from a list of subdomains") parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Input file with DNS chains") parser.add_argument( '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, help="Outptut file with one tracking subdomain per line") parser.add_argument( '-n', '--no-explicit', action='store_true', help="Don't output domains already blocked with rules without CNAME") parser.add_argument( '-r', '--rules', type=argparse.FileType('r'), default='rules', help="Rules file") args = parser.parse_args() # Progress bar widgets = [ progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' ', progressbar.Timer(), ' ', progressbar.AdaptiveTransferSpeed(unit='req'), ' ', progressbar.AdaptiveETA(), ] progress = progressbar.ProgressBar(widgets=widgets) # Reading rules for rule in args.rules: register_rule(rule.strip()) # Approximating line count if args.input.seekable(): lines = 0 for line in args.input: lines += 1 progress.max_value = lines args.input.seek(0) # Reading domains to filter reader = csv.reader(args.input) progress.start() for chain in reader: for match in get_matching(chain, no_explicit=args.no_explicit): print(match, file=args.output) progress.update(progress.value + 1) progress.finish()