12 changed files with 208 additions and 282 deletions
-
2.gitignore
-
59database.py
-
23export.py
-
147export_lists.sh
-
12feed_dns.py
-
7fetch_resources.sh
-
160filter_subdomains.py
-
4import_rules.sh
-
2nameservers/.gitignore
-
24nameservers/popular.list
-
15resolve_subdomains.sh
-
35validate_list.py
@ -1,4 +1,2 @@ |
|||
*.log |
|||
*.p |
|||
nameservers |
|||
nameservers.head |
@ -1,160 +0,0 @@ |
|||
#!/usr/bin/env python3 |
|||
# pylint: disable=C0103 |
|||
|
|||
""" |
|||
From a list of subdomains, output only |
|||
the ones resolving to a first-party tracker. |
|||
""" |
|||
|
|||
import argparse |
|||
import sys |
|||
import progressbar |
|||
import csv |
|||
import typing |
|||
import ipaddress |
|||
|
|||
# DomainRule = typing.Union[bool, typing.Dict[str, 'DomainRule']] |
|||
DomainRule = typing.Union[bool, typing.Dict] |
|||
# IpRule = typing.Union[bool, typing.Dict[int, 'DomainRule']] |
|||
IpRule = typing.Union[bool, typing.Dict] |
|||
|
|||
RULES_DICT: DomainRule = dict() |
|||
RULES_IP_DICT: IpRule = dict() |
|||
|
|||
|
|||
def get_bits(address: ipaddress.IPv4Address) -> typing.Iterator[int]: |
|||
for char in address.packed: |
|||
for i in range(7, -1, -1): |
|||
yield (char >> i) & 0b1 |
|||
|
|||
|
|||
def subdomain_matching(subdomain: str) -> bool: |
|||
parts = subdomain.split('.') |
|||
parts.reverse() |
|||
dic = RULES_DICT |
|||
for part in parts: |
|||
if isinstance(dic, bool) or part not in dic: |
|||
break |
|||
dic = dic[part] |
|||
if isinstance(dic, bool): |
|||
return dic |
|||
return False |
|||
|
|||
|
|||
def ip_matching(ip_str: str) -> bool: |
|||
ip = ipaddress.ip_address(ip_str) |
|||
dic = RULES_IP_DICT |
|||
i = 0 |
|||
for bit in get_bits(ip): |
|||
i += 1 |
|||
if isinstance(dic, bool) or bit not in dic: |
|||
break |
|||
dic = dic[bit] |
|||
if isinstance(dic, bool): |
|||
return dic |
|||
return False |
|||
|
|||
|
|||
def get_matching(chain: typing.List[str], no_explicit: bool = False |
|||
) -> typing.Iterable[str]: |
|||
if len(chain) <= 1: |
|||
return |
|||
initial = chain[0] |
|||
cname_destinations = chain[1:-1] |
|||
a_destination = chain[-1] |
|||
initial_matching = subdomain_matching(initial) |
|||
if no_explicit and initial_matching: |
|||
return |
|||
cname_matching = any(map(subdomain_matching, cname_destinations)) |
|||
if cname_matching or initial_matching or ip_matching(a_destination): |
|||
yield initial |
|||
|
|||
|
|||
def register_rule(subdomain: str) -> None: |
|||
# Make a tree with domain parts |
|||
parts = subdomain.split('.') |
|||
parts.reverse() |
|||
dic = RULES_DICT |
|||
last_part = len(parts) - 1 |
|||
for p, part in enumerate(parts): |
|||
if isinstance(dic, bool): |
|||
return |
|||
if p == last_part: |
|||
dic[part] = True |
|||
else: |
|||
dic.setdefault(part, dict()) |
|||
dic = dic[part] |
|||
|
|||
|
|||
def register_rule_ip(network: str) -> None: |
|||
net = ipaddress.ip_network(network) |
|||
ip = net.network_address |
|||
dic = RULES_IP_DICT |
|||
last_bit = net.prefixlen - 1 |
|||
for b, bit in enumerate(get_bits(ip)): |
|||
if isinstance(dic, bool): |
|||
return |
|||
if b == last_bit: |
|||
dic[bit] = True |
|||
else: |
|||
dic.setdefault(bit, dict()) |
|||
dic = dic[bit] |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
|
|||
# Parsing arguments |
|||
parser = argparse.ArgumentParser( |
|||
description="Filter first-party trackers from a list of subdomains") |
|||
parser.add_argument( |
|||
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin, |
|||
help="Input file with DNS chains") |
|||
parser.add_argument( |
|||
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout, |
|||
help="Outptut file with one tracking subdomain per line") |
|||
parser.add_argument( |
|||
'-n', '--no-explicit', action='store_true', |
|||
help="Don't output domains already blocked with rules without CNAME") |
|||
parser.add_argument( |
|||
'-r', '--rules', type=argparse.FileType('r'), |
|||
help="List of domains domains to block (with their subdomains)") |
|||
parser.add_argument( |
|||
'-p', '--rules-ip', type=argparse.FileType('r'), |
|||
help="List of IPs ranges to block") |
|||
args = parser.parse_args() |
|||
|
|||
# Progress bar |
|||
widgets = [ |
|||
progressbar.Percentage(), |
|||
' ', progressbar.SimpleProgress(), |
|||
' ', progressbar.Bar(), |
|||
' ', progressbar.Timer(), |
|||
' ', progressbar.AdaptiveTransferSpeed(unit='req'), |
|||
' ', progressbar.AdaptiveETA(), |
|||
] |
|||
progress = progressbar.ProgressBar(widgets=widgets) |
|||
|
|||
# Reading rules |
|||
if args.rules: |
|||
for rule in args.rules: |
|||
register_rule(rule.strip()) |
|||
if args.rules_ip: |
|||
for rule in args.rules_ip: |
|||
register_rule_ip(rule.strip()) |
|||
|
|||
# Approximating line count |
|||
if args.input.seekable(): |
|||
lines = 0 |
|||
for line in args.input: |
|||
lines += 1 |
|||
progress.max_value = lines |
|||
args.input.seek(0) |
|||
|
|||
# Reading domains to filter |
|||
reader = csv.reader(args.input) |
|||
progress.start() |
|||
for chain in reader: |
|||
for match in get_matching(chain, no_explicit=args.no_explicit): |
|||
print(match, file=args.output) |
|||
progress.update(progress.value + 1) |
|||
progress.finish() |
@ -0,0 +1,2 @@ |
|||
*.custom.list |
|||
*.cache.list |
@ -0,0 +1,24 @@ |
|||
8.8.8.8 |
|||
8.8.4.4 |
|||
2001:4860:4860:0:0:0:0:8888 |
|||
2001:4860:4860:0:0:0:0:8844 |
|||
208.67.222.222 |
|||
208.67.220.220 |
|||
2620:119:35::35 |
|||
2620:119:53::53 |
|||
4.2.2.1 |
|||
4.2.2.2 |
|||
8.26.56.26 |
|||
8.20.247.20 |
|||
84.200.69.80 |
|||
84.200.70.40 |
|||
2001:1608:10:25:0:0:1c04:b12f |
|||
2001:1608:10:25:0:0:9249:d69b |
|||
9.9.9.10 |
|||
149.112.112.10 |
|||
2620:fe::10 |
|||
2620:fe::fe:10 |
|||
1.1.1.1 |
|||
1.0.0.1 |
|||
2606:4700:4700::1111 |
|||
2606:4700:4700::1001 |
@ -0,0 +1,35 @@ |
|||
#!/usr/bin/env python3 |
|||
# pylint: disable=C0103 |
|||
|
|||
""" |
|||
Filter out invalid domain names |
|||
""" |
|||
|
|||
import database |
|||
import argparse |
|||
import sys |
|||
|
|||
if __name__ == '__main__': |
|||
|
|||
# Parsing arguments |
|||
parser = argparse.ArgumentParser( |
|||
description="Filter out invalid domain names.") |
|||
parser.add_argument( |
|||
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin, |
|||
help="TODO") |
|||
parser.add_argument( |
|||
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout, |
|||
help="TODO") |
|||
parser.add_argument( |
|||
'-d', '--domain', action='store_true', |
|||
help="Can be domain") |
|||
parser.add_argument( |
|||
'-4', '--ip4', action='store_true', |
|||
help="Can be IP4") |
|||
args = parser.parse_args() |
|||
|
|||
for line in args.input: |
|||
line = line.strip() |
|||
if (args.domain and database.Database.validate_domain(line)) or \ |
|||
(args.ip4 and database.Database.validate_ip4address(line)): |
|||
print(line, file=args.output) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue