From 90d6260b99b4395a4b0dcc0a72bd122fd1dd7e74 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Wed, 6 Mar 2024 14:33:21 -0500 Subject: [PATCH] Updated massdns ingestion script with sentinal value checking and using the ip address as the document id --- ingestors/ingest_massdns.py | 57 +++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index 5c8e121..158fa7b 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -2,6 +2,7 @@ # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) # ingest_massdns.py +import logging import time try: @@ -17,15 +18,15 @@ def construct_map() -> dict: keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } mapping = { - 'mappings': { - 'properties': { - 'ip': { 'type': 'ip' }, - 'name': { 'type': 'keyword' }, - 'record': keyword_mapping, - 'seen': { 'type': 'date' } + 'mappings': { + 'properties': { + 'ip' : { 'type' : 'ip' }, + 'name' : { 'type' : 'keyword' }, + 'record' : keyword_mapping, + 'seen' : { 'type' : 'date' } + } } } - } return mapping @@ -41,6 +42,9 @@ async def process_data(file_path: str): async for line in input_file: line = line.strip() + if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO) + break + if not line: continue @@ -49,44 +53,47 @@ async def process_data(file_path: str): if len(parts) < 3: raise ValueError(f'Invalid PTR record: {line}') - name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') + name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') + # Do we handle CNAME records returned by MassDNS? if record_type != 'PTR': continue - #if record_type == 'CNAME': - # if data.endswith('.in-addr.arpa'): - # continue - # Let's not index the PTR record if it's the same as the in-addr.arpa domain - if data == name: + if record == name: + continue + + if not record: # Skip empty records continue ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) struct = { - 'ip': ip, - 'record': data, - 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + 'ip' : ip, + 'record' : record, + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) } - yield {'_index': default_index, '_source': struct} - - return None # EOF + yield {'_id': ip, '_index': default_index, '_source': struct} # Store with ip as the unique id to allow the record to be reindexed if it exists. + ''' Example PTR record: -0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. -0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com. -0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net. +0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. +0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com. +0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net. 0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net. 0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp. Will be indexed as: { - "ip": "47.229.6.0", - "record": "047-229-006-000.res.spectrum.com.", - "seen": "2021-06-30T18:31:00Z" + "_id" : "47.229.6.0" + "_index" : "ptr-records", + "_source" : { + "ip" : "47.229.6.0", + "record" : "047-229-006-000.res.spectrum.com.", + "seen" : "2021-06-30T18:31:00Z" + } } ''' \ No newline at end of file