diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index e11d1ca..f3ba1c4 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -11,14 +11,17 @@ except ImportError: raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') +# Set a default elasticsearch index if one is not provided default_index = 'eris-massdns' def construct_map() -> dict: - '''Construct the Elasticsearch index mapping for MassDNS records''' + '''Construct the Elasticsearch index mapping for records''' + # Match on exact value or full text search keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + # Construct the index mapping mapping = { 'mappings': { 'properties': { @@ -32,91 +35,92 @@ def construct_map() -> dict: return mapping -async def process_data(file_path: str): +async def process_data(input_path: str): ''' - Read and process Massdns records from the log file. + Read and process the input file - :param file_path: Path to the Massdns log file + :param input_path: Path to the input file ''' - async with aiofiles.open(file_path) as input_file: + async with aiofiles.open(input_path) as input_file: + # Cache the last document to avoid creating a new one for the same IP address last = None - async for line in input_file: - line = line.strip() + try: + # Read the input file line by line + async for line in input_file: + line = line.strip() - # Sentinel value to indicate the end of a process (for closing out a FIFO stream) - if line == '~eof': - yield last - break + # Sentinel value to indicate the end of a process (for closing out a FIFO stream) + if line == '~eof': + yield last + break - # Skip empty lines - if not line: - continue - - # Split the line into its parts - parts = line.split() - - # Ensure the line has at least 3 parts - if len(parts) < 3: - logging.warning(f'Invalid PTR record: {line}') - continue - - # Split the PTR record into its parts - name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') - - # Do not index other records - if record_type != 'PTR': - continue - - # Do not index PTR records that do not have a record - if not record: - continue - - # Let's not index the PTR record if it's the same as the in-addr.arpa domain - if record == name: - continue - - # Get the IP address from the in-addr.arpa domain - ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) - - # Check if we are still processing the same IP address - if last: - if ip == last['_id']: - last_record = last['doc']['record'] - if isinstance(last_record, list): - if record not in last_record: - last['doc']['record'].append(record) - else: - logging.warning(f'Duplicate PTR record: {line}') - else: - if record != last_record: - last['doc']['record'] = [last_record, record] # IP addresses with more than one PTR record will turn into a list + # Skip empty lines (doubtful we will have any, but just in case) + if not line: continue - else: - yield last # Return the last document and start a new one - # Cache the this document in-case we have more for the same IP address - last = { - '_op_type' : 'update', - '_id' : ip, - '_index' : default_index, - 'doc' : { - 'ip' : ip, - 'record' : record, - 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) - }, - 'doc_as_upsert' : True # This will create the document if it does not exist - } + # Split the line into its parts + parts = line.split() + + # Ensure the line has at least 3 parts + if len(parts) < 3: + logging.warning(f'Invalid PTR record: {line}') + continue + + # Split the PTR record into its parts + name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') + + # Do not index other records + if record_type != 'PTR': + continue + + # Do not index PTR records that do not have a record + if not record: + continue + + # Do not index PTR records that have the same record as the in-addr.arpa domain + if record == name: + continue + + # Get the IP address from the in-addr.arpa domain + ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) + + # Check if we are still processing the same IP address + if last: + if ip == last['_id']: # This record is for the same IP address as the cached document + last_records = last['doc']['record'] + if record not in last_records: # Do not index duplicate records + last['doc']['record'].append(record) + continue + else: + yield last # Return the last document and start a new one + + # Cache the document + last = { + '_op_type' : 'update', + '_id' : ip, + '_index' : default_index, + 'doc' : { + 'ip' : ip, + 'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address) + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + }, + 'doc_as_upsert' : True # Create the document if it does not exist + } + + except Exception as e: + logging.error(f'Error processing data: {e}') async def test(input_path: str): ''' - Test the MassDNS ingestion process + Test the ingestion process - :param input_path: Path to the MassDNS log file + :param input_path: Path to the input file ''' + async for document in process_data(input_path): print(document) @@ -126,7 +130,7 @@ if __name__ == '__main__': import argparse import asyncio - parser = argparse.ArgumentParser(description='MassDNS Ingestor for ERIS') + parser = argparse.ArgumentParser(description='Ingestor for ERIS') parser.add_argument('input_path', help='Path to the input file or directory') args = parser.parse_args() @@ -139,9 +143,9 @@ Deployment: sudo apt-get install build-essential gcc make git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt - python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR-s 500 -o S -w $HOME/massdns/fifo.json + python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json or... - while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -s 1000 -o S -w $HOME/massdns/fifo.json; done + while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done Output: 0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. @@ -150,16 +154,16 @@ Output: Input: { - "_id" : "47.229.6.0" - "_index" : "eris-massdns", - "_source" : { - "ip" : "47.229.6.0", - "record" : "047-229-006-000.res.spectrum.com", # This will be a list if there are more than one PTR record - "seen" : "2021-06-30T18:31:00Z" + '_id' : '47.229.6.0' + '_index' : 'eris-massdns', + '_source' : { + 'ip' : '47.229.6.0', + 'record' : ['047-229-006-000.res.spectrum.com'], # We will store as a list for IP addresses with multiple PTR records + 'seen' : '2021-06-30T18:31:00Z' } } Notes: - Why do some IP addresses return a CNAME from a PTR request + Why do some IP addresses return a A/CNAME from a PTR request What is dns-servfail.net (Frequent CNAME response from PTR requests) '''