diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py index 7cb552d..4b7c6cd 100644 --- a/ingestors/ingest_zone.py +++ b/ingestors/ingest_zone.py @@ -9,8 +9,10 @@ try: except ImportError: raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + default_index = 'dns-zones' -record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534') +record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534') + def construct_map() -> dict: '''Construct the Elasticsearch index mapping for zone file records.''' @@ -61,6 +63,9 @@ async def process_data(file_path: str): async for line in input_file: line = line.strip() + if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO) + break + if not line or line.startswith(';'): continue @@ -76,14 +81,15 @@ async def process_data(file_path: str): ttl = int(ttl) + # Anomaly...Doubtful any CHAOS/HESIOD records will be found in zone files if record_class != 'in': - raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found) + raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # We do not want to collide with our current mapping (Again, this is an anomaly) if record_type not in record_types: raise ValueError(f'Unsupported record type: {record_type} with line: {line}') - # Little tidying up for specific record types + # Little tidying up for specific record types (removing trailing dots, etc) if record_type == 'nsec': data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]]) elif record_type == 'soa': @@ -93,11 +99,15 @@ async def process_data(file_path: str): if domain != last_domain: if last_domain: - struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} + struct = { + 'domain' : last_domain, + 'records' : domain_records[last_domain], + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) # Zone files do not contain a timestamp, so we use the current time + } del domain_records[last_domain] - yield {'_index': default_index, '_source': struct} + yield {'_id': domain, '_index': default_index, '_source': struct} # Set the ID to the domain name to allow the record to be reindexed if it exists. last_domain = domain @@ -108,8 +118,6 @@ async def process_data(file_path: str): domain_records[domain][record_type].append({'ttl': ttl, 'data': data}) - return None # EOF - ''' @@ -125,13 +133,17 @@ Example record: Will be indexed as: { - "domain": "1001.vegas", - "records": { - "ns": [ - {"ttl": 3600, "data": "ns11.waterrockdigital.com"}, - {"ttl": 3600, "data": "ns12.waterrockdigital.com"} - ] - }, - "seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing + "_id" : "1001.vegas" + "_index" : "dns-zones", + "_source" : { + "domain" : "1001.vegas", + "records" : { # All records are stored in a single dictionary + "ns": [ + {"ttl": 3600, "data": "ns11.waterrockdigital.com"}, + {"ttl": 3600, "data": "ns12.waterrockdigital.com"} + ] + }, + "seen" : "2021-09-01T00:00:00Z" # Zulu time added upon indexing + } } ''' \ No newline at end of file