#!/usr/bin/env python3 import argparse import database import logging import sys import typing import multiprocessing import enum Record = typing.Tuple[typing.Callable, typing.Callable, int, database.Path, database.Path] # select, write, name_packer, value_packer FUNCTION_MAP: typing.Any = { 'a': ( database.Database.get_ip4, database.Database.set_hostname, database.Database.pack_domain, database.Database.pack_ip4address, ), 'cname': ( database.Database.get_domain, database.Database.set_hostname, database.Database.pack_domain, database.Database.pack_domain, ), 'ptr': ( database.Database.get_domain, database.Database.set_ip4address, database.Database.pack_ip4address, database.Database.pack_domain, ), } class Writer(multiprocessing.Process): def __init__(self, recs_queue: multiprocessing.Queue, index: int = 0): super(Writer, self).__init__() self.log = logging.getLogger(f'wr') self.recs_queue = recs_queue def run(self) -> None: self.db = database.Database() self.db.log = logging.getLogger(f'wr') self.db.enter_step('block_wait') block: typing.List[Record] for block in iter(self.recs_queue.get, None): record: Record for record in block: select, write, updated, name, value = record self.db.enter_step('feed_switch') for source in select(self.db, value): write(self.db, name, updated, source=source) self.db.enter_step('block_wait') self.db.enter_step('end') self.db.save() class Parser(): def __init__(self, buf: typing.Any, recs_queue: multiprocessing.Queue, block_size: int, ): super(Parser, self).__init__() self.buf = buf self.log = logging.getLogger('pr') self.recs_queue = recs_queue self.block: typing.List[Record] = list() self.block_size = block_size self.prof = database.Profiler() self.prof.log = logging.getLogger('pr') def register(self, rtype: str, timestamp: int, name_str: str, value_str: str, ) -> None: self.prof.enter_step('pack') try: select, write, name_packer, value_packer = FUNCTION_MAP[rtype] except KeyError: self.log.exception("Unknown record type") return try: name = name_packer(name_str) except ValueError: self.log.exception("Cannot parse name ('%s' with %s)", name_str, name_packer) return try: value = value_packer(value_str) except ValueError: self.log.exception("Cannot parse value ('%s' with %s)", value_str, value_packer) return record = (select, write, timestamp, name, value) self.prof.enter_step('grow_block') self.block.append(record) if len(self.block) >= self.block_size: self.prof.enter_step('put_block') self.recs_queue.put(self.block) self.block = list() def run(self) -> None: self.consume() self.recs_queue.put(self.block) self.prof.profile() def consume(self) -> None: raise NotImplementedError class Rapid7Parser(Parser): def consume(self) -> None: data = dict() self.prof.enter_step('iowait') for line in self.buf: self.prof.enter_step('parse_rapid7') split = line.split('"') try: for k in range(1, 14, 4): key = split[k] val = split[k+2] data[key] = val self.register( data['type'], int(data['timestamp']), data['name'], data['value'], ) self.prof.enter_step('iowait') except KeyError: # Sometimes JSON records are off the place self.log.exception("Cannot parse: %s", line) class DnsMassParser(Parser): # dnsmass --output Snrql # --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4 TYPES = { 'A': ('a', -1, None), # 'AAAA': ('aaaa', -1, None), 'CNAME': ('cname', -1, -1), } def consume(self) -> None: self.prof.enter_step('parse_dnsmass') timestamp = 0 header = True for line in self.buf: line = line[:-1] if not line: header = True continue split = line.split(' ') try: if header: timestamp = int(split[1]) header = False else: rtype, name_offset, value_offset = \ DnsMassParser.TYPES[split[1]] self.register( rtype, timestamp, split[0][:name_offset], split[2][:value_offset], ) self.prof.enter_step('parse_dnsmass') except KeyError: # Malformed records are less likely to happen, # but we may never be sure self.log.exception("Cannot parse: %s", line) PARSERS = { 'rapid7': Rapid7Parser, 'dnsmass': DnsMassParser, } if __name__ == '__main__': # Parsing arguments log = logging.getLogger('feed_dns') args_parser = argparse.ArgumentParser( description="TODO") args_parser.add_argument( 'parser', choices=PARSERS.keys(), help="TODO") args_parser.add_argument( '-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="TODO") args_parser.add_argument( '-j', '--workers', type=int, default=4, help="TODO") args_parser.add_argument( '-b', '--block-size', type=int, default=100, help="TODO") args_parser.add_argument( '-q', '--queue-size', type=int, default=10, help="TODO") args = args_parser.parse_args() recs_queue: multiprocessing.Queue = multiprocessing.Queue( maxsize=args.queue_size) writer = Writer(recs_queue) writer.start() parser = PARSERS[args.parser](args.input, recs_queue, args.block_size) parser.run() recs_queue.put(None) writer.join()