eulaurarien/filter_subdomains.py

143 lines
4 KiB
Python
Executable file

#!/usr/bin/env python3
# pylint: disable=C0103
"""
From a list of subdomains, output only
the ones resolving to a first-party tracker.
"""
import argparse
import sys
import progressbar
import csv
import typing
import ipaddress
# DomainRule = typing.Union[bool, typing.Dict[str, 'DomainRule']]
DomainRule = typing.Union[bool, typing.Dict]
RULES_DICT: DomainRule = dict()
RULES_IP: typing.Set[ipaddress.IPv4Network] = set()
def subdomain_matching(subdomain: str) -> bool:
if not RULES_DICT:
return False
parts = subdomain.split('.')
parts.reverse()
dic = RULES_DICT
for part in parts:
if isinstance(dic, bool) or part not in dic:
break
dic = dic[part]
if isinstance(dic, bool):
return dic
return False
def ip_matching(ip_str: str) -> bool:
if not RULES_IP:
return False
ip = ipaddress.ip_address(ip_str)
for net in RULES_IP:
if ip in net:
return True
return False
def get_matching(chain: typing.List[str], no_explicit: bool = False
) -> typing.Iterable[str]:
if len(chain) <= 1:
return
initial = chain[0]
cname_destinations = chain[1:-1]
a_destination = chain[-1]
initial_matching = subdomain_matching(initial)
if no_explicit and initial_matching:
return
cname_matching = any(map(subdomain_matching, cname_destinations))
if cname_matching or initial_matching or ip_matching(a_destination):
yield initial
def register_rule(subdomain: str) -> None:
# Make a tree with domain parts
parts = subdomain.split('.')
parts.reverse()
dic = RULES_DICT
last_part = len(parts) - 1
for p, part in enumerate(parts):
if isinstance(dic, bool):
return
if p == last_part:
dic[part] = True
else:
dic.setdefault(part, dict())
dic = dic[part]
def register_rule_ip(network: str) -> None:
net = ipaddress.ip_network(network)
RULES_IP.add(net)
# If RULES_IP start becoming bigger,
# we might implement a binary tree for performance
if __name__ == '__main__':
# Parsing arguments
parser = argparse.ArgumentParser(
description="Filter first-party trackers from a list of subdomains")
parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file with DNS chains")
parser.add_argument(
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="Outptut file with one tracking subdomain per line")
parser.add_argument(
'-n', '--no-explicit', action='store_true',
help="Don't output domains already blocked with rules without CNAME")
parser.add_argument(
'-r', '--rules', type=argparse.FileType('r'),
help="List of domains domains to block (with their subdomains)")
parser.add_argument(
'-p', '--rules-ip', type=argparse.FileType('r'),
help="List of IPs ranges to block")
args = parser.parse_args()
# Progress bar
widgets = [
progressbar.Percentage(),
' ', progressbar.SimpleProgress(),
' ', progressbar.Bar(),
' ', progressbar.Timer(),
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
' ', progressbar.AdaptiveETA(),
]
progress = progressbar.ProgressBar(widgets=widgets)
# Reading rules
if args.rules:
for rule in args.rules:
register_rule(rule.strip())
if args.rules_ip:
for rule in args.rules_ip:
register_rule_ip(rule.strip())
# Approximating line count
if args.input.seekable():
lines = 0
for line in args.input:
lines += 1
progress.max_value = lines
args.input.seek(0)
# Reading domains to filter
reader = csv.reader(args.input)
progress.start()
for chain in reader:
for match in get_matching(chain, no_explicit=args.no_explicit):
print(match, file=args.output)
progress.update(progress.value + 1)
progress.finish()