eulaurarien/feed_dns.py
Geoffrey Frogeye 095e51fad9
Ensure massdns output is lower case
For some reason some server output part of their response as upper case.
This fails the reading process as it's designed to only work on lower
case for performance reasons.
2019-12-26 15:32:24 +01:00

264 lines
8.1 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import database
import logging
import sys
import typing
import multiprocessing
import time
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
# select, write
FUNCTION_MAP: typing.Any = {
'a': (
database.Database.get_ip4,
database.Database.set_hostname,
),
'cname': (
database.Database.get_domain,
database.Database.set_hostname,
),
'ptr': (
database.Database.get_domain,
database.Database.set_ip4address,
),
}
class Writer(multiprocessing.Process):
def __init__(self,
recs_queue: multiprocessing.Queue = None,
autosave_interval: int = 0,
ip4_cache: int = 0,
):
if recs_queue: # MP
super(Writer, self).__init__()
self.recs_queue = recs_queue
self.log = logging.getLogger(f'wr')
self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache
if not recs_queue: # No MP
self.open_db()
def open_db(self) -> None:
self.db = database.Database()
self.db.log = logging.getLogger(f'wr')
self.db.fill_ip4cache(max_size=self.ip4_cache)
def exec_record(self, record: Record) -> None:
self.db.enter_step('exec_record')
select, write, updated, name, value = record
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
except (ValueError, IndexError):
# ValueError: non-number in IP
# IndexError: IP too big
self.log.exception("Cannot execute: %s", record)
def end(self) -> None:
self.db.enter_step('end')
self.db.save()
def run(self) -> None:
self.open_db()
if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval
else:
next_save = 0
self.db.enter_step('block_wait')
block: typing.List[Record]
for block in iter(self.recs_queue.get, None):
record: Record
for record in block:
self.exec_record(record)
if next_save > 0 and time.time() > next_save:
self.log.info("Saving database...")
self.db.save()
self.log.info("Done!")
next_save = time.time() + self.autosave_interval
self.db.enter_step('block_wait')
self.end()
class Parser():
def __init__(self,
buf: typing.Any,
recs_queue: multiprocessing.Queue = None,
block_size: int = 0,
writer: Writer = None,
):
assert bool(writer) ^ bool(block_size and recs_queue)
self.buf = buf
self.log = logging.getLogger('pr')
self.recs_queue = recs_queue
if writer: # No MP
self.prof: database.Profiler = writer.db
self.register = writer.exec_record
else: # MP
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
self.prof.log = logging.getLogger('pr')
self.register = self.add_to_queue
def add_to_queue(self, record: Record) -> None:
self.prof.enter_step('register')
self.block.append(record)
if len(self.block) >= self.block_size:
self.prof.enter_step('put_block')
assert self.recs_queue
self.recs_queue.put(self.block)
self.block = list()
def run(self) -> None:
self.consume()
if self.recs_queue:
self.recs_queue.put(self.block)
self.prof.profile()
def consume(self) -> None:
raise NotImplementedError
class Rapid7Parser(Parser):
def consume(self) -> None:
data = dict()
for line in self.buf:
self.prof.enter_step('parse_rapid7')
split = line.split('"')
try:
for k in range(1, 14, 4):
key = split[k]
val = split[k+2]
data[key] = val
select, writer = FUNCTION_MAP[data['type']]
record = (
select,
writer,
int(data['timestamp']),
data['name'],
data['value']
)
except IndexError:
self.log.exception("Cannot parse: %s", line)
self.register(record)
class MassDnsParser(Parser):
# massdns --output Snrql
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
TYPES = {
'A': (FUNCTION_MAP['a'][0], FUNCTION_MAP['a'][1], -1, None),
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
'CNAME': (FUNCTION_MAP['cname'][0], FUNCTION_MAP['cname'][1], -1, -1),
}
def consume(self) -> None:
self.prof.enter_step('parse_massdns')
timestamp = 0
header = True
for line in self.buf:
line = line[:-1]
if not line:
header = True
continue
split = line.split(' ')
try:
if header:
timestamp = int(split[1])
header = False
else:
select, write, name_offset, value_offset = \
MassDnsParser.TYPES[split[1]]
record = (
select,
write,
timestamp,
split[0][:name_offset].lower(),
split[2][:value_offset].lower(),
)
self.register(record)
self.prof.enter_step('parse_massdns')
except KeyError:
continue
PARSERS = {
'rapid7': Rapid7Parser,
'massdns': MassDnsParser,
}
if __name__ == '__main__':
# Parsing arguments
log = logging.getLogger('feed_dns')
args_parser = argparse.ArgumentParser(
description="Read DNS records and import "
"tracking-relevant data into the database")
args_parser.add_argument(
'parser',
choices=PARSERS.keys(),
help="Input format")
args_parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file")
args_parser.add_argument(
'-b', '--block-size', type=int, default=1024,
help="Performance tuning value")
args_parser.add_argument(
'-q', '--queue-size', type=int, default=128,
help="Performance tuning value")
args_parser.add_argument(
'-a', '--autosave-interval', type=int, default=900,
help="Interval to which the database will save in seconds. "
"0 to disable.")
args_parser.add_argument(
'-s', '--single-process', action='store_true',
help="Only use one process. "
"Might be useful for single core computers.")
args_parser.add_argument(
'-4', '--ip4-cache', type=int, default=0,
help="RAM cache for faster IPv4 lookup. "
"Maximum useful value: 512 MiB (536870912). "
"Warning: Depending on the rules, this might already "
"be a memory-heavy process, even without the cache.")
args = args_parser.parse_args()
parser_cls = PARSERS[args.parser]
if args.single_process:
writer = Writer(
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache
)
parser = parser_cls(args.input, writer=writer)
parser.run()
writer.end()
else:
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
maxsize=args.queue_size)
writer = Writer(recs_queue,
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache
)
writer.start()
parser = parser_cls(args.input,
recs_queue=recs_queue,
block_size=args.block_size
)
parser.run()
recs_queue.put(None)
writer.join()