#!/usr/bin/env python3 import database import argparse import requests import typing import ipaddress import logging import time IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network] def get_ranges(asn: str) -> typing.Iterable[str]: req = requests.get( "https://stat.ripe.net/data/as-routing-consistency/data.json", params={"resource": asn}, ) data = req.json() for pref in data["data"]["prefixes"]: yield pref["prefix"] def get_name(asn: str) -> str: req = requests.get( "https://stat.ripe.net/data/as-overview/data.json", params={"resource": asn} ) data = req.json() return data["data"]["holder"] if __name__ == "__main__": log = logging.getLogger("feed_asn") # Parsing arguments parser = argparse.ArgumentParser( description="Add the IP ranges associated to the AS in the database" ) args = parser.parse_args() DB = database.Database() def add_ranges( path: database.Path, match: database.Match, ) -> None: assert isinstance(path, database.AsnPath) assert isinstance(match, database.AsnNode) asn_str = database.Database.unpack_asn(path) DB.enter_step("asn_get_name") name = get_name(asn_str) match.name = name DB.enter_step("asn_get_ranges") for prefix in get_ranges(asn_str): parsed_prefix: IPNetwork = ipaddress.ip_network(prefix) if parsed_prefix.version == 4: DB.set_ip4network(prefix, source=path, updated=int(time.time())) log.info("Added %s from %s (%s)", prefix, path, name) elif parsed_prefix.version == 6: log.warning("Unimplemented prefix version: %s", prefix) else: log.error("Unknown prefix version: %s", prefix) for _ in DB.exec_each_asn(add_ranges): pass DB.save()