diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py index 33807a8..e28e9aa 100644 --- a/ingestors/ingest_rir_delegations.py +++ b/ingestors/ingest_rir_delegations.py @@ -38,14 +38,18 @@ def construct_map() -> dict: 'properties': { 'registry' : { 'type': 'keyword' }, 'cc' : { 'type': 'keyword' }, # ISO 3166 2-letter code - 'type' : { 'type': 'keyword' }, - 'start' : { - 'properties': { - 'asn' : {'type': 'integer' }, - 'ip' : {'type': 'ip' } + 'asn' : { + 'properties': { + 'start' : { 'type': 'integer' }, + 'end' : { 'type': 'integer' } + } + }, + 'ip' : { + 'properties': { + 'start' : { 'type': 'ip' }, + 'end' : { 'type': 'ip' } } }, - 'value' : { 'type': 'integer' }, 'date' : { 'type': 'date' }, 'status' : { 'type': 'keyword' }, 'extensions' : keyword_mapping @@ -70,7 +74,7 @@ async def process_data(): continue csv_data = await response.text() - rows = [line for line in csv_data.split('\n') if line and not line.startswith('#')] + rows = [line.lower() for line in csv_data.split('\n') if line and not line.startswith('#')] csv_reader = csv.reader(rows, delimiter='|') del rows, csv_data # Cleanup @@ -103,8 +107,6 @@ async def process_data(): # Record lines (this is what we want) else: - record_type = 'asn' if row[3].isdigit() else 'ip' # Store with the correct mapping type - record = { 'registry' : row[0], 'cc' : row[1], @@ -114,7 +116,7 @@ async def process_data(): 'date' : row[5], 'status' : row[6] } - + if len(row) == 7: if row[7]: record['extensions'] = row[7] @@ -122,29 +124,36 @@ async def process_data(): if not record['cc']: del record['cc'] elif len(record['cc']) != 2: - raise ValueError(f'Invalid country code: {record["cc"]} ({cache})') + raise ValueError(f'Invalid country code: {cache}') + + if not record['value'].isdigit(): + raise ValueError(f'Invalid value: {cache}') if record['type'] == 'asn': - record['start'] = { record_type : int(record['start']) } + end = int(record['start']) + int(record['value']) - 1 + record['asn'] = { 'start': int(record['start']), 'end': end } elif record['type'] in ('ipv4', 'ipv6'): try: - ipaddress.ip_address(record['start']) - record['start'] = { record_type : record['start'] } + if record['type'] == 'ipv4': + end = ipaddress.ip_address(record['start']) + int(record['value']) - 1 + elif record['type'] == 'ipv6': + end = ipaddress.ip_network(f'{record["start"]}/{record["value"]}').broadcast_address + end = end.compressed.lower() + record['ip'] = { 'start': record['start'], 'end': str(end) } except ValueError: - raise ValueError(f'Invalid start IP: {record["start"]} ({cache})') + raise ValueError(f'Invalid IP range: {cache}') else: - raise ValueError(f'Invalid record type: {record["type"]}') - - if not record['value'].isdigit(): - raise ValueError(f'Invalid value: {record["value"]} ({cache})') - + raise ValueError(f'Invalid record type: {cache}') + + del record['start'], record['value'], record['type'] # Cleanup variables no longer needed + if not record['date'] or record['date'] == '00000000': del record['date'] else: record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')), - + if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'): - raise ValueError(f'Invalid status: {record["status"]} ({cache})') + raise ValueError(f'Invalid status: {cache}') #json_output['records'].append(record) @@ -180,7 +189,7 @@ Output: Input: { 'registry' : 'arin', - 'cc' : 'US', + 'cc' : 'us', 'type' : 'ipv4', 'start' : { 'ip': '76.15.132.0' }, 'value' : 1024, @@ -188,4 +197,7 @@ Input: 'status' : 'allocated', 'extensions' : '6c065d5b54b877781f05e7d30ebfff28' } + + Notes: + Do we make this handle the database locally or load it into ram? '''