Asyncronous refactorization of the codebase is complete, testing & metrics and then it will be production ready

This commit is contained in:
Dionysus 2024-03-05 21:40:34 -05:00
parent b6fb68ba3a
commit 4cf976aada
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
6 changed files with 378 additions and 43 deletions

View File

@ -1,11 +1,11 @@
# Elasticsearch Recon Ingestion Scripts (ERIS)
> A utility for ingesting various large scale reconnaissance data logs into Elasticsearch
### Work In Progress
## Prerequisites
- [python](https://www.python.org/)
- [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)*
- [aiofiles](https://pypi.org/project/aiofiles) *(`pip install aiofiles`)*
- [aiohttp](https://pypi.org/projects/aiohttp) *(`pip install aiohttp`)*
## Usage
```shell
@ -15,48 +15,45 @@ python eris.py [options] <input>
### Options
###### General arguments
| Argument | Description |
|-------------------|----------------------------------------------------------------|
| `input_path` | Path to the input file or directory |
| `--dry-run` | Dry run *(do not index records to Elasticsearch)* |
| `--watch` | Create or watch a FIFO for real-time indexing |
| Argument | Description |
|--------------|-----------------------------------------------|
| `input_path` | Path to the input file or directory |
| `--watch` | Create or watch a FIFO for real-time indexing |
###### Elasticsearch arguments
| Argument | Description | Default |
|-------------------|--------------------------------------------------------------------------------|----------------|
| `--host` | Elasticsearch host | `localhost` |
| `--port` | Elasticsearch port | `9200` |
| `--user` | Elasticsearch username | `elastic` |
| `--password` | Elasticsearch password | `$ES_PASSWORD` |
| `--api-key` | Elasticsearch API Key for authentication *(format must be api_key:api_secret)* | `$ES_APIKEY` |
| `--self-signed` | Elasticsearch connection with a self-signed certificate | |
| Argument | Description | Default |
|-----------------|---------------------------------------------------------|---------------------|
| `--host` | Elasticsearch host | `http://localhost/` |
| `--port` | Elasticsearch port | `9200` |
| `--user` | Elasticsearch username | `elastic` |
| `--password` | Elasticsearch password | `$ES_PASSWORD` |
| `--api-key` | Elasticsearch API Key for authentication | `$ES_APIKEY` |
| `--self-signed` | Elasticsearch connection with a self-signed certificate | |
###### Elasticsearch indexing arguments
| Argument | Description | Default |
|-------------------|--------------------------------------|---------------------|
| `--index` | Elasticsearch index name | Depends on ingestor |
| `--pipeline` | Use an ingest pipeline for the index | |
| `--replicas` | Number of replicas for the index | `1` |
| `--shards` | Number of shards for the index | `1` |
| Argument | Description | Default |
|--------------|--------------------------------------|---------------------|
| `--index` | Elasticsearch index name | Depends on ingestor |
| `--pipeline` | Use an ingest pipeline for the index | |
| `--replicas` | Number of replicas for the index | `1` |
| `--shards` | Number of shards for the index | `1` |
###### Performance arguments
| Argument | Description | Default |
|-------------------|----------------------------------------------------------|---------|
| `--chunk-max` | Maximum size in MB of a chunk | `10` |
| `--chunk-size` | Number of records to index in a chunk | `5000` |
| `--chunk-threads` | Number of threads to use when indexing in chunks | `2` |
| `--retries` | Number of times to retry indexing a chunk before failing | `10` |
| `--timeout` | Number of seconds to wait before retrying a chunk | `30` |
| `--chunk-max` | Maximum size in MB of a chunk | `100` |
| `--chunk-size` | Number of records to index in a chunk | `50000` |
| `--retries` | Number of times to retry indexing a chunk before failing | `100` |
| `--timeout` | Number of seconds to wait before retrying a chunk | `60` |
###### Ingestion arguments
| Argument | Description |
|-------------------|------------------------|
| `--httpx` | Index HTTPX records |
| `--masscan` | Index Masscan records |
| `--massdns` | Index massdns records |
| `--zone` | Index zone DNS records |
Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics into account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster.
| Argument | Description |
|-------------|--------------------------|
| `--certs` | Index Certstream records |
| `--httpx` | Index HTTPX records |
| `--masscan` | Index Masscan records |
| `--massdns` | Index massdns records |
| `--zone` | Index zone DNS records |
This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
@ -80,11 +77,14 @@ Create & add a geoip pipeline and use the following in your index mappings:
```
## Changelog
- Added ingestion script for certificate transparency logs in real time using websockets.
- `--dry-run` removed as this nears production level
- Implemented [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/latest/async.html) into the codebase & refactored some of the logic to accomadate.
- The `--watch` feature now uses a FIFO to do live ingestion.
- Isolated eris.py into it's own file and seperated the ingestion agents into their own modules.
## Roadmap
- Implement [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/v8.12.1/async.html) into the code.
- Fix issue with `ingest_certs.py` and not needing to pass a file to it
- WHOIS database ingestion scripts
- Dynamically update the batch metrics when the sniffer adds or removes nodes

View File

@ -12,7 +12,8 @@ import sys
sys.dont_write_bytecode = True
try:
from elasticsearch import AsyncElasticsearch
# This is commented out because there is a bug with the elasticsearch library that requires a patch (see initialize() method below)
#from elasticsearch import AsyncElasticsearch
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import async_streaming_bulk
except ImportError:
@ -30,10 +31,11 @@ class ElasticIndexer:
:param args: Parsed arguments from argparse
'''
self.chunk_max = args.chunk_max * 1024 * 1024 # MB
self.chunk_size = args.chunk_size
self.es = None
self.es_index = args.index
self.es_config = {
'hosts': [f'{args.host}:{args.port}'],
'verify_certs': args.self_signed,
@ -127,7 +129,7 @@ class ElasticIndexer:
count = 0
total = 0
async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size):
async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
action, result = result.popitem()
if not ok:
@ -155,7 +157,7 @@ async def main():
parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
# Elasticsearch arguments
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
parser.add_argument('--host', default='http://localhost/', help='Elasticsearch host')
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
@ -166,12 +168,13 @@ async def main():
parser.add_argument('--index', help='Elasticsearch index name')
parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index')
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
# Performance arguments
parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
parser.add_argument('--retries', type=int, default=100, help='Number of times to retry indexing a chunk before failing')
parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
# Ingestion arguments
parser.add_argument('--cert', action='store_true', help='Index Certstream records')

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# ingest_httpx.py
import json
try:
import aiofiles
except ImportError:
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
default_index = 'httpx-logs'
def construct_map() -> dict:
'''Construct the Elasticsearch index mapping for Masscan records.'''
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
mapping = {
'mappings': {
'properties': {
'change': 'me'
}
}
}
return mapping
async def process_data(file_path: str):
'''
Read and process HTTPX records from the log file.
:param file_path: Path to the HTTPX log file
'''
async with aiofiles.open(file_path, mode='r') as input_file:
async for line in input_file:
line = line.strip()
if not line:
continue
record = json.loads(line)
record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
record['domain'] = record.pop('input')
del record['failed'], record['knowledgebase'], record['time']
yield {'_index': default_index, '_source': record}
return None # EOF
''''
Example record:
{
"timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
"hash": { # Do we need all of these ?
"body_md5":"4ae9394eb98233b482508cbda3b33a66",
"body_mmh3":"-4111954",
"body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
"body_simhash":"9814303593401624250",
"header_md5":"980366deb2b2fb5df2ad861fc63e79ce",
"header_mmh3":"-813072798",
"header_sha256":"39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
"header_simhash":"10962523587435277678"
},
"port":"443",
"url":"https://supernets.org", # Remove this and only use the input field as "domain" maybe
"input":"supernets.org", # rename to domain
"title":"SuperNETs",
"scheme":"https",
"webserver":"nginx",
"body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
"content_type":"text/html",
"method":"GET", # Do we need this ?
"host":"51.89.151.158",
"path":"/",
"favicon":"-674048714",
"favicon_path":"/i/favicon.png",
"time":"592.907689ms", # Do we need this ?
"a":[
"6.150.220.23"
],
"tech":[
"Bootstrap:4.0.0",
"HSTS",
"Nginx"
],
"words":436, # Do we need this ?
"lines":79, # Do we need this ?
"status_code":200,
"content_length":4597,
"failed":false, # Do we need this ?
"knowledgebase":{ # Do we need this ?
"PageType":"nonerror",
"pHash":0
}
}
'''

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# ingest_masscan.py [asyncronous developement]
# ingest_masscan.py
'''
apt-get install iptables masscan libpcap-dev screen

View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# ingest_massdns.py
import time
try:
import aiofiles
except ImportError:
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
default_index = 'ptr-records'
def construct_map() -> dict:
'''Construct the Elasticsearch index mapping for MassDNS records'''
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
mapping = {
'mappings': {
'properties': {
'ip': { 'type': 'ip' },
'name': { 'type': 'keyword' },
'record': keyword_mapping,
'seen': { 'type': 'date' }
}
}
}
return mapping
async def process_data(file_path: str):
'''
Read and process Massdns records from the log file.
:param file_path: Path to the Massdns log file
'''
async with aiofiles.open(file_path, mode='r') as input_file:
async for line in input_file:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 3:
raise ValueError(f'Invalid PTR record: {line}')
name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
if record_type != 'PTR':
continue
#if record_type == 'CNAME':
# if data.endswith('.in-addr.arpa'):
# continue
# Let's not index the PTR record if it's the same as the in-addr.arpa domain
if data == name:
continue
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
struct = {
'ip': ip,
'record': data,
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
}
yield {'_index': default_index, '_source': struct}
return None # EOF
'''
Example PTR record:
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
Will be indexed as:
{
"ip": "47.229.6.0",
"record": "047-229-006-000.res.spectrum.com.",
"seen": "2021-06-30T18:31:00Z"
}
'''

View File

@ -0,0 +1,137 @@
#!/usr/bin/env python
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
# ingest_zone.py
import time
try:
import aiofiles
except ImportError:
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
default_index = 'dns-zones'
record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
def construct_map() -> dict:
'''Construct the Elasticsearch index mapping for zone file records.'''
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
mapping = {
'mappings': {
'properties': {
'domain': keyword_mapping,
'records': { 'properties': {} },
'seen': {'type': 'date'}
}
}
}
# Add record types to mapping dynamically to not clutter the code
for item in record_types:
if item in ('a','aaaa'):
mapping['mappings']['properties']['records']['properties'][item] = {
'properties': {
'data': { 'type': 'ip' },
'ttl': { 'type': 'integer' }
}
}
else:
mapping['mappings']['properties']['records']['properties'][item] = {
'properties': {
'data': keyword_mapping,
'ttl': { 'type': 'integer' }
}
}
return mapping
async def process_data(file_path: str):
'''
Read and process zone file records.
:param file_path: Path to the zone file
'''
domain_records = {}
last_domain = None
async with aiofiles.open(file_path, mode='r') as input_file:
async for line in input_file:
line = line.strip()
if not line or line.startswith(';'):
continue
parts = line.split()
if len(parts) < 5:
raise ValueError(f'Invalid line: {line}')
domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
if not ttl.isdigit():
raise ValueError(f'Invalid TTL: {ttl} with line: {line}')
ttl = int(ttl)
if record_class != 'in':
raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found)
# We do not want to collide with our current mapping (Again, this is an anomaly)
if record_type not in record_types:
raise ValueError(f'Unsupported record type: {record_type} with line: {line}')
# Little tidying up for specific record types
if record_type == 'nsec':
data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
elif record_type == 'soa':
data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
elif data.endswith('.'):
data = data.rstrip('.')
if domain != last_domain:
if last_domain:
struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
del domain_records[last_domain]
yield {'_index': default_index, '_source': struct}
last_domain = domain
domain_records[domain] = {}
if record_type not in domain_records[domain]:
domain_records[domain][record_type] = []
domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
return None # EOF
'''
Example record:
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
1-800-flowers.vegas. 3600 in ns dns1.cscdns.net.
1-800-flowers.vegas. 3600 in ns dns2.cscdns.net.
100.vegas. 3600 in ns ns51.domaincontrol.com.
100.vegas. 3600 in ns ns52.domaincontrol.com.
1001.vegas. 3600 in ns ns11.waterrockdigital.com.
1001.vegas. 3600 in ns ns12.waterrockdigital.com.
Will be indexed as:
{
"domain": "1001.vegas",
"records": {
"ns": [
{"ttl": 3600, "data": "ns11.waterrockdigital.com"},
{"ttl": 3600, "data": "ns12.waterrockdigital.com"}
]
},
"seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
}
'''