|
|
@ -6,7 +6,7 @@ import logging |
|
|
|
import sys |
|
|
|
import typing |
|
|
|
import multiprocessing |
|
|
|
import enum |
|
|
|
import time |
|
|
|
|
|
|
|
Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str] |
|
|
|
|
|
|
@ -30,14 +30,19 @@ FUNCTION_MAP: typing.Any = { |
|
|
|
class Writer(multiprocessing.Process): |
|
|
|
def __init__(self, |
|
|
|
recs_queue: multiprocessing.Queue, |
|
|
|
index: int = 0): |
|
|
|
autosave_interval: int = 0): |
|
|
|
super(Writer, self).__init__() |
|
|
|
self.log = logging.getLogger(f'wr') |
|
|
|
self.recs_queue = recs_queue |
|
|
|
self.autosave_interval = autosave_interval |
|
|
|
|
|
|
|
def run(self) -> None: |
|
|
|
self.db = database.Database() |
|
|
|
self.db.log = logging.getLogger(f'wr') |
|
|
|
if self.autosave_interval > 0: |
|
|
|
next_save = time.time() + self.autosave_interval |
|
|
|
else: |
|
|
|
next_save = 0 |
|
|
|
|
|
|
|
self.db.enter_step('block_wait') |
|
|
|
block: typing.List[Record] |
|
|
@ -55,6 +60,12 @@ class Writer(multiprocessing.Process): |
|
|
|
except ValueError: |
|
|
|
self.log.exception("Cannot execute: %s", record) |
|
|
|
|
|
|
|
if next_save > 0 and time.time() > next_save: |
|
|
|
self.log.info("Saving database...") |
|
|
|
self.db.save() |
|
|
|
self.log.info("Done!") |
|
|
|
next_save = time.time() + self.autosave_interval |
|
|
|
|
|
|
|
self.db.enter_step('block_wait') |
|
|
|
|
|
|
|
self.db.enter_step('end') |
|
|
@ -186,12 +197,15 @@ if __name__ == '__main__': |
|
|
|
args_parser.add_argument( |
|
|
|
'-q', '--queue-size', type=int, default=128, |
|
|
|
help="TODO") |
|
|
|
args_parser.add_argument( |
|
|
|
'-a', '--autosave-interval', type=int, default=900, |
|
|
|
help="TODO seconds") |
|
|
|
args = args_parser.parse_args() |
|
|
|
|
|
|
|
recs_queue: multiprocessing.Queue = multiprocessing.Queue( |
|
|
|
maxsize=args.queue_size) |
|
|
|
|
|
|
|
writer = Writer(recs_queue) |
|
|
|
writer = Writer(recs_queue, autosave_interval=args.autosave_interval) |
|
|
|
writer.start() |
|
|
|
|
|
|
|
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size) |
|
|
|