Greatly optimized subdomain filtering

This commit is contained in:
Geoffrey Frogeye 2019-11-14 10:45:06 +01:00
parent 00a0020914
commit 1bbc17a8ec
2 changed files with 133 additions and 40 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
*.log *.log
nameservers

View file

@ -6,70 +6,162 @@ From a list of subdomains, output only
the ones resolving to a first-party tracker. the ones resolving to a first-party tracker.
""" """
import logging
import multiprocessing
import os
import re import re
import sys import sys
import typing
import dns.resolver import coloredlogs
import dns.exception import dns.exception
import dns.resolver
import progressbar import progressbar
import regexes import regexes
DNS_TIMEOUT = 5.0 DNS_TIMEOUT = 5.0
MAX_NAMESERVERS = 512
def is_subdomain_matching(subdomain: str) -> bool: # TODO Retry failed requests
class DnsResolver(multiprocessing.Process):
""" """
Indicates if the subdomain redirects to a first-party tracker. Worker process for a DNS resolver.
Will resolve DNS to match first-party subdomains.
""" """
# TODO Look at the whole chain rather than the last one
try: def __init__(self,
query = dns.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT) in_queue: multiprocessing.Queue,
except dns.resolver.NXDOMAIN: out_queue: multiprocessing.Queue,
server: str):
super(DnsResolver, self).__init__()
self.log = logging.getLogger(server)
self.in_queue = in_queue
self.out_queue = out_queue
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = [server]
def is_subdomain_matching(self, subdomain: str) -> bool:
"""
Indicates if the subdomain redirects to a first-party tracker.
"""
# TODO Look at the whole chain rather than the last one
# TODO Also match the ASN of the IP (caching the ASN subnetworks will do)
try:
query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT)
except dns.resolver.NXDOMAIN:
return False
except dns.resolver.NoAnswer:
return False
except dns.resolver.YXDOMAIN:
self.log.warning("Query name too long for %s", subdomain)
return False
except dns.resolver.NoNameservers:
self.log.warning("All nameservers broken for %s", subdomain)
return False
except dns.exception.Timeout:
self.log.warning("Timeout for %s", subdomain)
return False
except dns.name.EmptyLabel:
self.log.warning("Empty label for %s", subdomain)
return False
canonical = query.canonical_name.to_text()
for regex in regexes.REGEXES:
if re.match(regex, canonical):
return True
return False return False
except dns.resolver.NoAnswer:
return False def run(self) -> None:
except dns.resolver.YXDOMAIN: self.log.info("Started")
print(f"Query name too long for {subdomain}", file=sys.stderr) for subdomain in iter(self.in_queue.get, None):
return False matching = self.is_subdomain_matching(subdomain)
except dns.resolver.NoNameservers: result = (subdomain, matching)
print(f"All nameservers broken for {subdomain}", file=sys.stderr) # self.log.debug("%s", result)
return False self.out_queue.put(result)
except dns.exception.Timeout: self.out_queue.put(None)
print(f"Timeout for {subdomain}", file=sys.stderr) self.log.info("Stopped")
return False
except dns.name.EmptyLabel:
print(f"Empty label for {subdomain}", file=sys.stderr)
return False
canonical = query.canonical_name.to_text()
for regex in regexes.REGEXES:
if re.match(regex, canonical):
return True
return False
def is_subdomain_matching_standalone(subdomain: str) -> None: def get_matching_subdomains(subdomains: typing.Iterable[str],
nameservers: typing.List[str] = None,
) -> typing.Iterable[typing.Tuple[str, bool]]:
subdomains_queue: multiprocessing.Queue = multiprocessing.Queue()
results_queue: multiprocessing.Queue = multiprocessing.Queue()
""" """
Print the subdomain if it redirects to a first-party tracker. Orchestrator of the different DnsResolver threads.
""" """
subdomain = subdomain.strip()
if not subdomain: # Use interal resolver by default
return servers = nameservers or dns.resolver.Resolver().nameservers
if is_subdomain_matching(subdomain): servers = servers[:MAX_NAMESERVERS]
print(subdomain)
# Create workers
for server in servers:
DnsResolver(subdomains_queue, results_queue, server).start()
# Send data to workers
for subdomain in subdomains:
subdomains_queue.put(subdomain)
# Send sentinel to each worker
# sentinel = None ~= EOF
for _ in servers:
subdomains_queue.put(None)
subdomains_queue.close()
# Wait for one sentinel per worker
# In the meantime output results
for _ in servers:
for result in iter(results_queue.get, None):
yield result
results_queue.close()
if __name__ == '__main__': if __name__ == '__main__':
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s'
)
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
# Parsing arguments
assert len(sys.argv) <= 2 assert len(sys.argv) <= 2
filename = None filename = None
if len(sys.argv) == 2 and sys.argv[1] != '-': if len(sys.argv) == 2 and sys.argv[1] != '-':
filename = sys.argv[1] filename = sys.argv[1]
num_lines = sum(1 for line in open(filename)) progress.max_value = sum(1 for line in open(filename))
iterator = progressbar.progressbar(open(filename), max_value=num_lines) textio = open(filename)
else: else:
iterator = sys.stdin textio = sys.stdin
for line in iterator: # Cleaning input
is_subdomain_matching_standalone(line) iterator = iter(textio)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
if filename: # Reading nameservers
iterator.close() servers: typing.List[str] = list()
if os.path.isfile('nameservers'):
servers = open('nameservers').readlines()
servers = list(filter(None, map(str.strip, servers)))
progress.start()
for subdomain, matching in get_matching_subdomains(iterator, servers):
progress.update(progress.value + 1)
if matching:
print(subdomain)
progress.finish()