eulaurarien/feed_asn.py
2021-08-14 23:27:28 +02:00

69 lines
1.9 KiB
Python
Executable file

#!/usr/bin/env python3
import database
import argparse
import requests
import typing
import ipaddress
import logging
import time
IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
def get_ranges(asn: str) -> typing.Iterable[str]:
req = requests.get(
"https://stat.ripe.net/data/as-routing-consistency/data.json",
params={"resource": asn},
)
data = req.json()
for pref in data["data"]["prefixes"]:
yield pref["prefix"]
def get_name(asn: str) -> str:
req = requests.get(
"https://stat.ripe.net/data/as-overview/data.json", params={"resource": asn}
)
data = req.json()
return data["data"]["holder"]
if __name__ == "__main__":
log = logging.getLogger("feed_asn")
# Parsing arguments
parser = argparse.ArgumentParser(
description="Add the IP ranges associated to the AS in the database"
)
args = parser.parse_args()
DB = database.Database()
def add_ranges(
path: database.Path,
match: database.Match,
) -> None:
assert isinstance(path, database.AsnPath)
assert isinstance(match, database.AsnNode)
asn_str = database.Database.unpack_asn(path)
DB.enter_step("asn_get_name")
name = get_name(asn_str)
match.name = name
DB.enter_step("asn_get_ranges")
for prefix in get_ranges(asn_str):
parsed_prefix: IPNetwork = ipaddress.ip_network(prefix)
if parsed_prefix.version == 4:
DB.set_ip4network(prefix, source=path, updated=int(time.time()))
log.info("Added %s from %s (%s)", prefix, path, name)
elif parsed_prefix.version == 6:
log.warning("Unimplemented prefix version: %s", prefix)
else:
log.error("Unknown prefix version: %s", prefix)
for _ in DB.exec_each_asn(add_ranges):
pass
DB.save()