Generates a host list of first-party trackers for ad-blocking.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

266 lines
8.2KB

  1. #!/usr/bin/env python3
  2. import argparse
  3. import database
  4. import logging
  5. import sys
  6. import typing
  7. import multiprocessing
  8. import time
  9. Record = typing.Tuple[typing.Callable, typing.Callable, int, str, str]
  10. # select, write
  11. FUNCTION_MAP: typing.Any = {
  12. 'a': (
  13. database.Database.get_ip4,
  14. database.Database.set_hostname,
  15. ),
  16. 'cname': (
  17. database.Database.get_domain,
  18. database.Database.set_hostname,
  19. ),
  20. 'ptr': (
  21. database.Database.get_domain,
  22. database.Database.set_ip4address,
  23. ),
  24. }
  25. class Writer(multiprocessing.Process):
  26. def __init__(self,
  27. recs_queue: multiprocessing.Queue = None,
  28. autosave_interval: int = 0,
  29. ip4_cache: int = 0,
  30. ):
  31. if recs_queue: # MP
  32. super(Writer, self).__init__()
  33. self.recs_queue = recs_queue
  34. self.log = logging.getLogger(f'wr')
  35. self.autosave_interval = autosave_interval
  36. self.ip4_cache = ip4_cache
  37. if not recs_queue: # No MP
  38. self.open_db()
  39. def open_db(self) -> None:
  40. self.db = database.Database()
  41. self.db.log = logging.getLogger(f'wr')
  42. self.db.fill_ip4cache(max_size=self.ip4_cache)
  43. def exec_record(self, record: Record) -> None:
  44. self.db.enter_step('exec_record')
  45. select, write, updated, name, value = record
  46. try:
  47. for source in select(self.db, value):
  48. write(self.db, name, updated, source=source)
  49. except (ValueError, IndexError):
  50. # ValueError: non-number in IP
  51. # IndexError: IP too big
  52. self.log.exception("Cannot execute: %s", record)
  53. def end(self) -> None:
  54. self.db.enter_step('end')
  55. self.db.save()
  56. def run(self) -> None:
  57. self.open_db()
  58. if self.autosave_interval > 0:
  59. next_save = time.time() + self.autosave_interval
  60. else:
  61. next_save = 0
  62. self.db.enter_step('block_wait')
  63. block: typing.List[Record]
  64. for block in iter(self.recs_queue.get, None):
  65. record: Record
  66. for record in block:
  67. self.exec_record(record)
  68. if next_save > 0 and time.time() > next_save:
  69. self.log.info("Saving database...")
  70. self.db.save()
  71. self.log.info("Done!")
  72. next_save = time.time() + self.autosave_interval
  73. self.db.enter_step('block_wait')
  74. self.end()
  75. class Parser():
  76. def __init__(self,
  77. buf: typing.Any,
  78. recs_queue: multiprocessing.Queue = None,
  79. block_size: int = 0,
  80. writer: Writer = None,
  81. ):
  82. assert bool(writer) ^ bool(block_size and recs_queue)
  83. self.buf = buf
  84. self.log = logging.getLogger('pr')
  85. self.recs_queue = recs_queue
  86. if writer: # No MP
  87. self.prof: database.Profiler = writer.db
  88. self.register = writer.exec_record
  89. else: # MP
  90. self.block: typing.List[Record] = list()
  91. self.block_size = block_size
  92. self.prof = database.Profiler()
  93. self.prof.log = logging.getLogger('pr')
  94. self.register = self.add_to_queue
  95. def add_to_queue(self, record: Record) -> None:
  96. self.prof.enter_step('register')
  97. self.block.append(record)
  98. if len(self.block) >= self.block_size:
  99. self.prof.enter_step('put_block')
  100. assert self.recs_queue
  101. self.recs_queue.put(self.block)
  102. self.block = list()
  103. def run(self) -> None:
  104. self.consume()
  105. if self.recs_queue:
  106. self.recs_queue.put(self.block)
  107. self.prof.profile()
  108. def consume(self) -> None:
  109. raise NotImplementedError
  110. class Rapid7Parser(Parser):
  111. def consume(self) -> None:
  112. data = dict()
  113. for line in self.buf:
  114. self.prof.enter_step('parse_rapid7')
  115. split = line.split('"')
  116. try:
  117. for k in range(1, 14, 4):
  118. key = split[k]
  119. val = split[k+2]
  120. data[key] = val
  121. select, writer = FUNCTION_MAP[data['type']]
  122. record = (
  123. select,
  124. writer,
  125. int(data['timestamp']),
  126. data['name'],
  127. data['value']
  128. )
  129. except (IndexError, KeyError):
  130. # IndexError: missing field
  131. # KeyError: Unknown type field
  132. self.log.exception("Cannot parse: %s", line)
  133. self.register(record)
  134. class MassDnsParser(Parser):
  135. # massdns --output Snrql
  136. # --retry REFUSED,SERVFAIL --resolvers nameservers-ipv4
  137. TYPES = {
  138. 'A': (FUNCTION_MAP['a'][0], FUNCTION_MAP['a'][1], -1, None),
  139. # 'AAAA': (FUNCTION_MAP['aaaa'][0], FUNCTION_MAP['aaaa'][1], -1, None),
  140. 'CNAME': (FUNCTION_MAP['cname'][0], FUNCTION_MAP['cname'][1], -1, -1),
  141. }
  142. def consume(self) -> None:
  143. self.prof.enter_step('parse_massdns')
  144. timestamp = 0
  145. header = True
  146. for line in self.buf:
  147. line = line[:-1]
  148. if not line:
  149. header = True
  150. continue
  151. split = line.split(' ')
  152. try:
  153. if header:
  154. timestamp = int(split[1])
  155. header = False
  156. else:
  157. select, write, name_offset, value_offset = \
  158. MassDnsParser.TYPES[split[1]]
  159. record = (
  160. select,
  161. write,
  162. timestamp,
  163. split[0][:name_offset].lower(),
  164. split[2][:value_offset].lower(),
  165. )
  166. self.register(record)
  167. self.prof.enter_step('parse_massdns')
  168. except KeyError:
  169. continue
  170. PARSERS = {
  171. 'rapid7': Rapid7Parser,
  172. 'massdns': MassDnsParser,
  173. }
  174. if __name__ == '__main__':
  175. # Parsing arguments
  176. log = logging.getLogger('feed_dns')
  177. args_parser = argparse.ArgumentParser(
  178. description="Read DNS records and import "
  179. "tracking-relevant data into the database")
  180. args_parser.add_argument(
  181. 'parser',
  182. choices=PARSERS.keys(),
  183. help="Input format")
  184. args_parser.add_argument(
  185. '-i', '--input', type=argparse.FileType('r'), default=sys.stdin,
  186. help="Input file")
  187. args_parser.add_argument(
  188. '-b', '--block-size', type=int, default=1024,
  189. help="Performance tuning value")
  190. args_parser.add_argument(
  191. '-q', '--queue-size', type=int, default=128,
  192. help="Performance tuning value")
  193. args_parser.add_argument(
  194. '-a', '--autosave-interval', type=int, default=900,
  195. help="Interval to which the database will save in seconds. "
  196. "0 to disable.")
  197. args_parser.add_argument(
  198. '-s', '--single-process', action='store_true',
  199. help="Only use one process. "
  200. "Might be useful for single core computers.")
  201. args_parser.add_argument(
  202. '-4', '--ip4-cache', type=int, default=0,
  203. help="RAM cache for faster IPv4 lookup. "
  204. "Maximum useful value: 512 MiB (536870912). "
  205. "Warning: Depending on the rules, this might already "
  206. "be a memory-heavy process, even without the cache.")
  207. args = args_parser.parse_args()
  208. parser_cls = PARSERS[args.parser]
  209. if args.single_process:
  210. writer = Writer(
  211. autosave_interval=args.autosave_interval,
  212. ip4_cache=args.ip4_cache
  213. )
  214. parser = parser_cls(args.input, writer=writer)
  215. parser.run()
  216. writer.end()
  217. else:
  218. recs_queue: multiprocessing.Queue = multiprocessing.Queue(
  219. maxsize=args.queue_size)
  220. writer = Writer(recs_queue,
  221. autosave_interval=args.autosave_interval,
  222. ip4_cache=args.ip4_cache
  223. )
  224. writer.start()
  225. parser = parser_cls(args.input,
  226. recs_queue=recs_queue,
  227. block_size=args.block_size
  228. )
  229. parser.run()
  230. recs_queue.put(None)
  231. writer.join()