Geoffrey Frogeye
dcf39c9582
Why did I think this would be a good idea? - value don't need to be packed most of the time, but we don't know that early - packed domain (it's one most of the time) is way larger than its unpacked counterpart
231 lines
6.6 KiB
Python
Executable file
231 lines
6.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import database
|
|
import logging
|
|
import sys
|
|
import typing
|
|
import multiprocessing
|
|
import enum
|
|
|
|
Record = typing.Tuple[typing.Callable,
|
|
typing.Callable, int, database.Path, database.Path]
|
|
|
|
# select, write, name_packer, value_packer
|
|
FUNCTION_MAP: typing.Any = {
|
|
'a': (
|
|
database.Database.get_ip4,
|
|
database.Database.set_hostname,
|
|
database.Database.pack_domain,
|
|
database.Database.pack_ip4address,
|
|
),
|
|
'cname': (
|
|
database.Database.get_domain,
|
|
database.Database.set_hostname,
|
|
database.Database.pack_domain,
|
|
database.Database.pack_domain,
|
|
),
|
|
'ptr': (
|
|
database.Database.get_domain,
|
|
database.Database.set_ip4address,
|
|
database.Database.pack_ip4address,
|
|
database.Database.pack_domain,
|
|
),
|
|
}
|
|
|
|
|
|
class Writer(multiprocessing.Process):
|
|
def __init__(self,
|
|
recs_queue: multiprocessing.Queue,
|
|
index: int = 0):
|
|
super(Writer, self).__init__()
|
|
self.log = logging.getLogger(f'wr')
|
|
self.recs_queue = recs_queue
|
|
|
|
def run(self) -> None:
|
|
self.db = database.Database()
|
|
self.db.log = logging.getLogger(f'wr')
|
|
|
|
self.db.enter_step('block_wait')
|
|
block: typing.List[Record]
|
|
for block in iter(self.recs_queue.get, None):
|
|
|
|
record: Record
|
|
for record in block:
|
|
|
|
select, write, updated, name, value = record
|
|
self.db.enter_step('feed_switch')
|
|
|
|
for source in select(self.db, value):
|
|
write(self.db, name, updated, source=source)
|
|
|
|
self.db.enter_step('block_wait')
|
|
|
|
self.db.enter_step('end')
|
|
self.db.save()
|
|
|
|
|
|
class Parser():
|
|
def __init__(self,
|
|
buf: typing.Any,
|
|
recs_queue: multiprocessing.Queue,
|
|
block_size: int,
|
|
):
|
|
super(Parser, self).__init__()
|
|
self.buf = buf
|
|
self.log = logging.getLogger('pr')
|
|
self.recs_queue = recs_queue
|
|
self.block: typing.List[Record] = list()
|
|
self.block_size = block_size
|
|
self.prof = database.Profiler()
|
|
self.prof.log = logging.getLogger('pr')
|
|
|
|
def register(self,
|
|
rtype: str,
|
|
timestamp: int,
|
|
name_str: str,
|
|
value_str: str,
|
|
) -> None:
|
|
self.prof.enter_step('pack')
|
|
try:
|
|
select, write, name_packer, value_packer = FUNCTION_MAP[rtype]
|
|
except KeyError:
|
|
self.log.exception("Unknown record type")
|
|
return
|
|
try:
|
|
name = name_packer(name_str)
|
|
except ValueError:
|
|
self.log.exception("Cannot parse name ('%s' with %s)",
|
|
name_str, name_packer)
|
|
return
|
|
try:
|
|
value = value_packer(value_str)
|
|
except ValueError:
|
|
self.log.exception("Cannot parse value ('%s' with %s)",
|
|
value_str, value_packer)
|
|
return
|
|
record = (select, write, timestamp, name, value)
|
|
|
|
self.prof.enter_step('grow_block')
|
|
self.block.append(record)
|
|
if len(self.block) >= self.block_size:
|
|
self.prof.enter_step('put_block')
|
|
self.recs_queue.put(self.block)
|
|
self.block = list()
|
|
|
|
def run(self) -> None:
|
|
self.consume()
|
|
self.recs_queue.put(self.block)
|
|
self.prof.profile()
|
|
|
|
def consume(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class Rapid7Parser(Parser):
|
|
def consume(self) -> None:
|
|
data = dict()
|
|
self.prof.enter_step('iowait')
|
|
for line in self.buf:
|
|
self.prof.enter_step('parse_rapid7')
|
|
split = line.split('"')
|
|
|
|
try:
|
|
for k in range(1, 14, 4):
|
|
key = split[k]
|
|
val = split[k+2]
|
|
data[key] = val
|
|
|
|
self.register(
|
|
data['type'],
|
|
int(data['timestamp']),
|
|
data['name'],
|
|
data['value'],
|
|
)
|
|
self.prof.enter_step('iowait')
|
|
except KeyError:
|
|
# Sometimes JSON records are off the place
|
|
self.log.exception("Cannot parse: %s", line)
|
|
|
|
|
|
class DnsMassParser(Parser):
|
|
# dnsmass --output Snrql
|
|
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
|
|
TYPES = {
|
|
'A': ('a', -1, None),
|
|
# 'AAAA': ('aaaa', -1, None),
|
|
'CNAME': ('cname', -1, -1),
|
|
}
|
|
|
|
def consume(self) -> None:
|
|
self.prof.enter_step('parse_dnsmass')
|
|
timestamp = 0
|
|
header = True
|
|
for line in self.buf:
|
|
line = line[:-1]
|
|
if not line:
|
|
header = True
|
|
continue
|
|
|
|
split = line.split(' ')
|
|
try:
|
|
if header:
|
|
timestamp = int(split[1])
|
|
header = False
|
|
else:
|
|
rtype, name_offset, value_offset = \
|
|
DnsMassParser.TYPES[split[1]]
|
|
self.register(
|
|
rtype,
|
|
timestamp,
|
|
split[0][:name_offset],
|
|
split[2][:value_offset],
|
|
)
|
|
self.prof.enter_step('parse_dnsmass')
|
|
except KeyError:
|
|
# Malformed records are less likely to happen,
|
|
# but we may never be sure
|
|
self.log.exception("Cannot parse: %s", line)
|
|
|
|
|
|
PARSERS = {
|
|
'rapid7': Rapid7Parser,
|
|
'dnsmass': DnsMassParser,
|
|
}
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Parsing arguments
|
|
log = logging.getLogger('feed_dns')
|
|
args_parser = argparse.ArgumentParser(
|
|
description="TODO")
|
|
args_parser.add_argument(
|
|
'parser',
|
|
choices=PARSERS.keys(),
|
|
help="TODO")
|
|
args_parser.add_argument(
|
|
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
|
|
help="TODO")
|
|
args_parser.add_argument(
|
|
'-j', '--workers', type=int, default=4,
|
|
help="TODO")
|
|
args_parser.add_argument(
|
|
'-b', '--block-size', type=int, default=100,
|
|
help="TODO")
|
|
args_parser.add_argument(
|
|
'-q', '--queue-size', type=int, default=10,
|
|
help="TODO")
|
|
args = args_parser.parse_args()
|
|
|
|
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
|
maxsize=args.queue_size)
|
|
|
|
writer = Writer(recs_queue)
|
|
writer.start()
|
|
|
|
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)
|
|
parser.run()
|
|
|
|
recs_queue.put(None)
|
|
writer.join()
|