#!/usr/bin/env python3 import argparse import database import logging import sys import typing import multiprocessing import time Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str] # select, write FUNCTION_MAP: typing.Any = { "a": ( database.Database.get_ip4, database.Database.set_hostname, ), "cname": ( database.Database.get_domain, database.Database.set_hostname, ), "ptr": ( database.Database.get_domain, database.Database.set_ip4address, ), } class Writer(multiprocessing.Process): def __init__( self, recs_queue: multiprocessing.Queue = None, autosave_interval: int = 0, ip4_cache: int = 0, ): if recs_queue: # MP super(Writer, self).__init__() self.recs_queue = recs_queue self.log = logging.getLogger(f"wr") self.autosave_interval = autosave_interval self.ip4_cache = ip4_cache if not recs_queue: # No MP self.open_db() def open_db(self) -> None: self.db = database.Database() self.db.log = logging.getLogger(f"wr") self.db.fill_ip4cache(max_size=self.ip4_cache) def exec_record(self, record: Record) -> None: self.db.enter_step("exec_record") select, write, updated, name, value = record try: for source in select(self.db, value): write(self.db, name, updated, source=source) except (ValueError, IndexError): # ValueError: non-number in IP # IndexError: IP too big self.log.exception("Cannot execute: %s", record) def end(self) -> None: self.db.enter_step("end") self.db.save() def run(self) -> None: self.open_db() if self.autosave_interval > 0: next_save = time.time() + self.autosave_interval else: next_save = 0 self.db.enter_step("block_wait") block: typing.List[Record] for block in iter(self.recs_queue.get, None): record: Record for record in block: self.exec_record(record) if next_save > 0 and time.time() > next_save: self.log.info("Saving database...") self.db.save() self.log.info("Done!") next_save = time.time() + self.autosave_interval self.db.enter_step("block_wait") self.end() class Parser: def __init__( self, buf: typing.Any, recs_queue: multiprocessing.Queue = None, block_size: int = 0, writer: Writer = None, ): assert bool(writer) ^ bool(block_size and recs_queue) self.buf = buf self.log = logging.getLogger("pr") self.recs_queue = recs_queue if writer: # No MP self.prof: database.Profiler = writer.db self.register = writer.exec_record else: # MP self.block: typing.List[Record] = list() self.block_size = block_size self.prof = database.Profiler() self.prof.log = logging.getLogger("pr") self.register = self.add_to_queue def add_to_queue(self, record: Record) -> None: self.prof.enter_step("register") self.block.append(record) if len(self.block) >= self.block_size: self.prof.enter_step("put_block") assert self.recs_queue self.recs_queue.put(self.block) self.block = list() def run(self) -> None: self.consume() if self.recs_queue: self.recs_queue.put(self.block) self.prof.profile() def consume(self) -> None: raise NotImplementedError class Rapid7Parser(Parser): def consume(self) -> None: data = dict() for line in self.buf: self.prof.enter_step("parse_rapid7") split = line.split('"') try: for k in range(1, 14, 4): key = split[k] val = split[k + 2] data[key] = val select, writer = FUNCTION_MAP[data["type"]] record = ( select, writer, int(data["timestamp"]), data["name"], data["value"], ) except (IndexError, KeyError): # IndexError: missing field # KeyError: Unknown type field self.log.exception("Cannot parse: %s", line) self.register(record) class MassDnsParser(Parser): # massdns --output Snrql # --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4 TYPES = { "A": (FUNCTION_MAP["a"][0], FUNCTION_MAP["a"][1], -1, None), # 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None), "CNAME": (FUNCTION_MAP["cname"][0], FUNCTION_MAP["cname"][1], -1, -1), } def consume(self) -> None: self.prof.enter_step("parse_massdns") timestamp = 0 header = True for line in self.buf: line = line[:-1] if not line: header = True continue split = line.split(" ") try: if header: timestamp = int(split[1]) header = False else: select, write, name_offset, value_offset = MassDnsParser.TYPES[ split[1] ] record = ( select, write, timestamp, split[0][:name_offset].lower(), split[2][:value_offset].lower(), ) self.register(record) self.prof.enter_step("parse_massdns") except KeyError: continue PARSERS = { "rapid7": Rapid7Parser, "massdns": MassDnsParser, } if __name__ == "__main__": # Parsing arguments log = logging.getLogger("feed_dns") args_parser = argparse.ArgumentParser( description="Read DNS records and import " "tracking-relevant data into the database" ) args_parser.add_argument("parser", choices=PARSERS.keys(), help="Input format") args_parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="Input file", ) args_parser.add_argument( "-b", "--block-size", type=int, default=1024, help="Performance tuning value" ) args_parser.add_argument( "-q", "--queue-size", type=int, default=128, help="Performance tuning value" ) args_parser.add_argument( "-a", "--autosave-interval", type=int, default=900, help="Interval to which the database will save in seconds. " "0 to disable.", ) args_parser.add_argument( "-s", "--single-process", action="store_true", help="Only use one process. " "Might be useful for single core computers.", ) args_parser.add_argument( "-4", "--ip4-cache", type=int, default=0, help="RAM cache for faster IPv4 lookup. " "Maximum useful value: 512 MiB (536870912). " "Warning: Depending on the rules, this might already " "be a memory-heavy process, even without the cache.", ) args = args_parser.parse_args() parser_cls = PARSERS[args.parser] if args.single_process: writer = Writer( autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache ) parser = parser_cls(args.input, writer=writer) parser.run() writer.end() else: recs_queue: multiprocessing.Queue = multiprocessing.Queue( maxsize=args.queue_size ) writer = Writer( recs_queue, autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache, ) writer.start() parser = parser_cls( args.input, recs_queue=recs_queue, block_size=args.block_size ) parser.run() recs_queue.put(None) writer.join()