eulaurarien/database.py

437 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Utility functions to interact with the database.
"""
import typing
import time
import logging
import coloredlogs
import pickle
import enum
coloredlogs.install(
level='DEBUG',
fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
)
Asn = int
Timestamp = int
Level = int
class Path():
pass
class RulePath(Path):
pass
class DomainPath(Path):
def __init__(self, path: typing.List[str]):
self.path = path
class HostnamePath(DomainPath):
pass
class ZonePath(DomainPath):
pass
class AsnPath(Path):
def __init__(self, asn: Asn):
self.asn = asn
class Ip4Path(Path):
def __init__(self, value: int, prefixlen: int):
self.value = value
self.prefixlen = prefixlen
Match = typing.Tuple[Timestamp, Path, Level]
# class AsnNode():
# def __init__(self, asn: int) -> None:
# self.asn = asn
class DomainTreeNode():
def __init__(self) -> None:
self.children: typing.Dict[str, DomainTreeNode] = dict()
self.match_zone: typing.Optional[Match] = None
self.match_hostname: typing.Optional[Match] = None
class IpTreeNode():
def __init__(self) -> None:
self.children: typing.List[typing.Optional[IpTreeNode]] = [None, None]
self.match: typing.Optional[Match] = None
Node = typing.Union[DomainTreeNode, IpTreeNode, Asn]
NodeCallable = typing.Callable[[Path,
Node,
typing.Optional[typing.Any]],
typing.Any]
class Profiler():
def __init__(self) -> None:
self.log = logging.getLogger('profiler')
self.time_last = time.perf_counter()
self.time_step = 'init'
self.time_dict: typing.Dict[str, float] = dict()
self.step_dict: typing.Dict[str, int] = dict()
def enter_step(self, name: str) -> None:
return
now = time.perf_counter()
try:
self.time_dict[self.time_step] += now - self.time_last
self.step_dict[self.time_step] += int(name != self.time_step)
except KeyError:
self.time_dict[self.time_step] = now - self.time_last
self.step_dict[self.time_step] = 1
self.time_step = name
self.time_last = time.perf_counter()
def profile(self) -> None:
self.enter_step('profile')
total = sum(self.time_dict.values())
for key, secs in sorted(self.time_dict.items(), key=lambda t: t[1]):
times = self.step_dict[key]
self.log.debug(f"{key:<20}: {times:9d} × {secs/times:5.3e} "
f"= {secs:9.2f} s ({secs/total:7.2%}) ")
self.log.debug(f"{'total':<20}: "
f"{total:9.2f} s ({1:7.2%})")
class Database(Profiler):
VERSION = 9
PATH = "blocking.p"
def initialize(self) -> None:
self.log.warning(
"Creating database version: %d ",
Database.VERSION)
self.domtree = DomainTreeNode()
self.asns: typing.Set[Asn] = set()
self.ip4tree = IpTreeNode()
def load(self) -> None:
self.enter_step('load')
try:
with open(self.PATH, 'rb') as db_fdsec:
version, data = pickle.load(db_fdsec)
if version == Database.VERSION:
self.domtree, self.asns, self.ip4tree = data
return
self.log.warning(
"Outdated database version found: %d, "
"will be rebuilt.",
version)
except (TypeError, AttributeError, EOFError):
self.log.error(
"Corrupt database found, "
"will be rebuilt.")
except FileNotFoundError:
pass
self.initialize()
def save(self) -> None:
self.enter_step('save')
with open(self.PATH, 'wb') as db_fdsec:
data = self.domtree, self.asns, self.ip4tree
pickle.dump((self.VERSION, data), db_fdsec)
self.profile()
def __init__(self) -> None:
Profiler.__init__(self)
self.log = logging.getLogger('db')
self.load()
@staticmethod
def pack_domain(domain: str) -> DomainPath:
return DomainPath(domain.split('.')[::-1])
@staticmethod
def unpack_domain(domain: DomainPath) -> str:
return '.'.join(domain.path[::-1])
@staticmethod
def pack_asn(asn: str) -> AsnPath:
asn = asn.upper()
if asn.startswith('AS'):
asn = asn[2:]
return AsnPath(int(asn))
@staticmethod
def unpack_asn(asn: AsnPath) -> str:
return f'AS{asn.asn}'
@staticmethod
def pack_ip4address(address: str) -> Ip4Path:
addr = 0
for split in address.split('.'):
addr = (addr << 8) + int(split)
return Ip4Path(addr, 32)
@staticmethod
def unpack_ip4address(address: Ip4Path) -> str:
addr = address.value
assert address.prefixlen == 32
octets: typing.List[int] = list()
octets = [0] * 4
for o in reversed(range(4)):
octets[o] = addr & 0xFF
addr >>= 8
return '.'.join(map(str, octets))
@staticmethod
def pack_ip4network(network: str) -> Ip4Path:
address, prefixlen_str = network.split('/')
prefixlen = int(prefixlen_str)
addr = Database.pack_ip4address(address)
addr.prefixlen = prefixlen
return addr
@staticmethod
def unpack_ip4network(network: Ip4Path) -> str:
addr = network.value
octets: typing.List[int] = list()
octets = [0] * 4
for o in reversed(range(4)):
octets[o] = addr & 0xFF
addr >>= 8
return '.'.join(map(str, octets)) + '/' + str(network.prefixlen)
def exec_each_domain(self,
callback: NodeCallable,
arg: typing.Any = None,
_dic: DomainTreeNode = None,
_par: DomainPath = None,
) -> typing.Any:
_dic = _dic or self.domtree
_par = _par or DomainPath([])
yield from callback(_par, _dic, arg)
for part in _dic.children:
dic = _dic.children[part]
yield from self.exec_each_domain(
callback,
arg,
_dic=dic,
_par=DomainPath(_par.path + [part])
)
def exec_each_ip4(self,
callback: NodeCallable,
arg: typing.Any = None,
_dic: IpTreeNode = None,
_par: Ip4Path = None,
) -> typing.Any:
_dic = _dic or self.ip4tree
_par = _par or Ip4Path(0, 0)
callback(_par, _dic, arg)
# 0
dic = _dic.children[0]
if dic:
addr0 = _par.value & (0xFFFFFFFF ^ (1 << (32-_par.prefixlen)))
assert addr0 == _par.value
yield from self.exec_each_ip4(
callback,
arg,
_dic=dic,
_par=Ip4Path(addr0, _par.prefixlen+1)
)
# 1
dic = _dic.children[1]
if dic:
addr1 = _par.value | (1 << (32-_par.prefixlen))
yield from self.exec_each_ip4(
callback,
arg,
_dic=dic,
_par=Ip4Path(addr1, _par.prefixlen+1)
)
def exec_each(self,
callback: NodeCallable,
arg: typing.Any = None,
) -> typing.Any:
yield from self.exec_each_domain(callback)
yield from self.exec_each_ip4(callback)
def update_references(self) -> None:
raise NotImplementedError
def prune(self, before: int, base_only: bool = False) -> None:
raise NotImplementedError
def explain(self, entry: int) -> str:
raise NotImplementedError
def export(self,
first_party_only: bool = False,
end_chain_only: bool = False,
explain: bool = False,
) -> typing.Iterable[str]:
if first_party_only or end_chain_only or explain:
raise NotImplementedError
def export_cb(path: Path, node: Node, _: typing.Any
) -> typing.Iterable[str]:
assert isinstance(path, DomainPath)
assert isinstance(node, DomainTreeNode)
if node.match_hostname:
a = self.unpack_domain(path)
yield a
yield from self.exec_each_domain(export_cb, None)
def count_rules(self,
first_party_only: bool = False,
) -> str:
raise NotImplementedError
def get_domain(self, domain_str: str) -> typing.Iterable[DomainPath]:
self.enter_step('get_domain_pack')
domain = self.pack_domain(domain_str)
self.enter_step('get_domain_brws')
dic = self.domtree
depth = 0
for part in domain.path:
if dic.match_zone:
self.enter_step('get_domain_yield')
yield ZonePath(domain.path[:depth])
self.enter_step('get_domain_brws')
if part not in dic.children:
return
dic = dic.children[part]
depth += 1
if dic.match_zone:
self.enter_step('get_domain_yield')
yield ZonePath(domain.path)
if dic.match_hostname:
self.enter_step('get_domain_yield')
yield HostnamePath(domain.path)
def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
self.enter_step('get_ip4_pack')
ip4 = self.pack_ip4address(ip4_str)
self.enter_step('get_ip4_brws')
dic = self.ip4tree
for i in reversed(range(ip4.prefixlen)):
part = (ip4.value >> i) & 0b1
if dic.match:
self.enter_step('get_ip4_yield')
yield Ip4Path(ip4.value, 32-i)
self.enter_step('get_ip4_brws')
next_dic = dic.children[part]
if next_dic is None:
return
dic = next_dic
if dic.match:
self.enter_step('get_ip4_yield')
yield ip4
def list_asn(self) -> typing.Iterable[AsnPath]:
for asn in self.asns:
yield AsnPath(asn)
def set_hostname(self,
hostname_str: str,
updated: int,
is_first_party: bool = None,
source: Path = None) -> None:
self.enter_step('set_hostname_pack')
if is_first_party:
raise NotImplementedError
self.enter_step('set_hostname_brws')
hostname = self.pack_domain(hostname_str)
dic = self.domtree
for part in hostname.path:
if dic.match_zone:
# Refuse to add hostname whose zone is already matching
return
if part not in dic.children:
dic.children[part] = DomainTreeNode()
dic = dic.children[part]
dic.match_hostname = (updated, source or RulePath(), 0)
def set_zone(self,
zone_str: str,
updated: int,
is_first_party: bool = None,
source: Path = None) -> None:
self.enter_step('set_zone_pack')
if is_first_party:
raise NotImplementedError
zone = self.pack_domain(zone_str)
self.enter_step('set_zone_brws')
dic = self.domtree
for part in zone.path:
if dic.match_zone:
# Refuse to add zone whose parent zone is already matching
return
if part not in dic.children:
dic.children[part] = DomainTreeNode()
dic = dic.children[part]
dic.match_zone = (updated, source or RulePath(), 0)
def set_asn(self,
asn_str: str,
updated: int,
is_first_party: bool = None,
source: Path = None) -> None:
self.enter_step('set_asn_pack')
if is_first_party or source:
# TODO updated
raise NotImplementedError
asn = self.pack_asn(asn_str)
self.enter_step('set_asn_brws')
self.asns.add(asn.asn)
def _set_ip4(self,
ip4: Ip4Path,
updated: int,
is_first_party: bool = None,
source: Path = None) -> None:
if is_first_party:
raise NotImplementedError
dic = self.ip4tree
for i in reversed(range(ip4.prefixlen)):
part = (ip4.value >> i) & 0b1
if dic.match:
# Refuse to add ip4* whose network is already matching
return
next_dic = dic.children[part]
if next_dic is None:
next_dic = IpTreeNode()
dic.children[part] = next_dic
dic = next_dic
dic.match = (updated, source or RulePath(), 0)
def set_ip4address(self,
ip4address_str: str,
*args: typing.Any, **kwargs: typing.Any
) -> None:
self.enter_step('set_ip4add_pack')
ip4 = self.pack_ip4address(ip4address_str)
self.enter_step('set_ip4add_brws')
self._set_ip4(ip4, *args, **kwargs)
def set_ip4network(self,
ip4network_str: str,
*args: typing.Any, **kwargs: typing.Any
) -> None:
self.enter_step('set_ip4net_pack')
ip4 = self.pack_ip4network(ip4network_str)
self.enter_step('set_ip4net_brws')
self._set_ip4(ip4, *args, **kwargs)