diff --git a/LICENSE b/LICENSE index 016e197..54ec6ab 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ ISC License -Copyright (c) 2023, acidvegas +Copyright (c) 2024, acidvegas Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above diff --git a/README.md b/README.md index fc7108f..e99d93f 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,41 @@ # Elasticsearch Recon Ingestion Scripts (ERIS) -> A utility for ingesting large scale reconnaissance data into Elast Search +> A utility for ingesting various large scale reconnaissance data logs into Elasticsearch ### Work In Progress ## Prerequisites - [python](https://www.python.org/) -- [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)* + - [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)* + +## Usage +```shell +python ingest_XXXX.py [options] +``` +**Note:** The `` can be a file or a directory of files, depending on the ingestion script. ###### Options | Argument | Description | | --------------- | -------------------------------------------------------------------------------------------- | | `--dry-run` | Perform a dry run without indexing records to Elasticsearch. | | `--batch_size` | Number of records to index in a batch *(default 25,000)*. | + +###### Elasticsearch Connnection Options +| Argument | Description | +| --------------- | -------------------------------------------------------------------------------------------- | | `--host` | Elasticsearch host *(default 'localhost')*. | | `--port` | Elasticsearch port *(default 9200)*. | | `--user` | Elasticsearch username *(default 'elastic')*. | | `--password` | Elasticsearch password. If not provided, it checks the environment variable **ES_PASSWORD**. | | `--api-key` | Elasticsearch API Key for authentication. | -| `--index` | Elasticsearch index name *(default 'zone_files')*. | -| `--filter` | Filter out records by type *(comma-separated list)*. | | `--self-signed` | Allow self-signed certificates. | +###### Elasticsearch Index Options +| Argument | Description | +| --------------- | -------------------------------------------------------------------------------------------- | +| `--index` | Elasticsearch index name *(default 'zone_files')*. | +| `--replicas` | Number of replicas for the index. | +| `--shards` | Number of shards for the index | + ___ ###### Mirrors diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py index b596bfd..9ded692 100644 --- a/ingestors/ingest_httpx.py +++ b/ingestors/ingest_httpx.py @@ -174,9 +174,20 @@ def main(): if not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') + + if args.shards < 1: + raise ValueError('Number of shards must be greater than 0') + + if args.replicas < 1: + raise ValueError('Number of replicas must be greater than 0') + + logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) + if not args.dry_run: + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') edx.process_file(args.input_path, args.batch_size) diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py index 98ad402..9018f40 100644 --- a/ingestors/ingest_masscan.py +++ b/ingestors/ingest_masscan.py @@ -74,7 +74,7 @@ class ElasticIndexer: if not self.es.indices.exists(index=self.es_index): response = self.es.indices.create(index=self.es_index, body=mapping) - + if response.get('acknowledged') and response.get('shards_acknowledged'): logging.info(f'Index \'{self.es_index}\' successfully created.') else: @@ -82,7 +82,7 @@ class ElasticIndexer: else: logging.warning(f'Index \'{self.es_index}\' already exists.') - + def process_file(self, file_path: str, batch_size: int): ''' @@ -120,7 +120,7 @@ class ElasticIndexer: ''' records = [] - + with open(file_path, 'r') as file: for line in file: line = line.strip() @@ -150,7 +150,7 @@ class ElasticIndexer: struct['ref_id'] = match.group(1) else: struct['banner'] = banner - + if self.dry_run: print(struct) else: @@ -182,7 +182,7 @@ def main(): parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') parser.add_argument('--user', default='elastic', help='Elasticsearch username') parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') - parser.add_argument('--api-key', help='Elasticsearch API Key for authentication') + parser.add_argument('--api-key', help='Elasticsearch API Key for authentication') parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates') # Elasticsearch indexing arguments @@ -201,13 +201,22 @@ def main(): if not args.host: raise ValueError('Missing required Elasticsearch argument: host') - + if not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') + if args.shards < 1: + raise ValueError('Number of shards must be greater than 0') + + if args.replicas < 0: + raise ValueError('Number of replicas must be greater than 0') + + logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') + edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) - edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + if not args.dry_run: + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') @@ -227,4 +236,4 @@ def main(): if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index f31cb93..3b9badf 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -203,10 +203,19 @@ def main(): if not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') + + if args.shards < 1: + raise ValueError('Number of shards must be greater than 0') + + if args.replicas < 1: + raise ValueError('Number of replicas must be greater than 0') + + logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) - edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + if not args.dry_run: + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}') diff --git a/ingestors/ingest_zone.py b/ingestors/ingest_zone.py index cfea315..d2634d1 100644 --- a/ingestors/ingest_zone.py +++ b/ingestors/ingest_zone.py @@ -133,6 +133,7 @@ class ElasticIndexer: } ''' + count = 0 records = [] domain_records = {} last_domain = None @@ -173,7 +174,7 @@ class ElasticIndexer: if domain != last_domain: if last_domain: - source = {'domain': domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} + source = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} del domain_records[last_domain] @@ -182,9 +183,10 @@ class ElasticIndexer: else: struct = {'_index': self.es_index, '_source': source} records.append(struct) + count += 1 if len(records) >= batch_size: success, _ = helpers.bulk(self.es, records) - logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}') + logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') records = [] last_domain = domain @@ -198,7 +200,7 @@ class ElasticIndexer: if records: success, _ = helpers.bulk(self.es, records) - logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}') + logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}') def main(): @@ -221,8 +223,8 @@ def main(): # Elasticsearch indexing arguments parser.add_argument('--index', default='zone-files', help='Elasticsearch index name') - parser.add_argument('--shards', type=int, default=0, help='Number of shards for the index') # This depends on your cluster configuration - parser.add_argument('--replicas', type=int, default=0, help='Number of replicas for the index') # This depends on your cluster configuration + parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # This depends on your cluster configuration + parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') # This depends on your cluster configuration args = parser.parse_args() @@ -238,10 +240,19 @@ def main(): if not args.api_key and (not args.user or not args.password): raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') + + if args.shards < 1: + raise ValueError('Number of shards must be greater than 0') + + if args.replicas < 1: + raise ValueError('Number of replicas must be greater than 0') + + logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...') edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) - edx.create_index(args.shards, args.replicas) # Create the index if it does not exist + if not args.dry_run: + edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if os.path.isfile(args.input_path): logging.info(f'Processing file: {args.input_path}')