161 lines
4.5 KiB
Python
Executable file
161 lines
4.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# pylint: disable=C0103
|
|
|
|
"""
|
|
From a list of subdomains, output only
|
|
the ones resolving to a first-party tracker.
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import progressbar
|
|
import csv
|
|
import typing
|
|
import ipaddress
|
|
|
|
# DomainRule = typing.Union[bool, typing.Dict[str, 'DomainRule']]
|
|
DomainRule = typing.Union[bool, typing.Dict]
|
|
# IpRule = typing.Union[bool, typing.Dict[int, 'DomainRule']]
|
|
IpRule = typing.Union[bool, typing.Dict]
|
|
|
|
RULES_DICT: DomainRule = dict()
|
|
RULES_IP_DICT: IpRule = dict()
|
|
|
|
|
|
def get_bits(address: ipaddress.IPv4Address) -> typing.Iterator[int]:
|
|
for char in address.packed:
|
|
for i in range(7, -1, -1):
|
|
yield (char >> i) & 0b1
|
|
|
|
|
|
def subdomain_matching(subdomain: str) -> bool:
|
|
parts = subdomain.split('.')
|
|
parts.reverse()
|
|
dic = RULES_DICT
|
|
for part in parts:
|
|
if isinstance(dic, bool) or part not in dic:
|
|
break
|
|
dic = dic[part]
|
|
if isinstance(dic, bool):
|
|
return dic
|
|
return False
|
|
|
|
|
|
def ip_matching(ip_str: str) -> bool:
|
|
ip = ipaddress.ip_address(ip_str)
|
|
dic = RULES_IP_DICT
|
|
i = 0
|
|
for bit in get_bits(ip):
|
|
i += 1
|
|
if isinstance(dic, bool) or bit not in dic:
|
|
break
|
|
dic = dic[bit]
|
|
if isinstance(dic, bool):
|
|
return dic
|
|
return False
|
|
|
|
|
|
def get_matching(chain: typing.List[str], no_explicit: bool = False
|
|
) -> typing.Iterable[str]:
|
|
if len(chain) <= 1:
|
|
return
|
|
initial = chain[0]
|
|
cname_destinations = chain[1:-1]
|
|
a_destination = chain[-1]
|
|
initial_matching = subdomain_matching(initial)
|
|
if no_explicit and initial_matching:
|
|
return
|
|
cname_matching = any(map(subdomain_matching, cname_destinations))
|
|
if cname_matching or initial_matching or ip_matching(a_destination):
|
|
yield initial
|
|
|
|
|
|
def register_rule(subdomain: str) -> None:
|
|
# Make a tree with domain parts
|
|
parts = subdomain.split('.')
|
|
parts.reverse()
|
|
dic = RULES_DICT
|
|
last_part = len(parts) - 1
|
|
for p, part in enumerate(parts):
|
|
if isinstance(dic, bool):
|
|
return
|
|
if p == last_part:
|
|
dic[part] = True
|
|
else:
|
|
dic.setdefault(part, dict())
|
|
dic = dic[part]
|
|
|
|
|
|
def register_rule_ip(network: str) -> None:
|
|
net = ipaddress.ip_network(network)
|
|
ip = net.network_address
|
|
dic = RULES_IP_DICT
|
|
last_bit = net.prefixlen - 1
|
|
for b, bit in enumerate(get_bits(ip)):
|
|
if isinstance(dic, bool):
|
|
return
|
|
if b == last_bit:
|
|
dic[bit] = True
|
|
else:
|
|
dic.setdefault(bit, dict())
|
|
dic = dic[bit]
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Parsing arguments
|
|
parser = argparse.ArgumentParser(
|
|
description="Filter first-party trackers from a list of subdomains")
|
|
parser.add_argument(
|
|
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
|
|
help="Input file with DNS chains")
|
|
parser.add_argument(
|
|
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
|
|
help="Outptut file with one tracking subdomain per line")
|
|
parser.add_argument(
|
|
'-n', '--no-explicit', action='store_true',
|
|
help="Don't output domains already blocked with rules without CNAME")
|
|
parser.add_argument(
|
|
'-r', '--rules', type=argparse.FileType('r'),
|
|
help="List of domains domains to block (with their subdomains)")
|
|
parser.add_argument(
|
|
'-p', '--rules-ip', type=argparse.FileType('r'),
|
|
help="List of IPs ranges to block")
|
|
args = parser.parse_args()
|
|
|
|
# Progress bar
|
|
widgets = [
|
|
progressbar.Percentage(),
|
|
' ', progressbar.SimpleProgress(),
|
|
' ', progressbar.Bar(),
|
|
' ', progressbar.Timer(),
|
|
' ', progressbar.AdaptiveTransferSpeed(unit='req'),
|
|
' ', progressbar.AdaptiveETA(),
|
|
]
|
|
progress = progressbar.ProgressBar(widgets=widgets)
|
|
|
|
# Reading rules
|
|
if args.rules:
|
|
for rule in args.rules:
|
|
register_rule(rule.strip())
|
|
if args.rules_ip:
|
|
for rule in args.rules_ip:
|
|
register_rule_ip(rule.strip())
|
|
|
|
# Approximating line count
|
|
if args.input.seekable():
|
|
lines = 0
|
|
for line in args.input:
|
|
lines += 1
|
|
progress.max_value = lines
|
|
args.input.seek(0)
|
|
|
|
# Reading domains to filter
|
|
reader = csv.reader(args.input)
|
|
progress.start()
|
|
for chain in reader:
|
|
for match in get_matching(chain, no_explicit=args.no_explicit):
|
|
print(match, file=args.output)
|
|
progress.update(progress.value + 1)
|
|
progress.finish()
|