diff --git a/ingestors/ingest_certs.py b/ingestors/ingest_certs.py index 36dccd4..185e60b 100644 --- a/ingestors/ingest_certs.py +++ b/ingestors/ingest_certs.py @@ -101,26 +101,28 @@ async def process_data(file_path: str = None): while True: try: async with websockets.connect('wss://certstream.calidog.io/') as websocket: - while True: line = await websocket.recv() + if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO) + break + try: record = json.loads(line) except json.decoder.JSONDecodeError: logging.error(f'Failed to parse JSON record from Certstream! ({line})') - input('Press Enter to continue...') # Pause the script to allow the user to read the error message + input('Press Enter to continue...') continue yield record except websockets.ConnectionClosed: logging.error('Connection to Certstream was closed. Attempting to reconnect...') - await asyncio.sleep(10) + await asyncio.sleep(15) except Exception as e: logging.error(f'An error occurred while processing Certstream records! ({e})') - await asyncio.sleep(10) + await asyncio.sleep(15) async def strip_struct_empty(data: dict) -> dict: