#!/usr/bin/env python3 """ From a list of subdomains, output only the ones resolving to a first-party tracker. """ import argparse import logging import os import queue import sys import threading import typing import time import coloredlogs import dns.exception import dns.resolver DNS_TIMEOUT = 5.0 NUMBER_TRIES = 5 class Worker(threading.Thread): """ Worker process for a DNS resolver. Will resolve DNS to match first-party subdomains. """ def change_nameserver(self) -> None: """ Assign a this worker another nameserver from the queue. """ server = None while server is None: try: server = self.orchestrator.nameservers_queue.get(block=False) except queue.Empty: self.orchestrator.refill_nameservers_queue() self.log.info("Using nameserver: %s", server) self.resolver.nameservers = [server] def __init__(self, orchestrator: 'Orchestrator', index: int = 0): super(Worker, self).__init__() self.log = logging.getLogger(f'worker{index:03d}') self.orchestrator = orchestrator self.resolver = dns.resolver.Resolver() self.change_nameserver() def resolve_subdomain(self, subdomain: str) -> typing.Optional[ typing.List[ dns.rrset.RRset ] ]: """ Returns the resolution chain of the subdomain to an A record, including any intermediary CNAME. The last element is an IP address. Returns None if the nameserver was unable to satisfy the request. Returns [] if the requests points to nothing. """ self.log.debug("Querying %s", subdomain) try: query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) except dns.resolver.NXDOMAIN: return [] except dns.resolver.NoAnswer: return [] except dns.resolver.YXDOMAIN: self.log.warning("Query name too long for %s", subdomain) return None except dns.resolver.NoNameservers: # NOTE Most of the time this error message means that the domain # does not exists, but sometimes it means the that the server # itself is broken. So we count on the retry logic. self.log.warning("All nameservers broken for %s", subdomain) return None except dns.exception.Timeout: # NOTE Same as above self.log.warning("Timeout for %s", subdomain) return None except dns.name.EmptyLabel: self.log.warning("Empty label for %s", subdomain) return None return query.response.answer def run(self) -> None: self.log.info("Started") subdomain: str for subdomain in iter(self.orchestrator.subdomains_queue.get, None): for _ in range(NUMBER_TRIES): resolved = self.resolve_subdomain(subdomain) # Retry with another nameserver if error if resolved is None: self.change_nameserver() else: break # If it wasn't found after multiple tries if resolved is None: self.log.error("Gave up on %s", subdomain) resolved = [] assert isinstance(resolved, list) self.orchestrator.results_queue.put(resolved) self.orchestrator.results_queue.put(None) self.log.info("Stopped") class Orchestrator(): """ Orchestrator of the different Worker threads. """ def refill_nameservers_queue(self) -> None: """ Re-fill the given nameservers into the nameservers queue. Done every-time the queue is empty, making it basically looping and infinite. """ # Might be in a race condition but that's probably fine for nameserver in self.nameservers: self.nameservers_queue.put(nameserver) self.log.info("Refilled nameserver queue") def __init__(self, subdomains: typing.Iterable[str], nameservers: typing.List[str] = None, nb_workers: int = 1, ): self.log = logging.getLogger('orchestrator') self.subdomains = subdomains self.nb_workers = nb_workers # Use interal resolver by default self.nameservers = nameservers or dns.resolver.Resolver().nameservers self.subdomains_queue: queue.Queue = queue.Queue( maxsize=self.nb_workers) self.results_queue: queue.Queue = queue.Queue() self.nameservers_queue: queue.Queue = queue.Queue() self.refill_nameservers_queue() def fill_subdomain_queue(self) -> None: """ Read the subdomains in input and put them into the queue. Done in a thread so we can both: - yield the results as they come - not store all the subdomains at once """ self.log.info("Started reading subdomains") # Send data to workers for subdomain in self.subdomains: self.subdomains_queue.put(subdomain) self.log.info("Finished reading subdomains") # Send sentinel to each worker # sentinel = None ~= EOF for _ in range(self.nb_workers): self.subdomains_queue.put(None) @staticmethod def format_rrset(rrset: dns.rrset.RRset) -> typing.Iterable[str]: if rrset.rdtype == dns.rdatatype.CNAME: dtype = 'cname' elif rrset.rdtype == dns.rdatatype.A: dtype = 'a' else: raise NotImplementedError name = rrset.name.to_text()[:-1] for item in rrset.items: value = item.to_text() if rrset.rdtype == dns.rdatatype.CNAME: value = value[:-1] yield '{"timestamp":"' + str(int(time.time())) + '","name":"' + \ name + '","type":"' + dtype + '","value":"' + value + '"}\n' def run(self) -> typing.Iterable[str]: """ Yield the results. """ # Create workers self.log.info("Creating workers") for i in range(self.nb_workers): Worker(self, i).start() fill_thread = threading.Thread(target=self.fill_subdomain_queue) fill_thread.start() # Wait for one sentinel per worker # In the meantime output results for _ in range(self.nb_workers): resolved: typing.List[dns.rrset.RRset] for resolved in iter(self.results_queue.get, None): for rrset in resolved: yield from self.format_rrset(rrset) self.log.info("Waiting for reader thread") fill_thread.join() self.log.info("Done!") def main() -> None: """ Main function when used directly. Read the subdomains provided and output it, the last CNAME resolved and the IP adress it resolves to. Takes as an input a filename (or nothing, for stdin), and as an output a filename (or nothing, for stdout). The input must be a subdomain per line, the output is a TODO Use the file `nameservers` as the list of nameservers to use, or else it will use the system defaults. """ # Initialization coloredlogs.install( level='DEBUG', fmt='%(asctime)s %(name)s %(levelname)s %(message)s' ) # Parsing arguments parser = argparse.ArgumentParser( description="Massively resolves subdomains and store them in a file.") parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Input file with one subdomain per line") parser.add_argument( '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, help="Outptut file with DNS chains") parser.add_argument( '-n', '--nameservers', default='nameservers', help="File with one nameserver per line") parser.add_argument( '-j', '--workers', type=int, default=512, help="Number of threads to use") args = parser.parse_args() # Cleaning input iterator = iter(args.input) iterator = map(str.strip, iterator) iterator = filter(None, iterator) # Reading nameservers servers: typing.List[str] = list() if os.path.isfile(args.nameservers): servers = open(args.nameservers).readlines() servers = list(filter(None, map(str.strip, servers))) for resolved in Orchestrator( iterator, servers, nb_workers=args.workers ).run(): args.output.write(resolved) if __name__ == '__main__': main()