Initial commit

This commit is contained in:
Dionysus 2024-01-20 02:04:50 -05:00
commit 14b6d1c88a
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
6 changed files with 963 additions and 0 deletions

15
LICENSE Normal file
View File

@ -0,0 +1,15 @@
ISC License
Copyright (c) 2023, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

27
README.md Normal file
View File

@ -0,0 +1,27 @@
# Elasticsearch Recon Ingestion Scripts (ERIS)
> A utility for ingesting large scale reconnaissance data into Elast Search
### Work In Progress
## Prerequisites
- [python](https://www.python.org/)
- [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)*
###### Options
| Argument | Description |
| --------------- | -------------------------------------------------------------------------------------------- |
| `--dry-run` | Perform a dry run without indexing records to Elasticsearch. |
| `--batch_size` | Number of records to index in a batch *(default 25,000)*. |
| `--host` | Elasticsearch host *(default 'localhost')*. |
| `--port` | Elasticsearch port *(default 9200)*. |
| `--user` | Elasticsearch username *(default 'elastic')*. |
| `--password` | Elasticsearch password. If not provided, it checks the environment variable **ES_PASSWORD**. |
| `--api-key` | Elasticsearch API Key for authentication. |
| `--index` | Elasticsearch index name *(default 'zone_files')*. |
| `--filter` | Filter out records by type *(comma-separated list)*. |
| `--self-signed` | Allow self-signed certificates. |
___
###### Mirrors
[acid.vegas](https://git.acid.vegas/eris) • [GitHub](https://github.com/acidvegas/eris) • [GitLab](https://gitlab.com/acidvegas/eris) • [SuperNETs](https://git.supernets.org/acidvegas/eris)

198
ingestors/ingest_httpx.py Normal file
View File

@ -0,0 +1,198 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# HTTPX Log File Ingestion:
#
# This script takes JSON formatted HTTPX logs and indexes them into Elasticsearch.
#
# Saving my "typical" HTTPX command here for reference to myself:
# httpx -l zone.org.txt -t 200 -r nameservers.txt -sc -location -favicon -title -bp -td -ip -cname -mc 200 -stream -sd -j -o output.json -v
import argparse
import json
import logging
import os
try:
from elasticsearch import Elasticsearch, helpers
except ImportError:
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
class ElasticIndexer:
def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
'''
Initialize the Elastic Search indexer.
:param es_host: Elasticsearch host
:param es_port: Elasticsearch port
:param es_user: Elasticsearch username
:param es_password: Elasticsearch password
:param es_api_key: Elasticsearch API Key
:param es_index: Elasticsearch index name
:param dry_run: If True, do not initialize Elasticsearch client
:param self_signed: If True, do not verify SSL certificates
'''
self.dry_run = dry_run
self.es = None
self.es_index = es_index
if not dry_run:
if es_api_key:
self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
else:
self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
def process_file(self, file_path: str, batch_size: int):
'''
Read and index HTTPX records in batches to Elasticsearch, handling large volumes efficiently.
:param file_path: Path to the HTTPX log file
:param batch_size: Number of records to process before indexing
Example record:
{
"timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
"hash": {
"body_md5":"4ae9394eb98233b482508cbda3b33a66",
"body_mmh3":"-4111954",
"body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
"body_simhash":"9814303593401624250",
"header_md5":"980366deb2b2fb5df2ad861fc63e79ce",
"header_mmh3":"-813072798",
"header_sha256":"39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
"header_simhash":"10962523587435277678"
},
"port":"443",
"url":"https://supernets.org",
"input":"supernets.org", # rename to domain
"title":"SuperNETs",
"scheme":"https",
"webserver":"nginx",
"body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
"content_type":"text/html",
"method":"GET",
"host":"51.89.151.158",
"path":"/",
"favicon":"-674048714",
"favicon_path":"/i/favicon.png",
"time":"592.907689ms",
"a":[
"51.89.151.158",
"2001:41d0:801:2000::5ce9"
],
"tech":[
"Bootstrap:4.0.0",
"HSTS",
"Nginx"
],
"words":436,
"lines":79,
"status_code":200,
"content_length":4597,
"failed":false,
"knowledgebase":{
"PageType":"nonerror",
"pHash":0
}
}
'''
records = []
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line:
continue
record = json.loads(line)
record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
record['domain'] = record.pop('input')
del record['failed'], record['knowledgebase'], record['time']
if self.dry_run:
print(record)
else:
record = {'_index': self.es_index, '_source': record}
records.append(record)
if len(records) >= batch_size:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
records = []
if records:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
def main():
'''Main function when running this script directly.'''
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
parser.add_argument('input_path', help='Path to the input file or directory')
# General arguments
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
# Elasticsearch connection arguments
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
# Elasticsearch indexing arguments
parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
args = parser.parse_args()
if not os.path.exists(args.input_path):
raise FileNotFoundError(f'Input file {args.input_path} does not exist')
if not args.dry_run:
if args.batch_size < 1:
raise ValueError('Batch size must be greater than 0')
if not args.host:
raise ValueError('Missing required Elasticsearch argument: host')
if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}')
edx.process_file(args.input_path, args.batch_size)
elif os.path.isdir(args.input_path):
logging.info(f'Processing files in directory: {args.input_path}')
for file in sorted(os.listdir(args.input_path)):
file_path = os.path.join(args.input_path, file)
if os.path.isfile(file_path):
logging.info(f'Processing file: {file_path}')
edx.process_file(file_path, args.batch_size)
else:
raise ValueError(f'Input path {args.input_path} is not a file or directory')
if __name__ == '__main__':
main()

230
ingestors/ingest_masscan.py Normal file
View File

@ -0,0 +1,230 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# Masscan Log File Ingestion:
#
# This script takes JSON formatted masscan logs with banners and indexes them into Elasticsearch.
#
# Saving my "typical" masscan command here for reference to myself:
# masscan 0.0.0.0/0 -p80,443 --banners --open-only --rate 50000 --shard 1/10 --excludefile exclude.conf -oJ output.json --interactive
import argparse
import json
import logging
import os
import re
import time
try:
from elasticsearch import Elasticsearch, helpers
except ImportError:
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
class ElasticIndexer:
def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
'''
Initialize the Elastic Search indexer.
:param es_host: Elasticsearch host
:param es_port: Elasticsearch port
:param es_user: Elasticsearch username
:param es_password: Elasticsearch password
:param es_api_key: Elasticsearch API Key
:param es_index: Elasticsearch index name
:param dry_run: If True, do not initialize Elasticsearch client
:param self_signed: If True, do not verify SSL certificates
'''
self.dry_run = dry_run
self.es = None
self.es_index = es_index
if not dry_run:
if es_api_key:
self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
else:
self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
def create_index(self, shards: int = 1, replicas: int = 1):
'''Create the Elasticsearch index with the defined mapping.'''
mapping = {
'settings': {
'number_of_shards': shards,
'number_of_replicas': replicas
},
'mappings': {
'properties': {
'ip': { 'type': 'ip' },
'port': { 'type': 'integer' },
'proto': { 'type': 'keyword' },
'service': { 'type': 'keyword' },
'banner': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
'ref_id': { 'type': 'keyword' },
'seen': { 'type': 'date' }
}
}
}
if not self.es.indices.exists(index=self.es_index):
response = self.es.indices.create(index=self.es_index, body=mapping)
if response.get('acknowledged') and response.get('shards_acknowledged'):
logging.info(f'Index \'{self.es_index}\' successfully created.')
else:
raise Exception(f'Failed to create index. ({response})')
else:
logging.warning(f'Index \'{self.es_index}\' already exists.')
def process_file(self, file_path: str, batch_size: int):
'''
Read and index Masscan records in batches to Elasticsearch, handling large volumes efficiently.
:param file_path: Path to the Masscan log file
:param batch_size: Number of records to process before indexing
Example record:
{
"ip": "43.134.51.142",
"timestamp": "1705255468", # Convert to ZULU BABY
"ports": [ # Typically only one port per record, but we will create a record for each port opened
{
"port": 22,
"proto": "tcp",
"service": { # This field is optional
"name": "ssh",
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
}
}
]
}
Will be indexed as:
{
"ip": "43.134.51.142",
"port": 22,
"proto": "tcp",
"service": "ssh", # Optional: not every record will have a service name ("unknown" is ignored)
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4", # Optional: not every record will have a banner
"seen": "2021-10-08T02:04:28Z",
"ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # This is optional and will only be present if the banner contains a reference ID (TCP RST Payload, Might be useful?)
}
'''
records = []
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line or not line.startswith('{'):
continue
record = json.loads(line)
for port_info in record['ports']:
struct = {
'ip': record['ip'],
'port': port_info['port'],
'proto': port_info['proto'],
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
}
if 'service' in port_info:
if 'name' in port_info['service']:
if port_info['service']['name'] != 'unknown':
struct['service'] = port_info['service']['name']
if 'banner' in port_info['service']:
banner = port_info['service']['banner']
match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
if match:
struct['ref_id'] = match.group(1)
else:
struct['banner'] = banner
if self.dry_run:
print(struct)
else:
struct = {'_index': self.es_index, '_source': struct}
records.append(struct)
if len(records) >= batch_size:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
records = []
if records:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
def main():
'''Main function when running this script directly.'''
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
parser.add_argument('input_path', help='Path to the input file or directory')
# General arguments
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
# Elasticsearch arguments
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
# Elasticsearch indexing arguments
parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
args = parser.parse_args()
if not os.path.exists(args.input_path):
raise FileNotFoundError(f'Input file {args.input_path} does not exist')
if not args.dry_run:
if args.batch_size < 1:
raise ValueError('Batch size must be greater than 0')
if not args.host:
raise ValueError('Missing required Elasticsearch argument: host')
if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}')
edx.process_file(args.input_path, args.batch_size)
elif os.path.isdir(args.input_path):
logging.info(f'Processing files in directory: {args.input_path}')
for file in sorted(os.listdir(args.input_path)):
file_path = os.path.join(args.input_path, file)
if os.path.isfile(file_path):
logging.info(f'Processing file: {file_path}')
edx.process_file(file_path, args.batch_size)
else:
raise ValueError(f'Input path {args.input_path} is not a file or directory')
if __name__ == '__main__':
main()

