From c23004fbff58aa674ee74d26d60dda118cd151c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Mon, 2 Dec 2019 19:03:08 +0100 Subject: [PATCH] Separated DNS resolution from filtering This effectively removes the parallelism of filtering, which doubles the processing time (5->8 hours), but this allows me to toy around with the performances of this step, which I aim to improve drastically. --- filter_out_explicit.py | 68 ---------- filter_subdomains.py | 246 +++++------------------------------- filter_subdomains.sh | 12 +- resolve_subdomains.py | 277 +++++++++++++++++++++++++++++++++++++++++ temp/.gitignore | 1 + 5 files changed, 316 insertions(+), 288 deletions(-) delete mode 100755 filter_out_explicit.py create mode 100755 resolve_subdomains.py diff --git a/filter_out_explicit.py b/filter_out_explicit.py deleted file mode 100755 index 90eed0b..0000000 --- a/filter_out_explicit.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# pylint: disable=C0103 - -""" -From a list of subdomains to block, -filter out the ones explicitely matching a regex. -It should be already handled by the ad blocker. -""" - -import argparse -import sys -import progressbar - -import adblockparser - -OPTIONS = {"third-party": True} - - -def explicitely_match(subdomain: str) -> bool: - url = f"https://{subdomain}/" - return rules.should_block(url, OPTIONS) - - -if __name__ == '__main__': - - # Parsing arguments - parser = argparse.ArgumentParser( - description="Filter first-party trackers from a list of subdomains") - parser.add_argument( - '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, - help="Input file with one subdomain per line") - parser.add_argument( - '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, - help="Outptut file with one tracking subdomain per line") - parser.add_argument( - '-r', '--rules', type=argparse.FileType('r'), default='rules', - help="Rules file") - args = parser.parse_args() - - # Reading rules - rules: adblockparser.AdblockRules = adblockparser.AdblockRules(args.rules) - - # Progress bar - widgets = [ - progressbar.Percentage(), - ' ', progressbar.SimpleProgress(), - ' ', progressbar.Bar(), - ' ', progressbar.Timer(), - ' ', progressbar.AdaptiveTransferSpeed(unit='req'), - ' ', progressbar.AdaptiveETA(), - ] - progress = progressbar.ProgressBar(widgets=widgets) - if args.input.seekable(): - progress.max_value = len(args.input.readlines()) - args.input.seek(0) - - # Cleaning input - iterator = iter(args.input) - iterator = map(str.strip, iterator) - iterator = filter(None, iterator) - - # Filtering - progress.start() - for subdomain in iterator: - progress.update(progress.value + 1) - if not explicitely_match(subdomain): - print(subdomain, file=args.output) - progress.finish() diff --git a/filter_subdomains.py b/filter_subdomains.py index 03344bf..8c2e9c4 100755 --- a/filter_subdomains.py +++ b/filter_subdomains.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# pylint: disable=C0103 """ From a list of subdomains, output only @@ -6,230 +7,56 @@ the ones resolving to a first-party tracker. """ import argparse -import logging -import os -import queue -import re import sys -import threading +import progressbar +import csv import typing import adblockparser -import coloredlogs -import dns.exception -import dns.resolver -import progressbar -import regexes - -DNS_TIMEOUT = 10.0 -NUMBER_THREADS = 64 -NUMBER_TRIES = 5 +OPTIONS = {"third-party": True} -class Worker(threading.Thread): - """ - Worker process for a DNS resolver. - Will resolve DNS to match first-party subdomains. - """ - OPTIONS = {"third-party": True} - - def change_nameserver(self) -> None: - """ - Assign a this worker another nameserver from the queue. - """ - server = None - while server is None: - try: - server = self.orchestrator.nameservers_queue.get(block=False) - except queue.Empty: - self.orchestrator.refill_nameservers_queue() - self.log.debug("Using nameserver: %s", server) - self.resolver.nameservers = [server] - - def __init__(self, - orchestrator: 'Orchestrator', - index: int = 0): - super(Worker, self).__init__() - self.log = logging.getLogger(f'worker{index:03d}') - self.orchestrator = orchestrator - - self.resolver = dns.resolver.Resolver() - self.change_nameserver() - - def is_subdomain_matching(self, subdomain: str) -> typing.Optional[bool]: - """ - Indicates if the subdomain redirects to a first-party tracker. - Returns None if the nameserver was unable to satisfy the request. - """ - # TODO Look at the whole chain rather than the last one - # TODO Also match the ASN of the IP (caching the ASN subnetworks will do) - try: - query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) - except dns.resolver.NXDOMAIN: - return False - except dns.resolver.NoAnswer: - return False - except dns.resolver.YXDOMAIN: - self.log.warning("Query name too long for %s", subdomain) - return None - except dns.resolver.NoNameservers: - # NOTE Most of the time this error message means that the domain - # does not exists, but sometimes it means the that the server - # itself is broken. So we count on the retry logic. - self.log.warning("All nameservers broken for %s", subdomain) - return None - except dns.exception.Timeout: - # NOTE Same as above - self.log.warning("Timeout for %s", subdomain) - return None - except dns.name.EmptyLabel: - self.log.warning("Empty label for %s", subdomain) - return None - canonical = query.canonical_name.to_text() - # for regex in regexes.REGEXES: - # if re.match(regex, canonical): - # return True - # return False - url = f"https://{canonical[:-1]}/" - return self.orchestrator.rules.should_block(url, Worker.OPTIONS) - - def run(self) -> None: - self.log.info("Started") - for subdomain in iter(self.orchestrator.subdomains_queue.get, None): - - for _ in range(NUMBER_TRIES): - matching = self.is_subdomain_matching(subdomain) - if matching is not None: - break - - # If it wasn't found after multiple tries - if matching is None: - self.log.error("Gave up on %s", subdomain) - matching = False - - result = (subdomain, matching) - self.orchestrator.results_queue.put(result) - - self.orchestrator.results_queue.put(None) - self.log.info("Stopped") +def subdomain_matching(subdomain: str) -> bool: + url = f"https://{subdomain}/" + return rules.should_block(url, OPTIONS) -class Orchestrator(): - """ - Orchestrator of the different Worker threads. - """ - - def refill_nameservers_queue(self) -> None: - """ - Re-fill the given nameservers into the nameservers queue. - Done every-time the queue is empty, making it - basically looping and infinite. - """ - # Might be in a race condition but that's probably fine - for nameserver in self.nameservers: - self.nameservers_queue.put(nameserver) - self.log.info("Refilled nameserver queue") - - def __init__(self, subdomains: typing.Iterable[str], - rules: typing.Iterable[str], - nameservers: typing.List[str] = None, - ): - self.log = logging.getLogger('orchestrator') - self.subdomains = subdomains - - # Use interal resolver by default - self.nameservers = nameservers or dns.resolver.Resolver().nameservers - - self.subdomains_queue: queue.Queue = queue.Queue( - maxsize=NUMBER_THREADS) - self.results_queue: queue.Queue = queue.Queue() - self.nameservers_queue: queue.Queue = queue.Queue() - - # Rules - self.rules = adblockparser.AdblockRules(rules) - - self.refill_nameservers_queue() - - def fill_subdomain_queue(self) -> None: - """ - Read the subdomains in input and put them into the queue. - Done in a thread so we can both: - - yield the results as they come - - not store all the subdomains at once - """ - self.log.info("Started reading subdomains") - # Send data to workers - for subdomain in self.subdomains: - self.subdomains_queue.put(subdomain) - - self.log.info("Finished reading subdomains") - # Send sentinel to each worker - # sentinel = None ~= EOF - for _ in range(NUMBER_THREADS): - self.subdomains_queue.put(None) - - def run(self) -> typing.Iterable[typing.Tuple[str, bool]]: - """ - Yield the results. - """ - # Create workers - self.log.info("Creating workers") - for i in range(NUMBER_THREADS): - Worker(self, i).start() - - fill_thread = threading.Thread(target=self.fill_subdomain_queue) - fill_thread.start() - - # Wait for one sentinel per worker - # In the meantime output results - for _ in range(NUMBER_THREADS): - for result in iter(self.results_queue.get, None): - yield result - - self.log.info("Waiting for reader thread") - fill_thread.join() - - self.log.info("Done!") +def get_matching(chain: typing.List[str], no_explicit: bool = False + ) -> typing.Iterable[str]: + initial = chain[0] + cname_destinations = chain[1:-1] + # a_destination = chain[-1] + initial_matching = subdomain_matching(initial) + if no_explicit and initial_matching: + return + cname_matching = any(map(subdomain_matching, cname_destinations)) + if cname_matching or initial_matching: + yield initial -def main() -> None: - """ - Main function when used directly. - Takes as an input a filename (or nothing, for stdin) - that will be read and the ones that are a tracker - will be outputed on stdout. - Use the file `nameservers` as the list of nameservers - to use, or else it will use the system defaults. - Also shows a nice progressbar. - """ - - # Initialization - coloredlogs.install( - level='DEBUG', - fmt='%(asctime)s %(name)s %(levelname)s %(message)s' - ) +if __name__ == '__main__': # Parsing arguments parser = argparse.ArgumentParser( description="Filter first-party trackers from a list of subdomains") parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, - help="Input file with one subdomain per line") + help="Input file with DNS chains") parser.add_argument( '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, help="Outptut file with one tracking subdomain per line") + parser.add_argument( + '-n', '--no-explicit', action='store_true', + help="Don't output domains already blocked with rules without CNAME") parser.add_argument( '-r', '--rules', type=argparse.FileType('r'), default='rules', help="Rules file") - # parser.add_argument( - # '-n', '--nameserver', type=argparse.FileType('r'), - # default='nameservers', help="File with one nameserver per line") - # parser.add_argument( - # '-j', '--workers', type=int, default=512, - # help="Number of threads to use") args = parser.parse_args() + # Reading rules + rules: adblockparser.AdblockRules = adblockparser.AdblockRules(args.rules) + # Progress bar widgets = [ progressbar.Percentage(), @@ -245,23 +72,12 @@ def main() -> None: args.input.seek(0) # Cleaning input - iterator = iter(args.input) - iterator = map(str.strip, iterator) - iterator = filter(None, iterator) - - # Reading nameservers - servers: typing.List[str] = list() - if os.path.isfile('nameservers'): - servers = open('nameservers').readlines() - servers = list(filter(None, map(str.strip, servers))) + reader = csv.reader(args.input) + # Filtering progress.start() - for subdomain, matching in Orchestrator(iterator, args.rules, servers).run(): + for chain in reader: + for match in get_matching(chain, no_explicit=args.no_explicit): + print(match, file=args.output) progress.update(progress.value + 1) - if matching: - print(subdomain, file=args.output) progress.finish() - - -if __name__ == '__main__': - main() diff --git a/filter_subdomains.sh b/filter_subdomains.sh index 220dae2..1d06a3c 100755 --- a/filter_subdomains.sh +++ b/filter_subdomains.sh @@ -1,14 +1,16 @@ #!/usr/bin/env bash -# Filter out the subdomains not pointing to a first-party tracker +# Resolve the CNAME chain of all the known subdomains for later analysis cat subdomains/*.list | sort -u > temp/all_subdomains.list +./resolve_subdomains.py --input temp/all_subdomains.list --output temp/all_resolved.csv +sort -u temp/all_resolved.csv > temp/all_resolved_sorted.csv + +# Filter out the subdomains not pointing to a first-party tracker cat rules/*.txt | grep -v '^!' | grep -v '^\[Adblock' | sort -u > temp/all_rules.txt -./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_subdomains.list --output temp/all_toblock.list -sort -u temp/all_toblock.list > dist/firstparty-trackers.txt -./filter_out_explicit.py --rules temp/all_rules.txt --input dist/firstparty-trackers.txt --output dist/firstparty-only-trackers.txt +./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_resolved_sorted.csv --output dist/firstparty-trackers.txt +./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_resolved_sorted.csv --no-explicit --output dist/firstparty-only-trackers.txt # Format the blocklist so it can be used as a hostlist - function generate_hosts { basename="$1" description="$2" diff --git a/resolve_subdomains.py b/resolve_subdomains.py new file mode 100755 index 0000000..a047f0d --- /dev/null +++ b/resolve_subdomains.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +""" +From a list of subdomains, output only +the ones resolving to a first-party tracker. +""" + +import argparse +import logging +import os +import queue +import sys +import threading +import typing +import csv + +import adblockparser +import coloredlogs +import dns.exception +import dns.resolver +import progressbar + +DNS_TIMEOUT = 10.0 +NUMBER_THREADS = 64 +NUMBER_TRIES = 5 + +# TODO All the domains don't get treated, +# so it leaves with 4-5 subdomains not resolved + + +class Worker(threading.Thread): + """ + Worker process for a DNS resolver. + Will resolve DNS to match first-party subdomains. + """ + OPTIONS = {"third-party": True} + + def change_nameserver(self) -> None: + """ + Assign a this worker another nameserver from the queue. + """ + server = None + while server is None: + try: + server = self.orchestrator.nameservers_queue.get(block=False) + except queue.Empty: + self.orchestrator.refill_nameservers_queue() + self.log.debug("Using nameserver: %s", server) + self.resolver.nameservers = [server] + + def __init__(self, + orchestrator: 'Orchestrator', + index: int = 0): + super(Worker, self).__init__() + self.log = logging.getLogger(f'worker{index:03d}') + self.orchestrator = orchestrator + + self.resolver = dns.resolver.Resolver() + self.change_nameserver() + + def resolve_subdomain(self, subdomain: str) -> typing.Optional[ + typing.List[ + str + ] + ]: + """ + Returns the resolution chain of the subdomain to an A record, + including any intermediary CNAME. + The last element is an IP address. + Returns None if the nameserver was unable to satisfy the request. + Returns [] if the requests points to nothing. + """ + try: + query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) + except dns.resolver.NXDOMAIN: + return [] + except dns.resolver.NoAnswer: + return [] + except dns.resolver.YXDOMAIN: + self.log.warning("Query name too long for %s", subdomain) + return None + except dns.resolver.NoNameservers: + # NOTE Most of the time this error message means that the domain + # does not exists, but sometimes it means the that the server + # itself is broken. So we count on the retry logic. + self.log.warning("All nameservers broken for %s", subdomain) + return None + except dns.exception.Timeout: + # NOTE Same as above + self.log.warning("Timeout for %s", subdomain) + return None + except dns.name.EmptyLabel: + self.log.warning("Empty label for %s", subdomain) + return None + resolved = list() + last = len(query.response.answer) - 1 + for a, answer in enumerate(query.response.answer): + if answer.rdtype == dns.rdatatype.CNAME: + assert a < last + resolved.append(answer.items[0].to_text()[:-1]) + elif answer.rdtype == dns.rdatatype.A: + assert a == last + resolved.append(answer.items[0].address) + else: + assert False + return resolved + + def run(self) -> None: + self.log.info("Started") + subdomain: str + for subdomain in iter(self.orchestrator.subdomains_queue.get, None): + + for _ in range(NUMBER_TRIES): + resolved = self.resolve_subdomain(subdomain) + if resolved is not None: + break + + # If it wasn't found after multiple tries + if resolved is None: + self.log.error("Gave up on %s", subdomain) + resolved = [] + + resolved.insert(0, subdomain) + self.orchestrator.results_queue.put(resolved) + + self.orchestrator.results_queue.put(None) + self.log.info("Stopped") + + +class Orchestrator(): + """ + Orchestrator of the different Worker threads. + """ + + def refill_nameservers_queue(self) -> None: + """ + Re-fill the given nameservers into the nameservers queue. + Done every-time the queue is empty, making it + basically looping and infinite. + """ + # Might be in a race condition but that's probably fine + for nameserver in self.nameservers: + self.nameservers_queue.put(nameserver) + self.log.info("Refilled nameserver queue") + + def __init__(self, subdomains: typing.Iterable[str], + nameservers: typing.List[str] = None, + ): + self.log = logging.getLogger('orchestrator') + self.subdomains = subdomains + + # Use interal resolver by default + self.nameservers = nameservers or dns.resolver.Resolver().nameservers + + self.subdomains_queue: queue.Queue = queue.Queue( + maxsize=NUMBER_THREADS) + self.results_queue: queue.Queue = queue.Queue() + self.nameservers_queue: queue.Queue = queue.Queue() + + self.refill_nameservers_queue() + + def fill_subdomain_queue(self) -> None: + """ + Read the subdomains in input and put them into the queue. + Done in a thread so we can both: + - yield the results as they come + - not store all the subdomains at once + """ + self.log.info("Started reading subdomains") + # Send data to workers + for subdomain in self.subdomains: + self.subdomains_queue.put(subdomain) + + self.log.info("Finished reading subdomains") + # Send sentinel to each worker + # sentinel = None ~= EOF + for _ in range(NUMBER_THREADS): + self.subdomains_queue.put(None) + + def run(self) -> typing.Iterable[typing.List[str]]: + """ + Yield the results. + """ + # Create workers + self.log.info("Creating workers") + for i in range(NUMBER_THREADS): + Worker(self, i).start() + + fill_thread = threading.Thread(target=self.fill_subdomain_queue) + fill_thread.start() + + # Wait for one sentinel per worker + # In the meantime output results + for _ in range(NUMBER_THREADS): + result: typing.List[str] + for result in iter(self.results_queue.get, None): + yield result + + self.log.info("Waiting for reader thread") + fill_thread.join() + + self.log.info("Done!") + + +def main() -> None: + """ + Main function when used directly. + Read the subdomains provided and output it, + the last CNAME resolved and the IP adress it resolves to. + Takes as an input a filename (or nothing, for stdin), + and as an output a filename (or nothing, for stdout). + The input must be a subdomain per line, the output is a comma-sep + file with the columns source CNAME and A. + Use the file `nameservers` as the list of nameservers + to use, or else it will use the system defaults. + Also shows a nice progressbar. + """ + + # Initialization + coloredlogs.install( + level='DEBUG', + fmt='%(asctime)s %(name)s %(levelname)s %(message)s' + ) + + # Parsing arguments + parser = argparse.ArgumentParser( + description="Massively resolves subdomains and store them in a file.") + parser.add_argument( + '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, + help="Input file with one subdomain per line") + parser.add_argument( + '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, + help="Outptut file with DNS chains") + # parser.add_argument( + # '-n', '--nameserver', type=argparse.FileType('r'), + # default='nameservers', help="File with one nameserver per line") + # parser.add_argument( + # '-j', '--workers', type=int, default=512, + # help="Number of threads to use") + args = parser.parse_args() + + # Progress bar + widgets = [ + progressbar.Percentage(), + ' ', progressbar.SimpleProgress(), + ' ', progressbar.Bar(), + ' ', progressbar.Timer(), + ' ', progressbar.AdaptiveTransferSpeed(unit='req'), + ' ', progressbar.AdaptiveETA(), + ] + progress = progressbar.ProgressBar(widgets=widgets) + if args.input.seekable(): + progress.max_value = len(args.input.readlines()) + args.input.seek(0) + + # Cleaning input + iterator = iter(args.input) + iterator = map(str.strip, iterator) + iterator = filter(None, iterator) + + # Reading nameservers + servers: typing.List[str] = list() + if os.path.isfile('nameservers'): + servers = open('nameservers').readlines() + servers = list(filter(None, map(str.strip, servers))) + + writer = csv.writer(args.output) + + progress.start() + for resolved in Orchestrator(iterator, servers).run(): + progress.update(progress.value + 1) + writer.writerow(resolved) + progress.finish() + + +if __name__ == '__main__': + main() diff --git a/temp/.gitignore b/temp/.gitignore index bc44c53..397cc16 100644 --- a/temp/.gitignore +++ b/temp/.gitignore @@ -1,2 +1,3 @@ *.list *.txt +*.csv