From b1fa34f3aae83810b61e219f94e1753c19611d36 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Tue, 12 Mar 2024 18:19:47 -0400 Subject: [PATCH] Added anomaly detection to RIR delegations ingestor --- ingestors/ingest_certstream.py | 49 +++++++++++++---------------- ingestors/ingest_rir_delegations.py | 40 ++++++++++++++++++----- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py index 8ff8ebe..6e6394c 100644 --- a/ingestors/ingest_certstream.py +++ b/ingestors/ingest_certstream.py @@ -43,41 +43,34 @@ async def process_data(place_holder: str = None): :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. ''' - while True: + async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60): try: - async with websockets.connect('wss://certstream.calidog.io') as websocket: - while True: - # Read a line from the websocket - line = await websocket.recv() + async for line in websocket: + + # Parse the JSON record + try: + record = json.loads(line) + except json.decoder.JSONDecodeError: + logging.error(f'Invalid line from the websocket: {line}') + continue - # Parse the JSON record - try: - record = json.loads(line) - except json.decoder.JSONDecodeError: - logging.error(f'Invalid line from the websocket: {line}') - continue + # Grab the unique domains from the record (excluding wildcards) + domains = record['data']['leaf_cert']['all_domains'] + domains = set([domain[2:] if domain.startswith('*.') else domain for domain in domains]) - # Grab the unique domains from the record (excluding wildcards) - domains = record['data']['leaf_cert']['all_domains'] - domains = set([domain[2:] if domain.startswith('*.') else domain for domain in domains]) + # Construct the document + for domain in domains: + struct = { + 'domain' : domain, + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + } - # Construct the document - for domain in domains: - struct = { - 'domain' : domain, - 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) - } + yield {'_index': default_index, '_source': struct} - yield {'_index': default_index, '_source': struct} - - except websockets.ConnectionClosed: - logging.error('Connection to Certstream was closed. Attempting to reconnect...') + except websockets.ConnectionClosed as e : + logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})') await asyncio.sleep(3) - except Exception as e: - logging.error(f'An error occurred while processing Certstream records! ({e})') - break - async def test(): '''Test the ingestion process.''' diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py index be108e5..c9f6bfb 100644 --- a/ingestors/ingest_rir_delegations.py +++ b/ingestors/ingest_rir_delegations.py @@ -3,6 +3,7 @@ # ingest_rir_delegations.py import csv +import ipaddress import logging import time @@ -21,7 +22,7 @@ delegation_db = { 'apnic' : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest', 'arin' : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest', 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest', - 'ripe' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest' + 'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest' } @@ -36,7 +37,7 @@ def construct_map() -> dict: 'mappings': { 'properties': { 'registry' : { 'type': 'keyword' }, - 'cc' : { 'type': 'keyword' }, + 'cc' : { 'type': 'keyword' }, # ISO 3166 2-letter code 'type' : { 'type': 'keyword' }, 'start' : { 'properties': { @@ -106,21 +107,44 @@ async def process_data(): record = { 'registry' : row[0], + 'cc' : row[1], 'type' : row[2], - 'start' : { record_type: row[3] }, + 'start' : row[3], 'value' : row[4], + 'date' : row[5], 'status' : row[6] } + if len(row) == 7: if row[7]: record['extensions'] = row[7] - if (cc := row[1]): - record['cc'] = cc.lower() + if not record['cc']: + del record['cc'] + elif len(record['cc']) != 2: + raise ValueError(f'Invalid country code: {record["cc"]} ({cache})') - if (date_field := row[5]): - if date_field != '00000000': - record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(date_field, '%Y%m%d')), + if record['type'] == 'asn': + record['start'] = { record_type : int(record['start']) } + elif record['type'] in ('ipv4', 'ipv6'): + try: + ipaddress.ip_address(record['start']) + record['start'] = { record_type : record['start'] } + except ValueError: + raise ValueError(f'Invalid start IP: {record["start"]} ({cache})') + else: + raise ValueError(f'Invalid record type: {record["type"]}') + + if not record['value'].isdigit(): + raise ValueError(f'Invalid value: {record["value"]} ({cache})') + + if not record['date'] or record['date'] == '00000000': + del record['date'] + else: + record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')), + + if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'): + raise ValueError(f'Invalid status: {record["status"]} ({cache})') #json_output['records'].append(record)