From 5023b85d7ca802f2908526b0e48ac5245aa457df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Fri, 13 Dec 2019 21:59:35 +0100 Subject: [PATCH] Added intermediate representation for DNS datasets It's just CSV. The DNS from the datasets are not ordered consistently, so we need to parse it completly. It seems that converting to an IR before sending data to ./feed_dns.py through a pipe is faster than decoding the JSON in ./feed_dns.py. This will also reduce the storage of the resolved subdomains by about 15% (compressed). --- feed_dns.py | 48 ++++++++++++++++++++++--------------------- json_to_csv.py | 36 ++++++++++++++++++++++++++++++++ new_workflow.sh | 6 +++--- resolve_subdomains.py | 5 ++--- resolve_subdomains.sh | 4 ++-- 5 files changed, 68 insertions(+), 31 deletions(-) create mode 100755 json_to_csv.py diff --git a/feed_dns.py b/feed_dns.py index f3fde5a..cb996e9 100755 --- a/feed_dns.py +++ b/feed_dns.py @@ -4,6 +4,8 @@ import database import argparse import sys import logging +import csv +import json if __name__ == '__main__': @@ -21,39 +23,39 @@ if __name__ == '__main__': try: DB.enter_step('iowait') - # line: bytes - line: str - for line in args.input: - DB.enter_step('feed_json_parse') - # split = line.split(b'"') - split = line.split('"') - try: - updated = int(split[3]) - name = split[7] - dtype = split[11] - value = split[15] - except IndexError: - log.error("Invalid JSON: %s", line) - continue - # DB.enter_step('feed_json_assert') + for row in csv.reader(args.input): + # for line in args.input: + DB.enter_step('feed_csv_parse') + dtype, timestamp, name, value = row + # DB.enter_step('feed_json_parse') # data = json.loads(line) - # assert dtype == data['type'] - # assert name == data['name'] - # assert value == data['value'] + # dtype = data['type'][0] + # # timestamp = data['timestamp'] + # name = data['name'] + # value = data['value'] DB.enter_step('feed_switch') if dtype == 'a': for rule in DB.get_ip4(value): if not list(DB.get_domain_in_zone(name)): - DB.set_hostname(name, source=rule, updated=updated) - elif dtype == 'cname': + + DB.set_hostname(name, source=rule, + updated=int(timestamp)) + # updated=int(data['timestamp'])) + elif dtype == 'c': for rule in DB.get_domain(value): if not list(DB.get_domain_in_zone(name)): - DB.set_hostname(name, source=rule, updated=updated) - elif dtype == 'ptr': + DB.set_hostname(name, source=rule, + updated=int(timestamp)) + # updated=int(data['timestamp'])) + elif dtype == 'p': for rule in DB.get_domain(value): if not list(DB.get_ip4_in_network(name)): - DB.set_ip4address(name, source=rule, updated=updated) + DB.set_ip4address(name, source=rule, + updated=int(timestamp)) + # updated=int(data['timestamp'])) + else: + raise NotImplementedError(f'Type: {dtype}') DB.enter_step('iowait') except KeyboardInterrupt: log.warning("Interupted.") diff --git a/json_to_csv.py b/json_to_csv.py new file mode 100755 index 0000000..11a3600 --- /dev/null +++ b/json_to_csv.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +import argparse +import sys +import logging +import json +import csv + +if __name__ == '__main__': + + # Parsing arguments + log = logging.getLogger('json_to_csv') + parser = argparse.ArgumentParser( + description="TODO") + parser.add_argument( + # '-i', '--input', type=argparse.FileType('rb'), default=sys.stdin.buffer, + '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, + help="TODO") + parser.add_argument( + # '-i', '--output', type=argparse.FileType('wb'), default=sys.stdout.buffer, + '-o', '--output', type=argparse.FileType('w'), default=sys.stdout, + help="TODO") + args = parser.parse_args() + + writer = csv.writer(args.output) + for line in args.input: + data = json.loads(line) + try: + writer.writerow([ + data['type'][0], + data['timestamp'], + data['name'], + data['value']]) + except IndexError: + log.error('Could not parse line: %s', line) + pass diff --git a/new_workflow.sh b/new_workflow.sh index c98cd46..e21b426 100755 --- a/new_workflow.sh +++ b/new_workflow.sh @@ -9,11 +9,11 @@ function log() { # TODO Fetch 'em log "Reading PTR records…" -pv ptr.json.gz | gunzip | ./feed_dns.py +pv ptr.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py log "Reading A records…" -pv a.json.gz | gunzip | ./feed_dns.py +pv a.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py log "Reading CNAME records…" -pv cname.json.gz | gunzip | ./feed_dns.py +pv cname.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py log "Pruning old data…" ./database.py --prune diff --git a/resolve_subdomains.py b/resolve_subdomains.py index fa2ea59..bc26e34 100755 --- a/resolve_subdomains.py +++ b/resolve_subdomains.py @@ -168,7 +168,7 @@ class Orchestrator(): @staticmethod def format_rrset(rrset: dns.rrset.RRset) -> typing.Iterable[str]: if rrset.rdtype == dns.rdatatype.CNAME: - dtype = 'cname' + dtype = 'c' elif rrset.rdtype == dns.rdatatype.A: dtype = 'a' else: @@ -178,8 +178,7 @@ class Orchestrator(): value = item.to_text() if rrset.rdtype == dns.rdatatype.CNAME: value = value[:-1] - yield '{"timestamp":"' + str(int(time.time())) + '","name":"' + \ - name + '","type":"' + dtype + '","value":"' + value + '"}\n' + yield f'{dtype},{int(time.time())},{name},{value}\n' def run(self) -> typing.Iterable[str]: """ diff --git a/resolve_subdomains.sh b/resolve_subdomains.sh index ee5f83c..e37ddeb 100755 --- a/resolve_subdomains.sh +++ b/resolve_subdomains.sh @@ -6,7 +6,7 @@ function log() { log "Compiling locally known subdomain…" # Sort by last character to utilize the DNS server caching mechanism -pv subdomains/*.list | rev | sort -u | rev > temp/all_subdomains.list +pv subdomains/*.list | sed 's/\r$//' | rev | sort -u | rev > temp/all_subdomains.list log "Resolving locally known subdomain…" -pv temp/all_subdomains.list | ./resolve_subdomains.py --output temp/all_resolved.json +pv temp/all_subdomains.list | ./resolve_subdomains.py --output temp/all_resolved.csv