Added intermediate representation for DNS datasets

It's just CSV.
The DNS from the datasets are not ordered consistently,
so we need to parse it completly.
It seems that converting to an IR before sending data to ./feed_dns.py
through a pipe is faster than decoding the JSON in ./feed_dns.py.
This will also reduce the storage of the resolved subdomains by
about 15% (compressed).
This commit is contained in:
Geoffrey Frogeye 2019-12-13 21:59:35 +01:00
parent 269b8278b5
commit 5023b85d7c
Signed by: geoffrey
GPG key ID: D8A7ECA00A8CD3DD
5 changed files with 68 additions and 31 deletions

View file

@ -4,6 +4,8 @@ import database
import argparse import argparse
import sys import sys
import logging import logging
import csv
import json
if __name__ == '__main__': if __name__ == '__main__':
@ -21,39 +23,39 @@ if __name__ == '__main__':
try: try:
DB.enter_step('iowait') DB.enter_step('iowait')
# line: bytes for row in csv.reader(args.input):
line: str # for line in args.input:
for line in args.input: DB.enter_step('feed_csv_parse')
DB.enter_step('feed_json_parse') dtype, timestamp, name, value = row
# split = line.split(b'"') # DB.enter_step('feed_json_parse')
split = line.split('"')
try:
updated = int(split[3])
name = split[7]
dtype = split[11]
value = split[15]
except IndexError:
log.error("Invalid JSON: %s", line)
continue
# DB.enter_step('feed_json_assert')
# data = json.loads(line) # data = json.loads(line)
# assert dtype == data['type'] # dtype = data['type'][0]
# assert name == data['name'] # # timestamp = data['timestamp']
# assert value == data['value'] # name = data['name']
# value = data['value']
DB.enter_step('feed_switch') DB.enter_step('feed_switch')
if dtype == 'a': if dtype == 'a':
for rule in DB.get_ip4(value): for rule in DB.get_ip4(value):
if not list(DB.get_domain_in_zone(name)): if not list(DB.get_domain_in_zone(name)):
DB.set_hostname(name, source=rule, updated=updated)
elif dtype == 'cname': DB.set_hostname(name, source=rule,
updated=int(timestamp))
# updated=int(data['timestamp']))
elif dtype == 'c':
for rule in DB.get_domain(value): for rule in DB.get_domain(value):
if not list(DB.get_domain_in_zone(name)): if not list(DB.get_domain_in_zone(name)):
DB.set_hostname(name, source=rule, updated=updated) DB.set_hostname(name, source=rule,
elif dtype == 'ptr': updated=int(timestamp))
# updated=int(data['timestamp']))
elif dtype == 'p':
for rule in DB.get_domain(value): for rule in DB.get_domain(value):
if not list(DB.get_ip4_in_network(name)): if not list(DB.get_ip4_in_network(name)):
DB.set_ip4address(name, source=rule, updated=updated) DB.set_ip4address(name, source=rule,
updated=int(timestamp))
# updated=int(data['timestamp']))
else:
raise NotImplementedError(f'Type: {dtype}')
DB.enter_step('iowait') DB.enter_step('iowait')
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning("Interupted.") log.warning("Interupted.")

36
json_to_csv.py Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env python3
import argparse
import sys
import logging
import json
import csv
if __name__ == '__main__':
# Parsing arguments
log = logging.getLogger('json_to_csv')
parser = argparse.ArgumentParser(
description="TODO")
parser.add_argument(
# '-i', '--input', type=argparse.FileType('rb'), default=sys.stdin.buffer,
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="TODO")
parser.add_argument(
# '-i', '--output', type=argparse.FileType('wb'), default=sys.stdout.buffer,
'-o', '--output', type=argparse.FileType('w'), default=sys.stdout,
help="TODO")
args = parser.parse_args()
writer = csv.writer(args.output)
for line in args.input:
data = json.loads(line)
try:
writer.writerow([
data['type'][0],
data['timestamp'],
data['name'],
data['value']])
except IndexError:
log.error('Could not parse line: %s', line)
pass

View file

@ -9,11 +9,11 @@ function log() {
# TODO Fetch 'em # TODO Fetch 'em
log "Reading PTR records…" log "Reading PTR records…"
pv ptr.json.gz | gunzip | ./feed_dns.py pv ptr.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py
log "Reading A records…" log "Reading A records…"
pv a.json.gz | gunzip | ./feed_dns.py pv a.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py
log "Reading CNAME records…" log "Reading CNAME records…"
pv cname.json.gz | gunzip | ./feed_dns.py pv cname.json.gz | gunzip | ./json_to_csv.py | ./feed_dns.py
log "Pruning old data…" log "Pruning old data…"
./database.py --prune ./database.py --prune

View file

@ -168,7 +168,7 @@ class Orchestrator():
@staticmethod @staticmethod
def format_rrset(rrset: dns.rrset.RRset) -> typing.Iterable[str]: def format_rrset(rrset: dns.rrset.RRset) -> typing.Iterable[str]:
if rrset.rdtype == dns.rdatatype.CNAME: if rrset.rdtype == dns.rdatatype.CNAME:
dtype = 'cname' dtype = 'c'
elif rrset.rdtype == dns.rdatatype.A: elif rrset.rdtype == dns.rdatatype.A:
dtype = 'a' dtype = 'a'
else: else:
@ -178,8 +178,7 @@ class Orchestrator():
value = item.to_text() value = item.to_text()
if rrset.rdtype == dns.rdatatype.CNAME: if rrset.rdtype == dns.rdatatype.CNAME:
value = value[:-1] value = value[:-1]
yield '{"timestamp":"' + str(int(time.time())) + '","name":"' + \ yield f'{dtype},{int(time.time())},{name},{value}\n'
name + '","type":"' + dtype + '","value":"' + value + '"}\n'
def run(self) -> typing.Iterable[str]: def run(self) -> typing.Iterable[str]:
""" """

View file

@ -6,7 +6,7 @@ function log() {
log "Compiling locally known subdomain…" log "Compiling locally known subdomain…"
# Sort by last character to utilize the DNS server caching mechanism # Sort by last character to utilize the DNS server caching mechanism
pv subdomains/*.list | rev | sort -u | rev > temp/all_subdomains.list pv subdomains/*.list | sed 's/\r$//' | rev | sort -u | rev > temp/all_subdomains.list
log "Resolving locally known subdomain…" log "Resolving locally known subdomain…"
pv temp/all_subdomains.list | ./resolve_subdomains.py --output temp/all_resolved.json pv temp/all_subdomains.list | ./resolve_subdomains.py --output temp/all_resolved.csv