eulaurarien/feed_dns.py
Geoffrey Frogeye 53b14c6ffa
Removed TODO placeholders in commands description
It's better than nothing but not by that much
2019-12-19 08:07:01 +01:00

228 lines
6.8 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import database
import logging
import sys
import typing
import multiprocessing
import time
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
# select, write
FUNCTION_MAP: typing.Any = {
'a': (
database.Database.get_ip4,
database.Database.set_hostname,
),
'cname': (
database.Database.get_domain,
database.Database.set_hostname,
),
'ptr': (
database.Database.get_domain,
database.Database.set_ip4address,
),
}
class Writer(multiprocessing.Process):
def __init__(self,
recs_queue: multiprocessing.Queue,
autosave_interval: int = 0,
ip4_cache: int = 0,
):
super(Writer, self).__init__()
self.log = logging.getLogger(f'wr')
self.recs_queue = recs_queue
self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache
def run(self) -> None:
self.db = database.Database()
self.db.log = logging.getLogger(f'wr')
self.db.fill_ip4cache(max_size=self.ip4_cache)
if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval
else:
next_save = 0
self.db.enter_step('block_wait')
block: typing.List[Record]
for block in iter(self.recs_queue.get, None):
record: Record
for record in block:
select, write, updated, name, value = record
self.db.enter_step('feed_switch')
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
except ValueError:
self.log.exception("Cannot execute: %s", record)
if next_save > 0 and time.time() > next_save:
self.log.info("Saving database...")
self.db.save()
self.log.info("Done!")
next_save = time.time() + self.autosave_interval
self.db.enter_step('block_wait')
self.db.enter_step('end')
self.db.save()
class Parser():
def __init__(self,
buf: typing.Any,
recs_queue: multiprocessing.Queue,
block_size: int,
):
super(Parser, self).__init__()
self.buf = buf
self.log = logging.getLogger('pr')
self.recs_queue = recs_queue
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
self.prof.log = logging.getLogger('pr')
def register(self, record: Record) -> None:
self.prof.enter_step('register')
self.block.append(record)
if len(self.block) >= self.block_size:
self.prof.enter_step('put_block')
self.recs_queue.put(self.block)
self.block = list()
def run(self) -> None:
self.consume()
self.recs_queue.put(self.block)
self.prof.profile()
def consume(self) -> None:
raise NotImplementedError
class Rapid7Parser(Parser):
def consume(self) -> None:
data = dict()
for line in self.buf:
self.prof.enter_step('parse_rapid7')
split = line.split('"')
try:
for k in range(1, 14, 4):
key = split[k]
val = split[k+2]
data[key] = val
select, writer = FUNCTION_MAP[data['type']]
record = (
select,
writer,
int(data['timestamp']),
data['name'],
data['value']
)
except IndexError:
self.log.exception("Cannot parse: %s", line)
self.register(record)
class MassDnsParser(Parser):
# massdns --output Snrql
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
TYPES = {
'A': (FUNCTION_MAP['a'][0], FUNCTION_MAP['a'][1], -1, None),
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
'CNAME': (FUNCTION_MAP['cname'][0], FUNCTION_MAP['cname'][1], -1, -1),
}
def consume(self) -> None:
self.prof.enter_step('parse_massdns')
timestamp = 0
header = True
for line in self.buf:
line = line[:-1]
if not line:
header = True
continue
split = line.split(' ')
try:
if header:
timestamp = int(split[1])
header = False
else:
select, write, name_offset, value_offset = \
MassDnsParser.TYPES[split[1]]
record = (
select,
write,
timestamp,
split[0][:name_offset],
split[2][:value_offset],
)
self.register(record)
self.prof.enter_step('parse_massdns')
except KeyError:
continue
PARSERS = {
'rapid7': Rapid7Parser,
'massdns': MassDnsParser,
}
if __name__ == '__main__':
# Parsing arguments
log = logging.getLogger('feed_dns')
args_parser = argparse.ArgumentParser(
description="Read DNS records and import "
"tracking-relevant data into the database")
args_parser.add_argument(
'parser',
choices=PARSERS.keys(),
help="Input format")
args_parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="Input file")
args_parser.add_argument(
'-b', '--block-size', type=int, default=1024,
help="Performance tuning value")
args_parser.add_argument(
'-q', '--queue-size', type=int, default=128,
help="Performance tuning value")
args_parser.add_argument(
'-a', '--autosave-interval', type=int, default=900,
help="Interval to which the database will save in seconds. "
"0 to disable.")
args_parser.add_argument(
'-4', '--ip4-cache', type=int, default=0,
help="RAM cache for faster IPv4 lookup. "
"Maximum useful value: 512 MiB (536870912). "
"Warning: Depending on the rules, this might already "
"be a memory-heavy process, even without the cache.")
args = args_parser.parse_args()
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
maxsize=args.queue_size)
writer = Writer(recs_queue,
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache
)
writer.start()
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)
parser.run()
recs_queue.put(None)
writer.join()