diff --git a/README.md b/README.md index accbb03..f91f638 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ Create & add a geoip pipeline and use the following in your index mappings: ## Roadmap - Implement [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/v8.12.1/async.html) into the code. - WHOIS database ingestion scripts +- Dynamically update the batch metrics when the sniffer adds or removes nodes ___ diff --git a/async_dev/eris.py b/async_dev/eris.py index 183b21f..093901a 100644 --- a/async_dev/eris.py +++ b/async_dev/eris.py @@ -2,11 +2,11 @@ # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) # eris.py [asyncronous developement] +import asyncio import argparse import logging import os import stat -import time import sys sys.dont_write_bytecode = True @@ -31,41 +31,44 @@ class ElasticIndexer: ''' self.chunk_size = args.chunk_size - self.chunk_threads = args.chunk_threads - self.dry_run = args.dry_run + self.es = None self.es_index = args.index + + self.es_config = { + 'hosts': [f'{args.host}:{args.port}'], + 'verify_certs': args.self_signed, + 'ssl_show_warn': args.self_signed, + 'request_timeout': args.timeout, + 'max_retries': args.retries, + 'retry_on_timeout': True, + 'sniff_on_start': True, # Is this problematic? + 'sniff_on_node_failure': True, + 'min_delay_between_sniffing': 60 # Add config option for this? + } - if not args.dry_run: - es_config = { - 'hosts': [f'{args.host}:{args.port}'], - 'verify_certs': args.self_signed, - 'ssl_show_warn': args.self_signed, - 'request_timeout': args.timeout, - 'max_retries': args.retries, - 'retry_on_timeout': True, - 'sniff_on_start': True, # Is this problematic? - 'sniff_on_node_failure': True, - 'min_delay_between_sniffing': 60 # Add config option for this? - } + if args.api_key: + self.es_config['api_key'] = (args.api_key, '') # Verify this is correct + else: + self.es_config['basic_auth'] = (args.user, args.password) + + + async def initialize(self): + '''Initialize the Elasticsearch client.''' - if args.api_key: - es_config['api_key'] = (args.key, '') # Verify this is correct - else: - es_config['basic_auth'] = (args.user, args.password) - - # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) - import sniff_patch - self.es = sniff_patch.init_elasticsearch(**es_config) + # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) + import sniff_patch + self.es = sniff_patch.init_elasticsearch(**self.es_config) - # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: - #self.es = AsyncElasticsearch(**es_config) + # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: + #self.es = AsyncElasticsearch(**es_config) - async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ): + async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1): ''' Create the Elasticsearch index with the defined mapping. - :param pipline: Name of the ingest pipeline to use for the index + :param map_body: Mapping for the index + :param pipeline: Name of the ingest pipeline to use for the index :param replicas: Number of replicas for the index :param shards: Number of shards for the index ''' @@ -112,7 +115,7 @@ class ElasticIndexer: return number_of_nodes - async def async_bulk_index_data(self, file_path: str, index_name: str, data_generator: callable): + async def process_data(self, file_path: str, data_generator: callable): ''' Index records in chunks to Elasticsearch. @@ -124,11 +127,11 @@ class ElasticIndexer: count = 0 total = 0 - async for ok, result in async_streaming_bulk(self.es, index_name=self.es_index, actions=data_generator(file_path), chunk_size=self.chunk_size): + async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size): action, result = result.popitem() if not ok: - logging.error(f'Failed to index document ({result["_id"]}) to {index_name} from {file_path} ({result})') + logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})') input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled) continue @@ -139,41 +142,16 @@ class ElasticIndexer: logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}') count = 0 - logging.info(f'Finished indexing {self.total:,} records to {self.es_index} from {file_path}') + logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}') - async def process_file(self, file_path: str, ingest_function: callable): - ''' - Read and index records in batches to Elasticsearch. - - :param file_path: Path to the file - :param batch_size: Number of records to index per batch - :param ingest_function: Function to process the file - ''' - - count = 0 - - async for processed in ingest_function(file_path): - if not processed: - break - - if self.dry_run: - print(processed) - continue - - count += 1 - - yield {'_index': self.es_index, '_source': processed} - - -def main(): +async def main(): '''Main function when running this script directly.''' parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') # General arguments parser.add_argument('input_path', help='Path to the input file or directory') # Required - parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') # Elasticsearch arguments @@ -192,7 +170,6 @@ def main(): # Performance arguments parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk') - parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks') parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing') parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk') @@ -214,6 +191,7 @@ def main(): raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') edx = ElasticIndexer(args) + await edx.initialize() # Initialize the Elasticsearch client asyncronously if args.cert: from ingestors import ingest_certs as ingestor @@ -225,32 +203,28 @@ def main(): from ingestors import ingest_massdns as ingestor elif args.zone: from ingestors import ingest_zone as ingestor - - batch_size = 0 - if not args.dry_run: - print(edx.get_cluster_health()) + health = await edx.get_cluster_health() + print(health) - time.sleep(3) # Delay to allow time for sniffing to complete - - nodes = edx.get_cluster_size() - logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') + await asyncio.sleep(5) # Delay to allow time for sniffing to complete + + nodes = await edx.get_cluster_size() + logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') - if not edx.es_index: - edx.es_index = ingestor.default_index + if not edx.es_index: + edx.es_index = ingestor.default_index - map_body = ingestor.construct_map() - edx.create_index(map_body, args.pipeline, args.replicas, args.shards) - - batch_size = int(nodes * (args.chunk_size * args.chunk_threads)) + map_body = ingestor.construct_map() + await edx.create_index(map_body, args.pipeline, args.replicas, args.shards) if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') - edx.process_file(args.input_path, batch_size, ingestor.process_file) + await edx.process_data(args.input_path, ingestor.process_data) elif stat.S_ISFIFO(os.stat(args.input_path).st_mode): logging.info(f'Watching FIFO: {args.input_path}') - edx.process_file(args.input_path, batch_size, ingestor.process_file) + await edx.process_data(args.input_path, ingestor.process_data) elif os.path.isdir(args.input_path): count = 1 @@ -260,7 +234,7 @@ def main(): file_path = os.path.join(args.input_path, file) if os.path.isfile(file_path): logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}') - edx.process_file(file_path, batch_size, ingestor.process_file) + await edx.process_data(file_path, ingestor.process_data) count += 1 else: logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') @@ -268,4 +242,4 @@ def main(): if __name__ == '__main__': - main() \ No newline at end of file + asyncio.run(main()) \ No newline at end of file diff --git a/async_dev/ingestors/ingest_certs.py b/async_dev/ingestors/ingest_certs.py index 5a5e300..40404d4 100644 --- a/async_dev/ingestors/ingest_certs.py +++ b/async_dev/ingestors/ingest_certs.py @@ -91,8 +91,12 @@ def construct_map() -> dict: return mapping -async def process(): - '''Read and process Certsream records live from the Websocket stream.''' +async def process_data(file_path: str = None): + ''' + Read and process Certsream records live from the Websocket stream. + + :param file_path: Path to the Certstream log file (unused, placeholder for consistency with other ingestors) + ''' while True: try: @@ -105,6 +109,7 @@ async def process(): record = json.loads(line) except json.decoder.JSONDecodeError: logging.error(f'Failed to parse JSON record from Certstream! ({line})') + input('Press Enter to continue...') # Pause the script to allow the user to read the error message continue yield record diff --git a/async_dev/ingestors/ingest_masscan_async.py b/async_dev/ingestors/ingest_masscan.py similarity index 78% rename from async_dev/ingestors/ingest_masscan_async.py rename to async_dev/ingestors/ingest_masscan.py index df0c2a3..8fcc0f2 100644 --- a/async_dev/ingestors/ingest_masscan_async.py +++ b/async_dev/ingestors/ingest_masscan.py @@ -59,7 +59,7 @@ def construct_map() -> dict: return mapping -async def process_file(file_path: str): +async def process_data(file_path: str): ''' Read and process Masscan records from the log file. @@ -79,22 +79,29 @@ async def process_file(file_path: str): try: record = json.loads(line) except json.decoder.JSONDecodeError: + # In rare cases, the JSON record may be incomplete or malformed: + # { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner": + # { "ip": "83.66.211.246", "timestamp": "1706557002" logging.error(f'Failed to parse JSON record! ({line})') - input('Press Enter to continue...') # Debugging + input('Press Enter to continue...') # Pause for review & debugging (Will remove pausing in production, still investigating the cause of this issue.) continue + if len(record['ports']) > 1: + logging.warning(f'Multiple ports found for record! ({record})') + input('Press Enter to continue...') # Pause for review (Will remove pausing in production, still investigating if you ever seen more than one port in a record.) + for port_info in record['ports']: struct = { - 'ip': record['ip'], - 'port': port_info['port'], - 'proto': port_info['proto'], - 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))), + 'ip' : record['ip'], + 'port' : port_info['port'], + 'proto' : port_info['proto'], + 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))), } if 'service' in port_info: if 'name' in port_info['service']: - if port_info['service']['name'] != 'unknown': - struct['service'] = port_info['service']['name'] + if (service_name := port_info['service']['name']) not in ('unknown',''): + struct['service'] = service_name if 'banner' in port_info['service']: banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace