Compare commits
No commits in common. "c543e0eab664971f6db158ecad338c775c581b8e" and "9f343ed2968d13f63f6068b77a473fe9ec0032ec" have entirely different histories.
c543e0eab6
...
9f343ed296
36
database.py
36
database.py
|
@ -227,19 +227,10 @@ class Database(Profiler):
|
||||||
self.log.warning("Allocating more than 512 MiB of RAM for "
|
self.log.warning("Allocating more than 512 MiB of RAM for "
|
||||||
"the Ip4 cache is not necessary.")
|
"the Ip4 cache is not necessary.")
|
||||||
max_cache_width = int(math.log2(max(1, max_size*8)))
|
max_cache_width = int(math.log2(max(1, max_size*8)))
|
||||||
allocated = False
|
|
||||||
cache_width = min(2**32, max_cache_width)
|
cache_width = min(2**32, max_cache_width)
|
||||||
while not allocated:
|
|
||||||
cache_size = 2**cache_width
|
|
||||||
try:
|
|
||||||
self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
|
|
||||||
except MemoryError:
|
|
||||||
self.log.exception(
|
|
||||||
"Could not allocate cache. Retrying a smaller one.")
|
|
||||||
cache_width -= 1
|
|
||||||
continue
|
|
||||||
allocated = True
|
|
||||||
self.ip4cache_shift = 32-cache_width
|
self.ip4cache_shift = 32-cache_width
|
||||||
|
cache_size = 2**cache_width
|
||||||
|
self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
|
||||||
for _ in self.exec_each_ip4(self._set_ip4cache):
|
for _ in self.exec_each_ip4(self._set_ip4cache):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -297,16 +288,11 @@ class Database(Profiler):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pack_ip4address_low(address: str) -> int:
|
def pack_ip4address(address: str) -> Ip4Path:
|
||||||
addr = 0
|
addr = 0
|
||||||
for split in address.split('.'):
|
for split in address.split('.'):
|
||||||
octet = int(split)
|
addr = (addr << 8) + int(split)
|
||||||
addr = (addr << 8) + octet
|
return Ip4Path(addr, 32)
|
||||||
return addr
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def pack_ip4address(address: str) -> Ip4Path:
|
|
||||||
return Ip4Path(Database.pack_ip4address_low(address), 32)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def unpack_ip4address(address: Ip4Path) -> str:
|
def unpack_ip4address(address: Ip4Path) -> str:
|
||||||
|
@ -641,17 +627,17 @@ class Database(Profiler):
|
||||||
|
|
||||||
def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
|
def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
|
||||||
self.enter_step('get_ip4_pack')
|
self.enter_step('get_ip4_pack')
|
||||||
ip4val = self.pack_ip4address_low(ip4_str)
|
ip4 = self.pack_ip4address(ip4_str)
|
||||||
self.enter_step('get_ip4_cache')
|
self.enter_step('get_ip4_cache')
|
||||||
if not self.ip4cache[ip4val >> self.ip4cache_shift]:
|
if not self.ip4cache[ip4.value >> self.ip4cache_shift]:
|
||||||
return
|
return
|
||||||
self.enter_step('get_ip4_brws')
|
self.enter_step('get_ip4_brws')
|
||||||
dic = self.ip4tree
|
dic = self.ip4tree
|
||||||
for i in range(31, -1, -1):
|
for i in range(31, 31-ip4.prefixlen, -1):
|
||||||
bit = (ip4val >> i) & 0b1
|
bit = (ip4.value >> i) & 0b1
|
||||||
if dic.active():
|
if dic.active():
|
||||||
self.enter_step('get_ip4_yield')
|
self.enter_step('get_ip4_yield')
|
||||||
yield Ip4Path(ip4val >> (i+1) << (i+1), 31-i)
|
yield Ip4Path(ip4.value >> (i+1) << (i+1), 31-i)
|
||||||
self.enter_step('get_ip4_brws')
|
self.enter_step('get_ip4_brws')
|
||||||
next_dic = dic.one if bit else dic.zero
|
next_dic = dic.one if bit else dic.zero
|
||||||
if next_dic is None:
|
if next_dic is None:
|
||||||
|
@ -659,7 +645,7 @@ class Database(Profiler):
|
||||||
dic = next_dic
|
dic = next_dic
|
||||||
if dic.active():
|
if dic.active():
|
||||||
self.enter_step('get_ip4_yield')
|
self.enter_step('get_ip4_yield')
|
||||||
yield Ip4Path(ip4val, 32)
|
yield ip4
|
||||||
|
|
||||||
def _unset_match(self,
|
def _unset_match(self,
|
||||||
match: Match,
|
match: Match,
|
||||||
|
|
67
feed_dns.py
67
feed_dns.py
|
@ -29,37 +29,20 @@ FUNCTION_MAP: typing.Any = {
|
||||||
|
|
||||||
class Writer(multiprocessing.Process):
|
class Writer(multiprocessing.Process):
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
recs_queue: multiprocessing.Queue = None,
|
recs_queue: multiprocessing.Queue,
|
||||||
autosave_interval: int = 0,
|
autosave_interval: int = 0,
|
||||||
ip4_cache: int = 0,
|
ip4_cache: int = 0,
|
||||||
):
|
):
|
||||||
if recs_queue: # MP
|
|
||||||
super(Writer, self).__init__()
|
super(Writer, self).__init__()
|
||||||
self.recs_queue = recs_queue
|
|
||||||
self.log = logging.getLogger(f'wr')
|
self.log = logging.getLogger(f'wr')
|
||||||
|
self.recs_queue = recs_queue
|
||||||
self.autosave_interval = autosave_interval
|
self.autosave_interval = autosave_interval
|
||||||
self.ip4_cache = ip4_cache
|
self.ip4_cache = ip4_cache
|
||||||
if not recs_queue: # No MP
|
|
||||||
self.open_db()
|
|
||||||
|
|
||||||
def open_db(self) -> None:
|
def run(self) -> None:
|
||||||
self.db = database.Database()
|
self.db = database.Database()
|
||||||
self.db.log = logging.getLogger(f'wr')
|
self.db.log = logging.getLogger(f'wr')
|
||||||
self.db.fill_ip4cache(max_size=self.ip4_cache)
|
self.db.fill_ip4cache(max_size=self.ip4_cache)
|
||||||
|
|
||||||
def exec_record(self, record: Record) -> None:
|
|
||||||
self.db.enter_step('exec_record')
|
|
||||||
select, write, updated, name, value = record
|
|
||||||
try:
|
|
||||||
for source in select(self.db, value):
|
|
||||||
write(self.db, name, updated, source=source)
|
|
||||||
except (ValueError, IndexError):
|
|
||||||
# ValueError: non-number in IP
|
|
||||||
# IndexError: IP too big
|
|
||||||
self.log.exception("Cannot execute: %s", record)
|
|
||||||
|
|
||||||
def run(self) -> None:
|
|
||||||
self.open_db()
|
|
||||||
if self.autosave_interval > 0:
|
if self.autosave_interval > 0:
|
||||||
next_save = time.time() + self.autosave_interval
|
next_save = time.time() + self.autosave_interval
|
||||||
else:
|
else:
|
||||||
|
@ -71,7 +54,15 @@ class Writer(multiprocessing.Process):
|
||||||
|
|
||||||
record: Record
|
record: Record
|
||||||
for record in block:
|
for record in block:
|
||||||
self.exec_record(record)
|
|
||||||
|
select, write, updated, name, value = record
|
||||||
|
self.db.enter_step('feed_switch')
|
||||||
|
|
||||||
|
try:
|
||||||
|
for source in select(self.db, value):
|
||||||
|
write(self.db, name, updated, source=source)
|
||||||
|
except ValueError:
|
||||||
|
self.log.exception("Cannot execute: %s", record)
|
||||||
|
|
||||||
if next_save > 0 and time.time() > next_save:
|
if next_save > 0 and time.time() > next_save:
|
||||||
self.log.info("Saving database...")
|
self.log.info("Saving database...")
|
||||||
|
@ -88,36 +79,28 @@ class Writer(multiprocessing.Process):
|
||||||
class Parser():
|
class Parser():
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
buf: typing.Any,
|
buf: typing.Any,
|
||||||
recs_queue: multiprocessing.Queue = None,
|
recs_queue: multiprocessing.Queue,
|
||||||
block_size: int = 0,
|
block_size: int,
|
||||||
writer: Writer = None,
|
|
||||||
):
|
):
|
||||||
assert bool(writer) ^ bool(block_size and recs_queue)
|
super(Parser, self).__init__()
|
||||||
self.buf = buf
|
self.buf = buf
|
||||||
self.log = logging.getLogger('pr')
|
self.log = logging.getLogger('pr')
|
||||||
self.recs_queue = recs_queue
|
self.recs_queue = recs_queue
|
||||||
if writer: # No MP
|
|
||||||
self.prof: database.Profiler = writer.db
|
|
||||||
self.register = writer.exec_record
|
|
||||||
else: # MP
|
|
||||||
self.block: typing.List[Record] = list()
|
self.block: typing.List[Record] = list()
|
||||||
self.block_size = block_size
|
self.block_size = block_size
|
||||||
self.prof = database.Profiler()
|
self.prof = database.Profiler()
|
||||||
self.prof.log = logging.getLogger('pr')
|
self.prof.log = logging.getLogger('pr')
|
||||||
self.register = self.add_to_queue
|
|
||||||
|
|
||||||
def add_to_queue(self, record: Record) -> None:
|
def register(self, record: Record) -> None:
|
||||||
self.prof.enter_step('register')
|
self.prof.enter_step('register')
|
||||||
self.block.append(record)
|
self.block.append(record)
|
||||||
if len(self.block) >= self.block_size:
|
if len(self.block) >= self.block_size:
|
||||||
self.prof.enter_step('put_block')
|
self.prof.enter_step('put_block')
|
||||||
assert self.recs_queue
|
|
||||||
self.recs_queue.put(self.block)
|
self.recs_queue.put(self.block)
|
||||||
self.block = list()
|
self.block = list()
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
self.consume()
|
self.consume()
|
||||||
if self.recs_queue:
|
|
||||||
self.recs_queue.put(self.block)
|
self.recs_queue.put(self.block)
|
||||||
self.prof.profile()
|
self.prof.profile()
|
||||||
|
|
||||||
|
@ -220,10 +203,6 @@ if __name__ == '__main__':
|
||||||
'-a', '--autosave-interval', type=int, default=900,
|
'-a', '--autosave-interval', type=int, default=900,
|
||||||
help="Interval to which the database will save in seconds. "
|
help="Interval to which the database will save in seconds. "
|
||||||
"0 to disable.")
|
"0 to disable.")
|
||||||
args_parser.add_argument(
|
|
||||||
'-s', '--single-process', action='store_true',
|
|
||||||
help="Only use one process. "
|
|
||||||
"Might be useful for single core computers.")
|
|
||||||
args_parser.add_argument(
|
args_parser.add_argument(
|
||||||
'-4', '--ip4-cache', type=int, default=0,
|
'-4', '--ip4-cache', type=int, default=0,
|
||||||
help="RAM cache for faster IPv4 lookup. "
|
help="RAM cache for faster IPv4 lookup. "
|
||||||
|
@ -232,15 +211,6 @@ if __name__ == '__main__':
|
||||||
"be a memory-heavy process, even without the cache.")
|
"be a memory-heavy process, even without the cache.")
|
||||||
args = args_parser.parse_args()
|
args = args_parser.parse_args()
|
||||||
|
|
||||||
parser_cls = PARSERS[args.parser]
|
|
||||||
if args.single_process:
|
|
||||||
writer = Writer(
|
|
||||||
autosave_interval=args.autosave_interval,
|
|
||||||
ip4_cache=args.ip4_cache
|
|
||||||
)
|
|
||||||
parser = parser_cls(args.input, writer=writer)
|
|
||||||
parser.run()
|
|
||||||
else:
|
|
||||||
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
recs_queue: multiprocessing.Queue = multiprocessing.Queue(
|
||||||
maxsize=args.queue_size)
|
maxsize=args.queue_size)
|
||||||
|
|
||||||
|
@ -250,10 +220,7 @@ if __name__ == '__main__':
|
||||||
)
|
)
|
||||||
writer.start()
|
writer.start()
|
||||||
|
|
||||||
parser = parser_cls(args.input,
|
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)
|
||||||
recs_queue=recs_queue,
|
|
||||||
block_size=args.block_size
|
|
||||||
)
|
|
||||||
parser.run()
|
parser.run()
|
||||||
|
|
||||||
recs_queue.put(None)
|
recs_queue.put(None)
|
||||||
|
|
Loading…
Reference in a new issue