Generates a host list of first-party trackers for ad-blocking.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

824 lines
26KB

  1. #!/usr/bin/env python3
  2. """
  3. Utility functions to interact with the database.
  4. """
  5. import typing
  6. import time
  7. import logging
  8. import coloredlogs
  9. import pickle
  10. import numpy
  11. import math
  12. import os
  13. TLD_LIST: typing.Set[str] = set()
  14. coloredlogs.install(
  15. level='DEBUG',
  16. fmt='%(asctime)s %(name)s %(levelname)s %(message)s'
  17. )
  18. Asn = int
  19. Timestamp = int
  20. Level = int
  21. class Path():
  22. pass
  23. class RulePath(Path):
  24. def __str__(self) -> str:
  25. return '(rule)'
  26. class RuleFirstPath(RulePath):
  27. def __str__(self) -> str:
  28. return '(first-party rule)'
  29. class RuleMultiPath(RulePath):
  30. def __str__(self) -> str:
  31. return '(multi-party rule)'
  32. class DomainPath(Path):
  33. def __init__(self, parts: typing.List[str]):
  34. self.parts = parts
  35. def __str__(self) -> str:
  36. return '?.' + Database.unpack_domain(self)
  37. class HostnamePath(DomainPath):
  38. def __str__(self) -> str:
  39. return Database.unpack_domain(self)
  40. class ZonePath(DomainPath):
  41. def __str__(self) -> str:
  42. return '*.' + Database.unpack_domain(self)
  43. class AsnPath(Path):
  44. def __init__(self, asn: Asn):
  45. self.asn = asn
  46. def __str__(self) -> str:
  47. return Database.unpack_asn(self)
  48. class Ip4Path(Path):
  49. def __init__(self, value: int, prefixlen: int):
  50. self.value = value
  51. self.prefixlen = prefixlen
  52. def __str__(self) -> str:
  53. return Database.unpack_ip4network(self)
  54. class Match():
  55. def __init__(self) -> None:
  56. self.source: typing.Optional[Path] = None
  57. self.updated: int = 0
  58. self.dupplicate: bool = False
  59. # Cache
  60. self.level: int = 0
  61. self.first_party: bool = False
  62. self.references: int = 0
  63. def active(self, first_party: bool = None) -> bool:
  64. if self.updated == 0 or (first_party and not self.first_party):
  65. return False
  66. return True
  67. def disable(self) -> None:
  68. self.updated = 0
  69. class AsnNode(Match):
  70. def __init__(self) -> None:
  71. Match.__init__(self)
  72. self.name = ''
  73. class DomainTreeNode():
  74. def __init__(self) -> None:
  75. self.children: typing.Dict[str, DomainTreeNode] = dict()
  76. self.match_zone = Match()
  77. self.match_hostname = Match()
  78. class IpTreeNode(Match):
  79. def __init__(self) -> None:
  80. Match.__init__(self)
  81. self.zero: typing.Optional[IpTreeNode] = None
  82. self.one: typing.Optional[IpTreeNode] = None
  83. Node = typing.Union[DomainTreeNode, IpTreeNode, AsnNode]
  84. MatchCallable = typing.Callable[[Path,
  85. Match],
  86. typing.Any]
  87. class Profiler():
  88. def __init__(self) -> None:
  89. do_profile = int(os.environ.get('PROFILE', '0'))
  90. if do_profile:
  91. self.log = logging.getLogger('profiler')
  92. self.time_last = time.perf_counter()
  93. self.time_step = 'init'
  94. self.time_dict: typing.Dict[str, float] = dict()
  95. self.step_dict: typing.Dict[str, int] = dict()
  96. self.enter_step = self.enter_step_real
  97. self.profile = self.profile_real
  98. else:
  99. self.enter_step = self.enter_step_dummy
  100. self.profile = self.profile_dummy
  101. def enter_step_dummy(self, name: str) -> None:
  102. return
  103. def enter_step_real(self, name: str) -> None:
  104. now = time.perf_counter()
  105. try:
  106. self.time_dict[self.time_step] += now - self.time_last
  107. self.step_dict[self.time_step] += int(name != self.time_step)
  108. except KeyError:
  109. self.time_dict[self.time_step] = now - self.time_last
  110. self.step_dict[self.time_step] = 1
  111. self.time_step = name
  112. self.time_last = time.perf_counter()
  113. def profile_dummy(self) -> None:
  114. return
  115. def profile_real(self) -> None:
  116. self.enter_step('profile')
  117. total = sum(self.time_dict.values())
  118. for key, secs in sorted(self.time_dict.items(), key=lambda t: t[1]):
  119. times = self.step_dict[key]
  120. self.log.debug(f"{key:<20}: {times:9d} × {secs/times:5.3e} "
  121. f"= {secs:9.2f} s ({secs/total:7.2%}) ")
  122. self.log.debug(f"{'total':<20}: "
  123. f"{total:9.2f} s ({1:7.2%})")
  124. class Database(Profiler):
  125. VERSION = 18
  126. PATH = "blocking.p"
  127. def initialize(self) -> None:
  128. self.log.warning(
  129. "Creating database version: %d ",
  130. Database.VERSION)
  131. # Dummy match objects that everything refer to
  132. self.rules: typing.List[Match] = list()
  133. for first_party in (False, True):
  134. m = Match()
  135. m.updated = 1
  136. m.level = 0
  137. m.first_party = first_party
  138. self.rules.append(m)
  139. self.domtree = DomainTreeNode()
  140. self.asns: typing.Dict[Asn, AsnNode] = dict()
  141. self.ip4tree = IpTreeNode()
  142. def load(self) -> None:
  143. self.enter_step('load')
  144. try:
  145. with open(self.PATH, 'rb') as db_fdsec:
  146. version, data = pickle.load(db_fdsec)
  147. if version == Database.VERSION:
  148. self.rules, self.domtree, self.asns, self.ip4tree = data
  149. return
  150. self.log.warning(
  151. "Outdated database version found: %d, "
  152. "it will be rebuilt.",
  153. version)
  154. except (TypeError, AttributeError, EOFError):
  155. self.log.error(
  156. "Corrupt (or heavily outdated) database found, "
  157. "it will be rebuilt.")
  158. except FileNotFoundError:
  159. pass
  160. self.initialize()
  161. def save(self) -> None:
  162. self.enter_step('save')
  163. with open(self.PATH, 'wb') as db_fdsec:
  164. data = self.rules, self.domtree, self.asns, self.ip4tree
  165. pickle.dump((self.VERSION, data), db_fdsec)
  166. self.profile()
  167. def __init__(self) -> None:
  168. Profiler.__init__(self)
  169. self.log = logging.getLogger('db')
  170. self.load()
  171. self.ip4cache_shift: int = 32
  172. self.ip4cache = numpy.ones(1)
  173. def _set_ip4cache(self, path: Path, _: Match) -> None:
  174. assert isinstance(path, Ip4Path)
  175. self.enter_step('set_ip4cache')
  176. mini = path.value >> self.ip4cache_shift
  177. maxi = (path.value + 2**(32-path.prefixlen)) >> self.ip4cache_shift
  178. if mini == maxi:
  179. self.ip4cache[mini] = True
  180. else:
  181. self.ip4cache[mini:maxi] = True
  182. def fill_ip4cache(self, max_size: int = 512*1024**2) -> None:
  183. """
  184. Size in bytes
  185. """
  186. if max_size > 2**32/8:
  187. self.log.warning("Allocating more than 512 MiB of RAM for "
  188. "the Ip4 cache is not necessary.")
  189. max_cache_width = int(math.log2(max(1, max_size*8)))
  190. allocated = False
  191. cache_width = min(2**32, max_cache_width)
  192. while not allocated:
  193. cache_size = 2**cache_width
  194. try:
  195. self.ip4cache = numpy.zeros(cache_size, dtype=numpy.bool)
  196. except MemoryError:
  197. self.log.exception(
  198. "Could not allocate cache. Retrying a smaller one.")
  199. cache_width -= 1
  200. continue
  201. allocated = True
  202. self.ip4cache_shift = 32-cache_width
  203. for _ in self.exec_each_ip4(self._set_ip4cache):
  204. pass
  205. @staticmethod
  206. def populate_tld_list() -> None:
  207. with open('temp/all_tld.list', 'r') as tld_fdesc:
  208. for tld in tld_fdesc:
  209. tld = tld.strip()
  210. TLD_LIST.add(tld)
  211. @staticmethod
  212. def validate_domain(path: str) -> bool:
  213. if len(path) > 255:
  214. return False
  215. splits = path.split('.')
  216. if not TLD_LIST:
  217. Database.populate_tld_list()
  218. if splits[-1] not in TLD_LIST:
  219. return False
  220. for split in splits:
  221. if not 1 <= len(split) <= 63:
  222. return False
  223. return True
  224. @staticmethod
  225. def pack_domain(domain: str) -> DomainPath:
  226. return DomainPath(domain.split('.')[::-1])
  227. @staticmethod
  228. def unpack_domain(domain: DomainPath) -> str:
  229. return '.'.join(domain.parts[::-1])
  230. @staticmethod
  231. def pack_asn(asn: str) -> AsnPath:
  232. asn = asn.upper()
  233. if asn.startswith('AS'):
  234. asn = asn[2:]
  235. return AsnPath(int(asn))
  236. @staticmethod
  237. def unpack_asn(asn: AsnPath) -> str:
  238. return f'AS{asn.asn}'
  239. @staticmethod
  240. def validate_ip4address(path: str) -> bool:
  241. splits = path.split('.')
  242. if len(splits) != 4:
  243. return False
  244. for split in splits:
  245. try:
  246. if not 0 <= int(split) <= 255:
  247. return False
  248. except ValueError:
  249. return False
  250. return True
  251. @staticmethod
  252. def pack_ip4address_low(address: str) -> int:
  253. addr = 0
  254. for split in address.split('.'):
  255. octet = int(split)
  256. addr = (addr << 8) + octet
  257. return addr
  258. @staticmethod
  259. def pack_ip4address(address: str) -> Ip4Path:
  260. return Ip4Path(Database.pack_ip4address_low(address), 32)
  261. @staticmethod
  262. def unpack_ip4address(address: Ip4Path) -> str:
  263. addr = address.value
  264. assert address.prefixlen == 32
  265. octets: typing.List[int] = list()
  266. octets = [0] * 4
  267. for o in reversed(range(4)):
  268. octets[o] = addr & 0xFF
  269. addr >>= 8
  270. return '.'.join(map(str, octets))
  271. @staticmethod
  272. def validate_ip4network(path: str) -> bool:
  273. # A bit generous but ok for our usage
  274. splits = path.split('/')
  275. if len(splits) != 2:
  276. return False
  277. if not Database.validate_ip4address(splits[0]):
  278. return False
  279. try:
  280. if not 0 <= int(splits[1]) <= 32:
  281. return False
  282. except ValueError:
  283. return False
  284. return True
  285. @staticmethod
  286. def pack_ip4network(network: str) -> Ip4Path:
  287. address, prefixlen_str = network.split('/')
  288. prefixlen = int(prefixlen_str)
  289. addr = Database.pack_ip4address(address)
  290. addr.prefixlen = prefixlen
  291. return addr
  292. @staticmethod
  293. def unpack_ip4network(network: Ip4Path) -> str:
  294. addr = network.value
  295. octets: typing.List[int] = list()
  296. octets = [0] * 4
  297. for o in reversed(range(4)):
  298. octets[o] = addr & 0xFF
  299. addr >>= 8
  300. return '.'.join(map(str, octets)) + '/' + str(network.prefixlen)
  301. def get_match(self, path: Path) -> Match:
  302. if isinstance(path, RuleMultiPath):
  303. return self.rules[0]
  304. elif isinstance(path, RuleFirstPath):
  305. return self.rules[1]
  306. elif isinstance(path, AsnPath):
  307. return self.asns[path.asn]
  308. elif isinstance(path, DomainPath):
  309. dicd = self.domtree
  310. for part in path.parts:
  311. dicd = dicd.children[part]
  312. if isinstance(path, HostnamePath):
  313. return dicd.match_hostname
  314. elif isinstance(path, ZonePath):
  315. return dicd.match_zone
  316. else:
  317. raise ValueError
  318. elif isinstance(path, Ip4Path):
  319. dici = self.ip4tree
  320. for i in range(31, 31-path.prefixlen, -1):
  321. bit = (path.value >> i) & 0b1
  322. dici_next = dici.one if bit else dici.zero
  323. if not dici_next:
  324. raise IndexError
  325. dici = dici_next
  326. return dici
  327. else:
  328. raise ValueError
  329. def exec_each_asn(self,
  330. callback: MatchCallable,
  331. ) -> typing.Any:
  332. for asn in self.asns:
  333. match = self.asns[asn]
  334. if match.active():
  335. c = callback(
  336. AsnPath(asn),
  337. match,
  338. )
  339. try:
  340. yield from c
  341. except TypeError: # not iterable
  342. pass
  343. def exec_each_domain(self,
  344. callback: MatchCallable,
  345. _dic: DomainTreeNode = None,
  346. _par: DomainPath = None,
  347. ) -> typing.Any:
  348. _dic = _dic or self.domtree
  349. _par = _par or DomainPath([])
  350. if _dic.match_hostname.active():
  351. c = callback(
  352. HostnamePath(_par.parts),
  353. _dic.match_hostname,
  354. )
  355. try:
  356. yield from c
  357. except TypeError: # not iterable
  358. pass
  359. if _dic.match_zone.active():
  360. c = callback(
  361. ZonePath(_par.parts),
  362. _dic.match_zone,
  363. )
  364. try:
  365. yield from c
  366. except TypeError: # not iterable
  367. pass
  368. for part in _dic.children:
  369. dic = _dic.children[part]
  370. yield from self.exec_each_domain(
  371. callback,
  372. _dic=dic,
  373. _par=DomainPath(_par.parts + [part])
  374. )
  375. def exec_each_ip4(self,
  376. callback: MatchCallable,
  377. _dic: IpTreeNode = None,
  378. _par: Ip4Path = None,
  379. ) -> typing.Any:
  380. _dic = _dic or self.ip4tree
  381. _par = _par or Ip4Path(0, 0)
  382. if _dic.active():
  383. c = callback(
  384. _par,
  385. _dic,
  386. )
  387. try:
  388. yield from c
  389. except TypeError: # not iterable
  390. pass
  391. # 0
  392. pref = _par.prefixlen + 1
  393. dic = _dic.zero
  394. if dic:
  395. # addr0 = _par.value & (0xFFFFFFFF ^ (1 << (32-pref)))
  396. # assert addr0 == _par.value
  397. addr0 = _par.value
  398. yield from self.exec_each_ip4(
  399. callback,
  400. _dic=dic,
  401. _par=Ip4Path(addr0, pref)
  402. )
  403. # 1
  404. dic = _dic.one
  405. if dic:
  406. addr1 = _par.value | (1 << (32-pref))
  407. # assert addr1 != _par.value
  408. yield from self.exec_each_ip4(
  409. callback,
  410. _dic=dic,
  411. _par=Ip4Path(addr1, pref)
  412. )
  413. def exec_each(self,
  414. callback: MatchCallable,
  415. ) -> typing.Any:
  416. yield from self.exec_each_domain(callback)
  417. yield from self.exec_each_ip4(callback)
  418. yield from self.exec_each_asn(callback)
  419. def update_references(self) -> None:
  420. # Should be correctly calculated normally,
  421. # keeping this just in case
  422. def reset_references_cb(path: Path,
  423. match: Match
  424. ) -> None:
  425. match.references = 0
  426. for _ in self.exec_each(reset_references_cb):
  427. pass
  428. def increment_references_cb(path: Path,
  429. match: Match
  430. ) -> None:
  431. if match.source:
  432. source = self.get_match(match.source)
  433. source.references += 1
  434. for _ in self.exec_each(increment_references_cb):
  435. pass
  436. def _clean_deps(self) -> None:
  437. # Disable the matches that depends on the targeted
  438. # matches until all disabled matches reference count = 0
  439. did_something = True
  440. def clean_deps_cb(path: Path,
  441. match: Match
  442. ) -> None:
  443. nonlocal did_something
  444. if not match.source:
  445. return
  446. source = self.get_match(match.source)
  447. if not source.active():
  448. self._unset_match(match)
  449. elif match.first_party > source.first_party:
  450. match.first_party = source.first_party
  451. else:
  452. return
  453. did_something = True
  454. while did_something:
  455. did_something = False
  456. self.enter_step('pass_clean_deps')
  457. for _ in self.exec_each(clean_deps_cb):
  458. pass
  459. def prune(self, before: int, base_only: bool = False) -> None:
  460. # Disable the matches targeted
  461. def prune_cb(path: Path,
  462. match: Match
  463. ) -> None:
  464. if base_only and match.level > 1:
  465. return
  466. if match.updated > before:
  467. return
  468. self._unset_match(match)
  469. self.log.debug("Print: disabled %s", path)
  470. self.enter_step('pass_prune')
  471. for _ in self.exec_each(prune_cb):
  472. pass
  473. self._clean_deps()
  474. # Remove branches with no match
  475. # TODO
  476. def explain(self, path: Path) -> str:
  477. match = self.get_match(path)
  478. string = str(path)
  479. if isinstance(match, AsnNode):
  480. string += f' ({match.name})'
  481. party_char = 'F' if match.first_party else 'M'
  482. dup_char = 'D' if match.dupplicate else '_'
  483. string += f' {match.level}{party_char}{dup_char}{match.references}'
  484. if match.source:
  485. string += f' ← {self.explain(match.source)}'
  486. return string
  487. def list_records(self,
  488. first_party_only: bool = False,
  489. end_chain_only: bool = False,
  490. no_dupplicates: bool = False,
  491. rules_only: bool = False,
  492. hostnames_only: bool = False,
  493. explain: bool = False,
  494. ) -> typing.Iterable[str]:
  495. def export_cb(path: Path, match: Match
  496. ) -> typing.Iterable[str]:
  497. if first_party_only and not match.first_party:
  498. return
  499. if end_chain_only and match.references > 0:
  500. return
  501. if no_dupplicates and match.dupplicate:
  502. return
  503. if rules_only and match.level > 1:
  504. return
  505. if hostnames_only and not isinstance(path, HostnamePath):
  506. return
  507. if explain:
  508. yield self.explain(path)
  509. else:
  510. yield str(path)
  511. yield from self.exec_each(export_cb)
  512. def count_records(self,
  513. first_party_only: bool = False,
  514. end_chain_only: bool = False,
  515. no_dupplicates: bool = False,
  516. rules_only: bool = False,
  517. hostnames_only: bool = False,
  518. ) -> str:
  519. memo: typing.Dict[str, int] = dict()
  520. def count_records_cb(path: Path, match: Match) -> None:
  521. if first_party_only and not match.first_party:
  522. return
  523. if end_chain_only and match.references > 0:
  524. return
  525. if no_dupplicates and match.dupplicate:
  526. return
  527. if rules_only and match.level > 1:
  528. return
  529. if hostnames_only and not isinstance(path, HostnamePath):
  530. return
  531. try:
  532. memo[path.__class__.__name__] += 1
  533. except KeyError:
  534. memo[path.__class__.__name__] = 1
  535. for _ in self.exec_each(count_records_cb):
  536. pass
  537. split: typing.List[str] = list()
  538. for key, value in sorted(memo.items(), key=lambda s: s[0]):
  539. split.append(f'{key[:-4].lower()}s: {value}')
  540. return ', '.join(split)
  541. def get_domain(self, domain_str: str) -> typing.Iterable[DomainPath]:
  542. self.enter_step('get_domain_pack')
  543. domain = self.pack_domain(domain_str)
  544. self.enter_step('get_domain_brws')
  545. dic = self.domtree
  546. depth = 0
  547. for part in domain.parts:
  548. if dic.match_zone.active():
  549. self.enter_step('get_domain_yield')
  550. yield ZonePath(domain.parts[:depth])
  551. self.enter_step('get_domain_brws')
  552. if part not in dic.children:
  553. return
  554. dic = dic.children[part]
  555. depth += 1
  556. if dic.match_zone.active():
  557. self.enter_step('get_domain_yield')
  558. yield ZonePath(domain.parts)
  559. if dic.match_hostname.active():
  560. self.enter_step('get_domain_yield')
  561. yield HostnamePath(domain.parts)
  562. def get_ip4(self, ip4_str: str) -> typing.Iterable[Path]:
  563. self.enter_step('get_ip4_pack')
  564. ip4val = self.pack_ip4address_low(ip4_str)
  565. self.enter_step('get_ip4_cache')
  566. if not self.ip4cache[ip4val >> self.ip4cache_shift]:
  567. return
  568. self.enter_step('get_ip4_brws')
  569. dic = self.ip4tree
  570. for i in range(31, -1, -1):
  571. bit = (ip4val >> i) & 0b1
  572. if dic.active():
  573. self.enter_step('get_ip4_yield')
  574. yield Ip4Path(ip4val >> (i+1) << (i+1), 31-i)
  575. self.enter_step('get_ip4_brws')
  576. next_dic = dic.one if bit else dic.zero
  577. if next_dic is None:
  578. return
  579. dic = next_dic
  580. if dic.active():
  581. self.enter_step('get_ip4_yield')
  582. yield Ip4Path(ip4val, 32)
  583. def _unset_match(self,
  584. match: Match,
  585. ) -> None:
  586. match.disable()
  587. if match.source:
  588. source_match = self.get_match(match.source)
  589. source_match.references -= 1
  590. def _set_match(self,
  591. match: Match,
  592. updated: int,
  593. source: Path,
  594. source_match: Match = None,
  595. dupplicate: bool = False,
  596. ) -> None:
  597. # source_match is in parameters because most of the time
  598. # its parent function needs it too,
  599. # so it can pass it to save a traversal
  600. source_match = source_match or self.get_match(source)
  601. new_level = source_match.level + 1
  602. if updated > match.updated or new_level < match.level \
  603. or source_match.first_party > match.first_party:
  604. # NOTE FP and level of matches referencing this one
  605. # won't be updated until run or prune
  606. if match.source:
  607. old_source = self.get_match(match.source)
  608. old_source.references -= 1
  609. match.updated = updated
  610. match.level = new_level
  611. match.first_party = source_match.first_party
  612. match.source = source
  613. source_match.references += 1
  614. match.dupplicate = dupplicate
  615. def _set_domain(self,
  616. hostname: bool,
  617. domain_str: str,
  618. updated: int,
  619. source: Path) -> None:
  620. self.enter_step('set_domain_val')
  621. if not Database.validate_domain(domain_str):
  622. raise ValueError(f"Invalid domain: {domain_str}")
  623. self.enter_step('set_domain_pack')
  624. domain = self.pack_domain(domain_str)
  625. self.enter_step('set_domain_fp')
  626. source_match = self.get_match(source)
  627. is_first_party = source_match.first_party
  628. self.enter_step('set_domain_brws')
  629. dic = self.domtree
  630. dupplicate = False
  631. for part in domain.parts:
  632. if part not in dic.children:
  633. dic.children[part] = DomainTreeNode()
  634. dic = dic.children[part]
  635. if dic.match_zone.active(is_first_party):
  636. dupplicate = True
  637. if hostname:
  638. match = dic.match_hostname
  639. else:
  640. match = dic.match_zone
  641. self._set_match(
  642. match,
  643. updated,
  644. source,
  645. source_match=source_match,
  646. dupplicate=dupplicate,
  647. )
  648. def set_hostname(self,
  649. *args: typing.Any, **kwargs: typing.Any
  650. ) -> None:
  651. self._set_domain(True, *args, **kwargs)
  652. def set_zone(self,
  653. *args: typing.Any, **kwargs: typing.Any
  654. ) -> None:
  655. self._set_domain(False, *args, **kwargs)
  656. def set_asn(self,
  657. asn_str: str,
  658. updated: int,
  659. source: Path) -> None:
  660. self.enter_step('set_asn')
  661. path = self.pack_asn(asn_str)
  662. if path.asn in self.asns:
  663. match = self.asns[path.asn]
  664. else:
  665. match = AsnNode()
  666. self.asns[path.asn] = match
  667. self._set_match(
  668. match,
  669. updated,
  670. source,
  671. )
  672. def _set_ip4(self,
  673. ip4: Ip4Path,
  674. updated: int,
  675. source: Path) -> None:
  676. self.enter_step('set_ip4_fp')
  677. source_match = self.get_match(source)
  678. is_first_party = source_match.first_party
  679. self.enter_step('set_ip4_brws')
  680. dic = self.ip4tree
  681. dupplicate = False
  682. for i in range(31, 31-ip4.prefixlen, -1):
  683. bit = (ip4.value >> i) & 0b1
  684. next_dic = dic.one if bit else dic.zero
  685. if next_dic is None:
  686. next_dic = IpTreeNode()
  687. if bit:
  688. dic.one = next_dic
  689. else:
  690. dic.zero = next_dic
  691. dic = next_dic
  692. if dic.active(is_first_party):
  693. dupplicate = True
  694. self._set_match(
  695. dic,
  696. updated,
  697. source,
  698. source_match=source_match,
  699. dupplicate=dupplicate,
  700. )
  701. self._set_ip4cache(ip4, dic)
  702. def set_ip4address(self,
  703. ip4address_str: str,
  704. *args: typing.Any, **kwargs: typing.Any
  705. ) -> None:
  706. self.enter_step('set_ip4add_val')
  707. if not Database.validate_ip4address(ip4address_str):
  708. raise ValueError(f"Invalid ip4address: {ip4address_str}")
  709. self.enter_step('set_ip4add_pack')
  710. ip4 = self.pack_ip4address(ip4address_str)
  711. self._set_ip4(ip4, *args, **kwargs)
  712. def set_ip4network(self,
  713. ip4network_str: str,
  714. *args: typing.Any, **kwargs: typing.Any
  715. ) -> None:
  716. self.enter_step('set_ip4net_val')
  717. if not Database.validate_ip4network(ip4network_str):
  718. raise ValueError(f"Invalid ip4network: {ip4network_str}")
  719. self.enter_step('set_ip4net_pack')
  720. ip4 = self.pack_ip4network(ip4network_str)
  721. self._set_ip4(ip4, *args, **kwargs)