eulaurarien/feed_dns.py

257 lines
7.5 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import database
import logging
import sys
import typing
import multiprocessing
import time
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
# select, write
FUNCTION_MAP: typing.Any = {
"a": (
database.Database.get_ip4,
database.Database.set_hostname,
),
"cname": (
database.Database.get_domain,
database.Database.set_hostname,
),
"ptr": (
database.Database.get_domain,
database.Database.set_ip4address,
),
}
class Writer(multiprocessing.Process):
def __init__(
self,
recs_queue: multiprocessing.Queue = None,
autosave_interval: int = 0,
ip4_cache: int = 0,
):
if recs_queue: # MP
super(Writer, self).__init__()
self.recs_queue = recs_queue
self.log = logging.getLogger("wr")
self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache
if not recs_queue: # No MP
self.open_db()
def open_db(self) -> None:
self.db = database.Database()
self.db.log = logging.getLogger("wr")
self.db.fill_ip4cache(max_size=self.ip4_cache)
def exec_record(self, record: Record) -> None:
self.db.enter_step("exec_record")
select, write, updated, name, value = record
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
except (ValueError, IndexError):
# ValueError: non-number in IP
# IndexError: IP too big
self.log.exception("Cannot execute: %s", record)
def end(self) -> None:
self.db.enter_step("end")
self.db.save()
def run(self) -> None:
self.open_db()
if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval
else:
next_save = 0
self.db.enter_step("block_wait")
block: typing.List[Record]
for block in iter(self.recs_queue.get, None):
assert block
record: Record
for record in block:
self.exec_record(record)
if next_save > 0 and time.time() > next_save:
self.log.info("Saving database...")
self.db.save()
self.log.info("Done!")
next_save = time.time() + self.autosave_interval
self.db.enter_step("block_wait")
self.end()
class Parser:
def __init__(
self,
buf: typing.Any,
recs_queue: multiprocessing.Queue = None,
block_size: int = 0,
writer: Writer = None,
):
assert bool(writer) ^ bool(block_size and recs_queue)
self.buf = buf
self.log = logging.getLogger("pr")
self.recs_queue = recs_queue
if writer: # No MP
self.prof: database.Profiler = writer.db
self.register = writer.exec_record
else: # MP
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
self.prof.log = logging.getLogger("pr")
self.register = self.add_to_queue
def add_to_queue(self, record: Record) -> None:
self.prof.enter_step("register")
self.block.append(record)
if len(self.block) >= self.block_size:
self.prof.enter_step("put_block")
assert self.recs_queue
self.recs_queue.put(self.block)
self.block = list()
def run(self) -> None:
self.consume()
if self.recs_queue:
self.recs_queue.put(self.block)
self.prof.profile()
def consume(self) -> None:
raise NotImplementedError
class MassDnsParser(Parser):
# massdns --output Snrql
# --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
TYPES = {
"A": (FUNCTION_MAP["a"][0], FUNCTION_MAP["a"][1], -1, None),
# 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
"CNAME": (FUNCTION_MAP["cname"][0], FUNCTION_MAP["cname"][1], -1, -1),
}
def consume(self) -> None:
self.prof.enter_step("parse_massdns")
timestamp = 0
header = True
for line in self.buf:
line = line.rstrip()
if not line:
header = True
continue
split = line.split(" ")
try:
if header:
timestamp = int(split[1])
header = False
else:
select, write, name_offset, value_offset = MassDnsParser.TYPES[
split[1]
]
record = (
select,
write,
timestamp,
split[0][:name_offset].lower(),
split[2][:value_offset].lower(),
)
self.register(record)
self.prof.enter_step("parse_massdns")
except KeyError:
# Unhandle record type
continue
except IndexError as err:
# Aborted file
print(err)
break
PARSERS = {
"massdns": MassDnsParser,
}
if __name__ == "__main__":
# Parsing arguments
log = logging.getLogger("feed_dns")
args_parser = argparse.ArgumentParser(
description="Read DNS records and import "
"tracking-relevant data into the database"
)
args_parser.add_argument("parser", choices=PARSERS.keys(), help="Input format")
args_parser.add_argument(
"-i",
"--input",
type=argparse.FileType("r"),
default=sys.stdin,
help="Input file",
)
args_parser.add_argument(
"-b", "--block-size", type=int, default=1024, help="Performance tuning value"
)
args_parser.add_argument(
"-q", "--queue-size", type=int, default=128, help="Performance tuning value"
)
args_parser.add_argument(
"-a",
"--autosave-interval",
type=int,
default=900,
help="Interval to which the database will save in seconds. " "0 to disable.",
)
args_parser.add_argument(
"-s",
"--single-process",
action="store_true",
help="Only use one process. " "Might be useful for single core computers.",
)
args_parser.add_argument(
"-4",
"--ip4-cache",
type=int,
default=0,
help="RAM cache for faster IPv4 lookup. "
"Maximum useful value: 512 MiB (536870912). "
"Warning: Depending on the rules, this might already "
"be a memory-heavy process, even without the cache.",
)
args = args_parser.parse_args()
parser_cls = PARSERS[args.parser]
if args.single_process:
writer = Writer(
autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache
)
parser = parser_cls(args.input, writer=writer)
parser.run()
writer.end()
else:
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
maxsize=args.queue_size
)
writer = Writer(
recs_queue,
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache,
)
writer.start()
parser = parser_cls(
args.input, recs_queue=recs_queue, block_size=args.block_size
)
parser.run()
recs_queue.put(None)
writer.join()