eulaurarien/feed_asn.py

71 lines
1.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import database
import argparse
import requests
import typing
import ipaddress
import logging
import time
IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
def get_ranges(asn: str) -> typing.Iterable[str]:
req = requests.get(
'https://stat.ripe.net/data/as-routing-consistency/data.json',
params={'resource': asn}
)
data = req.json()
for pref in data['data']['prefixes']:
yield pref['prefix']
2019-12-17 12:29:02 +00:00
def get_name(asn: str) -> str:
req = requests.get(
'https://stat.ripe.net/data/as-overview/data.json',
params={'resource': asn}
)
data = req.json()
return data['data']['holder']
if __name__ == '__main__':
log = logging.getLogger('feed_asn')
# Parsing arguments
parser = argparse.ArgumentParser(
description="TODO")
args = parser.parse_args()
DB = database.Database()
2019-12-16 13:18:03 +00:00
def add_ranges(path: database.Path,
match: database.Match,
2019-12-17 12:29:02 +00:00
) -> None:
2019-12-16 13:18:03 +00:00
assert isinstance(path, database.AsnPath)
2019-12-17 12:29:02 +00:00
assert isinstance(match, database.AsnNode)
asn_str = database.Database.unpack_asn(path)
2019-12-17 12:29:02 +00:00
DB.enter_step('asn_get_name')
match.name = get_name(asn_str)
DB.enter_step('asn_get_ranges')
for prefix in get_ranges(asn_str):
parsed_prefix: IPNetwork = ipaddress.ip_network(prefix)
if parsed_prefix.version == 4:
DB.set_ip4network(
prefix,
source=path,
updated=int(time.time())
)
log.info('Added %s from %s (%s)', prefix, asn_str, path)
elif parsed_prefix.version == 6:
log.warning('Unimplemented prefix version: %s', prefix)
else:
log.error('Unknown prefix version: %s', prefix)
2019-12-17 12:29:02 +00:00
for _ in DB.exec_each_asn(add_ranges):
2019-12-16 13:18:03 +00:00
pass
DB.save()