Certstream ingestion improved, still need to test stripping nulls and refactor the index mapping so we only store whats needed

This commit is contained in:
Dionysus 2024-03-06 14:38:34 -05:00
parent 90d6260b99
commit fd617e3c9d
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

View File

@ -101,26 +101,28 @@ async def process_data(file_path: str = None):
while True: while True:
try: try:
async with websockets.connect('wss://certstream.calidog.io/') as websocket: async with websockets.connect('wss://certstream.calidog.io/') as websocket:
while True: while True:
line = await websocket.recv() line = await websocket.recv()
if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
break
try: try:
record = json.loads(line) record = json.loads(line)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.error(f'Failed to parse JSON record from Certstream! ({line})') logging.error(f'Failed to parse JSON record from Certstream! ({line})')
input('Press Enter to continue...') # Pause the script to allow the user to read the error message input('Press Enter to continue...')
continue continue
yield record yield record
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
logging.error('Connection to Certstream was closed. Attempting to reconnect...') logging.error('Connection to Certstream was closed. Attempting to reconnect...')
await asyncio.sleep(10) await asyncio.sleep(15)
except Exception as e: except Exception as e:
logging.error(f'An error occurred while processing Certstream records! ({e})') logging.error(f'An error occurred while processing Certstream records! ({e})')
await asyncio.sleep(10) await asyncio.sleep(15)
async def strip_struct_empty(data: dict) -> dict: async def strip_struct_empty(data: dict) -> dict: