From c105db705dc01459488f1d9d90a55924cbf789a1 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Sat, 27 Jan 2024 04:28:30 -0500 Subject: [PATCH] Updated README, copied over consistencies across the ingestors, docstring updates to reflect on new arguments --- README.md | 46 +++++--- ingestors/ingest_httpx.py | 229 ++++++++++++++++++++++++++---------- ingestors/ingest_masscan.py | 48 +++++--- ingestors/ingest_massdns.py | 55 +++++---- ingestors/ingest_zone.py | 210 ++++++++++++++++++++++++--------- ingestors/sniff_patch.py | 56 +++++---- 6 files changed, 456 insertions(+), 188 deletions(-) diff --git a/README.md b/README.md index 8798f86..6e40a05 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,10 @@ python ingest_XXXX.py [options] ``` **Note:** The `` can be a file or a directory of files, depending on the ingestion script. +## Operations +This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster. +It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment. + ### ###### General arguments | Argument | Description | @@ -22,28 +26,32 @@ python ingest_XXXX.py [options] | `--watch` | Watch the input file for new lines and index them in real time | ###### Elasticsearch arguments -| Argument | Description | Default | -|-------------------|--------------------------------------------------------------------------------------|---------------| -| `--host` | Elasticsearch host *(Will sniff for other nodes in the cluster)* | `localhost` | -| `--port` | Elasticsearch port | `9200` | -| `--user` | Elasticsearch username | `elastic` | -| `--password` | Elasticsearch password *(if not provided, check environment variable `ES_PASSWORD`)* | | -| `--api-key` | Elasticsearch API Key for authentication | | -| `--self-signed` | Elastic search instance is using a self-signed certificate | `true` | -| `--index` | Elasticsearch index name | `masscan-logs`| -| `--shards` | Number of shards for the index | `1` | -| `--replicas` | Number of replicas for the index | `1` | +| Argument | Description | Default | +|-------------------|--------------------------------------------------------|----------------| +| `--host` | Elasticsearch host | `localhost` | +| `--port` | Elasticsearch port | `9200` | +| `--user` | Elasticsearch username | `elastic` | +| `--password` | Elasticsearch password | `$ES_PASSWORD` | +| `--api-key` | Elasticsearch API Key for authentication | | +| `--self-signed` | Elasticsearch connection with aself-signed certificate | | + +###### Elasticsearch indexing arguments +| Argument | Description | Default | +|-------------------|----------------------------------|---------------------| +| `--index` | Elasticsearch index name | Depends on ingestor | +| `--shards` | Number of shards for the index | `1` | +| `--replicas` | Number of replicas for the index | `1` | ###### Performance arguments -| Argument | Description | Default | -|-------------------|--------------------------------------------------------------------------------------|---------------| -| `--batch-max` | Maximum size in MB of a batch | `10` | -| `--batch-size` | Number of records to index in a batch | `5000` | -| `--batch-threads` | Number of threads to use when indexing in batches | `2` | -| `--retries` | Number of times to retry indexing a batch before failing | `10` | -| `--timeout` | Number of seconds to wait before retrying a batch | `30` | +| Argument | Description | Default | +|-------------------|----------------------------------------------------------|---------| +| `--batch-max` | Maximum size in MB of a batch | `10` | +| `--batch-size` | Number of records to index in a batch | `5000` | +| `--batch-threads` | Number of threads to use when indexing in batches | `2` | +| `--retries` | Number of times to retry indexing a batch before failing | `10` | +| `--timeout` | Number of seconds to wait before retrying a batch | `30` | -**NOTE:** Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. +Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics int account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster. ___ diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py index 91a851e..717b37f 100644 --- a/ingestors/ingest_httpx.py +++ b/ingestors/ingest_httpx.py @@ -15,7 +15,7 @@ import os import time try: - from elasticsearch import Elasticsearch, helpers, ConnectionError, TransportError + from elasticsearch import Elasticsearch, helpers except ImportError: raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') @@ -25,11 +25,11 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level class ElasticIndexer: - def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False): + def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30): ''' Initialize the Elastic Search indexer. - :param es_host: Elasticsearch host + :param es_host: Elasticsearch host(s) :param es_port: Elasticsearch port :param es_user: Elasticsearch username :param es_password: Elasticsearch password @@ -37,6 +37,8 @@ class ElasticIndexer: :param es_index: Elasticsearch index name :param dry_run: If True, do not initialize Elasticsearch client :param self_signed: If True, do not verify SSL certificates + :param retries: Number of times to retry indexing a batch before failing + :param timeout: Number of seconds to wait before retrying a batch ''' self.dry_run = dry_run @@ -44,19 +46,53 @@ class ElasticIndexer: self.es_index = es_index if not dry_run: + es_config = { + 'hosts': [f'{es_host}:{es_port}'], + 'verify_certs': self_signed, + 'ssl_show_warn': self_signed, + 'request_timeout': timeout, + 'max_retries': retries, + 'retry_on_timeout': True, + 'sniff_on_start': False, + 'sniff_on_node_failure': True, + 'min_delay_between_sniffing': 60 + } if es_api_key: - self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed) + es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'} else: - self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed) + es_config['basic_auth'] = (es_user, es_password) + + # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) + import sniff_patch + self.es = sniff_patch.init_elasticsearch(**es_config) + + # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: + #self.es = Elasticsearch(**es_config) - def process_file(self, file_path: str, batch_size: int): + def get_cluster_health(self) -> dict: + '''Get the health of the Elasticsearch cluster.''' + + return self.es.cluster.health() + + + def get_cluster_size(self) -> int: + '''Get the number of nodes in the Elasticsearch cluster.''' + + cluster_stats = self.es.cluster.stats() + number_of_nodes = cluster_stats['nodes']['count']['total'] + + return number_of_nodes + + + def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}): ''' Read and index HTTPX records in batches to Elasticsearch, handling large volumes efficiently. :param file_path: Path to the HTTPX log file - :param batch_size: Number of records to process before indexing + :param watch: If True, watch the file for new lines and index them in real time + :param chunk: Chunking configuration Example record: { @@ -110,7 +146,7 @@ class ElasticIndexer: records = [] with open(file_path, 'r') as file: - for line in file: + for line in (file := follow(file) if watch else file): line = line.strip() if not line: @@ -129,97 +165,172 @@ class ElasticIndexer: record = {'_index': self.es_index, '_source': record} records.append(record) count += 1 - if len(records) >= batch_size: - while True: - try: - success, _ = helpers.bulk(self.es, records) - except (ConnectionError, TransportError) as e: - logging.error(f'Failed to index records to Elasticsearch. ({e})') - time.sleep(60) - else: - logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') - records = [] - break + if len(records) >= chunk['batch']: + self.bulk_index(records, file_path, chunk, count) + records = [] if records: - while True: - try: - success, _ = helpers.bulk(self.es, records) - except (ConnectionError, TransportError) as e: - logging.error(f'Failed to index records to Elasticsearch. ({e})') - time.sleep(60) - else: - logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') + self.bulk_index(records, file_path, chunk, count) + + def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int): + ''' + Index a batch of documents to Elasticsearch. + + :param documents: List of documents to index + :param file_path: Path to the file being indexed + :param count: Total number of records processed + ''' + + remaining_documents = documents + + parallel_bulk_config = { + 'client': self.es, + 'chunk_size': chunk['size'], + 'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB + 'thread_count': chunk['threads'], + 'queue_size': 2 + } + + while remaining_documents: + failed_documents = [] + + try: + for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config): + if not success: + failed_documents.append(response) + + if not failed_documents: + ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count'] + logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}') break + else: + logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...') + remaining_documents = failed_documents + except Exception as e: + logging.error(f'Failed to index documents! ({e})') + time.sleep(30) + + + def follow(file) -> str: + ''' + Generator function that yields new lines in a file in real time. + + :param file: File object to read from + ''' + + file.seek(0,2) # Go to the end of the file + + while True: + line = file.readline() + + if not line: + time.sleep(0.1) + continue + + yield line + def main(): '''Main function when running this script directly.''' parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') - parser.add_argument('input_path', help='Path to the input file or directory') - + # General arguments + parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') - parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch') - - # Elasticsearch connection arguments + parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time') + + # Elasticsearch arguments parser.add_argument('--host', default='localhost', help='Elasticsearch host') parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') parser.add_argument('--user', default='elastic', help='Elasticsearch username') parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') - parser.add_argument('--api-key', help='Elasticsearch API Key for authentication') + parser.add_argument('--api-key', help='Elasticsearch API Key for authentication') parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates') # Elasticsearch indexing arguments - parser.add_argument('--index', default='zone-files', help='Elasticsearch index name') + parser.add_argument('--index', default='httpx-logs', help='Elasticsearch index name') parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') + # Performance arguments + parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch') + parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch') + parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches') + parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing') + parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch') + args = parser.parse_args() - if not os.path.exists(args.input_path): - raise FileNotFoundError(f'Input file {args.input_path} does not exist') + # Argument validation + if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): + raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') if not args.dry_run: if args.batch_size < 1: raise ValueError('Batch size must be greater than 0') + + elif args.retries < 1: + raise ValueError('Number of retries must be greater than 0') + + elif args.timeout < 5: + raise ValueError('Timeout must be greater than 4') + + elif args.batch_max < 1: + raise ValueError('Batch max size must be greater than 0') + + elif args.batch_threads < 1: + raise ValueError('Batch threads must be greater than 0') - if not args.host: + elif not args.host: raise ValueError('Missing required Elasticsearch argument: host') - - if not args.api_key and (not args.user or not args.password): + + elif not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') - - if args.shards < 1: + + elif args.shards < 1: raise ValueError('Number of shards must be greater than 0') - - if args.replicas < 1: + + elif args.replicas < 0: raise ValueError('Number of replicas must be greater than 0') - - logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') - - edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) + edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout) + if not args.dry_run: + print(edx.get_cluster_health()) + + time.sleep(3) # Delay to allow time for sniffing to complete + + nodes = edx.get_cluster_size() + + logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + chunk = { + 'size': args.batch_size, + 'max_size': args.batch_max * 1024 * 1024, # MB + 'threads': args.batch_threads + } + + chunk['batch'] = nodes * (chunk['size'] * chunk['threads']) + else: + chunk = {} # Ugly hack to get this working... + if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') - edx.process_file(args.input_path, args.batch_size) + edx.process_file(args.input_path, args.watch, chunk) elif os.path.isdir(args.input_path): - logging.info(f'Processing files in directory: {args.input_path}') + count = 1 + total = len(os.listdir(args.input_path)) + logging.info(f'Processing {total:,} files in directory: {args.input_path}') for file in sorted(os.listdir(args.input_path)): file_path = os.path.join(args.input_path, file) if os.path.isfile(file_path): - logging.info(f'Processing file: {file_path}') - edx.process_file(file_path, args.batch_size) - - else: - raise ValueError(f'Input path {args.input_path} is not a file or directory') - - - -if __name__ == '__main__': - main() \ No newline at end of file + logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}') + edx.process_file(file_path, args.watch, chunk) + count += 1 + else: + logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') \ No newline at end of file diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py index 79df82a..ae0a6cd 100644 --- a/ingestors/ingest_masscan.py +++ b/ingestors/ingest_masscan.py @@ -60,7 +60,7 @@ class ElasticIndexer: 'request_timeout': timeout, 'max_retries': retries, 'retry_on_timeout': True, - 'sniff_on_start': True, + 'sniff_on_start': False, 'sniff_on_node_failure': True, 'min_delay_between_sniffing': 60 } @@ -69,12 +69,22 @@ class ElasticIndexer: es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'} else: es_config['basic_auth'] = (es_user, es_password) + + # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) + import sniff_patch + self.es = sniff_patch.init_elasticsearch(**es_config) - self.es = Elasticsearch(**es_config) + # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: + #self.es = Elasticsearch(**es_config) def create_index(self, shards: int = 1, replicas: int = 1): - '''Create the Elasticsearch index with the defined mapping.''' + ''' + Create the Elasticsearch index with the defined mapping. + + :param shards: Number of shards for the index + :param replicas: Number of replicas for the index + ''' mapping = { 'settings': { @@ -106,6 +116,12 @@ class ElasticIndexer: logging.warning(f'Index \'{self.es_index}\' already exists.') + def get_cluster_health(self) -> dict: + '''Get the health of the Elasticsearch cluster.''' + + return self.es.cluster.health() + + def get_cluster_size(self) -> int: '''Get the number of nodes in the Elasticsearch cluster.''' @@ -120,8 +136,8 @@ class ElasticIndexer: Read and index Masscan records in batches to Elasticsearch, handling large volumes efficiently. :param file_path: Path to the Masscan log file - :param batch_size: Number of records to process before indexing - :param watch: If True, input file will be watched for new lines and indexed in real time + :param watch: If True, input file will be watched for new lines and indexed in real time\ + :param chunk: Chunk configuration for indexing in batches Example record: { @@ -207,6 +223,7 @@ class ElasticIndexer: :param file_path: Path to the file being indexed :param count: Total number of records processed ''' + remaining_documents = documents parallel_bulk_config = { @@ -261,9 +278,9 @@ def main(): '''Main function when running this script directly.''' parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') - parser.add_argument('input_path', help='Path to the input file or directory') - + # General arguments + parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time') @@ -280,22 +297,18 @@ def main(): parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') - # Batch arguments (for fine tuning performance) + # Performance arguments parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch') parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch') parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches') - # NOTE: Using --batch-threads as 4 and --batch-size as 10000 means we will process 40,000 records per-node before indexing, so 3 nodes would process 120,000 records before indexing - - # Elasticsearch retry arguments parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing') parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch') args = parser.parse_args() - if not os.path.exists(args.input_path): - raise FileNotFoundError(f'Input file {args.input_path} does not exist') - elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): - raise ValueError(f'Input path {args.input_path} is not a file or directory') + # Argument validation + if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): + raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') if not args.dry_run: if args.batch_size < 1: @@ -328,16 +341,19 @@ def main(): edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout) if not args.dry_run: + print(edx.get_cluster_health()) + time.sleep(3) # Delay to allow time for sniffing to complete nodes = edx.get_cluster_size() + logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') edx.create_index(args.shards, args.replicas) # Create the index if it does not exist chunk = { 'size': args.batch_size, - 'max_size': args.batch_max * 1024 * 1024, + 'max_size': args.batch_max * 1024 * 1024, # MB 'threads': args.batch_threads } diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index f6f073d..185d021 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -15,7 +15,6 @@ import time try: from elasticsearch import Elasticsearch, helpers - import sniff_patch except ImportError: raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') @@ -62,13 +61,22 @@ class ElasticIndexer: es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'} else: es_config['basic_auth'] = (es_user, es_password) - - #self.es = Elasticsearch(**es_config) + + # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) + import sniff_patch self.es = sniff_patch.init_elasticsearch(**es_config) + # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: + #self.es = Elasticsearch(**es_config) + def create_index(self, shards: int = 1, replicas: int = 1): - '''Create the Elasticsearch index with the defined mapping.''' + ''' + Create the Elasticsearch index with the defined mapping. + + :param shards: Number of shards for the index + :param replicas: Number of replicas for the index + ''' mapping = { 'settings': { @@ -97,6 +105,12 @@ class ElasticIndexer: logging.warning(f'Index \'{self.es_index}\' already exists.') + def get_cluster_health(self) -> dict: + '''Get the health of the Elasticsearch cluster.''' + + return self.es.cluster.health() + + def get_cluster_size(self) -> int: '''Get the number of nodes in the Elasticsearch cluster.''' @@ -106,13 +120,13 @@ class ElasticIndexer: return number_of_nodes - def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}): ''' Read and index PTR records in batches to Elasticsearch, handling large volumes efficiently. - :param file_path: Path to the PTR log file - :param batch_size: Number of records to process before indexing + :param file_path: Path to the Masscan log file + :param watch: If True, input file will be watched for new lines and indexed in real time\ + :param chunk: Chunk configuration for indexing in batches Example PTR record: 0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com. @@ -181,14 +195,16 @@ class ElasticIndexer: :param documents: List of documents to index :param file_path: Path to the file being indexed + :param chunk: Chunk configuration :param count: Total number of records processed ''' + remaining_documents = documents parallel_bulk_config = { 'client': self.es, 'chunk_size': chunk['size'], - 'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB + 'max_chunk_bytes': chunk['max_size'], 'thread_count': chunk['threads'], 'queue_size': 2 } @@ -237,9 +253,9 @@ def main(): '''Main function when running this script directly.''' parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') - parser.add_argument('input_path', help='Path to the input file or directory') - + # General arguments + parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time') @@ -256,22 +272,18 @@ def main(): parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') - # Batch arguments (for fine tuning performance) + # Performance arguments parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch') parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch') parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches') - # NOTE: Using --batch-threads as 4 and --batch-size as 10000 means we will process 40,000 records per-node before indexing, so 3 nodes would process 120,000 records before indexing - - # Elasticsearch retry arguments parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing') parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch') args = parser.parse_args() - if not os.path.exists(args.input_path): - raise FileNotFoundError(f'Input file {args.input_path} does not exist') - elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): - raise ValueError(f'Input path {args.input_path} is not a file or directory') + # Argument validation + if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): + raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') if not args.dry_run: if args.batch_size < 1: @@ -304,16 +316,19 @@ def main(): edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout) if not args.dry_run: - time.sleep(5) # Delay to allow time for sniffing to complete + print(edx.get_cluster_health()) + + time.sleep(3) # Delay to allow time for sniffing to complete nodes = edx.get_cluster_size() + logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') edx.create_index(args.shards, args.replicas) # Create the index if it does not exist chunk = { 'size': args.batch_size, - 'max_size': args.batch_max * 1024 * 1024, + 'max_size': args.batch_max * 1024 * 1024, # MB 'threads': args.batch_threads } diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py index 9bc92fa..93ccefe 100644 --- a/ingestors/ingest_zone.py +++ b/ingestors/ingest_zone.py @@ -39,11 +39,11 @@ record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','nap class ElasticIndexer: - def __init__(self, es_host: str, es_port: str, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False): + def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30): ''' Initialize the Elastic Search indexer. - :param es_host: Elasticsearch host + :param es_host: Elasticsearch host(s) :param es_port: Elasticsearch port :param es_user: Elasticsearch username :param es_password: Elasticsearch password @@ -51,6 +51,8 @@ class ElasticIndexer: :param es_index: Elasticsearch index name :param dry_run: If True, do not initialize Elasticsearch client :param self_signed: If True, do not verify SSL certificates + :param retries: Number of times to retry indexing a batch before failing + :param timeout: Number of seconds to wait before retrying a batch ''' self.dry_run = dry_run @@ -58,15 +60,38 @@ class ElasticIndexer: self.es_index = es_index if not dry_run: + es_config = { + 'hosts': [f'{es_host}:{es_port}'], + 'verify_certs': self_signed, + 'ssl_show_warn': self_signed, + 'request_timeout': timeout, + 'max_retries': retries, + 'retry_on_timeout': True, + 'sniff_on_start': False, + 'sniff_on_node_failure': True, + 'min_delay_between_sniffing': 60 + } if es_api_key: - self.es = Elasticsearch([f'{es_host}:{es_port}'], headers={'Authorization': f'ApiKey {es_api_key}'}, verify_certs=self_signed, ssl_show_warn=self_signed) + es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'} else: - self.es = Elasticsearch([f'{es_host}:{es_port}'], basic_auth=(es_user, es_password), verify_certs=self_signed, ssl_show_warn=self_signed) + es_config['basic_auth'] = (es_user, es_password) + + # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) + import sniff_patch + self.es = sniff_patch.init_elasticsearch(**es_config) + + # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: + #self.es = Elasticsearch(**es_config) def create_index(self, shards: int = 1, replicas: int = 1): - '''Create the Elasticsearch index with the defined mapping.''' + ''' + Create the Elasticsearch index with the defined mapping. + + :param shards: Number of shards for the index + :param replicas: Number of replicas for the index + ''' mapping = { 'settings': { @@ -111,12 +136,28 @@ class ElasticIndexer: logging.warning(f'Index \'{self.es_index}\' already exists.') - def process_file(self, file_path: str, batch_size: int): + def get_cluster_health(self) -> dict: + '''Get the health of the Elasticsearch cluster.''' + + return self.es.cluster.health() + + + def get_cluster_size(self) -> int: + '''Get the number of nodes in the Elasticsearch cluster.''' + + cluster_stats = self.es.cluster.stats() + number_of_nodes = cluster_stats['nodes']['count']['total'] + + return number_of_nodes + + + def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}): ''' Read and index DNS records in batches to Elasticsearch, handling large volumes efficiently. - :param file_path: Path to the DNS zone file - :param batch_size: Number of records to process before indexing + :param file_path: Path to the Masscan log file + :param watch: If True, input file will be watched for new lines and indexed in real time\ + :param chunk: Chunk configuration for indexing in batches Example record: 0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG @@ -192,17 +233,9 @@ class ElasticIndexer: struct = {'_index': self.es_index, '_source': source} records.append(struct) count += 1 - if len(records) >= batch_size: - while True: - try: - success, _ = helpers.bulk(self.es, records) - except (ConnectionError, TransportError) as e: - logging.error(f'Failed to index records to Elasticsearch. ({e})') - time.sleep(60) - else: - logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') - records = [] - break + if len(records) >= chunk['batch']: + self.bulk_index(records, file_path, chunk, count) + records = [] last_domain = domain @@ -214,28 +247,60 @@ class ElasticIndexer: domain_records[domain][record_type].append({'ttl': ttl, 'data': data}) if records: - while True: - try: - success, _ = helpers.bulk(self.es, records) - except (ConnectionError, TransportError) as e: - logging.error(f'Failed to index records to Elasticsearch. ({e})') - time.sleep(60) - else: - logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') + self.bulk_index(records, file_path, chunk, count) + + + def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int): + ''' + Index a batch of documents to Elasticsearch. + + :param documents: List of documents to index + :param file_path: Path to the file being indexed + :param count: Total number of records processed + ''' + + remaining_documents = documents + + parallel_bulk_config = { + 'client': self.es, + 'chunk_size': chunk['size'], + 'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB + 'thread_count': chunk['threads'], + 'queue_size': 2 + } + + while remaining_documents: + failed_documents = [] + + try: + for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config): + if not success: + failed_documents.append(response) + + if not failed_documents: + ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count'] + logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}') break + else: + logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...') + remaining_documents = failed_documents + except Exception as e: + logging.error(f'Failed to index documents! ({e})') + time.sleep(30) + def main(): '''Main function when running this script directly.''' parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') - parser.add_argument('input_path', help='Path to the input file or directory') - + # General arguments + parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') - parser.add_argument('--batch_size', type=int, default=50000, help='Number of records to index in a batch') - - # Elasticsearch connection arguments + parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time') + + # Elasticsearch arguments parser.add_argument('--host', default='localhost', help='Elasticsearch host') parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') parser.add_argument('--user', default='elastic', help='Elasticsearch username') @@ -245,51 +310,90 @@ def main(): # Elasticsearch indexing arguments parser.add_argument('--index', default='dns-zones', help='Elasticsearch index name') - parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # This depends on your cluster configuration - parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') # This depends on your cluster configuration + parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') + parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') + + # Performance arguments + parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch') + parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch') + parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches') + parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing') + parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch') args = parser.parse_args() - if not os.path.exists(args.input_path): - raise FileNotFoundError(f'Input file {args.input_path} does not exist') + # Argument validation + if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): + raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') if not args.dry_run: if args.batch_size < 1: raise ValueError('Batch size must be greater than 0') + + elif args.retries < 1: + raise ValueError('Number of retries must be greater than 0') + + elif args.timeout < 5: + raise ValueError('Timeout must be greater than 4') + + elif args.batch_max < 1: + raise ValueError('Batch max size must be greater than 0') + + elif args.batch_threads < 1: + raise ValueError('Batch threads must be greater than 0') - if not args.host: + elif not args.host: raise ValueError('Missing required Elasticsearch argument: host') - - if not args.api_key and (not args.user or not args.password): + + elif not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') - - if args.shards < 1: + + elif args.shards < 1: raise ValueError('Number of shards must be greater than 0') - - if args.replicas < 1: + + elif args.replicas < 0: raise ValueError('Number of replicas must be greater than 0') - - logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') - - edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) + edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout) + if not args.dry_run: + print(edx.get_cluster_health()) + + time.sleep(3) # Delay to allow time for sniffing to complete + + nodes = edx.get_cluster_size() + + logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + chunk = { + 'size': args.batch_size, + 'max_size': args.batch_max * 1024 * 1024, # MB + 'threads': args.batch_threads + } + + chunk['batch'] = nodes * (chunk['size'] * chunk['threads']) + else: + chunk = {} # Ugly hack to get this working... + if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') - edx.process_file(args.input_path, args.batch_size) + edx.process_file(args.input_path, args.watch, chunk) elif os.path.isdir(args.input_path): - logging.info(f'Processing files in directory: {args.input_path}') + count = 1 + total = len(os.listdir(args.input_path)) + logging.info(f'Processing {total:,} files in directory: {args.input_path}') for file in sorted(os.listdir(args.input_path)): file_path = os.path.join(args.input_path, file) if os.path.isfile(file_path): - logging.info(f'Processing file: {file_path}') - edx.process_file(file_path, args.batch_size) + logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}') + edx.process_file(file_path, args.watch, chunk) + count += 1 + else: + logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') - else: - raise ValueError(f'Input path {args.input_path} is not a file or directory') if __name__ == '__main__': diff --git a/ingestors/sniff_patch.py b/ingestors/sniff_patch.py index 3d75402..01b795f 100644 --- a/ingestors/sniff_patch.py +++ b/ingestors/sniff_patch.py @@ -1,26 +1,41 @@ -# sniff_* patch for elastic 8 clients -# Call init_elasticsearch() with normal Elasticsearch params -# Only needed if you use sniff_* options and only works with basic auth, feel free to edit to your needs. +#!/usr/bin/env python +# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) + +# Note: +# This is a patch for the elasticsearch 8.x client to fix the sniff_* options. +# This patch is only needed if you use the sniff_* options and only works with basic auth. +# Call init_elasticsearch() with normal Elasticsearch params. +# +# Source: +# - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960 import base64 + import elasticsearch._sync.client as client from elasticsearch.exceptions import SerializationError, ConnectionError def init_elasticsearch(*args, **kwargs): + ''' + Initialize the Elasticsearch client with the sniff patch. + + :param args: Elasticsearch positional arguments. + :param kwargs: Elasticsearch keyword arguments. + ''' client.default_sniff_callback = _override_sniff_callback(kwargs['basic_auth']) + return client.Elasticsearch(*args, **kwargs) - + def _override_sniff_callback(basic_auth): - """ + ''' Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166 Completely unmodified except for adding the auth header to the elastic request. Allows us to continue using the sniff_* options while this is broken in the library. TODO: Remove this when this issue is patched: - https://github.com/elastic/elasticsearch-py/issues/2005 - """ + - https://github.com/elastic/elasticsearch-py/issues/2005 + ''' auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode() sniffed_node_callback = client._base._default_sniffed_node_callback @@ -28,14 +43,13 @@ def _override_sniff_callback(basic_auth): for _ in transport.node_pool.all(): try: meta, node_infos = transport.perform_request( - "GET", - "/_nodes/_all/http", - headers={ - "accept": "application/vnd.elasticsearch+json; compatible-with=8", - # This auth header is missing in 8.x releases of the client, and causes 401s - "authorization": f"Basic {auth_str}" + 'GET', + '/_nodes/_all/http', + headers = { + 'accept': 'application/vnd.elasticsearch+json; compatible-with=8', + 'authorization': f'Basic {auth_str}' # This auth header is missing in 8.x releases of the client, and causes 401s }, - request_timeout=( + request_timeout = ( sniff_options.sniff_timeout if not sniff_options.is_initial_sniff else None @@ -48,19 +62,19 @@ def _override_sniff_callback(basic_auth): continue node_configs = [] - for node_info in node_infos.get("nodes", {}).values(): - address = node_info.get("http", {}).get("publish_address") - if not address or ":" not in address: + for node_info in node_infos.get('nodes', {}).values(): + address = node_info.get('http', {}).get('publish_address') + if not address or ':' not in address: continue - if "/" in address: + if '/' in address: # Support 7.x host/ip:port behavior where http.publish_host has been set. - fqdn, ipaddress = address.split("/", 1) + fqdn, ipaddress = address.split('/', 1) host = fqdn - _, port_str = ipaddress.rsplit(":", 1) + _, port_str = ipaddress.rsplit(':', 1) port = int(port_str) else: - host, port_str = address.rsplit(":", 1) + host, port_str = address.rsplit(':', 1) port = int(port_str) assert sniffed_node_callback is not None