#!/usr/bin/env python3 # pylint: disable=C0103 """ From a list of subdomains, output only the ones resolving to a first-party tracker. """ import logging import threading import queue import os import re import sys import typing import coloredlogs import dns.exception import dns.resolver import progressbar import regexes DNS_TIMEOUT = 60.0 # TODO Try again does not work because sentinel get through first :/ class DnsResolver(threading.Thread): """ Worker process for a DNS resolver. Will resolve DNS to match first-party subdomains. """ def __init__(self, in_queue: queue.Queue, out_queue: queue.Queue, server: str): super(DnsResolver, self).__init__() self.log = logging.getLogger(server) self.in_queue = in_queue self.out_queue = out_queue self.resolver = dns.resolver.Resolver() self.resolver.nameservers = [server] def is_subdomain_matching(self, subdomain: str) -> typing.Optional[bool]: """ Indicates if the subdomain redirects to a first-party tracker. """ # TODO Look at the whole chain rather than the last one # TODO Also match the ASN of the IP (caching the ASN subnetworks will do) try: query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) except dns.resolver.NXDOMAIN: return False except dns.resolver.NoAnswer: return False except dns.resolver.YXDOMAIN: self.log.warning("Query name too long for %s", subdomain) return False except dns.resolver.NoNameservers: self.log.warning("All nameservers broken for %s", subdomain) return None except dns.exception.Timeout: self.log.warning("Timeout for %s", subdomain) return None except dns.name.EmptyLabel: self.log.warning("Empty label for %s", subdomain) return False canonical = query.canonical_name.to_text() for regex in regexes.REGEXES: if re.match(regex, canonical): return True return False def run(self) -> None: self.log.info("Started") for subdomain in iter(self.in_queue.get, None): matching = self.is_subdomain_matching(subdomain) # If issue, retry if matching is None: # matching = False self.in_queue.put(subdomain) continue result = (subdomain, matching) # self.log.debug("%s", result) self.out_queue.put(result) self.out_queue.put(None) self.log.info("Stopped") def get_matching_subdomains(subdomains: typing.Iterable[str], nameservers: typing.List[str] = None, ) -> typing.Iterable[typing.Tuple[str, bool]]: subdomains_queue: queue.Queue = queue.Queue() results_queue: queue.Queue = queue.Queue() """ Orchestrator of the different DnsResolver threads. """ # Use interal resolver by default servers = nameservers or dns.resolver.Resolver().nameservers # Create workers for server in servers: DnsResolver(subdomains_queue, results_queue, server).start() # Send data to workers for subdomain in subdomains: subdomains_queue.put(subdomain) # Send sentinel to each worker # sentinel = None ~= EOF for _ in servers: subdomains_queue.put(None) # Wait for one sentinel per worker # In the meantime output results for _ in servers: for result in iter(results_queue.get, None): yield result if __name__ == '__main__': coloredlogs.install( level='DEBUG', fmt='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s' ) # Progress bar widgets = [ progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' ', progressbar.Timer(), ' ', progressbar.AdaptiveTransferSpeed(unit='req'), ' ', progressbar.AdaptiveETA(), ] progress = progressbar.ProgressBar(widgets=widgets) # Parsing arguments assert len(sys.argv) <= 2 filename = None if len(sys.argv) == 2 and sys.argv[1] != '-': filename = sys.argv[1] progress.max_value = sum(1 for line in open(filename)) textio = open(filename) else: textio = sys.stdin # Cleaning input iterator = iter(textio) iterator = map(str.strip, iterator) iterator = filter(None, iterator) # Reading nameservers servers: typing.List[str] = list() if os.path.isfile('nameservers'): servers = open('nameservers').readlines() servers = list(filter(None, map(str.strip, servers))) progress.start() for subdomain, matching in get_matching_subdomains(iterator, servers): progress.update(progress.value + 1) if matching: print(subdomain) progress.finish()