69 lines
1.9 KiB
Executable File

#!/usr/bin/env python3
import database
import argparse
import requests
import typing
import ipaddress
import logging
import time
IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
def get_ranges(asn: str) -> typing.Iterable[str]:
req = requests.get(
params={"resource": asn},
data = req.json()
for pref in data["data"]["prefixes"]:
yield pref["prefix"]
def get_name(asn: str) -> str:
req = requests.get(
"", params={"resource": asn}
data = req.json()
return data["data"]["holder"]
if __name__ == "__main__":
log = logging.getLogger("feed_asn")
# Parsing arguments
parser = argparse.ArgumentParser(
description="Add the IP ranges associated to the AS in the database"
args = parser.parse_args()
DB = database.Database()
def add_ranges(
path: database.Path,
match: database.Match,
) -> None:
assert isinstance(path, database.AsnPath)
assert isinstance(match, database.AsnNode)
asn_str = database.Database.unpack_asn(path)
name = get_name(asn_str) = name
for prefix in get_ranges(asn_str):
parsed_prefix: IPNetwork = ipaddress.ip_network(prefix)
if parsed_prefix.version == 4:
DB.set_ip4network(prefix, source=path, updated=int(time.time()))"Added %s from %s (%s)", prefix, path, name)
elif parsed_prefix.version == 6:
log.warning("Unimplemented prefix version: %s", prefix)
log.error("Unknown prefix version: %s", prefix)
for _ in DB.exec_each_asn(add_ranges):