For some reason some server output part of their response as upper case. This fails the reading process as it's designed to only work on lower case for performance reasons.
264 lines
8.1 KiB
Python
Executable file
264 lines
8.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import database
|
|
import logging
|
|
import sys
|
|
import typing
|
|
import multiprocessing
|
|
import time
|
|
|
|
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
|
|
|
|
# select, write
|
|
FUNCTION_MAP: typing.Any = {
|
|
'a': (
|
|
database.Database.get_ip4,
|
|
database.Database.set_hostname,
|
|
),
|
|
'cname': (
|
|
database.Database.get_domain,
|
|
database.Database.set_hostname,
|
|
),
|
|
'ptr': (
|
|
database.Database.get_domain,
|
|
database.Database.set_ip4address,
|
|
),
|
|
}
|
|
|
|
|
|
class Writer(multiprocessing.Process):
|
|
def __init__(self,
|
|
recs_queue: multiprocessing.Queue = None,
|
|
autosave_interval: int = 0,
|
|
ip4_cache: int = 0,
|
|
):
|
|
if recs_queue: # MP
|
|
super(Writer, self).__init__()
|
|
self.recs_queue = recs_queue
|
|
self.log = logging.getLogger(f'wr')
|
|
self.autosave_interval = autosave_interval
|
|
self.ip4_cache = ip4_cache
|
|
if not recs_queue: # No MP
|
|
self.open_db()
|
|
|
|
def open_db(self) -> None:
|
|
self.db = database.Database()
|
|
self.db.log = logging.getLogger(f'wr')
|
|
self.db.fill_ip4cache(max_size=self.ip4_cache)
|
|
|
|
def exec_record(self, record: Record) -> None:
|
|
self.db.enter_step('exec_record')
|
|
select, write, updated, name, value = record
|
|
try:
|
|
for source in select(self.db, value):
|
|
write(self.db, name, updated, source=source)
|
|
except (ValueError, IndexError):
|
|
# ValueError: non-number in IP
|
|
# IndexError: IP too big
|
|
self.log.exception("Cannot execute: %s", record)
|
|
|
|
def end(self) -> None:
|
|
self.db.enter_step('end')
|
|
self.db.save()
|
|
|
|
def run(self) -> None:
|
|
self.open_db()
|
|
if self.autosave_interval > 0:
|
|
next_save = time.time() + self.autosave_interval
|
|
else:
|
|
next_save = 0
|
|
|
|
self.db.enter_step('block_wait')
|
|
block: typing.List[Record]
|
|
for block in iter(self.recs_queue.get, None):
|
|
|
|
record: Record
|
|
for record in block:
|
|
self.exec_record(record)
|
|
|
|
if next_save > 0 and time.time() > next_save:
|
|
self.log.info("Saving database...")
|
|
self.db.save()
|
|
self.log.info("Done!")
|
|
next_save = time.time() + self.autosave_interval
|
|
|
|
self.db.enter_step('block_wait')
|
|
self.end()
|
|
|
|
|
|
class Parser():
|
|
def __init__(self,
|
|
buf: typing.Any,
|
|
recs_queue: multiprocessing.Queue = None,
|
|
block_size: int = 0,
|
|
writer: Writer = None,
|
|
):
|
|
assert bool(writer) ^ bool(block_size and recs_queue)
|
|
self.buf = buf
|
|
self.log = logging.getLogger('pr')
|
|
self.recs_queue = recs_queue
|
|
if writer: # No MP
|
|
self.prof: database.Profiler = writer.db
|
|
self.register = writer.exec_record
|
|
else: # MP
|
|
self.block: typing.List[Record] = list()
|
|
self.block_size = block_size
|
|
self.prof = database.Profiler()
|
|
self.prof.log = logging.getLogger('pr')
|
|
self.register = self.add_to_queue
|
|
|
|
def add_to_queue(self, record: Record) -> None:
|
|
self.prof.enter_step('register')
|
|
self.block.append(record)
|
|
if len(self.block) >= self.block_size:
|
|
self.prof.enter_step('put_block')
|
|
assert self.recs_queue
|
|
self.recs_queue.put(self.block)
|
|
self.block = list()
|
|
|
|
def run(self) -> None:
|
|
self.consume()
|
|
if self.recs_queue:
|
|
self.recs_queue.put(self.block)
|
|
self.prof.profile()
|
|
|
|
def consume(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class Rapid7Parser(Parser):
|
|
def consume(self) -> None:
|
|
data = dict()
|
|
for line in self.buf:
|
|
self.prof.enter_step('parse_rapid7')
|
|
split = line.split('"')
|
|
|
|
try:
|
|
for k in range(1, 14, 4):
|
|
key = split[k]
|
|
val = split[k+2]
|
|
data[key] = val
|
|
|
|
select, writer = FUNCTION_MAP[data['type']]
|
|
record = (
|
|
select,
|
|
writer,
|
|
int(data['timestamp']),
|
|
data['name'],
|
|
data['value']
|
|
)
|
|
except IndexError:
|
|
self.log.exception("Cannot parse: %s", line)
|
|
self.register(record)
|
|
|
|
|
|
class MassDnsParser(Parser):
|
|
# massdns --output Snrql
|
|
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
|
|
TYPES = {
|
|
'A': (FUNCTION_MAP['a'][0], FUNCTION_MAP['a'][1], -1, None),
|
|
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
|
|
'CNAME': (FUNCTION_MAP['cname'][0], FUNCTION_MAP['cname'][1], -1, -1),
|
|
}
|
|
|
|
def consume(self) -> None:
|
|
self.prof.enter_step('parse_massdns')
|
|
timestamp = 0
|
|
header = True
|
|
for line in self.buf:
|
|
line = line[:-1]
|
|
if not line:
|
|
header = True
|
|
continue
|
|
|
|
split = line.split(' ')
|
|
try:
|
|
if header:
|
|
timestamp = int(split[1])
|
|
header = False
|
|
else:
|
|
select, write, name_offset, value_offset = \
|
|
MassDnsParser.TYPES[split[1]]
|
|
record = (
|
|
select,
|
|
write,
|
|
timestamp,
|
|
split[0][:name_offset].lower(),
|
|
split[2][:value_offset].lower(),
|
|
)
|
|
self.register(record)
|
|
self.prof.enter_step('parse_massdns')
|
|
except KeyError:
|
|
continue
|
|
|
|
|
|
PARSERS = {
|
|
'rapid7': Rapid7Parser,
|
|
'massdns': MassDnsParser,
|
|
}
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Parsing arguments
|
|
log = logging.getLogger('feed_dns')
|
|
args_parser = argparse.ArgumentParser(
|
|
description="Read DNS records and import "
|
|
"tracking-relevant data into the database")
|
|
args_parser.add_argument(
|
|
'parser',
|
|
choices=PARSERS.keys(),
|
|
help="Input format")
|
|
args_parser.add_argument(
|
|
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
|
|
help="Input file")
|
|
args_parser.add_argument(
|
|
'-b', '--block-size', type=int, default=1024,
|
|
help="Performance tuning value")
|
|
args_parser.add_argument(
|
|
'-q', '--queue-size', type=int, default=128,
|
|
help="Performance tuning value")
|
|
args_parser.add_argument(
|
|
'-a', '--autosave-interval', type=int, default=900,
|
|
help="Interval to which the database will save in seconds. "
|
|
"0 to disable.")
|
|
args_parser.add_argument(
|
|
'-s', '--single-process', action='store_true',
|
|
help="Only use one process. "
|
|
"Might be useful for single core computers.")
|
|
args_parser.add_argument(
|
|
'-4', '--ip4-cache', type=int, default=0,
|
|
help="RAM cache for faster IPv4 lookup. "
|
|
"Maximum useful value: 512 MiB (536870912). "
|
|
"Warning: Depending on the rules, this might already "
|
|
"be a memory-heavy process, even without the cache.")
|
|
args = args_parser.parse_args()
|
|
|
|
parser_cls = PARSERS[args.parser]
|
|
if args.single_process:
|
|
writer = Writer(
|
|
autosave_interval=args.autosave_interval,
|
|
ip4_cache=args.ip4_cache
|
|
)
|
|
parser = parser_cls(args.input, writer=writer)
|
|
parser.run()
|
|
writer.end()
|
|
else:
|
|
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
|
maxsize=args.queue_size)
|
|
|
|
writer = Writer(recs_queue,
|
|
autosave_interval=args.autosave_interval,
|
|
ip4_cache=args.ip4_cache
|
|
)
|
|
writer.start()
|
|
|
|
parser = parser_cls(args.input,
|
|
recs_queue=recs_queue,
|
|
block_size=args.block_size
|
|
)
|
|
parser.run()
|
|
|
|
recs_queue.put(None)
|
|
writer.join()
|