From fd617e3c9d45ff29e8b6419ff0869213eeffee25 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Wed, 6 Mar 2024 14:38:34 -0500 Subject: [PATCH] Certstream ingestion improved, still need to test stripping nulls and refactor the index mapping so we only store whats needed --- ingestors/ingest_certs.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ingestors/ingest_certs.py b/ingestors/ingest_certs.py index 36dccd4..185e60b 100644 --- a/ingestors/ingest_certs.py +++ b/ingestors/ingest_certs.py @@ -101,26 +101,28 @@ async def process_data(file_path: str = None): while True: try: async with websockets.connect('wss://certstream.calidog.io/') as websocket: - while True: line = await websocket.recv() + if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO) + break + try: record = json.loads(line) except json.decoder.JSONDecodeError: logging.error(f'Failed to parse JSON record from Certstream! ({line})') - input('Press Enter to continue...') # Pause the script to allow the user to read the error message + input('Press Enter to continue...') continue yield record except websockets.ConnectionClosed: logging.error('Connection to Certstream was closed. Attempting to reconnect...') - await asyncio.sleep(10) + await asyncio.sleep(15) except Exception as e: logging.error(f'An error occurred while processing Certstream records! ({e})') - await asyncio.sleep(10) + await asyncio.sleep(15) async def strip_struct_empty(data: dict) -> dict: