eulaurarien/filter_subdomains.py

171 lines
5 KiB
Python
Raw Normal View History

2019-11-10 17:14:25 +00:00
#!/usr/bin/env python3
2019-11-10 20:59:06 +00:00
# pylint: disable=C0103
2019-11-10 17:14:25 +00:00
"""
From a list of subdomains, output only
the ones resolving to a first-party tracker.
"""
2019-11-14 09:45:06 +00:00
import logging
import multiprocessing
import os
2019-11-10 17:14:25 +00:00
import re
import sys
2019-11-14 09:45:06 +00:00
import typing
2019-11-10 17:14:25 +00:00
2019-11-14 09:45:06 +00:00
import coloredlogs
2019-11-10 21:18:27 +00:00
import dns.exception
2019-11-14 09:45:06 +00:00
import dns.resolver
2019-11-10 20:59:06 +00:00
import progressbar
2019-11-10 17:14:25 +00:00
import regexes
2019-11-10 21:18:27 +00:00
DNS_TIMEOUT = 5.0
2019-11-14 09:45:06 +00:00
class DnsResolver(multiprocessing.Process):
2019-11-10 17:14:25 +00:00
"""
2019-11-14 09:45:06 +00:00
Worker process for a DNS resolver.
Will resolve DNS to match first-party subdomains.
2019-11-10 17:14:25 +00:00
"""
2019-11-14 09:45:06 +00:00
def __init__(self,
in_queue: multiprocessing.Queue,
out_queue: multiprocessing.Queue,
server: str):
super(DnsResolver, self).__init__()
self.log = logging.getLogger(server)
self.in_queue = in_queue
self.out_queue = out_queue
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = [server]
2019-11-14 10:35:05 +00:00
def is_subdomain_matching(self, subdomain: str) -> typing.Optional[bool]:
2019-11-14 09:45:06 +00:00
"""
Indicates if the subdomain redirects to a first-party tracker.
"""
# TODO Look at the whole chain rather than the last one
# TODO Also match the ASN of the IP (caching the ASN subnetworks will do)
try:
query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT)
except dns.resolver.NXDOMAIN:
return False
except dns.resolver.NoAnswer:
return False
except dns.resolver.YXDOMAIN:
self.log.warning("Query name too long for %s", subdomain)
return False
except dns.resolver.NoNameservers:
self.log.warning("All nameservers broken for %s", subdomain)
2019-11-14 10:35:05 +00:00
return None
2019-11-14 09:45:06 +00:00
except dns.exception.Timeout:
self.log.warning("Timeout for %s", subdomain)
2019-11-14 10:35:05 +00:00
return None
2019-11-14 09:45:06 +00:00
except dns.name.EmptyLabel:
self.log.warning("Empty label for %s", subdomain)
return False
canonical = query.canonical_name.to_text()
for regex in regexes.REGEXES:
if re.match(regex, canonical):
return True
2019-11-10 22:07:21 +00:00
return False
2019-11-14 09:45:06 +00:00
def run(self) -> None:
self.log.info("Started")
for subdomain in iter(self.in_queue.get, None):
matching = self.is_subdomain_matching(subdomain)
2019-11-14 10:35:05 +00:00
# If issue, retry
if matching is None:
# matching = False
self.in_queue.put(subdomain)
continue
2019-11-14 09:45:06 +00:00
result = (subdomain, matching)
# self.log.debug("%s", result)
self.out_queue.put(result)
self.out_queue.put(None)
self.log.info("Stopped")
2019-11-10 17:14:25 +00:00
2019-11-14 09:45:06 +00:00
def get_matching_subdomains(subdomains: typing.Iterable[str],
nameservers: typing.List[str] = None,
) -> typing.Iterable[typing.Tuple[str, bool]]:
subdomains_queue: multiprocessing.Queue = multiprocessing.Queue()
results_queue: multiprocessing.Queue = multiprocessing.Queue()
2019-11-10 20:59:06 +00:00
"""
2019-11-14 09:45:06 +00:00
Orchestrator of the different DnsResolver threads.
2019-11-10 20:59:06 +00:00
"""
2019-11-14 09:45:06 +00:00
# Use interal resolver by default
servers = nameservers or dns.resolver.Resolver().nameservers
# Create workers
for server in servers:
DnsResolver(subdomains_queue, results_queue, server).start()
# Send data to workers
for subdomain in subdomains:
subdomains_queue.put(subdomain)
# Send sentinel to each worker
# sentinel = None ~= EOF
for _ in servers:
subdomains_queue.put(None)
subdomains_queue.close()
# Wait for one sentinel per worker
# In the meantime output results
for _ in servers:
for result in iter(results_queue.get, None):
yield result
results_queue.close()
2019-11-10 20:59:06 +00:00
2019-11-10 17:14:25 +00:00
if __name__ == '__main__':
2019-11-14 09:45:06 +00:00
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s'
)
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
# Parsing arguments
2019-11-10 20:59:06 +00:00
assert len(sys.argv) <= 2
filename = None
2019-11-14 09:45:06 +00:00
2019-11-10 20:59:06 +00:00
if len(sys.argv) == 2 and sys.argv[1] != '-':
filename = sys.argv[1]
2019-11-14 09:45:06 +00:00
progress.max_value = sum(1 for line in open(filename))
textio = open(filename)
2019-11-10 20:59:06 +00:00
else:
2019-11-14 09:45:06 +00:00
textio = sys.stdin
# Cleaning input
iterator = iter(textio)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
2019-11-10 20:59:06 +00:00
2019-11-14 09:45:06 +00:00
# Reading nameservers
servers: typing.List[str] = list()
if os.path.isfile('nameservers'):
servers = open('nameservers').readlines()
servers = list(filter(None, map(str.strip, servers)))
2019-11-10 20:59:06 +00:00
2019-11-14 09:45:06 +00:00
progress.start()
for subdomain, matching in get_matching_subdomains(iterator, servers):
progress.update(progress.value + 1)
if matching:
print(subdomain)
progress.finish()