229
ingestors/ingest_massdns.py Normal file
View File

@ -0,0 +1,229 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# Massdns Log File Ingestion:
#
# This script takes JSON formatted massdns logs and indexes them into Elasticsearch.
#
# Saving my "typical" massdns command here for reference to myself:
# python $HOME/massdns/scripts/ptr.py | massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o J -w $HOME/massdns/ptr.json
import argparse
import json
import logging
import os
import time
try:
from elasticsearch import Elasticsearch, helpers
except ImportError:
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
class ElasticIndexer:
def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
'''
Initialize the Elastic Search indexer.
:param es_host: Elasticsearch host
:param es_port: Elasticsearch port
:param es_user: Elasticsearch username
:param es_password: Elasticsearch password
:param es_api_key: Elasticsearch API Key
:param es_index: Elasticsearch index name
:param dry_run: If True, do not initialize Elasticsearch client
:param self_signed: If True, do not verify SSL certificates
'''
self.dry_run = dry_run
self.es = None
self.es_index = es_index
if not dry_run:
if es_api_key:
self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
else:
self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
def create_index(self, shards: int = 1, replicas: int = 1):
'''Create the Elasticsearch index with the defined mapping.'''
mapping = {
'settings': {
'number_of_shards': shards,
'number_of_replicas': replicas
},
'mappings': {
'properties': {
'ip': { 'type': 'ip' },
'port': { 'type': 'integer' },
'proto': { 'type': 'keyword' },
'service': { 'type': 'keyword' },
'banner': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
'ref_id': { 'type': 'keyword' },
'seen': { 'type': 'date' }
}
}
}
if not self.es.indices.exists(index=self.es_index):
response = self.es.indices.create(index=self.es_index, body=mapping)
if response.get('acknowledged') and response.get('shards_acknowledged'):
logging.info(f'Index \'{self.es_index}\' successfully created.')
else:
raise Exception(f'Failed to create index. ({response})')
else:
logging.warning(f'Index \'{self.es_index}\' already exists.')
def process_file(self, file_path: str, batch_size: int):
'''
Read and index PTR records in batches to Elasticsearch, handling large volumes efficiently.
:param file_path: Path to the PTR log file
:param batch_size: Number of records to process before indexing
Example PTR record:
{
"name":"0.0.50.73.in-addr.arpa.",
"type":"PTR",
"class":"IN",
"status":"NOERROR",
"rx_ts":1704595370817617348,
"data": {
"answers": [
{"ttl":3600,"type":"PTR","class":"IN","name":"0.0.50.73.in-addr.arpa.","data":"c-73-50-0-0.hsd1.il.comcast.net."}
],
"authorities": [
{"ttl":86400,"type":"NS","class":"IN","name":"73.in-addr.arpa.","data":"dns102.comcast.net."},
{"ttl":86400,"type":"NS","class":"IN","name":"73.in-addr.arpa.","data":"dns103.comcast.net."},
{"ttl":86400,"type":"NS","class":"IN","name":"73.in-addr.arpa.","data":"dns105.comcast.net."},
{"ttl":86400,"type":"NS","class":"IN","name":"73.in-addr.arpa.","data":"dns101.comcast.net."},
{"ttl":86400,"type":"NS","class":"IN","name":"73.in-addr.arpa.","data":"dns104.comcast.net."}
],
"additionals":[
{"ttl":105542,"type":"A","class":"IN","name":"dns101.comcast.net.","data":"69.252.250.103"},
{"ttl":105542,"type":"A","class":"IN","name":"dns102.comcast.net.","data":"68.87.85.132"},
{"ttl":105542,"type":"AAAA","class":"IN","name":"dns104.comcast.net.","data":"2001:558:100a:5:68:87:68:244"},
{"ttl":105542,"type":"AAAA","class":"IN","name":"dns105.comcast.net.","data":"2001:558:100e:5:68:87:72:244"}
]
},
"flags": ["rd","ra"],
"resolver":"103.144.64.173:53",
"proto":"UDP"
}
'''
records = []
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line:
continue
record = json.loads(line.strip())
# Should we keep CNAME records? Can an IP address have a CNAME?
if record['type'] != 'PTR':
logging.error(record)
raise ValueError(f'Unsupported record type: {record["type"]}')
if record['class'] != 'IN':
logging.error(record)
raise ValueError(f'Unsupported record class: {record["class"]}')
if record['status'] != 'NOERROR':
logging.warning(f'Skipping bad record: {record}')
continue
record['ip'] = '.'.join(record['name'].replace('.in-addr.arpa', '').split('.')[::-1])
record['name'] = record['name'].rstrip('.')
record['timestamp'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(record.pop('rx_ts') / 1_000_000_000))
if self.dry_run:
print(record)
else:
struct = {'_index': self.es_index, '_source': struct}
records.append(struct)
if len(records) >= batch_size:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
records = []
if records:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index}')
def main():
'''Main function when running this script directly.'''
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
parser.add_argument('input_path', help='Path to the input file or directory')
# General arguments
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
# Elasticsearch connection arguments
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
# Elasticsearch indexing arguments
parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
args = parser.parse_args()
if not os.path.exists(args.input_path):
raise FileNotFoundError(f'Input file {args.input_path} does not exist')
if not args.dry_run:
if args.batch_size < 1:
raise ValueError('Batch size must be greater than 0')
if not args.host:
raise ValueError('Missing required Elasticsearch argument: host')
if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}')
edx.process_file(args.input_path, args.batch_size)
elif os.path.isdir(args.input_path):
logging.info(f'Processing files in directory: {args.input_path}')
for file in sorted(os.listdir(args.input_path)):
file_path = os.path.join(args.input_path, file)
if os.path.isfile(file_path):
logging.info(f'Processing file: {file_path}')
edx.process_file(file_path, args.batch_size)
else:
raise ValueError(f'Input path {args.input_path} is not a file or directory')
if __name__ == '__main__':
main()

