Make multi-processing optional for feed_dns
This commit is contained in:
parent
195f41bd9f
commit
c543e0eab6
69
feed_dns.py
69
feed_dns.py
|
@ -29,20 +29,37 @@ FUNCTION_MAP: typing.Any = {
|
|||
|
||||
class Writer(multiprocessing.Process):
|
||||
def __init__(self,
|
||||
recs_queue: multiprocessing.Queue,
|
||||
recs_queue: multiprocessing.Queue = None,
|
||||
autosave_interval: int = 0,
|
||||
ip4_cache: int = 0,
|
||||
):
|
||||
if recs_queue: # MP
|
||||
super(Writer, self).__init__()
|
||||
self.log = logging.getLogger(f'wr')
|
||||
self.recs_queue = recs_queue
|
||||
self.log = logging.getLogger(f'wr')
|
||||
self.autosave_interval = autosave_interval
|
||||
self.ip4_cache = ip4_cache
|
||||
if not recs_queue: # No MP
|
||||
self.open_db()
|
||||
|
||||
def run(self) -> None:
|
||||
def open_db(self) -> None:
|
||||
self.db = database.Database()
|
||||
self.db.log = logging.getLogger(f'wr')
|
||||
self.db.fill_ip4cache(max_size=self.ip4_cache)
|
||||
|
||||
def exec_record(self, record: Record) -> None:
|
||||
self.db.enter_step('exec_record')
|
||||
select, write, updated, name, value = record
|
||||
try:
|
||||
for source in select(self.db, value):
|
||||
write(self.db, name, updated, source=source)
|
||||
except (ValueError, IndexError):
|
||||
# ValueError: non-number in IP
|
||||
# IndexError: IP too big
|
||||
self.log.exception("Cannot execute: %s", record)
|
||||
|
||||
def run(self) -> None:
|
||||
self.open_db()
|
||||
if self.autosave_interval > 0:
|
||||
next_save = time.time() + self.autosave_interval
|
||||
else:
|
||||
|
@ -54,17 +71,7 @@ class Writer(multiprocessing.Process):
|
|||
|
||||
record: Record
|
||||
for record in block:
|
||||
|
||||
select, write, updated, name, value = record
|
||||
self.db.enter_step('feed_switch')
|
||||
|
||||
try:
|
||||
for source in select(self.db, value):
|
||||
write(self.db, name, updated, source=source)
|
||||
except (ValueError, IndexError):
|
||||
# ValueError: non-number in IP
|
||||
# IndexError: IP too big
|
||||
self.log.exception("Cannot execute: %s", record)
|
||||
self.exec_record(record)
|
||||
|
||||
if next_save > 0 and time.time() > next_save:
|
||||
self.log.info("Saving database...")
|
||||
|
@ -81,28 +88,36 @@ class Writer(multiprocessing.Process):
|
|||
class Parser():
|
||||
def __init__(self,
|
||||
buf: typing.Any,
|
||||
recs_queue: multiprocessing.Queue,
|
||||
block_size: int,
|
||||
recs_queue: multiprocessing.Queue = None,
|
||||
block_size: int = 0,
|
||||
writer: Writer = None,
|
||||
):
|
||||
super(Parser, self).__init__()
|
||||
assert bool(writer) ^ bool(block_size and recs_queue)
|
||||
self.buf = buf
|
||||
self.log = logging.getLogger('pr')
|
||||
self.recs_queue = recs_queue
|
||||
if writer: # No MP
|
||||
self.prof: database.Profiler = writer.db
|
||||
self.register = writer.exec_record
|
||||
else: # MP
|
||||
self.block: typing.List[Record] = list()
|
||||
self.block_size = block_size
|
||||
self.prof = database.Profiler()
|
||||
self.prof.log = logging.getLogger('pr')
|
||||
self.register = self.add_to_queue
|
||||
|
||||
def register(self, record: Record) -> None:
|
||||
def add_to_queue(self, record: Record) -> None:
|
||||
self.prof.enter_step('register')
|
||||
self.block.append(record)
|
||||
if len(self.block) >= self.block_size:
|
||||
self.prof.enter_step('put_block')
|
||||
assert self.recs_queue
|
||||
self.recs_queue.put(self.block)
|
||||
self.block = list()
|
||||
|
||||
def run(self) -> None:
|
||||
self.consume()
|
||||
if self.recs_queue:
|
||||
self.recs_queue.put(self.block)
|
||||
self.prof.profile()
|
||||
|
||||
|
@ -205,6 +220,10 @@ if __name__ == '__main__':
|
|||
'-a', '--autosave-interval', type=int, default=900,
|
||||
help="Interval to which the database will save in seconds. "
|
||||
"0 to disable.")
|
||||
args_parser.add_argument(
|
||||
'-s', '--single-process', action='store_true',
|
||||
help="Only use one process. "
|
||||
"Might be useful for single core computers.")
|
||||
args_parser.add_argument(
|
||||
'-4', '--ip4-cache', type=int, default=0,
|
||||
help="RAM cache for faster IPv4 lookup. "
|
||||
|
@ -213,6 +232,15 @@ if __name__ == '__main__':
|
|||
"be a memory-heavy process, even without the cache.")
|
||||
args = args_parser.parse_args()
|
||||
|
||||
parser_cls = PARSERS[args.parser]
|
||||
if args.single_process:
|
||||
writer = Writer(
|
||||
autosave_interval=args.autosave_interval,
|
||||
ip4_cache=args.ip4_cache
|
||||
)
|
||||
parser = parser_cls(args.input, writer=writer)
|
||||
parser.run()
|
||||
else:
|
||||
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
||||
maxsize=args.queue_size)
|
||||
|
||||
|
@ -222,7 +250,10 @@ if __name__ == '__main__':
|
|||
)
|
||||
writer.start()
|
||||
|
||||
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)
|
||||
parser = parser_cls(args.input,
|
||||
recs_queue=recs_queue,
|
||||
block_size=args.block_size
|
||||
)
|
||||
parser.run()
|
||||
|
||||
recs_queue.put(None)
|
||||
|
|
Loading…
Reference in a new issue