Multi-processed parser

This commit is contained in:
Geoffrey Frogeye 2019-12-15 17:05:41 +01:00
parent ce52897d30
commit 45325782d2
Signed by: geoffrey
GPG key ID: D8A7ECA00A8CD3DD

View file

@ -5,6 +5,7 @@ import database
import logging import logging
import sys import sys
import typing import typing
import multiprocessing
import enum import enum
RecordType = enum.Enum('RecordType', 'A AAAA CNAME PTR') RecordType = enum.Enum('RecordType', 'A AAAA CNAME PTR')
@ -27,28 +28,67 @@ FUNCTION_MAP: typing.Any = {
} }
class Parser(): class Writer(multiprocessing.Process):
def __init__(self, buf: typing.Any) -> None: def __init__(self,
self.buf = buf recs_queue: multiprocessing.Queue,
self.log = logging.getLogger('parser') index: int = 0):
super(Writer, self).__init__()
self.log = logging.getLogger(f'wr')
self.recs_queue = recs_queue
def run(self) -> None:
self.db = database.Database() self.db = database.Database()
self.db.log = logging.getLogger(f'wr')
def end(self) -> None: self.db.enter_step('block_wait')
self.db.save() block: typing.List[Record]
for block in iter(self.recs_queue.get, None):
def register(self, record: Record
rtype: RecordType, for record in block:
updated: int,
name: str, rtype, updated, name, value = record
value: str self.db.enter_step('feed_switch')
) -> None:
self.db.enter_step('register')
select, write = FUNCTION_MAP[rtype] select, write = FUNCTION_MAP[rtype]
for source in select(self.db, value): for source in select(self.db, value):
# write(self.db, name, updated, source=source) # write(self.db, name, updated, source=source)
write(self.db, name, updated) write(self.db, name, updated)
self.db.enter_step('block_wait')
self.db.enter_step('end')
self.db.save()
class Parser():
def __init__(self,
buf: typing.Any,
recs_queue: multiprocessing.Queue,
block_size: int,
):
super(Parser, self).__init__()
self.buf = buf
self.log = logging.getLogger('pr')
self.recs_queue = recs_queue
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
self.prof.log = logging.getLogger('pr')
def register(self, record: Record) -> None:
self.prof.enter_step('register')
self.block.append(record)
if len(self.block) >= self.block_size:
self.prof.enter_step('put_block')
self.recs_queue.put(self.block)
self.block = list()
def run(self) -> None:
self.consume()
self.recs_queue.put(self.block)
self.prof.profile()
def consume(self) -> None: def consume(self) -> None:
raise NotImplementedError raise NotImplementedError
@ -64,7 +104,7 @@ class Rapid7Parser(Parser):
def consume(self) -> None: def consume(self) -> None:
data = dict() data = dict()
for line in self.buf: for line in self.buf:
self.db.enter_step('parse_rapid7') self.prof.enter_step('parse_rapid7')
split = line.split('"') split = line.split('"')
for k in range(1, 14, 4): for k in range(1, 14, 4):
@ -72,12 +112,13 @@ class Rapid7Parser(Parser):
val = split[k+2] val = split[k+2]
data[key] = val data[key] = val
self.register( record = (
Rapid7Parser.TYPES[data['type']], Rapid7Parser.TYPES[data['type']],
int(data['timestamp']), int(data['timestamp']),
data['name'], data['name'],
data['value'] data['value']
) )
self.register(record)
class DnsMassParser(Parser): class DnsMassParser(Parser):
@ -90,7 +131,7 @@ class DnsMassParser(Parser):
} }
def consume(self) -> None: def consume(self) -> None:
self.db.enter_step('parse_dnsmass') self.prof.enter_step('parse_dnsmass')
timestamp = 0 timestamp = 0
header = True header = True
for line in self.buf: for line in self.buf:
@ -107,13 +148,14 @@ class DnsMassParser(Parser):
else: else:
dtype, name_offset, value_offset = \ dtype, name_offset, value_offset = \
DnsMassParser.TYPES[split[1]] DnsMassParser.TYPES[split[1]]
self.register( record = (
dtype, dtype,
timestamp, timestamp,
split[0][:name_offset], split[0][:name_offset],
split[2][:value_offset], split[2][:value_offset],
) )
self.db.enter_step('parse_dnsmass') self.register(record)
self.prof.enter_step('parse_dnsmass')
except KeyError: except KeyError:
continue continue
@ -136,12 +178,25 @@ if __name__ == '__main__':
args_parser.add_argument( args_parser.add_argument(
'-i', '--input', type=argparse.FileType('r'), default=sys.stdin, '-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
help="TODO") help="TODO")
args_parser.add_argument(
'-j', '--workers', type=int, default=4,
help="TODO")
args_parser.add_argument(
'-b', '--block-size', type=int, default=100,
help="TODO")
args_parser.add_argument(
'-q', '--queue-size', type=int, default=10,
help="TODO")
args = args_parser.parse_args() args = args_parser.parse_args()
parser = PARSERS[args.parser](args.input) recs_queue: multiprocessing.Queue = multiprocessing.Queue(
try: maxsize=args.queue_size)
parser.consume()
except KeyboardInterrupt:
pass
parser.end()
writer = Writer(recs_queue)
writer.start()
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)
parser.run()
recs_queue.put(None)
writer.join()