Compare commits

..

2 Commits

6 changed files with 77 additions and 23 deletions

View File

@ -1,6 +1,6 @@
ISC License ISC License
Copyright (c) 2023, acidvegas <acid.vegas@acid.vegas> Copyright (c) 2024, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above

View File

@ -1,27 +1,41 @@
# Elasticsearch Recon Ingestion Scripts (ERIS) # Elasticsearch Recon Ingestion Scripts (ERIS)
> A utility for ingesting large scale reconnaissance data into Elast Search > A utility for ingesting various large scale reconnaissance data logs into Elasticsearch
### Work In Progress ### Work In Progress
## Prerequisites ## Prerequisites
- [python](https://www.python.org/) - [python](https://www.python.org/)
- [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)* - [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)*
## Usage
```shell
python ingest_XXXX.py [options] <input>
```
**Note:** The `<input>` can be a file or a directory of files, depending on the ingestion script.
###### Options ###### Options
| Argument | Description | | Argument | Description |
| --------------- | -------------------------------------------------------------------------------------------- | | --------------- | -------------------------------------------------------------------------------------------- |
| `--dry-run` | Perform a dry run without indexing records to Elasticsearch. | | `--dry-run` | Perform a dry run without indexing records to Elasticsearch. |
| `--batch_size` | Number of records to index in a batch *(default 25,000)*. | | `--batch_size` | Number of records to index in a batch *(default 25,000)*. |
###### Elasticsearch Connnection Options
| Argument | Description |
| --------------- | -------------------------------------------------------------------------------------------- |
| `--host` | Elasticsearch host *(default 'localhost')*. | | `--host` | Elasticsearch host *(default 'localhost')*. |
| `--port` | Elasticsearch port *(default 9200)*. | | `--port` | Elasticsearch port *(default 9200)*. |
| `--user` | Elasticsearch username *(default 'elastic')*. | | `--user` | Elasticsearch username *(default 'elastic')*. |
| `--password` | Elasticsearch password. If not provided, it checks the environment variable **ES_PASSWORD**. | | `--password` | Elasticsearch password. If not provided, it checks the environment variable **ES_PASSWORD**. |
| `--api-key` | Elasticsearch API Key for authentication. | | `--api-key` | Elasticsearch API Key for authentication. |
| `--index` | Elasticsearch index name *(default 'zone_files')*. |
| `--filter` | Filter out records by type *(comma-separated list)*. |
| `--self-signed` | Allow self-signed certificates. | | `--self-signed` | Allow self-signed certificates. |
###### Elasticsearch Index Options
| Argument | Description |
| --------------- | -------------------------------------------------------------------------------------------- |
| `--index` | Elasticsearch index name *(default 'zone_files')*. |
| `--replicas` | Number of replicas for the index. |
| `--shards` | Number of shards for the index |
___ ___
###### Mirrors ###### Mirrors for this repository: [acid.vegas](https://git.acid.vegas/eris) • [SuperNETs](https://git.supernets.org/acidvegas/eris) • [GitHub](https://github.com/acidvegas/eris) • [GitLab](https://gitlab.com/acidvegas/eris) • [Codeberg](https://codeberg.org/acidvegas/eris)
[acid.vegas](https://git.acid.vegas/eris) • [GitHub](https://github.com/acidvegas/eris) • [GitLab](https://gitlab.com/acidvegas/eris) • [SuperNETs](https://git.supernets.org/acidvegas/eris)

View File

@ -174,9 +174,20 @@ def main():
if not args.api_key and (not args.user or not args.password): if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
if args.shards < 1:
raise ValueError('Number of shards must be greater than 0')
if args.replicas < 1:
raise ValueError('Number of replicas must be greater than 0')
logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
if not args.dry_run:
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path): if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}') logging.info(f'Processing file: {args.input_path}')
edx.process_file(args.input_path, args.batch_size) edx.process_file(args.input_path, args.batch_size)

View File

