Updated massdns ingestion script with sentinal value checking and using the ip address as the document id

This commit is contained in:
Dionysus 2024-03-06 14:33:21 -05:00
parent cba51ca2dd
commit 90d6260b99
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

View File

@ -2,6 +2,7 @@
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# ingest_massdns.py # ingest_massdns.py
import logging
import time import time
try: try:
@ -19,10 +20,10 @@ def construct_map() -> dict:
mapping = { mapping = {
'mappings': { 'mappings': {
'properties': { 'properties': {
'ip': { 'type': 'ip' }, 'ip' : { 'type' : 'ip' },
'name': { 'type': 'keyword' }, 'name' : { 'type' : 'keyword' },
'record': keyword_mapping, 'record' : keyword_mapping,
'seen': { 'type': 'date' } 'seen' : { 'type' : 'date' }
} }
} }
} }
@ -41,6 +42,9 @@ async def process_data(file_path: str):
async for line in input_file: async for line in input_file:
line = line.strip() line = line.strip()
if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
break
if not line: if not line:
continue continue
@ -49,30 +53,29 @@ async def process_data(file_path: str):
if len(parts) < 3: if len(parts) < 3:
raise ValueError(f'Invalid PTR record: {line}') raise ValueError(f'Invalid PTR record: {line}')
name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
# Do we handle CNAME records returned by MassDNS?
if record_type != 'PTR': if record_type != 'PTR':
continue continue
#if record_type == 'CNAME':
# if data.endswith('.in-addr.arpa'):
# continue
# Let's not index the PTR record if it's the same as the in-addr.arpa domain # Let's not index the PTR record if it's the same as the in-addr.arpa domain
if data == name: if record == name:
continue
if not record: # Skip empty records
continue continue
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
struct = { struct = {
'ip': ip, 'ip' : ip,
'record': data, 'record' : record,
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
} }
yield {'_index': default_index, '_source': struct} yield {'_id': ip, '_index': default_index, '_source': struct} # Store with ip as the unique id to allow the record to be reindexed if it exists.
return None # EOF
''' '''
@ -85,8 +88,12 @@ Example PTR record:
Will be indexed as: Will be indexed as:
{ {
"ip": "47.229.6.0", "_id" : "47.229.6.0"
"record": "047-229-006-000.res.spectrum.com.", "_index" : "ptr-records",
"seen": "2021-06-30T18:31:00Z" "_source" : {
"ip" : "47.229.6.0",
"record" : "047-229-006-000.res.spectrum.com.",
"seen" : "2021-06-30T18:31:00Z"
}
} }
''' '''