From c543e0eab664971f6db158ecad338c775c581b8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geoffrey=20=E2=80=9CFrogeye=E2=80=9D=20Preud=27homme?= Date: Wed, 25 Dec 2019 13:04:15 +0100 Subject: [PATCH] Make multi-processing optional for feed_dns --- feed_dns.py | 101 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 35 deletions(-) diff --git a/feed_dns.py b/feed_dns.py index ebb8fcb..20eeb69 100755 --- a/feed_dns.py +++ b/feed_dns.py @@ -29,20 +29,37 @@ FUNCTION_MAP: typing.Any = { class Writer(multiprocessing.Process): def __init__(self, - recs_queue: multiprocessing.Queue, + recs_queue: multiprocessing.Queue = None, autosave_interval: int = 0, ip4_cache: int = 0, ): - super(Writer, self).__init__() + if recs_queue: # MP + super(Writer, self).__init__() + self.recs_queue = recs_queue self.log = logging.getLogger(f'wr') - self.recs_queue = recs_queue self.autosave_interval = autosave_interval self.ip4_cache = ip4_cache + if not recs_queue: # No MP + self.open_db() - def run(self) -> None: + def open_db(self) -> None: self.db = database.Database() self.db.log = logging.getLogger(f'wr') self.db.fill_ip4cache(max_size=self.ip4_cache) + + def exec_record(self, record: Record) -> None: + self.db.enter_step('exec_record') + select, write, updated, name, value = record + try: + for source in select(self.db, value): + write(self.db, name, updated, source=source) + except (ValueError, IndexError): + # ValueError: non-number in IP + # IndexError: IP too big + self.log.exception("Cannot execute: %s", record) + + def run(self) -> None: + self.open_db() if self.autosave_interval > 0: next_save = time.time() + self.autosave_interval else: @@ -54,17 +71,7 @@ class Writer(multiprocessing.Process): record: Record for record in block: - - select, write, updated, name, value = record - self.db.enter_step('feed_switch') - - try: - for source in select(self.db, value): - write(self.db, name, updated, source=source) - except (ValueError, IndexError): - # ValueError: non-number in IP - # IndexError: IP too big - self.log.exception("Cannot execute: %s", record) + self.exec_record(record) if next_save > 0 and time.time() > next_save: self.log.info("Saving database...") @@ -81,29 +88,37 @@ class Writer(multiprocessing.Process): class Parser(): def __init__(self, buf: typing.Any, - recs_queue: multiprocessing.Queue, - block_size: int, + recs_queue: multiprocessing.Queue = None, + block_size: int = 0, + writer: Writer = None, ): - super(Parser, self).__init__() + assert bool(writer) ^ bool(block_size and recs_queue) self.buf = buf self.log = logging.getLogger('pr') self.recs_queue = recs_queue - self.block: typing.List[Record] = list() - self.block_size = block_size - self.prof = database.Profiler() - self.prof.log = logging.getLogger('pr') + if writer: # No MP + self.prof: database.Profiler = writer.db + self.register = writer.exec_record + else: # MP + self.block: typing.List[Record] = list() + self.block_size = block_size + self.prof = database.Profiler() + self.prof.log = logging.getLogger('pr') + self.register = self.add_to_queue - def register(self, record: Record) -> None: + def add_to_queue(self, record: Record) -> None: self.prof.enter_step('register') self.block.append(record) if len(self.block) >= self.block_size: self.prof.enter_step('put_block') + assert self.recs_queue self.recs_queue.put(self.block) self.block = list() def run(self) -> None: self.consume() - self.recs_queue.put(self.block) + if self.recs_queue: + self.recs_queue.put(self.block) self.prof.profile() def consume(self) -> None: @@ -205,6 +220,10 @@ if __name__ == '__main__': '-a', '--autosave-interval', type=int, default=900, help="Interval to which the database will save in seconds. " "0 to disable.") + args_parser.add_argument( + '-s', '--single-process', action='store_true', + help="Only use one process. " + "Might be useful for single core computers.") args_parser.add_argument( '-4', '--ip4-cache', type=int, default=0, help="RAM cache for faster IPv4 lookup. " @@ -213,17 +232,29 @@ if __name__ == '__main__': "be a memory-heavy process, even without the cache.") args = args_parser.parse_args() - recs_queue: multiprocessing.Queue = multiprocessing.Queue( - maxsize=args.queue_size) + parser_cls = PARSERS[args.parser] + if args.single_process: + writer = Writer( + autosave_interval=args.autosave_interval, + ip4_cache=args.ip4_cache + ) + parser = parser_cls(args.input, writer=writer) + parser.run() + else: + recs_queue: multiprocessing.Queue = multiprocessing.Queue( + maxsize=args.queue_size) - writer = Writer(recs_queue, - autosave_interval=args.autosave_interval, - ip4_cache=args.ip4_cache - ) - writer.start() + writer = Writer(recs_queue, + autosave_interval=args.autosave_interval, + ip4_cache=args.ip4_cache + ) + writer.start() - parser = PARSERS[args.parser](args.input, recs_queue, args.block_size) - parser.run() + parser = parser_cls(args.input, + recs_queue=recs_queue, + block_size=args.block_size + ) + parser.run() - recs_queue.put(None) - writer.join() + recs_queue.put(None) + writer.join()