#!/usr/bin/env python3 import database import argparse import sys import time import typing FUNCTION_MAP = { "zone": database.Database.set_zone, "hostname": database.Database.set_hostname, "asn": database.Database.set_asn, "ip4network": database.Database.set_ip4network, "ip4address": database.Database.set_ip4address, } if __name__ == "__main__": # Parsing arguments parser = argparse.ArgumentParser(description="Import base rules to the database") parser.add_argument( "type", choices=FUNCTION_MAP.keys(), help="Type of rule inputed" ) parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="File with one rule per line", ) parser.add_argument( "-f", "--first-party", action="store_true", help="The input only comes from verified first-party sources", ) args = parser.parse_args() DB = database.Database() fun = FUNCTION_MAP[args.type] source: database.RulePath if args.first_party: source = database.RuleFirstPath() else: source = database.RuleMultiPath() for rule in args.input: rule = rule.strip() try: fun( DB, rule, source=source, updated=int(time.time()), ) except ValueError: DB.log.error(f"Could not add rule: {rule}") DB.save()