add ingest_rir_transfers.py

This commit is contained in:
Dionysus 2024-03-13 20:54:36 -04:00
parent 4dc31a5090
commit 7f93a4d8de
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

View File

@ -4,7 +4,6 @@
import json import json
import ipaddress import ipaddress
import logging
import time import time
try: try:
@ -77,19 +76,17 @@ async def process_data():
async with aiohttp.ClientSession(headers=headers) as session: async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url) as response: async with session.get(url) as response:
if response.status != 200: if response.status != 200:
logging.error(f'Failed to fetch {registry} delegation data: {response.status}') raise Exception(f'Failed to fetch {registry} delegation data: {response.status}')
raise Exception('errror') #continue
data = await response.text() data = await response.text()
try: try:
json_data = json.loads(data) json_data = json.loads(data)
except aiohttp.ContentTypeError as e: except aiohttp.ContentTypeError as e:
logging.error(f'Failed to parse {registry} delegation data: {e}') raise Exception(f'Failed to parse {registry} delegation data: {e}')
raise Exception('errror') #continue
if 'transfers' not in json_data: if 'transfers' not in json_data:
logging.error(f'Invalid {registry} delegation data: {json_data}') raise Exception(f'Invalid {registry} delegation data: {json_data}')
for record in json_data['transfers']: for record in json_data['transfers']:
@ -102,8 +99,7 @@ async def process_data():
asn = set_block[option] asn = set_block[option]
if type(asn) != int: if type(asn) != int:
if not asn.isdigit(): if not asn.isdigit():
logging.error(f'Invalid {set_type} {option} ASN in {registry} data: {asn}') raise Exception(f'Invalid {set_type} {option} ASN in {registry} data: {asn}')
raise Exception('errror') #continue
else: else:
record['asns'][set_type][count][option] = int(asn) record['asns'][set_type][count][option] = int(asn)
count += 1 count += 1
@ -126,18 +122,16 @@ async def process_data():
ipaddress.ip_address(normalized_ip) ipaddress.ip_address(normalized_ip)
record[ip_type][set_type][count][option] = normalized_ip record[ip_type][set_type][count][option] = normalized_ip
except ValueError as e: except ValueError as e:
logging.error(f'Invalid {set_type} {option} IP in {registry} data: {e}') raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}')
raise Exception('errror') #continue
count += 1 count += 1
if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'): if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'):
logging.error(f'Invalid transfer type in {registry} data: {record["type"]}') raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}')
raise Exception('errror') #continue
yield {'_index': default_index, '_source': record} yield {'_index': default_index, '_source': record}
except Exception as e: except Exception as e:
logging.error(f'Error processing {registry} delegation data: {e}') raise Exception(f'Error processing {registry} delegation data: {e}')
async def test(): async def test():