Separated DNS resolution from filtering

This effectively removes the parallelism of filtering,
which doubles the processing time (5->8 hours),
but this allows me to toy around with the performances of this step,
which I aim to improve drastically.
newworkflow_parseropti
Geoffrey Frogeye 2019-12-02 19:03:08 +01:00
parent 7d01d016a5
commit c23004fbff
Signed by: geoffrey
GPG Key ID: D8A7ECA00A8CD3DD
5 changed files with 316 additions and 288 deletions

View File

@ -1,68 +0,0 @@
#!/usr/bin/env python3
# pylint: disable=C0103
"""
From a list of subdomains to block,
filter out the ones explicitely matching a regex.
It should be already handled by the ad blocker.
"""
import argparse
import sys
import progressbar
import adblockparser
OPTIONS = {"third-party": True}
def explicitely_match(subdomain: str) -> bool:
url = f"https://{subdomain}/"
return rules.should_block(url, OPTIONS)
if __name__ == '__main__':
# Parsing arguments
parser = argparse.ArgumentParser(
description="Filter first-party trackers from a list of subdomains")
parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file with one subdomain per line")
parser.add_argument(
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Outptut file with one tracking subdomain per line")
parser.add_argument(
'-r', '--rules', type=argparse.FileType('r'), default='rules',
help="Rules file")
args = parser.parse_args()
# Reading rules
rules: adblockparser.AdblockRules = adblockparser.AdblockRules(args.rules)
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
if args.input.seekable():
progress.max_value = len(args.input.readlines())
args.input.seek(0)
# Cleaning input
iterator = iter(args.input)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
# Filtering
progress.start()
for subdomain in iterator:
progress.update(progress.value + 1)
if not explicitely_match(subdomain):
print(subdomain, file=args.output)
progress.finish()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
# pylint: disable=C0103
"""
From a list of subdomains, output only
@ -6,230 +7,56 @@ the ones resolving to a first-party tracker.
"""
import argparse
import logging
import os
import queue
import re
import sys
import threading
import progressbar
import csv
import typing
import adblockparser
import coloredlogs
import dns.exception
import dns.resolver
import progressbar
import regexes
DNS_TIMEOUT = 10.0
NUMBER_THREADS = 64
NUMBER_TRIES = 5
OPTIONS = {"third-party": True}
class Worker(threading.Thread):
"""
Worker process for a DNS resolver.
Will resolve DNS to match first-party subdomains.
"""
OPTIONS = {"third-party": True}
def change_nameserver(self) -> None:
"""
Assign a this worker another nameserver from the queue.
"""
server = None
while server is None:
try:
server = self.orchestrator.nameservers_queue.get(block=False)
except queue.Empty:
self.orchestrator.refill_nameservers_queue()
self.log.debug("Using nameserver: %s", server)
self.resolver.nameservers = [server]
def __init__(self,
orchestrator: 'Orchestrator',
index: int = 0):
super(Worker, self).__init__()
self.log = logging.getLogger(f'worker{index:03d}')
self.orchestrator = orchestrator
self.resolver = dns.resolver.Resolver()
self.change_nameserver()
def is_subdomain_matching(self, subdomain: str) -> typing.Optional[bool]:
"""
Indicates if the subdomain redirects to a first-party tracker.
Returns None if the nameserver was unable to satisfy the request.
"""
# TODO Look at the whole chain rather than the last one
# TODO Also match the ASN of the IP (caching the ASN subnetworks will do)
try:
query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT)
except dns.resolver.NXDOMAIN:
return False
except dns.resolver.NoAnswer:
return False
except dns.resolver.YXDOMAIN:
self.log.warning("Query name too long for %s", subdomain)
return None
except dns.resolver.NoNameservers:
# NOTE Most of the time this error message means that the domain
# does not exists, but sometimes it means the that the server
# itself is broken. So we count on the retry logic.
self.log.warning("All nameservers broken for %s", subdomain)
return None
except dns.exception.Timeout:
# NOTE Same as above
self.log.warning("Timeout for %s", subdomain)
return None
except dns.name.EmptyLabel:
self.log.warning("Empty label for %s", subdomain)
return None
canonical = query.canonical_name.to_text()
# for regex in regexes.REGEXES:
# if re.match(regex, canonical):
# return True
# return False
url = f"https://{canonical[:-1]}/"
return self.orchestrator.rules.should_block(url, Worker.OPTIONS)
def run(self) -> None:
self.log.info("Started")
for subdomain in iter(self.orchestrator.subdomains_queue.get, None):
for _ in range(NUMBER_TRIES):
matching = self.is_subdomain_matching(subdomain)
if matching is not None:
break
# If it wasn't found after multiple tries
if matching is None:
self.log.error("Gave up on %s", subdomain)
matching = False
result = (subdomain, matching)
self.orchestrator.results_queue.put(result)
self.orchestrator.results_queue.put(None)
self.log.info("Stopped")
def subdomain_matching(subdomain: str) -> bool:
url = f"https://{subdomain}/"
return rules.should_block(url, OPTIONS)
class Orchestrator():
"""
Orchestrator of the different Worker threads.
"""
def refill_nameservers_queue(self) -> None:
"""
Re-fill the given nameservers into the nameservers queue.
Done every-time the queue is empty, making it
basically looping and infinite.
"""
# Might be in a race condition but that's probably fine
for nameserver in self.nameservers:
self.nameservers_queue.put(nameserver)
self.log.info("Refilled nameserver queue")
def __init__(self, subdomains: typing.Iterable[str],
rules: typing.Iterable[str],
nameservers: typing.List[str] = None,
):
self.log = logging.getLogger('orchestrator')
self.subdomains = subdomains
# Use interal resolver by default
self.nameservers = nameservers or dns.resolver.Resolver().nameservers
self.subdomains_queue: queue.Queue = queue.Queue(
maxsize=NUMBER_THREADS)
self.results_queue: queue.Queue = queue.Queue()
self.nameservers_queue: queue.Queue = queue.Queue()
# Rules
self.rules = adblockparser.AdblockRules(rules)
self.refill_nameservers_queue()
def fill_subdomain_queue(self) -> None:
"""
Read the subdomains in input and put them into the queue.
Done in a thread so we can both:
- yield the results as they come
- not store all the subdomains at once
"""
self.log.info("Started reading subdomains")
# Send data to workers
for subdomain in self.subdomains:
self.subdomains_queue.put(subdomain)
self.log.info("Finished reading subdomains")
# Send sentinel to each worker
# sentinel = None ~= EOF
for _ in range(NUMBER_THREADS):
self.subdomains_queue.put(None)
def run(self) -> typing.Iterable[typing.Tuple[str, bool]]:
"""
Yield the results.
"""
# Create workers
self.log.info("Creating workers")
for i in range(NUMBER_THREADS):
Worker(self, i).start()
fill_thread = threading.Thread(target=self.fill_subdomain_queue)
fill_thread.start()
# Wait for one sentinel per worker
# In the meantime output results
for _ in range(NUMBER_THREADS):
for result in iter(self.results_queue.get, None):
yield result
self.log.info("Waiting for reader thread")
fill_thread.join()
self.log.info("Done!")
def get_matching(chain: typing.List[str], no_explicit: bool = False
) -> typing.Iterable[str]:
initial = chain[0]
cname_destinations = chain[1:-1]
# a_destination = chain[-1]
initial_matching = subdomain_matching(initial)
if no_explicit and initial_matching:
return
cname_matching = any(map(subdomain_matching, cname_destinations))
if cname_matching or initial_matching:
yield initial
def main() -> None:
"""
Main function when used directly.
Takes as an input a filename (or nothing, for stdin)
that will be read and the ones that are a tracker
will be outputed on stdout.
Use the file `nameservers` as the list of nameservers
to use, or else it will use the system defaults.
Also shows a nice progressbar.
"""
# Initialization
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
)
if __name__ == '__main__':
# Parsing arguments
parser = argparse.ArgumentParser(
description="Filter first-party trackers from a list of subdomains")
parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file with one subdomain per line")
help="Input file with DNS chains")
parser.add_argument(
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Outptut file with one tracking subdomain per line")
parser.add_argument(
'-n', '--no-explicit', action='store_true',
help="Don't output domains already blocked with rules without CNAME")
parser.add_argument(
'-r', '--rules', type=argparse.FileType('r'), default='rules',
help="Rules file")
# parser.add_argument(
# '-n', '--nameserver', type=argparse.FileType('r'),
# default='nameservers', help="File with one nameserver per line")
# parser.add_argument(
# '-j', '--workers', type=int, default=512,
# help="Number of threads to use")
args = parser.parse_args()
# Reading rules
rules: adblockparser.AdblockRules = adblockparser.AdblockRules(args.rules)
# Progress bar
widgets = [
progressbar.Percentage(),
@ -245,23 +72,12 @@ def main() -> None:
args.input.seek(0)
# Cleaning input
iterator = iter(args.input)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
# Reading nameservers
servers: typing.List[str] = list()
if os.path.isfile('nameservers'):
servers = open('nameservers').readlines()
servers = list(filter(None, map(str.strip, servers)))
reader = csv.reader(args.input)
# Filtering
progress.start()
for subdomain, matching in Orchestrator(iterator, args.rules, servers).run():
for chain in reader:
for match in get_matching(chain, no_explicit=args.no_explicit):
print(match, file=args.output)
progress.update(progress.value + 1)
if matching:
print(subdomain, file=args.output)
progress.finish()
if __name__ == '__main__':
main()

View File

@ -1,14 +1,16 @@
#!/usr/bin/env bash
# Filter out the subdomains not pointing to a first-party tracker
# Resolve the CNAME chain of all the known subdomains for later analysis
cat subdomains/*.list | sort -u > temp/all_subdomains.list
./resolve_subdomains.py --input temp/all_subdomains.list --output temp/all_resolved.csv
sort -u temp/all_resolved.csv > temp/all_resolved_sorted.csv
# Filter out the subdomains not pointing to a first-party tracker
cat rules/*.txt | grep -v '^!' | grep -v '^\[Adblock' | sort -u > temp/all_rules.txt
./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_subdomains.list --output temp/all_toblock.list
sort -u temp/all_toblock.list > dist/firstparty-trackers.txt
./filter_out_explicit.py --rules temp/all_rules.txt --input dist/firstparty-trackers.txt --output dist/firstparty-only-trackers.txt
./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_resolved_sorted.csv --output dist/firstparty-trackers.txt
./filter_subdomains.py --rules temp/all_rules.txt --input temp/all_resolved_sorted.csv --no-explicit --output dist/firstparty-only-trackers.txt
# Format the blocklist so it can be used as a hostlist
function generate_hosts {
basename="$1"
description="$2"

277
resolve_subdomains.py Executable file
View File

@ -0,0 +1,277 @@
#!/usr/bin/env python3
"""
From a list of subdomains, output only
the ones resolving to a first-party tracker.
"""
import argparse
import logging
import os
import queue
import sys
import threading
import typing
import csv
import adblockparser
import coloredlogs
import dns.exception
import dns.resolver
import progressbar
DNS_TIMEOUT = 10.0
NUMBER_THREADS = 64
NUMBER_TRIES = 5
# TODO All the domains don't get treated,
# so it leaves with 4-5 subdomains not resolved
class Worker(threading.Thread):
"""
Worker process for a DNS resolver.
Will resolve DNS to match first-party subdomains.
"""
OPTIONS = {"third-party": True}
def change_nameserver(self) -> None:
"""
Assign a this worker another nameserver from the queue.
"""
server = None
while server is None:
try:
server = self.orchestrator.nameservers_queue.get(block=False)
except queue.Empty:
self.orchestrator.refill_nameservers_queue()
self.log.debug("Using nameserver: %s", server)
self.resolver.nameservers = [server]
def __init__(self,
orchestrator: 'Orchestrator',
index: int = 0):
super(Worker, self).__init__()
self.log = logging.getLogger(f'worker{index:03d}')
self.orchestrator = orchestrator
self.resolver = dns.resolver.Resolver()
self.change_nameserver()
def resolve_subdomain(self, subdomain: str) -> typing.Optional[
typing.List[
str
]
]:
"""
Returns the resolution chain of the subdomain to an A record,
including any intermediary CNAME.
The last element is an IP address.
Returns None if the nameserver was unable to satisfy the request.
Returns [] if the requests points to nothing.
"""
try:
query = self.resolver.query(subdomain, 'A', lifetime=DNS_TIMEOUT)
except dns.resolver.NXDOMAIN:
return []
except dns.resolver.NoAnswer:
return []
except dns.resolver.YXDOMAIN:
self.log.warning("Query name too long for %s", subdomain)
return None
except dns.resolver.NoNameservers:
# NOTE Most of the time this error message means that the domain
# does not exists, but sometimes it means the that the server
# itself is broken. So we count on the retry logic.
self.log.warning("All nameservers broken for %s", subdomain)
return None
except dns.exception.Timeout:
# NOTE Same as above
self.log.warning("Timeout for %s", subdomain)
return None
except dns.name.EmptyLabel:
self.log.warning("Empty label for %s", subdomain)
return None
resolved = list()
last = len(query.response.answer) - 1
for a, answer in enumerate(query.response.answer):
if answer.rdtype == dns.rdatatype.CNAME:
assert a < last
resolved.append(answer.items[0].to_text()[:-1])
elif answer.rdtype == dns.rdatatype.A:
assert a == last
resolved.append(answer.items[0].address)
else:
assert False
return resolved
def run(self) -> None:
self.log.info("Started")
subdomain: str
for subdomain in iter(self.orchestrator.subdomains_queue.get, None):
for _ in range(NUMBER_TRIES):
resolved = self.resolve_subdomain(subdomain)
if resolved is not None:
break
# If it wasn't found after multiple tries
if resolved is None:
self.log.error("Gave up on %s", subdomain)
resolved = []
resolved.insert(0, subdomain)
self.orchestrator.results_queue.put(resolved)
self.orchestrator.results_queue.put(None)
self.log.info("Stopped")
class Orchestrator():
"""
Orchestrator of the different Worker threads.
"""
def refill_nameservers_queue(self) -> None:
"""
Re-fill the given nameservers into the nameservers queue.
Done every-time the queue is empty, making it
basically looping and infinite.
"""
# Might be in a race condition but that's probably fine
for nameserver in self.nameservers:
self.nameservers_queue.put(nameserver)
self.log.info("Refilled nameserver queue")
def __init__(self, subdomains: typing.Iterable[str],
nameservers: typing.List[str] = None,
):
self.log = logging.getLogger('orchestrator')
self.subdomains = subdomains
# Use interal resolver by default
self.nameservers = nameservers or dns.resolver.Resolver().nameservers
self.subdomains_queue: queue.Queue = queue.Queue(
maxsize=NUMBER_THREADS)
self.results_queue: queue.Queue = queue.Queue()
self.nameservers_queue: queue.Queue = queue.Queue()
self.refill_nameservers_queue()
def fill_subdomain_queue(self) -> None:
"""
Read the subdomains in input and put them into the queue.
Done in a thread so we can both:
- yield the results as they come
- not store all the subdomains at once
"""
self.log.info("Started reading subdomains")
# Send data to workers
for subdomain in self.subdomains:
self.subdomains_queue.put(subdomain)
self.log.info("Finished reading subdomains")
# Send sentinel to each worker
# sentinel = None ~= EOF
for _ in range(NUMBER_THREADS):
self.subdomains_queue.put(None)
def run(self) -> typing.Iterable[typing.List[str]]:
"""
Yield the results.
"""
# Create workers
self.log.info("Creating workers")
for i in range(NUMBER_THREADS):
Worker(self, i).start()
fill_thread = threading.Thread(target=self.fill_subdomain_queue)
fill_thread.start()
# Wait for one sentinel per worker
# In the meantime output results
for _ in range(NUMBER_THREADS):
result: typing.List[str]
for result in iter(self.results_queue.get, None):
yield result
self.log.info("Waiting for reader thread")
fill_thread.join()
self.log.info("Done!")
def main() -> None:
"""
Main function when used directly.
Read the subdomains provided and output it,
the last CNAME resolved and the IP adress it resolves to.
Takes as an input a filename (or nothing, for stdin),
and as an output a filename (or nothing, for stdout).
The input must be a subdomain per line, the output is a comma-sep
file with the columns source CNAME and A.
Use the file `nameservers` as the list of nameservers
to use, or else it will use the system defaults.
Also shows a nice progressbar.
"""
# Initialization
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
)
# Parsing arguments
parser = argparse.ArgumentParser(
description="Massively resolves subdomains and store them in a file.")
parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file with one subdomain per line")
parser.add_argument(
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Outptut file with DNS chains")
# parser.add_argument(
# '-n', '--nameserver', type=argparse.FileType('r'),
# default='nameservers', help="File with one nameserver per line")
# parser.add_argument(
# '-j', '--workers', type=int, default=512,
# help="Number of threads to use")
args = parser.parse_args()
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
if args.input.seekable():
progress.max_value = len(args.input.readlines())
args.input.seek(0)
# Cleaning input
iterator = iter(args.input)
iterator = map(str.strip, iterator)
iterator = filter(None, iterator)
# Reading nameservers
servers: typing.List[str] = list()
if os.path.isfile('nameservers'):
servers = open('nameservers').readlines()
servers = list(filter(None, map(str.strip, servers)))
writer = csv.writer(args.output)
progress.start()
for resolved in Orchestrator(iterator, servers).run():
progress.update(progress.value + 1)
writer.writerow(resolved)
progress.finish()
if __name__ == '__main__':
main()

1
temp/.gitignore vendored
View File

@ -1,2 +1,3 @@
*.list
*.txt
*.csv