53 lines
1.4 KiB
Python
53 lines
1.4 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import database
|
||
|
import argparse
|
||
|
import requests
|
||
|
import typing
|
||
|
import ipaddress
|
||
|
import logging
|
||
|
import time
|
||
|
|
||
|
IPNetwork = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
|
||
|
|
||
|
|
||
|
def get_ranges(asn: str) -> typing.Iterable[str]:
|
||
|
req = requests.get(
|
||
|
'https://stat.ripe.net/data/as-routing-consistency/data.json',
|
||
|
params={'resource': asn}
|
||
|
)
|
||
|
data = req.json()
|
||
|
for pref in data['data']['prefixes']:
|
||
|
yield pref['prefix']
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
log = logging.getLogger('feed_asn')
|
||
|
|
||
|
# Parsing arguments
|
||
|
parser = argparse.ArgumentParser(
|
||
|
description="TODO")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
DB = database.Database()
|
||
|
DBW = database.Database(write=True)
|
||
|
|
||
|
for asn, entry in DB.list_asn():
|
||
|
DB.enter_step('asn_get_ranges')
|
||
|
for prefix in get_ranges(asn):
|
||
|
parsed_prefix: IPNetwork = ipaddress.ip_network(prefix)
|
||
|
if parsed_prefix.version == 4:
|
||
|
DBW.set_ip4network(
|
||
|
prefix,
|
||
|
source=entry,
|
||
|
updated=int(time.time())
|
||
|
)
|
||
|
log.info('Added %s from %s (id=%s)', prefix, asn, entry)
|
||
|
elif parsed_prefix.version == 6:
|
||
|
log.warning('Unimplemented prefix version: %s', prefix)
|
||
|
else:
|
||
|
log.error('Unknown prefix version: %s', prefix)
|
||
|
|
||
|
DB.close()
|