diff --git a/README.md b/README.md index f91f638..9684bfd 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # Elasticsearch Recon Ingestion Scripts (ERIS) > A utility for ingesting various large scale reconnaissance data logs into Elasticsearch -### Work In Progress - ## Prerequisites - [python](https://www.python.org/) - [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)* + - [aiofiles](https://pypi.org/project/aiofiles) *(`pip install aiofiles`)* + - [aiohttp](https://pypi.org/projects/aiohttp) *(`pip install aiohttp`)* ## Usage ```shell @@ -15,48 +15,45 @@ python eris.py [options] ### Options ###### General arguments -| Argument | Description | -|-------------------|----------------------------------------------------------------| -| `input_path` | Path to the input file or directory | -| `--dry-run` | Dry run *(do not index records to Elasticsearch)* | -| `--watch` | Create or watch a FIFO for real-time indexing | +| Argument | Description | +|--------------|-----------------------------------------------| +| `input_path` | Path to the input file or directory | +| `--watch` | Create or watch a FIFO for real-time indexing | ###### Elasticsearch arguments -| Argument | Description | Default | -|-------------------|--------------------------------------------------------------------------------|----------------| -| `--host` | Elasticsearch host | `localhost` | -| `--port` | Elasticsearch port | `9200` | -| `--user` | Elasticsearch username | `elastic` | -| `--password` | Elasticsearch password | `$ES_PASSWORD` | -| `--api-key` | Elasticsearch API Key for authentication *(format must be api_key:api_secret)* | `$ES_APIKEY` | -| `--self-signed` | Elasticsearch connection with a self-signed certificate | | +| Argument | Description | Default | +|-----------------|---------------------------------------------------------|---------------------| +| `--host` | Elasticsearch host | `http://localhost/` | +| `--port` | Elasticsearch port | `9200` | +| `--user` | Elasticsearch username | `elastic` | +| `--password` | Elasticsearch password | `$ES_PASSWORD` | +| `--api-key` | Elasticsearch API Key for authentication | `$ES_APIKEY` | +| `--self-signed` | Elasticsearch connection with a self-signed certificate | | ###### Elasticsearch indexing arguments -| Argument | Description | Default | -|-------------------|--------------------------------------|---------------------| -| `--index` | Elasticsearch index name | Depends on ingestor | -| `--pipeline` | Use an ingest pipeline for the index | | -| `--replicas` | Number of replicas for the index | `1` | -| `--shards` | Number of shards for the index | `1` | +| Argument | Description | Default | +|--------------|--------------------------------------|---------------------| +| `--index` | Elasticsearch index name | Depends on ingestor | +| `--pipeline` | Use an ingest pipeline for the index | | +| `--replicas` | Number of replicas for the index | `1` | +| `--shards` | Number of shards for the index | `1` | ###### Performance arguments | Argument | Description | Default | |-------------------|----------------------------------------------------------|---------| -| `--chunk-max` | Maximum size in MB of a chunk | `10` | -| `--chunk-size` | Number of records to index in a chunk | `5000` | -| `--chunk-threads` | Number of threads to use when indexing in chunks | `2` | -| `--retries` | Number of times to retry indexing a chunk before failing | `10` | -| `--timeout` | Number of seconds to wait before retrying a chunk | `30` | +| `--chunk-max` | Maximum size in MB of a chunk | `100` | +| `--chunk-size` | Number of records to index in a chunk | `50000` | +| `--retries` | Number of times to retry indexing a chunk before failing | `100` | +| `--timeout` | Number of seconds to wait before retrying a chunk | `60` | ###### Ingestion arguments -| Argument | Description | -|-------------------|------------------------| -| `--httpx` | Index HTTPX records | -| `--masscan` | Index Masscan records | -| `--massdns` | Index massdns records | -| `--zone` | Index zone DNS records | - -Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics into account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster. +| Argument | Description | +|-------------|--------------------------| +| `--certs` | Index Certstream records | +| `--httpx` | Index HTTPX records | +| `--masscan` | Index Masscan records | +| `--massdns` | Index massdns records | +| `--zone` | Index zone DNS records | This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster. It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment. @@ -80,11 +77,14 @@ Create & add a geoip pipeline and use the following in your index mappings: ``` ## Changelog +- Added ingestion script for certificate transparency logs in real time using websockets. +- `--dry-run` removed as this nears production level +- Implemented [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/latest/async.html) into the codebase & refactored some of the logic to accomadate. - The `--watch` feature now uses a FIFO to do live ingestion. - Isolated eris.py into it's own file and seperated the ingestion agents into their own modules. ## Roadmap -- Implement [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/v8.12.1/async.html) into the code. +- Fix issue with `ingest_certs.py` and not needing to pass a file to it - WHOIS database ingestion scripts - Dynamically update the batch metrics when the sniffer adds or removes nodes diff --git a/async_dev/eris.py b/async_dev/eris.py index 093901a..e2ecfbe 100644 --- a/async_dev/eris.py +++ b/async_dev/eris.py @@ -12,7 +12,8 @@ import sys sys.dont_write_bytecode = True try: - from elasticsearch import AsyncElasticsearch + # This is commented out because there is a bug with the elasticsearch library that requires a patch (see initialize() method below) + #from elasticsearch import AsyncElasticsearch from elasticsearch.exceptions import NotFoundError from elasticsearch.helpers import async_streaming_bulk except ImportError: @@ -30,10 +31,11 @@ class ElasticIndexer: :param args: Parsed arguments from argparse ''' + self.chunk_max = args.chunk_max * 1024 * 1024 # MB self.chunk_size = args.chunk_size self.es = None self.es_index = args.index - + self.es_config = { 'hosts': [f'{args.host}:{args.port}'], 'verify_certs': args.self_signed, @@ -127,7 +129,7 @@ class ElasticIndexer: count = 0 total = 0 - async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size): + async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max): action, result = result.popitem() if not ok: @@ -155,7 +157,7 @@ async def main(): parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') # Elasticsearch arguments - parser.add_argument('--host', default='localhost', help='Elasticsearch host') + parser.add_argument('--host', default='http://localhost/', help='Elasticsearch host') parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') parser.add_argument('--user', default='elastic', help='Elasticsearch username') parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') @@ -166,12 +168,13 @@ async def main(): parser.add_argument('--index', help='Elasticsearch index name') parser.add_argument('--pipeline', help='Use an ingest pipeline for the index') parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') - parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index') + parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # Performance arguments parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk') - parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing') - parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk') + parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes') + parser.add_argument('--retries', type=int, default=100, help='Number of times to retry indexing a chunk before failing') + parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk') # Ingestion arguments parser.add_argument('--cert', action='store_true', help='Index Certstream records') diff --git a/async_dev/ingestors/ingest_httpx.py b/async_dev/ingestors/ingest_httpx.py new file mode 100644 index 0000000..93d8b58 --- /dev/null +++ b/async_dev/ingestors/ingest_httpx.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) +# ingest_httpx.py + +import json + +try: + import aiofiles +except ImportError: + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + +default_index = 'httpx-logs' + +def construct_map() -> dict: + '''Construct the Elasticsearch index mapping for Masscan records.''' + + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + mapping = { + 'mappings': { + 'properties': { + 'change': 'me' + } + } + } + + return mapping + + +async def process_data(file_path: str): + ''' + Read and process HTTPX records from the log file. + + :param file_path: Path to the HTTPX log file + ''' + + async with aiofiles.open(file_path, mode='r') as input_file: + async for line in input_file: + line = line.strip() + + if not line: + continue + + record = json.loads(line) + + record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets + record['domain'] = record.pop('input') + + del record['failed'], record['knowledgebase'], record['time'] + + yield {'_index': default_index, '_source': record} + + return None # EOF + + + +'''' +Example record: +{ + "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset + "hash": { # Do we need all of these ? + "body_md5":"4ae9394eb98233b482508cbda3b33a66", + "body_mmh3":"-4111954", + "body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3", + "body_simhash":"9814303593401624250", + "header_md5":"980366deb2b2fb5df2ad861fc63e79ce", + "header_mmh3":"-813072798", + "header_sha256":"39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d", + "header_simhash":"10962523587435277678" + }, + "port":"443", + "url":"https://supernets.org", # Remove this and only use the input field as "domain" maybe + "input":"supernets.org", # rename to domain + "title":"SuperNETs", + "scheme":"https", + "webserver":"nginx", + "body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup", + "content_type":"text/html", + "method":"GET", # Do we need this ? + "host":"51.89.151.158", + "path":"/", + "favicon":"-674048714", + "favicon_path":"/i/favicon.png", + "time":"592.907689ms", # Do we need this ? + "a":[ + "6.150.220.23" + ], + "tech":[ + "Bootstrap:4.0.0", + "HSTS", + "Nginx" + ], + "words":436, # Do we need this ? + "lines":79, # Do we need this ? + "status_code":200, + "content_length":4597, + "failed":false, # Do we need this ? + "knowledgebase":{ # Do we need this ? + "PageType":"nonerror", + "pHash":0 + } +} +''' \ No newline at end of file diff --git a/async_dev/ingestors/ingest_masscan.py b/async_dev/ingestors/ingest_masscan.py index 8fcc0f2..5eb0660 100644 --- a/async_dev/ingestors/ingest_masscan.py +++ b/async_dev/ingestors/ingest_masscan.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) -# ingest_masscan.py [asyncronous developement] +# ingest_masscan.py ''' apt-get install iptables masscan libpcap-dev screen diff --git a/async_dev/ingestors/ingest_massdns.py b/async_dev/ingestors/ingest_massdns.py new file mode 100644 index 0000000..5c8e121 --- /dev/null +++ b/async_dev/ingestors/ingest_massdns.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) +# ingest_massdns.py + +import time + +try: + import aiofiles +except ImportError: + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + +default_index = 'ptr-records' + +def construct_map() -> dict: + '''Construct the Elasticsearch index mapping for MassDNS records''' + + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + mapping = { + 'mappings': { + 'properties': { + 'ip': { 'type': 'ip' }, + 'name': { 'type': 'keyword' }, + 'record': keyword_mapping, + 'seen': { 'type': 'date' } + } + } + } + + return mapping + + +async def process_data(file_path: str): + ''' + Read and process Massdns records from the log file. + + :param file_path: Path to the Massdns log file + ''' + + async with aiofiles.open(file_path, mode='r') as input_file: + async for line in input_file: + line = line.strip() + + if not line: + continue + + parts = line.split() + + if len(parts) < 3: + raise ValueError(f'Invalid PTR record: {line}') + + name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.') + + if record_type != 'PTR': + continue + + #if record_type == 'CNAME': + # if data.endswith('.in-addr.arpa'): + # continue + + # Let's not index the PTR record if it's the same as the in-addr.arpa domain + if data == name: + continue + + ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1]) + + struct = { + 'ip': ip, + 'record': data, + 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + } + + yield {'_index': default_index, '_source': struct} + + return None # EOF + + +''' +Example PTR record: +0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. +0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com. +0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net. +0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net. +0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp. + +Will be indexed as: +{ + "ip": "47.229.6.0", + "record": "047-229-006-000.res.spectrum.com.", + "seen": "2021-06-30T18:31:00Z" +} +''' \ No newline at end of file diff --git a/async_dev/ingestors/ingest_zone.py b/async_dev/ingestors/ingest_zone.py new file mode 100644 index 0000000..7cb552d --- /dev/null +++ b/async_dev/ingestors/ingest_zone.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) +# ingest_zone.py + +import time + +try: + import aiofiles +except ImportError: + raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') + +default_index = 'dns-zones' +record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534') + +def construct_map() -> dict: + '''Construct the Elasticsearch index mapping for zone file records.''' + + keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } + + mapping = { + 'mappings': { + 'properties': { + 'domain': keyword_mapping, + 'records': { 'properties': {} }, + 'seen': {'type': 'date'} + } + } + } + + # Add record types to mapping dynamically to not clutter the code + for item in record_types: + if item in ('a','aaaa'): + mapping['mappings']['properties']['records']['properties'][item] = { + 'properties': { + 'data': { 'type': 'ip' }, + 'ttl': { 'type': 'integer' } + } + } + else: + mapping['mappings']['properties']['records']['properties'][item] = { + 'properties': { + 'data': keyword_mapping, + 'ttl': { 'type': 'integer' } + } + } + + return mapping + + +async def process_data(file_path: str): + ''' + Read and process zone file records. + + :param file_path: Path to the zone file + ''' + + domain_records = {} + last_domain = None + + async with aiofiles.open(file_path, mode='r') as input_file: + async for line in input_file: + line = line.strip() + + if not line or line.startswith(';'): + continue + + parts = line.split() + + if len(parts) < 5: + raise ValueError(f'Invalid line: {line}') + + domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:]) + + if not ttl.isdigit(): + raise ValueError(f'Invalid TTL: {ttl} with line: {line}') + + ttl = int(ttl) + + if record_class != 'in': + raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found) + + # We do not want to collide with our current mapping (Again, this is an anomaly) + if record_type not in record_types: + raise ValueError(f'Unsupported record type: {record_type} with line: {line}') + + # Little tidying up for specific record types + if record_type == 'nsec': + data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]]) + elif record_type == 'soa': + data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()]) + elif data.endswith('.'): + data = data.rstrip('.') + + if domain != last_domain: + if last_domain: + struct = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} + + del domain_records[last_domain] + + yield {'_index': default_index, '_source': struct} + + last_domain = domain + + domain_records[domain] = {} + + if record_type not in domain_records[domain]: + domain_records[domain][record_type] = [] + + domain_records[domain][record_type].append({'ttl': ttl, 'data': data}) + + return None # EOF + + + +''' +Example record: +0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG +0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY= +1-800-flowers.vegas. 3600 in ns dns1.cscdns.net. +1-800-flowers.vegas. 3600 in ns dns2.cscdns.net. +100.vegas. 3600 in ns ns51.domaincontrol.com. +100.vegas. 3600 in ns ns52.domaincontrol.com. +1001.vegas. 3600 in ns ns11.waterrockdigital.com. +1001.vegas. 3600 in ns ns12.waterrockdigital.com. + +Will be indexed as: +{ + "domain": "1001.vegas", + "records": { + "ns": [ + {"ttl": 3600, "data": "ns11.waterrockdigital.com"}, + {"ttl": 3600, "data": "ns12.waterrockdigital.com"} + ] + }, + "seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing +} +''' \ No newline at end of file