#!/usr/bin/env python3 """ From a list of subdomains, output only the ones resolving to a first-party tracker. """ import argparse import logging import os import queue import sys import threading import typing import csv import coloredlogs import dns.exception import dns.resolver import progressbar DNS_TIMEOUT = 5.0 NUMBER_THREADS = 512 NUMBER_TRIES = 5 # TODO All the domains don't get treated, # so it leaves with 4-5 subdomains not resolved glob = None class Worker(threading.Thread): """ Worker process for a DNS resolver. Will resolve DNS to match first-party subdomains. """ def change_nameserver(self) -> None: """ Assign a this worker another nameserver from the queue. """ server = None while server is None: try: server = self.orchestrator.nameservers_queue.get(block=False) except queue.Empty: self.orchestrator.refill_nameservers_queue() self.log.info("Using nameserver: %s", server) self.resolver.nameservers = [server] def __init__(self, orchestrator: 'Orchestrator', index: int = 0): super(Worker, self).__init__() self.log = logging.getLogger(f'worker{index:03d}') self.orchestrator = orchestrator self.resolver = dns.resolver.Resolver() self.change_nameserver() def resolve_subdomain(self, subdomain: str) -> typing.Optional[ typing.List[ str ] ]: """ Returns the resolution chain of the subdomain to an A record, including any intermediary CNAME. The last element is an IP address. Returns None if the nameserver was unable to satisfy the request. Returns [] if the requests points to nothing. """ self.log.debug("Querying %s", subdomain) try: query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) except dns.resolver.NXDOMAIN: return [] except dns.resolver.NoAnswer: return [] except dns.resolver.YXDOMAIN: self.log.warning("Query name too long for %s", subdomain) return None except dns.resolver.NoNameservers: # NOTE Most of the time this error message means that the domain # does not exists, but sometimes it means the that the server # itself is broken. So we count on the retry logic. self.log.warning("All nameservers broken for %s", subdomain) return None except dns.exception.Timeout: # NOTE Same as above self.log.warning("Timeout for %s", subdomain) return None except dns.name.EmptyLabel: self.log.warning("Empty label for %s", subdomain) return None resolved = list() last = len(query.response.answer) - 1 for a, answer in enumerate(query.response.answer): if answer.rdtype == dns.rdatatype.CNAME: assert a < last resolved.append(answer.items[0].to_text()[:-1]) elif answer.rdtype == dns.rdatatype.A: assert a == last resolved.append(answer.items[0].address) else: assert False return resolved def run(self) -> None: self.log.info("Started") subdomain: str for subdomain in iter(self.orchestrator.subdomains_queue.get, None): for _ in range(NUMBER_TRIES): resolved = self.resolve_subdomain(subdomain) # Retry with another nameserver if error if resolved is None: self.change_nameserver() else: break # If it wasn't found after multiple tries if resolved is None: self.log.error("Gave up on %s", subdomain) resolved = [] resolved.insert(0, subdomain) assert isinstance(resolved, list) self.orchestrator.results_queue.put(resolved) self.orchestrator.results_queue.put(None) self.log.info("Stopped") class Orchestrator(): """ Orchestrator of the different Worker threads. """ def refill_nameservers_queue(self) -> None: """ Re-fill the given nameservers into the nameservers queue. Done every-time the queue is empty, making it basically looping and infinite. """ # Might be in a race condition but that's probably fine for nameserver in self.nameservers: self.nameservers_queue.put(nameserver) self.log.info("Refilled nameserver queue") def __init__(self, subdomains: typing.Iterable[str], nameservers: typing.List[str] = None, ): self.log = logging.getLogger('orchestrator') self.subdomains = subdomains # Use interal resolver by default self.nameservers = nameservers or dns.resolver.Resolver().nameservers self.subdomains_queue: queue.Queue = queue.Queue( maxsize=NUMBER_THREADS) self.results_queue: queue.Queue = queue.Queue() self.nameservers_queue: queue.Queue = queue.Queue() self.refill_nameservers_queue() def fill_subdomain_queue(self) -> None: """ Read the subdomains in input and put them into the queue. Done in a thread so we can both: - yield the results as they come - not store all the subdomains at once """ self.log.info("Started reading subdomains") # Send data to workers for subdomain in self.subdomains: self.subdomains_queue.put(subdomain) self.log.info("Finished reading subdomains") # Send sentinel to each worker # sentinel = None ~= EOF for _ in range(NUMBER_THREADS): self.subdomains_queue.put(None) def run(self) -> typing.Iterable[typing.List[str]]: """ Yield the results. """ # Create workers self.log.info("Creating workers") for i in range(NUMBER_THREADS): Worker(self, i).start() fill_thread = threading.Thread(target=self.fill_subdomain_queue) fill_thread.start() # Wait for one sentinel per worker # In the meantime output results for _ in range(NUMBER_THREADS): result: typing.List[str] for result in iter(self.results_queue.get, None): yield result self.log.info("Waiting for reader thread") fill_thread.join() self.log.info("Done!") def main() -> None: """ Main function when used directly. Read the subdomains provided and output it, the last CNAME resolved and the IP adress it resolves to. Takes as an input a filename (or nothing, for stdin), and as an output a filename (or nothing, for stdout). The input must be a subdomain per line, the output is a comma-sep file with the columns source CNAME and A. Use the file `nameservers` as the list of nameservers to use, or else it will use the system defaults. Also shows a nice progressbar. """ # Initialization coloredlogs.install( level='DEBUG', fmt='%(asctime)s %(name)s %(levelname)s %(message)s' ) # Parsing arguments parser = argparse.ArgumentParser( description="Massively resolves subdomains and store them in a file.") parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Input file with one subdomain per line") parser.add_argument( '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, help="Outptut file with DNS chains") # parser.add_argument( # '-n', '--nameserver', type=argparse.FileType('r'), # default='nameservers', help="File with one nameserver per line") # parser.add_argument( # '-j', '--workers', type=int, default=512, # help="Number of threads to use") args = parser.parse_args() # Progress bar widgets = [ progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' ', progressbar.Timer(), ' ', progressbar.AdaptiveTransferSpeed(unit='req'), ' ', progressbar.AdaptiveETA(), ] progress = progressbar.ProgressBar(widgets=widgets) if args.input.seekable(): progress.max_value = len(args.input.readlines()) args.input.seek(0) # Cleaning input iterator = iter(args.input) iterator = map(str.strip, iterator) iterator = filter(None, iterator) # Reading nameservers servers: typing.List[str] = list() if os.path.isfile('nameservers'): servers = open('nameservers').readlines() servers = list(filter(None, map(str.strip, servers))) writer = csv.writer(args.output) progress.start() global glob glob = Orchestrator(iterator, servers) for resolved in glob.run(): progress.update(progress.value + 1) writer.writerow(resolved) progress.finish() if __name__ == '__main__': main()