#!/usr/bin/env python3 # pylint: disable=C0103 """ From a list of subdomains, output only the ones resolving to a first-party tracker. """ import argparse import sys import progressbar import csv import typing import ipaddress # DomainRule = typing.Union[bool, typing.Dict[str, 'DomainRule']] DomainRule = typing.Union[bool, typing.Dict] # IpRule = typing.Union[bool, typing.Dict[int, 'DomainRule']] IpRule = typing.Union[bool, typing.Dict] RULES_DICT: DomainRule = dict() RULES_IP_DICT: IpRule = dict() def get_bits(address: ipaddress.IPv4Address) -> typing.Iterator[int]: for char in address.packed: for i in range(7, -1, -1): yield (char >> i) & 0b1 def subdomain_matching(subdomain: str) -> bool: parts = subdomain.split('.') parts.reverse() dic = RULES_DICT for part in parts: if isinstance(dic, bool) or part not in dic: break dic = dic[part] if isinstance(dic, bool): return dic return False def ip_matching(ip_str: str) -> bool: ip = ipaddress.ip_address(ip_str) dic = RULES_IP_DICT i = 0 for bit in get_bits(ip): i += 1 if isinstance(dic, bool) or bit not in dic: break dic = dic[bit] if isinstance(dic, bool): return dic return False def get_matching(chain: typing.List[str], no_explicit: bool = False ) -> typing.Iterable[str]: if len(chain) <= 1: return initial = chain[0] cname_destinations = chain[1:-1] a_destination = chain[-1] initial_matching = subdomain_matching(initial) if no_explicit and initial_matching: return cname_matching = any(map(subdomain_matching, cname_destinations)) if cname_matching or initial_matching or ip_matching(a_destination): yield initial def register_rule(subdomain: str) -> None: # Make a tree with domain parts parts = subdomain.split('.') parts.reverse() dic = RULES_DICT last_part = len(parts) - 1 for p, part in enumerate(parts): if isinstance(dic, bool): return if p == last_part: dic[part] = True else: dic.setdefault(part, dict()) dic = dic[part] def register_rule_ip(network: str) -> None: net = ipaddress.ip_network(network) ip = net.network_address dic = RULES_IP_DICT last_bit = net.prefixlen - 1 for b, bit in enumerate(get_bits(ip)): if isinstance(dic, bool): return if b == last_bit: dic[bit] = True else: dic.setdefault(bit, dict()) dic = dic[bit] if __name__ == '__main__': # Parsing arguments parser = argparse.ArgumentParser( description="Filter first-party trackers from a list of subdomains") parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Input file with DNS chains") parser.add_argument( '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, help="Outptut file with one tracking subdomain per line") parser.add_argument( '-n', '--no-explicit', action='store_true', help="Don't output domains already blocked with rules without CNAME") parser.add_argument( '-r', '--rules', type=argparse.FileType('r'), help="List of domains domains to block (with their subdomains)") parser.add_argument( '-p', '--rules-ip', type=argparse.FileType('r'), help="List of IPs ranges to block") args = parser.parse_args() # Progress bar widgets = [ progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' ', progressbar.Timer(), ' ', progressbar.AdaptiveTransferSpeed(unit='req'), ' ', progressbar.AdaptiveETA(), ] progress = progressbar.ProgressBar(widgets=widgets) # Reading rules if args.rules: for rule in args.rules: register_rule(rule.strip()) if args.rules_ip: for rule in args.rules_ip: register_rule_ip(rule.strip()) # Approximating line count if args.input.seekable(): lines = 0 for line in args.input: lines += 1 progress.max_value = lines args.input.seek(0) # Reading domains to filter reader = csv.reader(args.input) progress.start() for chain in reader: for match in get_matching(chain, no_explicit=args.no_explicit): print(match, file=args.output) progress.update(progress.value + 1) progress.finish()