eulaurarien/feed_dns.py

281 lines
8.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
import database
import logging
import sys
2019-12-13 11:36:11 +00:00
import typing
2019-12-15 16:05:41 +00:00
import multiprocessing
import time
2019-12-15 16:12:44 +00:00
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
# select, write
FUNCTION_MAP: typing.Any = {
2021-08-14 21:27:28 +00:00
"a": (
database.Database.get_ip4,
database.Database.set_hostname,
),
2021-08-14 21:27:28 +00:00
"cname": (
database.Database.get_domain,
database.Database.set_hostname,
),
2021-08-14 21:27:28 +00:00
"ptr": (
database.Database.get_domain,
database.Database.set_ip4address,
),
}
2019-12-15 16:05:41 +00:00
class Writer(multiprocessing.Process):
2021-08-14 21:27:28 +00:00
def __init__(
self,
recs_queue: multiprocessing.Queue = None,
autosave_interval: int = 0,
ip4_cache: int = 0,
):
if recs_queue: # MP
super(Writer, self).__init__()
self.recs_queue = recs_queue
2021-08-14 21:35:51 +00:00
self.log = logging.getLogger("wr")
self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache
if not recs_queue: # No MP
self.open_db()
2019-12-15 16:05:41 +00:00
def open_db(self) -> None:
self.db = database.Database()
2021-08-14 21:35:51 +00:00
self.db.log = logging.getLogger("wr")
self.db.fill_ip4cache(max_size=self.ip4_cache)
def exec_record(self, record: Record) -> None:
2021-08-14 21:27:28 +00:00
self.db.enter_step("exec_record")
select, write, updated, name, value = record
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
2019-12-27 00:10:21 +00:00
except (ValueError, IndexError):
# ValueError: non-number in IP
2019-12-27 00:10:21 +00:00
# IndexError: IP too big
self.log.exception("Cannot execute: %s", record)
def end(self) -> None:
2021-08-14 21:27:28 +00:00
self.db.enter_step("end")
self.db.save()
def run(self) -> None:
self.open_db()
if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval
else:
next_save = 0
2019-12-15 16:05:41 +00:00
2021-08-14 21:27:28 +00:00
self.db.enter_step("block_wait")
2019-12-15 16:05:41 +00:00
block: typing.List[Record]
for block in iter(self.recs_queue.get, None):
2021-08-14 21:35:51 +00:00
assert block
2019-12-15 16:05:41 +00:00
record: Record
for record in block:
self.exec_record(record)
2019-12-15 16:05:41 +00:00
if next_save > 0 and time.time() > next_save:
self.log.info("Saving database...")
self.db.save()
self.log.info("Done!")
next_save = time.time() + self.autosave_interval
2021-08-14 21:27:28 +00:00
self.db.enter_step("block_wait")
self.end()
2021-08-14 21:27:28 +00:00
class Parser:
def __init__(
self,
buf: typing.Any,
recs_queue: multiprocessing.Queue = None,
block_size: int = 0,
writer: Writer = None,
):
assert bool(writer) ^ bool(block_size and recs_queue)
2019-12-15 16:05:41 +00:00
self.buf = buf
2021-08-14 21:27:28 +00:00
self.log = logging.getLogger("pr")
2019-12-15 16:05:41 +00:00
self.recs_queue = recs_queue
if writer: # No MP
self.prof: database.Profiler = writer.db
self.register = writer.exec_record
else: # MP
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
2021-08-14 21:27:28 +00:00
self.prof.log = logging.getLogger("pr")
self.register = self.add_to_queue
def add_to_queue(self, record: Record) -> None:
2021-08-14 21:27:28 +00:00
self.prof.enter_step("register")
2019-12-15 16:05:41 +00:00
self.block.append(record)
if len(self.block) >= self.block_size:
2021-08-14 21:27:28 +00:00
self.prof.enter_step("put_block")
assert self.recs_queue
2019-12-15 16:05:41 +00:00
self.recs_queue.put(self.block)
self.block = list()
def run(self) -> None:
self.consume()
if self.recs_queue:
self.recs_queue.put(self.block)
2019-12-15 16:05:41 +00:00
self.prof.profile()
def consume(self) -> None:
raise NotImplementedError
class Rapid7Parser(Parser):
def consume(self) -> None:
2019-12-15 15:38:01 +00:00
data = dict()
for line in self.buf:
2021-08-14 21:27:28 +00:00
self.prof.enter_step("parse_rapid7")
2019-12-15 15:38:01 +00:00
split = line.split('"')
try:
for k in range(1, 14, 4):
key = split[k]
2021-08-14 21:27:28 +00:00
val = split[k + 2]
data[key] = val
2021-08-14 21:27:28 +00:00
select, writer = FUNCTION_MAP[data["type"]]
record = (
select,
writer,
2021-08-14 21:27:28 +00:00
int(data["timestamp"]),
data["name"],
data["value"],
)
2019-12-27 00:10:21 +00:00
except (IndexError, KeyError):
2021-08-14 21:27:28 +00:00
# IndexError: missing field
# KeyError: Unknown type field
self.log.exception("Cannot parse: %s", line)
2019-12-15 16:05:41 +00:00
self.register(record)
2019-12-18 00:03:08 +00:00
class MassDnsParser(Parser):
# massdns --output Snrql
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
TYPES = {
2021-08-14 21:27:28 +00:00
"A": (FUNCTION_MAP["a"][0], FUNCTION_MAP["a"][1], -1, None),
2019-12-15 16:12:44 +00:00
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
2021-08-14 21:27:28 +00:00
"CNAME": (FUNCTION_MAP["cname"][0], FUNCTION_MAP["cname"][1], -1, -1),
}
def consume(self) -> None:
2021-08-14 21:27:28 +00:00
self.prof.enter_step("parse_massdns")
timestamp = 0
header = True
for line in self.buf:
line = line[:-1]
if not line:
header = True
continue
2021-08-14 21:27:28 +00:00
split = line.split(" ")
try:
if header:
timestamp = int(split[1])
header = False
else:
2021-08-14 21:27:28 +00:00
select, write, name_offset, value_offset = MassDnsParser.TYPES[
split[1]
]
2019-12-15 16:05:41 +00:00
record = (
2019-12-15 16:12:44 +00:00
select,
write,
timestamp,
split[0][:name_offset].lower(),
split[2][:value_offset].lower(),
)
2019-12-15 16:05:41 +00:00
self.register(record)
2021-08-14 21:27:28 +00:00
self.prof.enter_step("parse_massdns")
except KeyError:
continue
PARSERS = {
2021-08-14 21:27:28 +00:00
"rapid7": Rapid7Parser,
"massdns": MassDnsParser,
}
2021-08-14 21:27:28 +00:00
if __name__ == "__main__":
2019-12-13 11:36:11 +00:00
# Parsing arguments
2021-08-14 21:27:28 +00:00
log = logging.getLogger("feed_dns")
args_parser = argparse.ArgumentParser(
description="Read DNS records and import "
2021-08-14 21:27:28 +00:00
"tracking-relevant data into the database"
)
args_parser.add_argument("parser", choices=PARSERS.keys(), help="Input format")
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-i",
"--input",
type=argparse.FileType("r"),
default=sys.stdin,
help="Input file",
)
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-b", "--block-size", type=int, default=1024, help="Performance tuning value"
)
2019-12-15 16:05:41 +00:00
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-q", "--queue-size", type=int, default=128, help="Performance tuning value"
)
2019-12-15 16:05:41 +00:00
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-a",
"--autosave-interval",
type=int,
default=900,
help="Interval to which the database will save in seconds. " "0 to disable.",
)
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-s",
"--single-process",
action="store_true",
help="Only use one process. " "Might be useful for single core computers.",
)
args_parser.add_argument(
2021-08-14 21:27:28 +00:00
"-4",
"--ip4-cache",
type=int,
default=0,
help="RAM cache for faster IPv4 lookup. "
"Maximum useful value: 512 MiB (536870912). "
"Warning: Depending on the rules, this might already "
2021-08-14 21:27:28 +00:00
"be a memory-heavy process, even without the cache.",
)
args = args_parser.parse_args()
2019-12-13 11:36:11 +00:00
parser_cls = PARSERS[args.parser]
if args.single_process:
writer = Writer(
2021-08-14 21:27:28 +00:00
autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache
)
parser = parser_cls(args.input, writer=writer)
parser.run()
writer.end()
else:
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
2021-08-14 21:27:28 +00:00
maxsize=args.queue_size
)
2021-08-14 21:27:28 +00:00
writer = Writer(
recs_queue,
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache,
)
writer.start()
2021-08-14 21:27:28 +00:00
parser = parser_cls(
args.input, recs_queue=recs_queue, block_size=args.block_size
)
parser.run()
recs_queue.put(None)
writer.join()