From 00711fe8564ab453d6af953793e3cd6e51273965 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Tue, 12 Mar 2024 17:04:14 -0400 Subject: [PATCH] Created an ingestor for RIR delegations --- ingestors/ingest_rir_delegations.py | 167 ++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 ingestors/ingest_rir_delegations.py diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py new file mode 100644 index 0000000..be108e5 --- /dev/null +++ b/ingestors/ingest_rir_delegations.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) +# ingest_rir_delegations.py + +import csv +import logging +import time + +try: + import aiohttp +except ImportError: + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + + +# Set a default elasticsearch index if one is not provided +default_index = 'rir-delegations-' + time.strftime('%Y-%m-%d') + +# Delegation data sources +delegation_db = { + 'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/delegated-afrinic-extended-latest', + 'apnic' : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest', + 'arin' : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest', + 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest', + 'ripe' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest' +} + + +def construct_map() -> dict: + '''Construct the Elasticsearch index mapping for records''' + + # Match on exact value or full text search + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + # Construct the index mapping + mapping = { + 'mappings': { + 'properties': { + 'registry' : { 'type': 'keyword' }, + 'cc' : { 'type': 'keyword' }, + 'type' : { 'type': 'keyword' }, + 'start' : { + 'properties': { + 'asn' : {'type': 'integer' }, + 'ip' : {'type': 'ip' } + } + }, + 'value' : { 'type': 'integer' }, + 'date' : { 'type': 'date' }, + 'status' : { 'type': 'keyword' }, + 'extensions' : keyword_mapping + } + } + } + + return mapping + + +async def process_data(): + '''Read and process the delegation data.''' + + for registry, url in delegation_db.items(): + try: + headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC + + async with aiohttp.ClientSession(headers=headers) as session: + async with session.get(url) as response: + if response.status != 200: + logging.error(f'Failed to fetch {registry} delegation data: {response.status}') + continue + + csv_data = await response.text() + rows = [line for line in csv_data.split('\n') if line and not line.startswith('#')] + csv_reader = csv.reader(rows, delimiter='|') + + del rows, csv_data # Cleanup + + # Process the CSV data + for row in csv_reader: + cache = '|'.join(row) # Cache the last row for error handling + + # Heuristic for the header line (not doing anything with it for now) + if len(row) == 7 and row[1] != '*': + header = { + 'version' : row[0], + 'registry' : row[1], + 'serial' : row[2], + 'records' : row[3], + 'startdate' : row[4], + 'enddate' : row[5], + 'UTCoffset' : row[6] + } + continue + + # Heuristic for the summary lines (not doing anything with it for now) + elif row[2] != '*' and row[3] == '*': + summary = { + 'registry' : row[0], + 'type' : row[2], + 'count' : row[4] + } + continue + + # Record lines (this is what we want) + else: + record_type = 'asn' if row[3].isdigit() else 'ip' # Store with the correct mapping type + + record = { + 'registry' : row[0], + 'type' : row[2], + 'start' : { record_type: row[3] }, + 'value' : row[4], + 'status' : row[6] + } + if len(row) == 7: + if row[7]: + record['extensions'] = row[7] + + if (cc := row[1]): + record['cc'] = cc.lower() + + if (date_field := row[5]): + if date_field != '00000000': + record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(date_field, '%Y%m%d')), + + #json_output['records'].append(record) + + # Let's just index the records themself (for now) + yield {'_index': default_index, '_source': record} + + except Exception as e: + logging.error(f'Error processing {registry} delegation data: {e}') + + +async def test(): + '''Test the ingestion process''' + + async for document in process_data(): + print(document) + + + +if __name__ == '__main__': + import asyncio + + asyncio.run(test()) + + + +''' +Output: + arin|US|ipv4|76.15.132.0|1024|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 + arin|US|ipv4|76.15.136.0|2048|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 + arin|US|ipv4|76.15.144.0|4096|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 + arin|US|ipv4|76.15.160.0|8192|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28 + +Input: + { + 'registry' : 'arin', + 'cc' : 'US', + 'type' : 'ipv4', + 'start' : { 'ip': '76.15.132.0' }, + 'value' : 1024, + 'date' : '2007-05-02T00:00:00Z', + 'status' : 'allocated', + 'extensions' : '6c065d5b54b877781f05e7d30ebfff28' + } +'''