281 lines
8.2 KiB
Python
Executable file
281 lines
8.2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import database
|
|
import logging
|
|
import sys
|
|
import typing
|
|
import multiprocessing
|
|
import time
|
|
|
|
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
|
|
|
|
# select, write
|
|
FUNCTION_MAP: typing.Any = {
|
|
"a": (
|
|
database.Database.get_ip4,
|
|
database.Database.set_hostname,
|
|
),
|
|
"cname": (
|
|
database.Database.get_domain,
|
|
database.Database.set_hostname,
|
|
),
|
|
"ptr": (
|
|
database.Database.get_domain,
|
|
database.Database.set_ip4address,
|
|
),
|
|
}
|
|
|
|
|
|
class Writer(multiprocessing.Process):
|
|
def __init__(
|
|
self,
|
|
recs_queue: multiprocessing.Queue = None,
|
|
autosave_interval: int = 0,
|
|
ip4_cache: int = 0,
|
|
):
|
|
if recs_queue: # MP
|
|
super(Writer, self).__init__()
|
|
self.recs_queue = recs_queue
|
|
self.log = logging.getLogger("wr")
|
|
self.autosave_interval = autosave_interval
|
|
self.ip4_cache = ip4_cache
|
|
if not recs_queue: # No MP
|
|
self.open_db()
|
|
|
|
def open_db(self) -> None:
|
|
self.db = database.Database()
|
|
self.db.log = logging.getLogger("wr")
|
|
self.db.fill_ip4cache(max_size=self.ip4_cache)
|
|
|
|
def exec_record(self, record: Record) -> None:
|
|
self.db.enter_step("exec_record")
|
|
select, write, updated, name, value = record
|
|
try:
|
|
for source in select(self.db, value):
|
|
write(self.db, name, updated, source=source)
|
|
except (ValueError, IndexError):
|
|
# ValueError: non-number in IP
|
|
# IndexError: IP too big
|
|
self.log.exception("Cannot execute: %s", record)
|
|
|
|
def end(self) -> None:
|
|
self.db.enter_step("end")
|
|
self.db.save()
|
|
|
|
def run(self) -> None:
|
|
self.open_db()
|
|
if self.autosave_interval > 0:
|
|
next_save = time.time() + self.autosave_interval
|
|
else:
|
|
next_save = 0
|
|
|
|
self.db.enter_step("block_wait")
|
|
block: typing.List[Record]
|
|
for block in iter(self.recs_queue.get, None):
|
|
|
|
assert block
|
|
record: Record
|
|
for record in block:
|
|
self.exec_record(record)
|
|
|
|
if next_save > 0 and time.time() > next_save:
|
|
self.log.info("Saving database...")
|
|
self.db.save()
|
|
self.log.info("Done!")
|
|
next_save = time.time() + self.autosave_interval
|
|
|
|
self.db.enter_step("block_wait")
|
|
self.end()
|
|
|
|
|
|
class Parser:
|
|
def __init__(
|
|
self,
|
|
buf: typing.Any,
|
|
recs_queue: multiprocessing.Queue = None,
|
|
block_size: int = 0,
|
|
writer: Writer = None,
|
|
):
|
|
assert bool(writer) ^ bool(block_size and recs_queue)
|
|
self.buf = buf
|
|
self.log = logging.getLogger("pr")
|
|
self.recs_queue = recs_queue
|
|
if writer: # No MP
|
|
self.prof: database.Profiler = writer.db
|
|
self.register = writer.exec_record
|
|
else: # MP
|
|
self.block: typing.List[Record] = list()
|
|
self.block_size = block_size
|
|
self.prof = database.Profiler()
|
|
self.prof.log = logging.getLogger("pr")
|
|
self.register = self.add_to_queue
|
|
|
|
def add_to_queue(self, record: Record) -> None:
|
|
self.prof.enter_step("register")
|
|
self.block.append(record)
|
|
if len(self.block) >= self.block_size:
|
|
self.prof.enter_step("put_block")
|
|
assert self.recs_queue
|
|
self.recs_queue.put(self.block)
|
|
self.block = list()
|
|
|
|
def run(self) -> None:
|
|
self.consume()
|
|
if self.recs_queue:
|
|
self.recs_queue.put(self.block)
|
|
self.prof.profile()
|
|
|
|
def consume(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class Rapid7Parser(Parser):
|
|
def consume(self) -> None:
|
|
data = dict()
|
|
for line in self.buf:
|
|
self.prof.enter_step("parse_rapid7")
|
|
split = line.split('"')
|
|
|
|
try:
|
|
for k in range(1, 14, 4):
|
|
key = split[k]
|
|
val = split[k + 2]
|
|
data[key] = val
|
|
|
|
select, writer = FUNCTION_MAP[data["type"]]
|
|
record = (
|
|
select,
|
|
writer,
|
|
int(data["timestamp"]),
|
|
data["name"],
|
|
data["value"],
|
|
)
|
|
except (IndexError, KeyError):
|
|
# IndexError: missing field
|
|
# KeyError: Unknown type field
|
|
self.log.exception("Cannot parse: %s", line)
|
|
self.register(record)
|
|
|
|
|
|
class MassDnsParser(Parser):
|
|
# massdns --output Snrql
|
|
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
|
|
TYPES = {
|
|
"A": (FUNCTION_MAP["a"][0], FUNCTION_MAP["a"][1], -1, None),
|
|
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
|
|
"CNAME": (FUNCTION_MAP["cname"][0], FUNCTION_MAP["cname"][1], -1, -1),
|
|
}
|
|
|
|
def consume(self) -> None:
|
|
self.prof.enter_step("parse_massdns")
|
|
timestamp = 0
|
|
header = True
|
|
for line in self.buf:
|
|
line = line[:-1]
|
|
if not line:
|
|
header = True
|
|
continue
|
|
|
|
split = line.split(" ")
|
|
try:
|
|
if header:
|
|
timestamp = int(split[1])
|
|
header = False
|
|
else:
|
|
select, write, name_offset, value_offset = MassDnsParser.TYPES[
|
|
split[1]
|
|
]
|
|
record = (
|
|
select,
|
|
write,
|
|
timestamp,
|
|
split[0][:name_offset].lower(),
|
|
split[2][:value_offset].lower(),
|
|
)
|
|
self.register(record)
|
|
self.prof.enter_step("parse_massdns")
|
|
except KeyError:
|
|
continue
|
|
|
|
|
|
PARSERS = {
|
|
"rapid7": Rapid7Parser,
|
|
"massdns": MassDnsParser,
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# Parsing arguments
|
|
log = logging.getLogger("feed_dns")
|
|
args_parser = argparse.ArgumentParser(
|
|
description="Read DNS records and import "
|
|
"tracking-relevant data into the database"
|
|
)
|
|
args_parser.add_argument("parser", choices=PARSERS.keys(), help="Input format")
|
|
args_parser.add_argument(
|
|
"-i",
|
|
"--input",
|
|
type=argparse.FileType("r"),
|
|
default=sys.stdin,
|
|
help="Input file",
|
|
)
|
|
args_parser.add_argument(
|
|
"-b", "--block-size", type=int, default=1024, help="Performance tuning value"
|
|
)
|
|
args_parser.add_argument(
|
|
"-q", "--queue-size", type=int, default=128, help="Performance tuning value"
|
|
)
|
|
args_parser.add_argument(
|
|
"-a",
|
|
"--autosave-interval",
|
|
type=int,
|
|
default=900,
|
|
help="Interval to which the database will save in seconds. " "0 to disable.",
|
|
)
|
|
args_parser.add_argument(
|
|
"-s",
|
|
"--single-process",
|
|
action="store_true",
|
|
help="Only use one process. " "Might be useful for single core computers.",
|
|
)
|
|
args_parser.add_argument(
|
|
"-4",
|
|
"--ip4-cache",
|
|
type=int,
|
|
default=0,
|
|
help="RAM cache for faster IPv4 lookup. "
|
|
"Maximum useful value: 512 MiB (536870912). "
|
|
"Warning: Depending on the rules, this might already "
|
|
"be a memory-heavy process, even without the cache.",
|
|
)
|
|
args = args_parser.parse_args()
|
|
|
|
parser_cls = PARSERS[args.parser]
|
|
if args.single_process:
|
|
writer = Writer(
|
|
autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache
|
|
)
|
|
parser = parser_cls(args.input, writer=writer)
|
|
parser.run()
|
|
writer.end()
|
|
else:
|
|
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
|
maxsize=args.queue_size
|
|
)
|
|
|
|
writer = Writer(
|
|
recs_queue,
|
|
autosave_interval=args.autosave_interval,
|
|
ip4_cache=args.ip4_cache,
|
|
)
|
|
writer.start()
|
|
|
|
parser = parser_cls(
|
|
args.input, recs_queue=recs_queue, block_size=args.block_size
|
|
)
|
|
parser.run()
|
|
|
|
recs_queue.put(None)
|
|
writer.join()
|