diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py index 4ed997c..b7d2a2f 100644 --- a/ingestors/ingest_rir_transfers.py +++ b/ingestors/ingest_rir_transfers.py @@ -5,6 +5,7 @@ import json import ipaddress import time +from datetime import datetime try: import aiohttp @@ -35,7 +36,9 @@ def construct_map() -> dict: mapping = { 'mappings': { 'properties': { - 'date' : { 'type': 'date' }, + 'transfer_date' : { 'type': 'date' }, + 'source_registration_date': { 'type': 'date' }, + 'recipient_registration_date': { 'type': 'date' }, 'ip4nets' : { 'properties': { 'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }, @@ -67,8 +70,32 @@ def construct_map() -> dict: return mapping -async def process_data(): - '''Read and process the transfers data.''' +def normalize_date(date_str: str) -> str: + ''' + Convert date string to ISO 8601 format + + :param date_str: Date string to convert + ''' + + try: + # Parse the date with various formats + for fmt in ('%Y-%m-%d %H:%M:%S.%f%z', '%Y-%m-%d %H:%M:%S%z', '%Y-%m-%d %H:%M:%S'): + try: + dt = datetime.strptime(date_str, fmt) + return dt.strftime('%Y-%m-%dT%H:%M:%SZ') + except ValueError: + continue + return date_str + except: + return date_str + + +async def process_data(place_holder: str = None): + ''' + Read and process the transfer data. + + :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. + ''' for registry, url in transfers_db.items(): try: @@ -92,6 +119,11 @@ async def process_data(): for record in json_data['transfers']: record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + # Normalize all date fields + for date_field in ('transfer_date', 'source_registration_date', 'recipient_registration_date'): + if date_field in record: + record[date_field] = normalize_date(record[date_field]) + if 'asns' in record: for set_type in ('original_set', 'transfer_set'): if set_type in record['asns']: