#!/usr/bin/env python3 import database import argparse import requests import typing import ipaddress import logging import time IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network] def get_ranges(asn: str) -> typing.Iterable[str]: req = requests.get( 'https://stat.ripe.net/data/as-routing-consistency/data.json', params={'resource': asn} ) data = req.json() for pref in data['data']['prefixes']: yield pref['prefix'] if __name__ == '__main__': log = logging.getLogger('feed_asn') # Parsing arguments parser = argparse.ArgumentParser( description="TODO") args = parser.parse_args() DB = database.Database() DBW = database.Database(write=True) for asn, entry in DB.list_asn(): DB.enter_step('asn_get_ranges') for prefix in get_ranges(asn): parsed_prefix: IPNetwork = ipaddress.ip_network(prefix) if parsed_prefix.version == 4: DBW.set_ip4network( prefix, source=entry, updated=int(time.time()) ) log.info('Added %s from %s (id=%s)', prefix, asn, entry) elif parsed_prefix.version == 6: log.warning('Unimplemented prefix version: %s', prefix) else: log.error('Unknown prefix version: %s', prefix) DB.close() DBW.close()