264
ingestors/ingest_zone.py Normal file
View File

@ -0,0 +1,264 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# DNS Zone File Ingestion:
#
# This script will read a DNS zone file and index the records to Elasticsearch.
#
# Zone files must be valid and follow the RFC 1035 standard for proper indexing:
# - https://datatracker.ietf.org/doc/html/rfc1035
#
# Most of my zones come from CZDS but AXFR outputs can be used as well.
# Anomaly detection is in place to alert the user of any unexpected records.
import argparse
import logging
import os
import time
try:
from elasticsearch import Elasticsearch, helpers
except ImportError:
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
# Setting up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
# Record types to index
record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','ns','nsec','nsec3','nsec3param','ptr','rrsig','sshfp','soa','srv','txt','type65534')
class ElasticIndexer:
def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False):
'''
Initialize the Elastic Search indexer.
:param es_host: Elasticsearch host
:param es_port: Elasticsearch port
:param es_user: Elasticsearch username
:param es_password: Elasticsearch password
:param es_api_key: Elasticsearch API Key
:param es_index: Elasticsearch index name
:param dry_run: If True, do not initialize Elasticsearch client
:param self_signed: If True, do not verify SSL certificates
'''
self.dry_run = dry_run
self.es = None
self.es_index = es_index
if not dry_run:
if es_api_key:
self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed)
else:
self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed)
def create_index(self, shards: int = 1, replicas: int = 1):
'''Create the Elasticsearch index with the defined mapping.'''
mapping = {
'settings': {
'number_of_shards': shards,
'number_of_replicas': replicas
},
'mappings': {
'properties': {
'domain': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
'records': { 'properties': {} },
'seen': {'type': 'date'}
}
}
}
# Add record types to mapping dynamically to not clutter the code
for item in record_types:
if item in ('a','aaaa'):
mapping['mappings']['properties']['records']['properties'][item] = {
'properties': {
'data': { 'type': 'ip' },
'ttl': { 'type': 'integer' }
}
}
else:
mapping['mappings']['properties']['records']['properties'][item] = {
'properties': {
'data': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
'ttl': { 'type': 'integer' }
}
}
if not self.es.indices.exists(index=self.es_index):
response = self.es.indices.create(index=self.es_index, body=mapping)
if response.get('acknowledged') and response.get('shards_acknowledged'):
logging.info(f'Index \'{self.es_index}\' successfully created.')
else:
raise Exception(f'Failed to create index. ({response})')
else:
logging.warning(f'Index \'{self.es_index}\' already exists.')
def process_file(self, file_path: str, batch_size: int):
'''
Read and index DNS records in batches to Elasticsearch, handling large volumes efficiently.
:param file_path: Path to the DNS zone file
:param batch_size: Number of records to process before indexing
Example record:
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
1-800-flowers.vegas. 3600 in ns dns1.cscdns.net.
1-800-flowers.vegas. 3600 in ns dns2.cscdns.net.
100.vegas. 3600 in ns ns51.domaincontrol.com.
100.vegas. 3600 in ns ns52.domaincontrol.com.
1001.vegas. 3600 in ns ns11.waterrockdigital.com.
1001.vegas. 3600 in ns ns12.waterrockdigital.com.
Will be indexed as:
{
"domain": "1001.vegas",
"records": {
"ns": [
{"ttl": 3600, "data": "ns11.waterrockdigital.com"},
{"ttl": 3600, "data": "ns12.waterrockdigital.com"}
]
},
"seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
}
'''
records = []
domain_records = {}
last_domain = None
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line or line.startswith(';'):
continue
parts = line.split()
if len(parts) < 5:
raise ValueError(f'Invalid line: {line}')
domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
if not ttl.isdigit():
raise ValueError(f'Invalid TTL: {ttl} with line: {line}')
ttl = int(ttl)
if record_class != 'in':
raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found)
# We do not want to collide with our current mapping (Again, this is an anomaly)
if record_type not in record_types:
raise ValueError(f'Unsupported record type: {record_type} with line: {line}')
# Little tidying up for specific record types
if record_type == 'nsec':
data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
elif record_type == 'soa':
data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
elif data.endswith('.'):
data = data.rstrip('.')
if domain != last_domain:
if last_domain:
source = {'domain': domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
del domain_records[last_domain]
if self.dry_run:
print(source)
else:
struct = {'_index': self.es_index, '_source': source}
records.append(struct)
if len(records) >= batch_size:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}')
records = []
last_domain = domain
domain_records[domain] = {}
if record_type not in domain_records[domain]:
domain_records[domain][record_type] = []
domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
if records:
success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}')
def main():
'''Main function when running this script directly.'''
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
parser.add_argument('input_path', help='Path to the input file or directory')
# General arguments
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch')
# Elasticsearch connection arguments
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
# Elasticsearch indexing arguments
parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
parser.add_argument('--shards', type=int, default=0, help='Number of shards for the index') # This depends on your cluster configuration
parser.add_argument('--replicas', type=int, default=0, help='Number of replicas for the index') # This depends on your cluster configuration
args = parser.parse_args()
if not os.path.exists(args.input_path):
raise FileNotFoundError(f'Input file {args.input_path} does not exist')
if not args.dry_run:
if args.batch_size < 1:
raise ValueError('Batch size must be greater than 0')
if not args.host:
raise ValueError('Missing required Elasticsearch argument: host')
if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}')
edx.process_file(args.input_path, args.batch_size)
elif os.path.isdir(args.input_path):
logging.info(f'Processing files in directory: {args.input_path}')
for file in sorted(os.listdir(args.input_path)):
file_path = os.path.join(args.input_path, file)
if os.path.isfile(file_path):
logging.info(f'Processing file: {file_path}')
edx.process_file(file_path, args.batch_size)
else:
raise ValueError(f'Input path {args.input_path} is not a file or directory')
if __name__ == '__main__':
main()