eulaurarien/filter_subdomains.py
Geoffrey Frogeye 7d01d016a5 Can now use AdBlock lists for tracking matching
It's not very performant by itself, especially since pyre2 isn't
maintained nor really compilableinstallable anymore.

The performance seems to have decreased from 200 req/s to 0.2 req/s when
using 512 threads, and to 80 req/s using 64 req/s.
This might or might not be related,as the CPU doesn't seem to be the
bottleneck.

I will probably add support for host-based rules, matching the
subdomains of such hosts (as for now there doesn't seem to be any other
pattern for first-party trackers than subdomains, and this would be a
very broad performace / compatibility with existing lists improvement),
and convert the AdBlock lists to this format, only keeping domains-only
rules.
2019-11-15 08:57:31 +01:00

268 lines
8.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
From a list of subdomains, output only
the ones resolving to a first-party tracker.
"""
import argparse
import logging
import os
import queue
import re
import sys
import threading
import typing
import adblockparser
import coloredlogs
import dns.exception
import dns.resolver
import progressbar
import regexes
DNS_TIMEOUT = 10.0
NUMBER_THREADS = 64
NUMBER_TRIES = 5
class Worker(threading.Thread):
"""
Worker process for a DNS resolver.
Will resolve DNS to match first-party subdomains.
"""
OPTIONS = {"third-party": True}
def change_nameserver(self) -> None:
"""
Assign a this worker another nameserver from the queue.
"""
server = None
while server is None:
try:
server = self.orchestrator.nameservers_queue.get(block=False)
except queue.Empty:
self.orchestrator.refill_nameservers_queue()
self.log.debug("Using nameserver: %s", server)
self.resolver.nameservers = [server]
def __init__(self,
orchestrator: 'Orchestrator',
index: int = 0):
super(Worker, self).__init__()
self.log = logging.getLogger(f'worker{index:03d}')
self.orchestrator = orchestrator
self.resolver = dns.resolver.Resolver()
self.change_nameserver()
def is_subdomain_matching(self, subdomain: str) -> typing.Optional[bool]:
"""
Indicates if the subdomain redirects to a first-party tracker.
Returns None if the nameserver was unable to satisfy the request.
"""
# TODO Look at the whole chain rather than the last one
# TODO Also match the ASN of the IP (caching the ASN subnetworks will do)
try:
query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT)
except dns.resolver.NXDOMAIN:
return False
except dns.resolver.NoAnswer:
return False
except dns.resolver.YXDOMAIN:
self.log.warning("Query name too long for %s", subdomain)
return None
except dns.resolver.NoNameservers:
# NOTE Most of the time this error message means that the domain
# does not exists, but sometimes it means the that the server
# itself is broken. So we count on the retry logic.
self.log.warning("All nameservers broken for %s", subdomain)
return None
except dns.exception.Timeout:
# NOTE Same as above
self.log.warning("Timeout for %s", subdomain)
return None
except dns.name.EmptyLabel:
self.log.warning("Empty label for %s", subdomain)
return None
canonical = query.canonical_name.to_text()
# for regex in regexes.REGEXES:
# if re.match(regex, canonical):
# return True
# return False
url = f"https://{canonical[:-1]}/"
return self.orchestrator.rules.should_block(url, Worker.OPTIONS)
def run(self) -> None:
self.log.info("Started")
for subdomain in iter(self.orchestrator.subdomains_queue.get, None):
for _ in range(NUMBER_TRIES):
matching = self.is_subdomain_matching(subdomain)
if matching is not None:
break
# If it wasn't found after multiple tries
if matching is None:
self.log.error("Gave up on %s", subdomain)
matching = False
result = (subdomain, matching)
self.orchestrator.results_queue.put(result)
self.orchestrator.results_queue.put(None)
self.log.info("Stopped")
class Orchestrator():
"""
Orchestrator of the different Worker threads.
"""
def refill_nameservers_queue(self) -> None:
"""
Re-fill the given nameservers into the nameservers queue.
Done every-time the queue is empty, making it
basically looping and infinite.
"""
# Might be in a race condition but that's probably fine
for nameserver in self.nameservers:
self.nameservers_queue.put(nameserver)
self.log.info("Refilled nameserver queue")
def __init__(self, subdomains: typing.Iterable[str],
rules: typing.Iterable[str],
nameservers: typing.List[str] = None,
):
self.log = logging.getLogger('orchestrator')
self.subdomains = subdomains
# Use interal resolver by default
self.nameservers = nameservers or dns.resolver.Resolver().nameservers
self.subdomains_queue: queue.Queue = queue.Queue(
maxsize=NUMBER_THREADS)
self.results_queue: queue.Queue = queue.Queue()
self.nameservers_queue: queue.Queue = queue.Queue()
# Rules
self.rules = adblockparser.AdblockRules(rules)
self.refill_nameservers_queue()
def fill_subdomain_queue(self) -> None:
"""
Read the subdomains in input and put them into the queue.
Done in a thread so we can both:
- yield the results as they come
- not store all the subdomains at once
"""
self.log.info("Started reading subdomains")
# Send data to workers
for subdomain in self.subdomains:
self.subdomains_queue.put(subdomain)
self.log.info("Finished reading subdomains")
# Send sentinel to each worker
# sentinel = None ~= EOF
for _ in range(NUMBER_THREADS):
self.subdomains_queue.put(None)
def run(self) -> typing.Iterable[typing.Tuple[str, bool]]:
"""
Yield the results.
"""
# Create workers
self.log.info("Creating workers")
for i in range(NUMBER_THREADS):
Worker(self, i).start()
fill_thread = threading.Thread(target=self.fill_subdomain_queue)
fill_thread.start()
# Wait for one sentinel per worker
# In the meantime output results
for _ in range(NUMBER_THREADS):
for result in iter(self.results_queue.get, None):
yield result
self.log.info("Waiting for reader thread")
fill_thread.join()
self.log.info("Done!")
def main() -> None:
"""
Main function when used directly.
Takes as an input a filename (or nothing, for stdin)
that will be read and the ones that are a tracker
will be outputed on stdout.
Use the file `nameservers` as the list of nameservers
to use, or else it will use the system defaults.
Also shows a nice progressbar.
"""
# Initialization
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
)
# Parsing arguments
parser = argparse.ArgumentParser(
description="Filter first-party trackers from a list of subdomains")
parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file with one subdomain per line")
parser.add_argument(
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Outptut file with one tracking subdomain per line")
parser.add_argument(
'-r', '--rules', type=argparse.FileType('r'), default='rules',
help="Rules file")
# parser.add_argument(
# '-n', '--nameserver', type=argparse.FileType('r'),
# default='nameservers', help="File with one nameserver per line")
# parser.add_argument(
# '-j', '--workers', type=int, default=512,
# help="Number of threads to use")
args = parser.parse_args()
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
if args.input.seekable():
progress.max_value = len(args.input.readlines())
args.input.seek(0)
# Cleaning input
iterator = iter(args.input)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
# Reading nameservers
servers: typing.List[str] = list()
if os.path.isfile('nameservers'):
servers = open('nameservers').readlines()
servers = list(filter(None, map(str.strip, servers)))
progress.start()
for subdomain, matching in Orchestrator(iterator, args.rules, servers).run():
progress.update(progress.value + 1)
if matching:
print(subdomain, file=args.output)
progress.finish()
if __name__ == '__main__':
main()