Compare commits

...

3 commits

2 changed files with 91 additions and 44 deletions

View file

@ -227,10 +227,19 @@ class Database(Profiler):
self.log.warning("Allocating more than 512 MiB of RAM for " self.log.warning("Allocating more than 512 MiB of RAM for "
"the Ip4 cache is not necessary.") "the Ip4 cache is not necessary.")
max_cache_width = int(math.log2(max(1, max_size*8))) max_cache_width = int(math.log2(max(1, max_size*8)))
allocated = False
cache_width = min(2**32, max_cache_width) cache_width = min(2**32, max_cache_width)
while not allocated:
cache_size = 2**cache_width
try:
self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
except MemoryError:
self.log.exception(
"Could not allocate cache. Retrying a smaller one.")
cache_width -= 1
continue
allocated = True
self.ip4cache_shift = 32-cache_width self.ip4cache_shift = 32-cache_width
cache_size = 2**cache_width
self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
for _ in self.exec_each_ip4(self._set_ip4cache): for _ in self.exec_each_ip4(self._set_ip4cache):
pass pass
@ -288,11 +297,16 @@ class Database(Profiler):
return True return True
@staticmethod @staticmethod
def pack_ip4address(address: str) -> Ip4Path: def pack_ip4address_low(address: str) -> int:
addr = 0 addr = 0
for split in address.split('.'): for split in address.split('.'):
addr = (addr << 8) + int(split) octet = int(split)
return Ip4Path(addr, 32) addr = (addr << 8) + octet
return addr
@staticmethod
def pack_ip4address(address: str) -> Ip4Path:
return Ip4Path(Database.pack_ip4address_low(address), 32)
@staticmethod @staticmethod
def unpack_ip4address(address: Ip4Path) -> str: def unpack_ip4address(address: Ip4Path) -> str:
@ -627,17 +641,17 @@ class Database(Profiler):
def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]: def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
self.enter_step('get_ip4_pack') self.enter_step('get_ip4_pack')
ip4 = self.pack_ip4address(ip4_str) ip4val = self.pack_ip4address_low(ip4_str)
self.enter_step('get_ip4_cache') self.enter_step('get_ip4_cache')
if not self.ip4cache[ip4.value >> self.ip4cache_shift]: if not self.ip4cache[ip4val >> self.ip4cache_shift]:
return return
self.enter_step('get_ip4_brws') self.enter_step('get_ip4_brws')
dic = self.ip4tree dic = self.ip4tree
for i in range(31, 31-ip4.prefixlen, -1): for i in range(31, -1, -1):
bit = (ip4.value >> i) & 0b1 bit = (ip4val >> i) & 0b1
if dic.active(): if dic.active():
self.enter_step('get_ip4_yield') self.enter_step('get_ip4_yield')
yield Ip4Path(ip4.value >> (i+1) << (i+1), 31-i) yield Ip4Path(ip4val >> (i+1) << (i+1), 31-i)
self.enter_step('get_ip4_brws') self.enter_step('get_ip4_brws')
next_dic = dic.one if bit else dic.zero next_dic = dic.one if bit else dic.zero
if next_dic is None: if next_dic is None:
@ -645,7 +659,7 @@ class Database(Profiler):
dic = next_dic dic = next_dic
if dic.active(): if dic.active():
self.enter_step('get_ip4_yield') self.enter_step('get_ip4_yield')
yield ip4 yield Ip4Path(ip4val, 32)
def _unset_match(self, def _unset_match(self,
match: Match, match: Match,

View file

@ -29,20 +29,37 @@ FUNCTION_MAP: typing.Any = {
class Writer(multiprocessing.Process): class Writer(multiprocessing.Process):
def __init__(self, def __init__(self,
recs_queue: multiprocessing.Queue, recs_queue: multiprocessing.Queue = None,
autosave_interval: int = 0, autosave_interval: int = 0,
ip4_cache: int = 0, ip4_cache: int = 0,
): ):
super(Writer, self).__init__() if recs_queue: # MP
super(Writer, self).__init__()
self.recs_queue = recs_queue
self.log = logging.getLogger(f'wr') self.log = logging.getLogger(f'wr')
self.recs_queue = recs_queue
self.autosave_interval = autosave_interval self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache self.ip4_cache = ip4_cache
if not recs_queue: # No MP
self.open_db()
def run(self) -> None: def open_db(self) -> None:
self.db = database.Database() self.db = database.Database()
self.db.log = logging.getLogger(f'wr') self.db.log = logging.getLogger(f'wr')
self.db.fill_ip4cache(max_size=self.ip4_cache) self.db.fill_ip4cache(max_size=self.ip4_cache)
def exec_record(self, record: Record) -> None:
self.db.enter_step('exec_record')
select, write, updated, name, value = record
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
except (ValueError, IndexError):
# ValueError: non-number in IP
# IndexError: IP too big
self.log.exception("Cannot execute: %s", record)
def run(self) -> None:
self.open_db()
if self.autosave_interval > 0: if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval next_save = time.time() + self.autosave_interval
else: else:
@ -54,15 +71,7 @@ class Writer(multiprocessing.Process):
record: Record record: Record
for record in block: for record in block:
self.exec_record(record)
select, write, updated, name, value = record
self.db.enter_step('feed_switch')
try:
for source in select(self.db, value):
write(self.db, name, updated, source=source)
except ValueError:
self.log.exception("Cannot execute: %s", record)
if next_save > 0 and time.time() > next_save: if next_save > 0 and time.time() > next_save:
self.log.info("Saving database...") self.log.info("Saving database...")
@ -79,29 +88,37 @@ class Writer(multiprocessing.Process):
class Parser(): class Parser():
def __init__(self, def __init__(self,
buf: typing.Any, buf: typing.Any,
recs_queue: multiprocessing.Queue, recs_queue: multiprocessing.Queue = None,
block_size: int, block_size: int = 0,
writer: Writer = None,
): ):
super(Parser, self).__init__() assert bool(writer) ^ bool(block_size and recs_queue)
self.buf = buf self.buf = buf
self.log = logging.getLogger('pr') self.log = logging.getLogger('pr')
self.recs_queue = recs_queue self.recs_queue = recs_queue
self.block: typing.List[Record] = list() if writer: # No MP
self.block_size = block_size self.prof: database.Profiler = writer.db
self.prof = database.Profiler() self.register = writer.exec_record
self.prof.log = logging.getLogger('pr') else: # MP
self.block: typing.List[Record] = list()
self.block_size = block_size
self.prof = database.Profiler()
self.prof.log = logging.getLogger('pr')
self.register = self.add_to_queue
def register(self, record: Record) -> None: def add_to_queue(self, record: Record) -> None:
self.prof.enter_step('register') self.prof.enter_step('register')
self.block.append(record) self.block.append(record)
if len(self.block) >= self.block_size: if len(self.block) >= self.block_size:
self.prof.enter_step('put_block') self.prof.enter_step('put_block')
assert self.recs_queue
self.recs_queue.put(self.block) self.recs_queue.put(self.block)
self.block = list() self.block = list()
def run(self) -> None: def run(self) -> None:
self.consume() self.consume()
self.recs_queue.put(self.block) if self.recs_queue:
self.recs_queue.put(self.block)
self.prof.profile() self.prof.profile()
def consume(self) -> None: def consume(self) -> None:
@ -203,6 +220,10 @@ if __name__ == '__main__':
'-a', '--autosave-interval', type=int, default=900, '-a', '--autosave-interval', type=int, default=900,
help="Interval to which the database will save in seconds. " help="Interval to which the database will save in seconds. "
"0 to disable.") "0 to disable.")
args_parser.add_argument(
'-s', '--single-process', action='store_true',
help="Only use one process. "
"Might be useful for single core computers.")
args_parser.add_argument( args_parser.add_argument(
'-4', '--ip4-cache', type=int, default=0, '-4', '--ip4-cache', type=int, default=0,
help="RAM cache for faster IPv4 lookup. " help="RAM cache for faster IPv4 lookup. "
@ -211,17 +232,29 @@ if __name__ == '__main__':
"be a memory-heavy process, even without the cache.") "be a memory-heavy process, even without the cache.")
args = args_parser.parse_args() args = args_parser.parse_args()
recs_queue: multiprocessing.Queue = multiprocessing.Queue( parser_cls = PARSERS[args.parser]
maxsize=args.queue_size) if args.single_process:
writer = Writer(
autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache
)
parser = parser_cls(args.input, writer=writer)
parser.run()
else:
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
maxsize=args.queue_size)
writer = Writer(recs_queue, writer = Writer(recs_queue,
autosave_interval=args.autosave_interval, autosave_interval=args.autosave_interval,
ip4_cache=args.ip4_cache ip4_cache=args.ip4_cache
) )
writer.start() writer.start()
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size) parser = parser_cls(args.input,
parser.run() recs_queue=recs_queue,
block_size=args.block_size
)
parser.run()
recs_queue.put(None) recs_queue.put(None)
writer.join() writer.join()