@ -6,7 +6,7 @@
# This script takes JSON formatted masscan logs with banners and indexes them into Elasticsearch. # This script takes JSON formatted masscan logs with banners and indexes them into Elasticsearch.
# #
# Saving my "typical" masscan command here for reference to myself: # Saving my "typical" masscan command here for reference to myself:
# masscan 0.0.0.0/0 -p80,443 --banners --open-only --rate 50000 --shard 1/10 --excludefile exclude.conf -oJ output.json --interactive # masscan 0.0.0.0/0 -p3559,1900 --banners --open-only --rate 25000 --excludefile exclude.conf -oJ output.json --interactive
import argparse import argparse
import json import json
@ -74,7 +74,7 @@ class ElasticIndexer:
if not self.es.indices.exists(index=self.es_index): if not self.es.indices.exists(index=self.es_index):
response = self.es.indices.create(index=self.es_index, body=mapping) response = self.es.indices.create(index=self.es_index, body=mapping)
if response.get('acknowledged') and response.get('shards_acknowledged'): if response.get('acknowledged') and response.get('shards_acknowledged'):
logging.info(f'Index \'{self.es_index}\' successfully created.') logging.info(f'Index \'{self.es_index}\' successfully created.')
else: else:
@ -82,7 +82,7 @@ class ElasticIndexer:
else: else:
logging.warning(f'Index \'{self.es_index}\' already exists.') logging.warning(f'Index \'{self.es_index}\' already exists.')
def process_file(self, file_path: str, batch_size: int): def process_file(self, file_path: str, batch_size: int):
''' '''
@ -120,7 +120,7 @@ class ElasticIndexer:
''' '''
records = [] records = []
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
for line in file: for line in file:
line = line.strip() line = line.strip()
@ -150,7 +150,7 @@ class ElasticIndexer:
struct['ref_id'] = match.group(1) struct['ref_id'] = match.group(1)
else: else:
struct['banner'] = banner struct['banner'] = banner
if self.dry_run: if self.dry_run:
print(struct) print(struct)
else: else:
@ -182,7 +182,7 @@ def main():
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
parser.add_argument('--user', default='elastic', help='Elasticsearch username') parser.add_argument('--user', default='elastic', help='Elasticsearch username')
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication') parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates') parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
# Elasticsearch indexing arguments # Elasticsearch indexing arguments
@ -201,13 +201,22 @@ def main():
if not args.host: if not args.host:
raise ValueError('Missing required Elasticsearch argument: host') raise ValueError('Missing required Elasticsearch argument: host')
if not args.api_key and (not args.user or not args.password): if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
if args.shards < 1:
raise ValueError('Number of shards must be greater than 0')
if args.replicas < 0:
raise ValueError('Number of replicas must be greater than 0')
logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if not args.dry_run:
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path): if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}') logging.info(f'Processing file: {args.input_path}')
@ -227,4 +236,4 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -203,10 +203,19 @@ def main():
if not args.api_key and (not args.user or not args.password): if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
if args.shards < 1:
raise ValueError('Number of shards must be greater than 0')
if args.replicas < 1:
raise ValueError('Number of replicas must be greater than 0')
logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if not args.dry_run:
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path): if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}') logging.info(f'Processing file: {args.input_path}')

View File

@ -133,6 +133,7 @@ class ElasticIndexer:
} }
''' '''
count = 0
records = [] records = []
domain_records = {} domain_records = {}
last_domain = None last_domain = None
@ -173,7 +174,7 @@ class ElasticIndexer:
if domain != last_domain: if domain != last_domain:
if last_domain: if last_domain:
source = {'domain': domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())} source = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
del domain_records[last_domain] del domain_records[last_domain]
@ -182,9 +183,10 @@ class ElasticIndexer:
else: else:
struct = {'_index': self.es_index, '_source': source} struct = {'_index': self.es_index, '_source': source}
records.append(struct) records.append(struct)
count += 1
if len(records) >= batch_size: if len(records) >= batch_size:
success, _ = helpers.bulk(self.es, records) success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}') logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
records = [] records = []
last_domain = domain last_domain = domain
@ -198,7 +200,7 @@ class ElasticIndexer:
if records: if records:
success, _ = helpers.bulk(self.es, records) success, _ = helpers.bulk(self.es, records)
logging.info(f'Successfully indexed {success} records to {self.es_index} from {file_path}') logging.info(f'Successfully indexed {success:,} ({count:,}) records to {self.es_index} from {file_path}')
def main(): def main():
@ -221,8 +223,8 @@ def main():
# Elasticsearch indexing arguments # Elasticsearch indexing arguments
parser.add_argument('--index', default='zone-files', help='Elasticsearch index name') parser.add_argument('--index', default='zone-files', help='Elasticsearch index name')
parser.add_argument('--shards', type=int, default=0, help='Number of shards for the index') # This depends on your cluster configuration parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # This depends on your cluster configuration
parser.add_argument('--replicas', type=int, default=0, help='Number of replicas for the index') # This depends on your cluster configuration parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') # This depends on your cluster configuration
args = parser.parse_args() args = parser.parse_args()
@ -238,10 +240,19 @@ def main():
if not args.api_key and (not args.user or not args.password): if not args.api_key and (not args.user or not args.password):
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey') raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
if args.shards < 1:
raise ValueError('Number of shards must be greater than 0')
if args.replicas < 1:
raise ValueError('Number of replicas must be greater than 0')
logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}...')
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed) edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed)
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist if not args.dry_run:
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
if os.path.isfile(args.input_path): if os.path.isfile(args.input_path):
logging.info(f'Processing file: {args.input_path}') logging.info(f'Processing file: {args.input_path}')