Added optional cache for faster IP matching

This commit is contained in:
Geoffrey Frogeye 2019-12-18 21:23:49 +01:00
parent 06b745890c
commit 4a22054796
Signed by: geoffrey
GPG key ID: D8A7ECA00A8CD3DD
6 changed files with 63 additions and 17 deletions

View file

@ -9,6 +9,8 @@ import time
import logging import logging
import coloredlogs import coloredlogs
import pickle import pickle
import numpy
import math
TLD_LIST: typing.Set[str] = set() TLD_LIST: typing.Set[str] = set()
@ -201,6 +203,33 @@ class Database(Profiler):
Profiler.__init__(self) Profiler.__init__(self)
self.log = logging.getLogger('db') self.log = logging.getLogger('db')
self.load() self.load()
self.ip4cache_shift: int = 32
self.ip4cache = numpy.ones(1)
def _set_ip4cache(self, path: Path, _: Match) -> None:
assert isinstance(path, Ip4Path)
self.enter_step('set_ip4cache')
mini = path.value >> self.ip4cache_shift
maxi = (path.value + 2**(32-path.prefixlen)) >> self.ip4cache_shift
if mini == maxi:
self.ip4cache[mini] = True
else:
self.ip4cache[mini:maxi] = True
def fill_ip4cache(self, max_size: int = 512*1024**2) -> None:
"""
Size in bytes
"""
if max_size > 2**32/8:
self.log.warning("Allocating more than 512 MiB of RAM for "
"the Ip4 cache is not necessary.")
max_cache_width = int(math.log2(max(1, max_size*8)))
cache_width = min(2**32, max_cache_width)
self.ip4cache_shift = 32-cache_width
cache_size = 2**cache_width
self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
for _ in self.exec_each_ip4(self._set_ip4cache):
pass
@staticmethod @staticmethod
def populate_tld_list() -> None: def populate_tld_list() -> None:
@ -404,8 +433,9 @@ class Database(Profiler):
pref = _par.prefixlen + 1 pref = _par.prefixlen + 1
dic = _dic.zero dic = _dic.zero
if dic: if dic:
addr0 = _par.value & (0xFFFFFFFF ^ (1 << (32-pref))) # addr0 = _par.value & (0xFFFFFFFF ^ (1 << (32-pref)))
assert addr0 == _par.value # assert addr0 == _par.value
addr0 = _par.value
yield from self.exec_each_ip4( yield from self.exec_each_ip4(
callback, callback,
_dic=dic, _dic=dic,
@ -415,6 +445,7 @@ class Database(Profiler):
dic = _dic.one dic = _dic.one
if dic: if dic:
addr1 = _par.value | (1 << (32-pref)) addr1 = _par.value | (1 << (32-pref))
# assert addr1 != _par.value
yield from self.exec_each_ip4( yield from self.exec_each_ip4(
callback, callback,
_dic=dic, _dic=dic,
@ -548,6 +579,9 @@ class Database(Profiler):
def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]: def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
self.enter_step('get_ip4_pack') self.enter_step('get_ip4_pack')
ip4 = self.pack_ip4address(ip4_str) ip4 = self.pack_ip4address(ip4_str)
self.enter_step('get_ip4_cache')
if not self.ip4cache[ip4.value >> self.ip4cache_shift]:
return
self.enter_step('get_ip4_brws') self.enter_step('get_ip4_brws')
dic = self.ip4tree dic = self.ip4tree
for i in range(31, 31-ip4.prefixlen, -1): for i in range(31, 31-ip4.prefixlen, -1):
@ -680,6 +714,7 @@ class Database(Profiler):
source_match=source_match, source_match=source_match,
dupplicate=dupplicate, dupplicate=dupplicate,
) )
self._set_ip4cache(ip4, dic)
def set_ip4address(self, def set_ip4address(self,
ip4address_str: str, ip4address_str: str,

View file

@ -30,15 +30,19 @@ FUNCTION_MAP: typing.Any = {
class Writer(multiprocessing.Process): class Writer(multiprocessing.Process):
def __init__(self, def __init__(self,
recs_queue: multiprocessing.Queue, recs_queue: multiprocessing.Queue,
autosave_interval: int = 0): autosave_interval: int = 0,
ip4_cache: int = 0,
):
super(Writer, self).__init__() super(Writer, self).__init__()
self.log = logging.getLogger(f'wr') self.log = logging.getLogger(f'wr')
self.recs_queue = recs_queue self.recs_queue = recs_queue
self.autosave_interval = autosave_interval self.autosave_interval = autosave_interval
self.ip4_cache = ip4_cache
def run(self) -> None: def run(self) -> None:
self.db = database.Database() self.db = database.Database()
self.db.log = logging.getLogger(f'wr') self.db.log = logging.getLogger(f'wr')
self.db.fill_ip4cache(max_size=self.ip4_cache)
if self.autosave_interval > 0: if self.autosave_interval > 0:
next_save = time.time() + self.autosave_interval next_save = time.time() + self.autosave_interval
else: else:
@ -200,12 +204,15 @@ if __name__ == '__main__':
args_parser.add_argument( args_parser.add_argument(
'-a', '--autosave-interval', type=int, default=900, '-a', '--autosave-interval', type=int, default=900,
help="TODO seconds") help="TODO seconds")
args_parser.add_argument(
'-4', '--ip4-cache', type=int, default=0,
help="TODO bytes max 512 MiB")
args = args_parser.parse_args() args = args_parser.parse_args()
recs_queue: multiprocessing.Queue = multiprocessing.Queue( recs_queue: multiprocessing.Queue = multiprocessing.Queue(
maxsize=args.queue_size) maxsize=args.queue_size)
writer = Writer(recs_queue, autosave_interval=args.autosave_interval) writer = Writer(recs_queue, autosave_interval=args.autosave_interval, ip4_cache=args.ip4_cache)
writer.start() writer.start()
parser = PARSERS[args.parser](args.input, recs_queue, args.block_size) parser = PARSERS[args.parser](args.input, recs_queue, args.block_size)

View file

@ -39,10 +39,14 @@ if __name__ == '__main__':
source = database.RuleMultiPath() source = database.RuleMultiPath()
for rule in args.input: for rule in args.input:
rule = rule.strip()
try:
fun(DB, fun(DB,
rule.strip(), rule,
source=source, source=source,
updated=int(time.time()), updated=int(time.time()),
) )
except ValueError:
DB.log.error(f"Could not add rule: {rule}")
DB.save() DB.save()

View file

@ -35,7 +35,7 @@ dl http://data.iana.org/TLD/tlds-alpha-by-domain.txt temp/all_tld.temp.list
grep -v '^#' temp/all_tld.temp.list | awk '{print tolower($0)}' > temp/all_tld.list grep -v '^#' temp/all_tld.temp.list | awk '{print tolower($0)}' > temp/all_tld.list
log "Retrieving nameservers…" log "Retrieving nameservers…"
dl https://public-dns.info/nameservers.txt nameservers/public-dns.list dl https://public-dns.info/nameservers.txt nameservers/public-dns.cache.list
log "Retrieving top subdomains…" log "Retrieving top subdomains…"
dl http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m.csv.zip top-1m.csv.zip dl http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m.csv.zip top-1m.csv.zip

View file

@ -9,7 +9,7 @@ function feed_rapid7_fdns { # dataset
line=$(curl -s https://opendata.rapid7.com/sonar.fdns_v2/ | grep "href=\".\+-fdns_$dataset.json.gz\"") line=$(curl -s https://opendata.rapid7.com/sonar.fdns_v2/ | grep "href=\".\+-fdns_$dataset.json.gz\"")
link="https://opendata.rapid7.com$(echo "$line" | cut -d'"' -f2)" link="https://opendata.rapid7.com$(echo "$line" | cut -d'"' -f2)"
log "Reading $(echo "$dataset" | awk '{print toupper($0)}') records from $link" log "Reading $(echo "$dataset" | awk '{print toupper($0)}') records from $link"
curl -L "$link" | gunzip | ./feed_dns.py rapid7 curl -L "$link" | gunzip
} }
function feed_rapid7_rdns { # dataset function feed_rapid7_rdns { # dataset
@ -17,10 +17,10 @@ function feed_rapid7_rdns { # dataset
line=$(curl -s https://opendata.rapid7.com/sonar.rdns_v2/ | grep "href=\".\+-rdns.json.gz\"") line=$(curl -s https://opendata.rapid7.com/sonar.rdns_v2/ | grep "href=\".\+-rdns.json.gz\"")
link="https://opendata.rapid7.com$(echo "$line" | cut -d'"' -f2)" link="https://opendata.rapid7.com$(echo "$line" | cut -d'"' -f2)"
log "Reading PTR records from $link" log "Reading PTR records from $link"
curl -L "$link" | gunzip | ./feed_dns.py rapid7 curl -L "$link" | gunzip
} }
feed_rapid7_rdns feed_rapid7_rdns | ./feed_dns.py rapid7
feed_rapid7_fdns a feed_rapid7_fdns a | ./feed_dns.py rapid7 --ip4-cache 536870912
# feed_rapid7_fdns aaaa # feed_rapid7_fdns aaaa | ./feed_dns.py rapid7 --ip6-cache 536870912
feed_rapid7_fdns cname feed_rapid7_fdns cname | ./feed_dns.py rapid7

View file

@ -27,5 +27,5 @@ partner.intentmedia.net
wizaly.com wizaly.com
# Commanders Act # Commanders Act
tagcommander.com tagcommander.com
# Affex Marketing # Ingenious Technologies
affex.org affex.org