Ingestion agents are now modular, FIFO live ingestion added
This commit is contained in:
parent
c105db705d
commit
1ee71868ec
45
README.md
45
README.md
@ -9,50 +9,63 @@
|
||||
|
||||
## Usage
|
||||
```shell
|
||||
python ingest_XXXX.py [options] <input>
|
||||
python eris.py [options] <input>
|
||||
```
|
||||
**Note:** The `<input>` can be a file or a directory of files, depending on the ingestion script.
|
||||
|
||||
## Operations
|
||||
This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
|
||||
It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
|
||||
|
||||
###
|
||||
### Options
|
||||
###### General arguments
|
||||
| Argument | Description |
|
||||
|-------------------|----------------------------------------------------------------|
|
||||
| `input_path` | Path to the input file or directory |
|
||||
| `--dry-run` | Dry run *(do not index records to Elasticsearch)* |
|
||||
| `--watch` | Watch the input file for new lines and index them in real time |
|
||||
| `--watch` | Create or watch a FIFO for real-time indexing |
|
||||
|
||||
###### Elasticsearch arguments
|
||||
| Argument | Description | Default |
|
||||
|-------------------|--------------------------------------------------------|----------------|
|
||||
|-------------------|--------------------------------------------------------------------------------|----------------|
|
||||
| `--host` | Elasticsearch host | `localhost` |
|
||||
| `--port` | Elasticsearch port | `9200` |
|
||||
| `--user` | Elasticsearch username | `elastic` |
|
||||
| `--password` | Elasticsearch password | `$ES_PASSWORD` |
|
||||
| `--api-key` | Elasticsearch API Key for authentication | |
|
||||
| `--api-key` | Elasticsearch API Key for authentication *(format must be api_key:api_secret)* | `$ES_APIKEY` |
|
||||
| `--self-signed` | Elasticsearch connection with a self-signed certificate | |
|
||||
|
||||
###### Elasticsearch indexing arguments
|
||||
| Argument | Description | Default |
|
||||
|-------------------|----------------------------------|---------------------|
|
||||
|-------------------|--------------------------------------|---------------------|
|
||||
| `--index` | Elasticsearch index name | Depends on ingestor |
|
||||
| `--shards` | Number of shards for the index | `1` |
|
||||
| `--pipeline` | Use an ingest pipeline for the index | |
|
||||
| `--replicas` | Number of replicas for the index | `1` |
|
||||
| `--shards` | Number of shards for the index | `1` |
|
||||
|
||||
###### Performance arguments
|
||||
| Argument | Description | Default |
|
||||
|-------------------|----------------------------------------------------------|---------|
|
||||
| `--batch-max` | Maximum size in MB of a batch | `10` |
|
||||
| `--batch-size` | Number of records to index in a batch | `5000` |
|
||||
| `--batch-threads` | Number of threads to use when indexing in batches | `2` |
|
||||
| `--retries` | Number of times to retry indexing a batch before failing | `10` |
|
||||
| `--timeout` | Number of seconds to wait before retrying a batch | `30` |
|
||||
| `--chunk-max` | Maximum size in MB of a chunk | `10` |
|
||||
| `--chunk-size` | Number of records to index in a chunk | `5000` |
|
||||
| `--chunk-threads` | Number of threads to use when indexing in chunks | `2` |
|
||||
| `--retries` | Number of times to retry indexing a chunk before failing | `10` |
|
||||
| `--timeout` | Number of seconds to wait before retrying a chunk | `30` |
|
||||
|
||||
###### Ingestion arguments
|
||||
| Argument | Description |
|
||||
|-------------------|------------------------|
|
||||
| `--httpx` | Index HTTPX records |
|
||||
| `--masscan` | Index Masscan records |
|
||||
| `--massdns` | Index massdns records |
|
||||
| `--zone` | Index zone DNS records |
|
||||
|
||||
Using `--batch-threads` as 4 and `--batch-size` as 10000 with 3 nodes would process 120,000 records before indexing 40,000 per node. Take these kind of metrics int account when consider how much records you want to process at once and the memory limitations of your environment, aswell as the networking constraint it may have ono your node(s), depending on the size of your cluster.
|
||||
|
||||
## Operations
|
||||
This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
|
||||
It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
|
||||
|
||||
## Changelog
|
||||
- The `--watch` feature now uses a FIFO to do live ingestion.
|
||||
- Isolated eris.py into it's own file and seperated the ingestion agents into their own modules.
|
||||
|
||||
___
|
||||
|
||||
###### Mirrors for this repository: [acid.vegas](https://git.acid.vegas/eris) • [SuperNETs](https://git.supernets.org/acidvegas/eris) • [GitHub](https://github.com/acidvegas/eris) • [GitLab](https://gitlab.com/acidvegas/eris) • [Codeberg](https://codeberg.org/acidvegas/eris)
|
287
eris.py
Normal file
287
eris.py
Normal file
@ -0,0 +1,287 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import time
|
||||
import sys
|
||||
|
||||
sys.dont_write_bytecode = True
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
from elasticsearch.exceptions import NotFoundError
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, args: argparse.Namespace):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param args: Parsed arguments from argparse
|
||||
'''
|
||||
|
||||
self.chunk_max = args.chunk_max * 1024 * 1024 # MB
|
||||
self.chunk_size = args.chunk_size
|
||||
self.chunk_threads = args.chunk_threads
|
||||
self.dry_run = args.dry_run
|
||||
self.es_index = args.index
|
||||
|
||||
if not args.dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{args.host}:{args.port}'],
|
||||
'verify_certs': args.self_signed,
|
||||
'ssl_show_warn': args.self_signed,
|
||||
'request_timeout': args.timeout,
|
||||
'max_retries': args.retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60 # Add config option for this?
|
||||
}
|
||||
|
||||
if args.api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {args.api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (args.user, args.password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
|
||||
|
||||
def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ):
|
||||
'''
|
||||
Create the Elasticsearch index with the defined mapping.
|
||||
|
||||
:param pipline: Name of the ingest pipeline to use for the index
|
||||
:param replicas: Number of replicas for the index
|
||||
:param shards: Number of shards for the index
|
||||
'''
|
||||
|
||||
if self.es.indices.exists(index=self.es_index):
|
||||
logging.info(f'Index \'{self.es_index}\' already exists.')
|
||||
return
|
||||
|
||||
mapping = map_body
|
||||
|
||||
mapping['settings'] = {
|
||||
'number_of_shards': shards,
|
||||
'number_of_replicas': replicas
|
||||
}
|
||||
|
||||
if pipeline:
|
||||
try:
|
||||
self.es.ingest.get_pipeline(id=pipeline)
|
||||
logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
|
||||
mapping['settings']['index.default_pipeline'] = pipeline
|
||||
except NotFoundError:
|
||||
raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
|
||||
|
||||
response = self.es.indices.create(index=self.es_index, body=mapping)
|
||||
|
||||
if response.get('acknowledged') and response.get('shards_acknowledged'):
|
||||
logging.info(f'Index \'{self.es_index}\' successfully created.')
|
||||
else:
|
||||
raise Exception(f'Failed to create index. ({response})')
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param count: Total number of records processed
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': self.chunk_size,
|
||||
'max_chunk_bytes': self.chunk_max,
|
||||
'thread_count': self.chunk_threads,
|
||||
'queue_size': 2 # Add config option for this?
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30) # Should we add a config option for this?
|
||||
|
||||
|
||||
def process_file(self, file_path: str, batch_size: int, ingest_function: callable):
|
||||
'''
|
||||
Read and index records in batches to Elasticsearch.
|
||||
|
||||
:param file_path: Path to the file
|
||||
:param batch_size: Number of records to index per batch
|
||||
:param ingest_function: Function to process the file
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
|
||||
for processed in ingest_function(file_path):
|
||||
|
||||
if not processed:
|
||||
break
|
||||
|
||||
if self.dry_run:
|
||||
print(processed)
|
||||
continue
|
||||
|
||||
struct = {'_index': self.es_index, '_source': processed}
|
||||
records.append(struct)
|
||||
count += 1
|
||||
|
||||
if len(records) >= batch_size:
|
||||
self.bulk_index(records, file_path, count)
|
||||
records = []
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, count)
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', help='Elasticsearch index name')
|
||||
parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--chunk-max', type=int, default=10, help='Maximum size in MB of a chunk')
|
||||
parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk')
|
||||
parser.add_argument('--chunk-threads', type=int, default=2, help='Number of threads to use when indexing in chunks')
|
||||
parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a chunk before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
|
||||
|
||||
# Ingestion arguments
|
||||
parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
|
||||
parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
|
||||
parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
|
||||
parser.add_argument('--zone', action='store_true', help='Index Zone records')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.watch:
|
||||
if not os.path.exists(args.input_path):
|
||||
os.mkfifo(args.input_path)
|
||||
elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode):
|
||||
raise ValueError(f'Path {args.input_path} is not a FIFO')
|
||||
elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
edx = ElasticIndexer(args)
|
||||
|
||||
if args.httpx:
|
||||
from ingestors import ingest_httpx as ingestor
|
||||
elif args.masscan:
|
||||
from ingestors import ingest_masscan as ingestor
|
||||
elif args.massdns:
|
||||
from ingestors import ingest_massdns as ingestor
|
||||
elif args.zone:
|
||||
from ingestors import ingest_zone as ingestor
|
||||
|
||||
batch_size = 0
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
if not edx.es_index:
|
||||
edx.es_index = ingestor.default_index
|
||||
|
||||
map_body = ingestor.construct_map()
|
||||
edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
|
||||
|
||||
batch_size = int(nodes * (args.chunk_size * args.chunk_threads))
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, batch_size, ingestor.process_file)
|
||||
|
||||
elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
|
||||
logging.info(f'Watching FIFO: {args.input_path}')
|
||||
edx.process_file(args.input_path, batch_size, ingestor.process_file)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, batch_size, ingestor.process_file)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
105
helpers/elastictop.py
Normal file
105
helpers/elastictop.py
Normal file
@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python
|
||||
# estop - developed by acidvegas (https://git.acid.vegas/eris)
|
||||
|
||||
'''
|
||||
Little script to show some basic information about an Elasticsearch cluster.
|
||||
'''
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
|
||||
|
||||
def bytes_to_human_readable(num_bytes):
|
||||
'''Convert bytes to a human-readable format.'''
|
||||
for unit in ['bytes', 'kb', 'mb', 'gb', 'tb', 'pb', 'eb', 'zb']:
|
||||
if abs(num_bytes) < 1024.0:
|
||||
return f"{num_bytes:3.1f}{unit}"
|
||||
num_bytes /= 1024.0
|
||||
return f"{num_bytes:.1f}YB"
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
args = parser.parse_args()
|
||||
|
||||
es_config = {
|
||||
'hosts': [f'{args.host}:{args.port}'],
|
||||
'verify_certs': args.self_signed,
|
||||
'ssl_show_warn': args.self_signed,
|
||||
'basic_auth': (args.user, args.password)
|
||||
}
|
||||
es = Elasticsearch(**es_config)
|
||||
|
||||
stats = es.cluster.stats()
|
||||
|
||||
name = stats['cluster_name']
|
||||
status = stats['status']
|
||||
indices = {
|
||||
'total': stats['indices']['count'],
|
||||
'shards': stats['indices']['shards']['total'],
|
||||
'docs': stats['indices']['docs']['count'],
|
||||
'size': bytes_to_human_readable(stats['indices']['store']['size_in_bytes'])
|
||||
}
|
||||
nodes = {
|
||||
'total': stats['_nodes']['total'],
|
||||
'successful': stats['_nodes']['successful'],
|
||||
'failed': stats['_nodes']['failed']
|
||||
}
|
||||
|
||||
if status == 'green':
|
||||
print(f'Cluster {name} (\033[92m{status}\033[0m)')
|
||||
elif status == 'yellow':
|
||||
print(f'Cluster {name} (\033[93m{status}\033[0m)')
|
||||
elif status == 'red':
|
||||
print(f'Cluster {name} (\033[91m{status}\033[0m)')
|
||||
|
||||
print('')
|
||||
print(f'Nodes {nodes["total"]} Total, {nodes["successful"]} Successful, {nodes["failed"]} Failed')
|
||||
|
||||
nodes_info = es.nodes.info()
|
||||
# Loop through each node and print details
|
||||
for node_id, node_info in nodes_info['nodes'].items():
|
||||
node_name = node_info['name']
|
||||
transport_address = node_info['transport_address']
|
||||
#node_stats = es.nodes.stats(node_id=node_id)
|
||||
version = node_info['version']
|
||||
memory = bytes_to_human_readable(int(node_info['settings']['node']['attr']['ml']['machine_memory']))
|
||||
print(f" {node_name.ljust(7)} | Host: {transport_address.rjust(21)} | Version: {version.ljust(7)} | Processors: {node_info['os']['available_processors']} | Memory: {memory}")
|
||||
|
||||
indices_stats = es.cat.indices(format="json")
|
||||
|
||||
#print(' |')
|
||||
print('')
|
||||
|
||||
print(f'Indices {indices["total"]:,} Total {indices["shards"]:,}, Shards')
|
||||
for index in indices_stats:
|
||||
index_name = index['index']
|
||||
document_count = f'{int(index['docs.count']):,}'
|
||||
store_size = index['store.size']
|
||||
number_of_shards = int(index['pri']) # primary shards
|
||||
number_of_replicas = int(index['rep']) # replicas
|
||||
|
||||
if index_name.startswith('.') or document_count == '0':
|
||||
continue
|
||||
|
||||
print(f" {index_name.ljust(15)} | Documents: {document_count.rjust(15)} | {store_size.rjust(7)} [Shards: {number_of_shards:,}, Replicas: {number_of_replicas:,}]")
|
||||
|
||||
dox = f'{indices["docs"]:,}'
|
||||
print('')
|
||||
print(f'Total {dox.rjust(48)} {indices["size"].rjust(9)}')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
33
helpers/es_index_dump
Executable file
33
helpers/es_index_dump
Executable file
@ -0,0 +1,33 @@
|
||||
#!/bin/sh
|
||||
# ElasticSearch Index Dumper - developed by acidvegas (https://git.acid.vegas/eris)
|
||||
|
||||
# This script will dump the entire contents of an ElasticSearch index to a JSON file.
|
||||
#
|
||||
# Todo:
|
||||
# - Add authentication support
|
||||
|
||||
# Configuration
|
||||
BATCH_SIZE=10000
|
||||
ES_HOST="https://elastic.change.me:9200"
|
||||
ES_INDEX="juicy_booties"
|
||||
|
||||
SCROLL_ID=$(curl -s -XGET "$ES_HOST/$ES_INDEX/_search?scroll=1m" -H 'Content-Type: application/json' -d'{ "size": $BATCH_SIZE, "query": { "match_all": {} } }' | jq -r '._scroll_id')
|
||||
|
||||
count=0
|
||||
|
||||
while true; do
|
||||
RESPONSE=$(curl -s -XGET "$ES_HOST/_search/scroll" -H 'Content-Type: application/json' -d'{"scroll": "1m", "scroll_id": "'$SCROLL_ID'"}')
|
||||
|
||||
HITS=$(echo $RESPONSE | jq -c '.hits.hits[]')
|
||||
|
||||
if [ -z "$HITS" ] || [ "$HITS" = "null" ]; then
|
||||
break
|
||||
fi
|
||||
|
||||
echo $HITS | jq -c '._source' >> $ES_INDEX.json
|
||||
|
||||
SCROLL_ID=$(echo $RESPONSE | jq -r '._scroll_id')
|
||||
|
||||
count=$(($count + $BATCH_SIZE))
|
||||
echo "Dumped $BATCH_SIZE records ($count total) from $ES_INDEX on $ES_HOST"
|
||||
done
|
140
helpers/notes.md
Normal file
140
helpers/notes.md
Normal file
@ -0,0 +1,140 @@
|
||||
# Create a GeoIP ingestion pipeline
|
||||
|
||||
My notes for creating an ingestion pipeline for geoip usage in Kibanas maps
|
||||
|
||||
###### Create the ingestion pipeline
|
||||
```
|
||||
PUT _ingest/pipeline/geoip
|
||||
{
|
||||
"description" : "Add geoip info",
|
||||
"processors" : [
|
||||
{
|
||||
"geoip" : {
|
||||
"field" : "ip",
|
||||
"ignore_missing": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
###### Update an index
|
||||
```
|
||||
PUT my_ip_locations
|
||||
{
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"geoip": {
|
||||
"properties": {
|
||||
"location": { "type": "geo_point" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
or...
|
||||
|
||||
###### Create the index
|
||||
```
|
||||
PUT /masscan-data
|
||||
{
|
||||
"settings": {
|
||||
"number_of_shards": 3,
|
||||
"number_of_replicas": 1
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"banner": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"geoip": {
|
||||
"properties": {
|
||||
"city_name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"continent_name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"country_iso_code": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"country_name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"location": {
|
||||
"type": "geo_point"
|
||||
},
|
||||
"region_iso_code": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"region_name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"ip": {
|
||||
"type": "ip"
|
||||
},
|
||||
"port": {
|
||||
"type": "integer"
|
||||
},
|
||||
"proto": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"ref_id": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"seen": {
|
||||
"type": "date"
|
||||
},
|
||||
"service": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
0
ingestors/__init__.py
Normal file
0
ingestors/__init__.py
Normal file
@ -1,99 +1,55 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_httpx.py
|
||||
|
||||
# HTTPX Log File Ingestion:
|
||||
#
|
||||
# This script takes JSON formatted HTTPX logs and indexes them into Elasticsearch.
|
||||
#
|
||||
# Saving my "typical" HTTPX command here for reference to myself:
|
||||
# httpx -l zone.org.txt -t 200 -r nameservers.txt -sc -location -favicon -title -bp -td -ip -cname -mc 200 -stream -sd -j -o output.json -v
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
default_index = 'httpx-logs'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for Masscan records.'''
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param es_host: Elasticsearch host(s)
|
||||
:param es_port: Elasticsearch port
|
||||
:param es_user: Elasticsearch username
|
||||
:param es_password: Elasticsearch password
|
||||
:param es_api_key: Elasticsearch API Key
|
||||
:param es_index: Elasticsearch index name
|
||||
:param dry_run: If True, do not initialize Elasticsearch client
|
||||
:param self_signed: If True, do not verify SSL certificates
|
||||
:param retries: Number of times to retry indexing a batch before failing
|
||||
:param timeout: Number of seconds to wait before retrying a batch
|
||||
'''
|
||||
|
||||
self.dry_run = dry_run
|
||||
self.es = None
|
||||
self.es_index = es_index
|
||||
|
||||
if not dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{es_host}:{es_port}'],
|
||||
'verify_certs': self_signed,
|
||||
'ssl_show_warn': self_signed,
|
||||
'request_timeout': timeout,
|
||||
'max_retries': retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'change': 'me'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if es_api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (es_user, es_password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
return mapping
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and index HTTPX records in batches to Elasticsearch, handling large volumes efficiently.
|
||||
Read and process HTTPX records from the log file.
|
||||
|
||||
:param file_path: Path to the HTTPX log file
|
||||
:param watch: If True, watch the file for new lines and index them in real time
|
||||
:param chunk: Chunking configuration
|
||||
'''
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
record = json.loads(line)
|
||||
|
||||
record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
|
||||
record['domain'] = record.pop('input')
|
||||
|
||||
del record['failed'], record['knowledgebase'], record['time']
|
||||
|
||||
yield record
|
||||
|
||||
return None # EOF
|
||||
|
||||
|
||||
|
||||
''''
|
||||
Example record:
|
||||
{
|
||||
"timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
|
||||
@ -122,8 +78,7 @@ class ElasticIndexer:
|
||||
"favicon_path":"/i/favicon.png",
|
||||
"time":"592.907689ms",
|
||||
"a":[
|
||||
"51.89.151.158",
|
||||
"2001:41d0:801:2000::5ce9"
|
||||
"6.150.220.23"
|
||||
],
|
||||
"tech":[
|
||||
"Bootstrap:4.0.0",
|
||||
@ -141,196 +96,3 @@ class ElasticIndexer:
|
||||
}
|
||||
}
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in (file := follow(file) if watch else file):
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
record = json.loads(line)
|
||||
|
||||
record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
|
||||
record['domain'] = record.pop('input')
|
||||
|
||||
del record['failed'], record['knowledgebase'], record['time']
|
||||
|
||||
if self.dry_run:
|
||||
print(record)
|
||||
else:
|
||||
record = {'_index': self.es_index, '_source': record}
|
||||
records.append(record)
|
||||
count += 1
|
||||
if len(records) >= chunk['batch']:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
records = []
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param count: Total number of records processed
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': chunk['size'],
|
||||
'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
|
||||
'thread_count': chunk['threads'],
|
||||
'queue_size': 2
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def follow(file) -> str:
|
||||
'''
|
||||
Generator function that yields new lines in a file in real time.
|
||||
|
||||
:param file: File object to read from
|
||||
'''
|
||||
|
||||
file.seek(0,2) # Go to the end of the file
|
||||
|
||||
while True:
|
||||
line = file.readline()
|
||||
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
yield line
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', default='httpx-logs', help='Elasticsearch index name')
|
||||
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
|
||||
parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
|
||||
parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
|
||||
parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Argument validation
|
||||
if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
if not args.dry_run:
|
||||
if args.batch_size < 1:
|
||||
raise ValueError('Batch size must be greater than 0')
|
||||
|
||||
elif args.retries < 1:
|
||||
raise ValueError('Number of retries must be greater than 0')
|
||||
|
||||
elif args.timeout < 5:
|
||||
raise ValueError('Timeout must be greater than 4')
|
||||
|
||||
elif args.batch_max < 1:
|
||||
raise ValueError('Batch max size must be greater than 0')
|
||||
|
||||
elif args.batch_threads < 1:
|
||||
raise ValueError('Batch threads must be greater than 0')
|
||||
|
||||
elif not args.host:
|
||||
raise ValueError('Missing required Elasticsearch argument: host')
|
||||
|
||||
elif not args.api_key and (not args.user or not args.password):
|
||||
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
|
||||
|
||||
elif args.shards < 1:
|
||||
raise ValueError('Number of shards must be greater than 0')
|
||||
|
||||
elif args.replicas < 0:
|
||||
raise ValueError('Number of replicas must be greater than 0')
|
||||
|
||||
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
|
||||
|
||||
chunk = {
|
||||
'size': args.batch_size,
|
||||
'max_size': args.batch_max * 1024 * 1024, # MB
|
||||
'threads': args.batch_threads
|
||||
}
|
||||
|
||||
chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
|
||||
else:
|
||||
chunk = {} # Ugly hack to get this working...
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, args.watch, chunk)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, args.watch, chunk)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
@ -1,183 +1,67 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_masscan.py
|
||||
|
||||
# Masscan Log File Ingestion:
|
||||
#
|
||||
# This script takes JSON formatted masscan logs with banners and indexes them into Elasticsearch.
|
||||
#
|
||||
# Saving my "typical" masscan setup & command here for reference to myself:
|
||||
# apt-get install iptables masscan libpcap-dev screen
|
||||
# /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
|
||||
# printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.0.0/29\n192.0.0.170/32\n192.0.0.171/32\n192.0.2.0/24\n192.88.99.0/24\n192.168.0.0/16\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n240.0.0.0/4\n255.255.255.255/32\n" > exclude.conf
|
||||
# screen -S scan
|
||||
# masscan 0.0.0.0/0 -p8080,8888,8000 --banners --source-port 61010 --open-only --rate 35000 --excludefile exclude.conf -oJ output_new.json --interactive
|
||||
#
|
||||
# Note: The above iptables rule is not persistent and will be removed on reboot.
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
default_index = 'masscan-logs'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for Masscan records.'''
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param es_host: Elasticsearch host(s)
|
||||
:param es_port: Elasticsearch port
|
||||
:param es_user: Elasticsearch username
|
||||
:param es_password: Elasticsearch password
|
||||
:param es_api_key: Elasticsearch API Key
|
||||
:param es_index: Elasticsearch index name
|
||||
:param dry_run: If True, do not initialize Elasticsearch client
|
||||
:param self_signed: If True, do not verify SSL certificates
|
||||
:param retries: Number of times to retry indexing a batch before failing
|
||||
:param timeout: Number of seconds to wait before retrying a batch
|
||||
'''
|
||||
|
||||
self.dry_run = dry_run
|
||||
self.es = None
|
||||
self.es_index = es_index
|
||||
|
||||
if not dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{es_host}:{es_port}'],
|
||||
'verify_certs': self_signed,
|
||||
'ssl_show_warn': self_signed,
|
||||
'request_timeout': timeout,
|
||||
'max_retries': retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60
|
||||
}
|
||||
|
||||
if es_api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (es_user, es_password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
|
||||
|
||||
def create_index(self, shards: int = 1, replicas: int = 1):
|
||||
'''
|
||||
Create the Elasticsearch index with the defined mapping.
|
||||
|
||||
:param shards: Number of shards for the index
|
||||
:param replicas: Number of replicas for the index
|
||||
'''
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'settings': {
|
||||
'number_of_shards': shards,
|
||||
'number_of_replicas': replicas
|
||||
},
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'ip': { 'type': 'ip' },
|
||||
'port': { 'type': 'integer' },
|
||||
'proto': { 'type': 'keyword' },
|
||||
'service': { 'type': 'keyword' },
|
||||
'banner': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
|
||||
'banner': keyword_mapping,
|
||||
'ref_id': { 'type': 'keyword' },
|
||||
'seen': { 'type': 'date' }
|
||||
'seen': { 'type': 'date' },
|
||||
'geoip': {
|
||||
'properties': {
|
||||
'city_name': keyword_mapping,
|
||||
'continent_name': keyword_mapping,
|
||||
'country_iso_code': keyword_mapping,
|
||||
'country_name': keyword_mapping,
|
||||
'location': { 'type': 'geo_point' },
|
||||
'region_iso_code': keyword_mapping,
|
||||
'region_name': keyword_mapping,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if not self.es.indices.exists(index=self.es_index):
|
||||
response = self.es.indices.create(index=self.es_index, body=mapping)
|
||||
|
||||
if response.get('acknowledged') and response.get('shards_acknowledged'):
|
||||
logging.info(f'Index \'{self.es_index}\' successfully created.')
|
||||
else:
|
||||
raise Exception(f'Failed to create index. ({response})')
|
||||
|
||||
else:
|
||||
logging.warning(f'Index \'{self.es_index}\' already exists.')
|
||||
return mapping
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and index Masscan records in batches to Elasticsearch, handling large volumes efficiently.
|
||||
Read and process Masscan records from the log file.
|
||||
|
||||
:param file_path: Path to the Masscan log file
|
||||
:param watch: If True, input file will be watched for new lines and indexed in real time\
|
||||
:param chunk: Chunk configuration for indexing in batches
|
||||
|
||||
Example record:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"timestamp": "1705255468", # Convert to ZULU BABY
|
||||
"ports": [ # Typically only one port per record, but we will create a record for each port opened
|
||||
{
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": { # This field is optional
|
||||
"name": "ssh",
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": "ssh", # Optional: not every record will have a service name ("unknown" is ignored)
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4", # Optional: not every record will have a banner
|
||||
"seen": "2021-10-08T02:04:28Z",
|
||||
"ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # This is optional and will only be present if the banner contains a reference ID (TCP RST Payload, Might be useful?)
|
||||
}
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in (file := follow(file) if watch else file):
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line or not line.startswith('{'):
|
||||
continue
|
||||
|
||||
try:
|
||||
record = json.loads(line)
|
||||
except json.decoder.JSONDecodeError:
|
||||
logging.error(f'Failed to parse JSON record! ({line})')
|
||||
input('Press Enter to continue...') # Debugging
|
||||
continue
|
||||
|
||||
for port_info in record['ports']:
|
||||
struct = {
|
||||
@ -201,184 +85,51 @@ class ElasticIndexer:
|
||||
else:
|
||||
struct['banner'] = banner
|
||||
|
||||
if self.dry_run:
|
||||
print(struct)
|
||||
else:
|
||||
struct = {'_index': self.es_index, '_source': struct}
|
||||
records.append(struct)
|
||||
count += 1
|
||||
if len(records) >= chunk['batch']:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
records = []
|
||||
yield struct
|
||||
|
||||
return None # EOF
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param count: Total number of records processed
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': chunk['size'],
|
||||
'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
|
||||
'thread_count': chunk['threads'],
|
||||
'queue_size': 2
|
||||
Example record:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"timestamp": "1705255468", # Convert to ZULU BABY
|
||||
"ports": [ # We will create a record for each port opened
|
||||
{
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": { # This field is optional
|
||||
"name": "ssh",
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": "ssh",
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
|
||||
"seen": "2021-10-08T02:04:28Z",
|
||||
"ref_id": "?sKfOvsC4M4a2W8PaC4zF?", # TCP RST Payload, Might be useful..
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def follow(file) -> str:
|
||||
'''
|
||||
Generator function that yields new lines in a file in real time.
|
||||
|
||||
:param file: File object to read from
|
||||
'''
|
||||
|
||||
file.seek(0,2) # Go to the end of the file
|
||||
|
||||
while True:
|
||||
line = file.readline()
|
||||
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
yield line
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', default='masscan-logs', help='Elasticsearch index name')
|
||||
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
|
||||
parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
|
||||
parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
|
||||
parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Argument validation
|
||||
if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
if not args.dry_run:
|
||||
if args.batch_size < 1:
|
||||
raise ValueError('Batch size must be greater than 0')
|
||||
|
||||
elif args.retries < 1:
|
||||
raise ValueError('Number of retries must be greater than 0')
|
||||
|
||||
elif args.timeout < 5:
|
||||
raise ValueError('Timeout must be greater than 4')
|
||||
|
||||
elif args.batch_max < 1:
|
||||
raise ValueError('Batch max size must be greater than 0')
|
||||
|
||||
elif args.batch_threads < 1:
|
||||
raise ValueError('Batch threads must be greater than 0')
|
||||
|
||||
elif not args.host:
|
||||
raise ValueError('Missing required Elasticsearch argument: host')
|
||||
|
||||
elif not args.api_key and (not args.user or not args.password):
|
||||
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
|
||||
|
||||
elif args.shards < 1:
|
||||
raise ValueError('Number of shards must be greater than 0')
|
||||
|
||||
elif args.replicas < 0:
|
||||
raise ValueError('Number of replicas must be greater than 0')
|
||||
|
||||
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
|
||||
|
||||
chunk = {
|
||||
'size': args.batch_size,
|
||||
'max_size': args.batch_max * 1024 * 1024, # MB
|
||||
'threads': args.batch_threads
|
||||
# GeoIP ingestion pipeline fields
|
||||
"geoip": {
|
||||
"city_name": "City",
|
||||
"continent_name": "Continent",
|
||||
"country_iso_code": "CC",
|
||||
"country_name": "Country",
|
||||
"location": {
|
||||
"lat": 0.0000,
|
||||
"lon": 0.0000
|
||||
},
|
||||
"region_iso_code": "RR",
|
||||
"region_name": "Region"
|
||||
}
|
||||
|
||||
chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
|
||||
else:
|
||||
chunk = {} # Ugly hack to get this working...
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, args.watch, chunk)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, args.watch, chunk)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
}
|
||||
'''
|
@ -1,153 +1,39 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_massdns.py
|
||||
|
||||
# Massdns Log File Ingestion:
|
||||
#
|
||||
# This script takes JSON formatted massdns logs and indexes them into Elasticsearch.
|
||||
#
|
||||
# Saving my "typical" massdns command here for reference to myself:
|
||||
# python $HOME/massdns/scripts/ptr.py | massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o J -w $HOME/massdns/ptr.json
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
default_index = 'ptr-records'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for MassDNS records'''
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param es_host: Elasticsearch host(s)
|
||||
:param es_port: Elasticsearch port
|
||||
:param es_user: Elasticsearch username
|
||||
:param es_password: Elasticsearch password
|
||||
:param es_api_key: Elasticsearch API Key
|
||||
:param es_index: Elasticsearch index name
|
||||
:param dry_run: If True, do not initialize Elasticsearch client
|
||||
:param self_signed: If True, do not verify SSL certificates
|
||||
:param retries: Number of times to retry indexing a batch before failing
|
||||
:param timeout: Number of seconds to wait before retrying a batch
|
||||
'''
|
||||
|
||||
self.dry_run = dry_run
|
||||
self.es = None
|
||||
self.es_index = es_index
|
||||
|
||||
if not dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{es_host}:{es_port}'],
|
||||
'verify_certs': self_signed,
|
||||
'ssl_show_warn': self_signed,
|
||||
'request_timeout': timeout,
|
||||
'max_retries': retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60
|
||||
}
|
||||
|
||||
if es_api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (es_user, es_password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
|
||||
|
||||
def create_index(self, shards: int = 1, replicas: int = 1):
|
||||
'''
|
||||
Create the Elasticsearch index with the defined mapping.
|
||||
|
||||
:param shards: Number of shards for the index
|
||||
:param replicas: Number of replicas for the index
|
||||
'''
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'settings': {
|
||||
'number_of_shards': shards,
|
||||
'number_of_replicas': replicas
|
||||
},
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'ip': { 'type': 'ip' },
|
||||
'name': { 'type': 'keyword' },
|
||||
'record': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
|
||||
'record': keyword_mapping,
|
||||
'seen': { 'type': 'date' }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if not self.es.indices.exists(index=self.es_index):
|
||||
response = self.es.indices.create(index=self.es_index, body=mapping)
|
||||
|
||||
if response.get('acknowledged') and response.get('shards_acknowledged'):
|
||||
logging.info(f'Index \'{self.es_index}\' successfully created.')
|
||||
else:
|
||||
raise Exception(f'Failed to create index. ({response})')
|
||||
|
||||
else:
|
||||
logging.warning(f'Index \'{self.es_index}\' already exists.')
|
||||
return mapping
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and index PTR records in batches to Elasticsearch, handling large volumes efficiently.
|
||||
Read and process Massdns records from the log file.
|
||||
|
||||
:param file_path: Path to the Masscan log file
|
||||
:param watch: If True, input file will be watched for new lines and indexed in real time\
|
||||
:param chunk: Chunk configuration for indexing in batches
|
||||
|
||||
Example PTR record:
|
||||
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
|
||||
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
|
||||
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
|
||||
0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
|
||||
0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "47.229.6.0",
|
||||
"record": "047-229-006-000.res.spectrum.com.",
|
||||
"seen": "2021-06-30T18:31:00Z"
|
||||
}
|
||||
:param file_path: Path to the Massdns log file
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in (file := follow(file) if watch else file):
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
@ -163,197 +49,39 @@ class ElasticIndexer:
|
||||
if record_type != 'PTR':
|
||||
continue
|
||||
|
||||
#if record_type == 'CNAME':
|
||||
# if data.endswith('.in-addr.arpa'):
|
||||
# continue
|
||||
|
||||
# Let's not index the PTR record if it's the same as the in-addr.arpa domain
|
||||
if data == name:
|
||||
continue
|
||||
|
||||
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
|
||||
|
||||
source = {
|
||||
struct = {
|
||||
'ip': ip,
|
||||
'record': data,
|
||||
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
||||
}
|
||||
|
||||
if self.dry_run:
|
||||
print(source)
|
||||
else:
|
||||
struct = {'_index': self.es_index, '_source': source}
|
||||
records.append(struct)
|
||||
count += 1
|
||||
if len(records) >= chunk['batch']:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
records = []
|
||||
yield struct
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
return None # EOF
|
||||
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
Example PTR record:
|
||||
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
|
||||
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
|
||||
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
|
||||
0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
|
||||
0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param chunk: Chunk configuration
|
||||
:param count: Total number of records processed
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': chunk['size'],
|
||||
'max_chunk_bytes': chunk['max_size'],
|
||||
'thread_count': chunk['threads'],
|
||||
'queue_size': 2
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "47.229.6.0",
|
||||
"record": "047-229-006-000.res.spectrum.com.",
|
||||
"seen": "2021-06-30T18:31:00Z"
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def follow(file) -> str:
|
||||
'''
|
||||
Generator function that yields new lines in a file in real time.
|
||||
|
||||
:param file: File object to read from
|
||||
'''
|
||||
|
||||
file.seek(0,2) # Go to the end of the file
|
||||
|
||||
while True:
|
||||
line = file.readline()
|
||||
|
||||
if not line:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
yield line
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', default='ptr-records', help='Elasticsearch index name')
|
||||
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
|
||||
parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
|
||||
parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
|
||||
parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Argument validation
|
||||
if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
if not args.dry_run:
|
||||
if args.batch_size < 1:
|
||||
raise ValueError('Batch size must be greater than 0')
|
||||
|
||||
elif args.retries < 1:
|
||||
raise ValueError('Number of retries must be greater than 0')
|
||||
|
||||
elif args.timeout < 5:
|
||||
raise ValueError('Timeout must be greater than 4')
|
||||
|
||||
elif args.batch_max < 1:
|
||||
raise ValueError('Batch max size must be greater than 0')
|
||||
|
||||
elif args.batch_threads < 1:
|
||||
raise ValueError('Batch threads must be greater than 0')
|
||||
|
||||
elif not args.host:
|
||||
raise ValueError('Missing required Elasticsearch argument: host')
|
||||
|
||||
elif not args.api_key and (not args.user or not args.password):
|
||||
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
|
||||
|
||||
elif args.shards < 1:
|
||||
raise ValueError('Number of shards must be greater than 0')
|
||||
|
||||
elif args.replicas < 0:
|
||||
raise ValueError('Number of replicas must be greater than 0')
|
||||
|
||||
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
|
||||
|
||||
chunk = {
|
||||
'size': args.batch_size,
|
||||
'max_size': args.batch_max * 1024 * 1024, # MB
|
||||
'threads': args.batch_threads
|
||||
}
|
||||
|
||||
chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
|
||||
else:
|
||||
chunk = {} # Ugly hack to get this working...
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, args.watch, chunk)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, args.watch, chunk)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,106 +1,21 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_zone.py
|
||||
|
||||
# DNS Zone File Ingestion:
|
||||
#
|
||||
# This script will read a DNS zone file and index the records to Elasticsearch.
|
||||
#
|
||||
# Zone files must be valid and follow the RFC 1035 standard for proper indexing:
|
||||
# - https://datatracker.ietf.org/doc/html/rfc1035
|
||||
#
|
||||
# Most of my zones come from CZDS but AXFR outputs can be used as well.
|
||||
# Anomaly detection is in place to alert the user of any unexpected records.
|
||||
|
||||
|
||||
# WARNING NOTICE:
|
||||
#
|
||||
# The following zones need reindex due to previous unsupported record types:
|
||||
# - .wang (Contains a completely invalid line) (tjjm6hs65gL9KUFU76J747MB NS)
|
||||
# - .tel (NAPTR records were missing)
|
||||
# - .nu (RP records were missing
|
||||
# - .se (RP records were missing)
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers, ConnectionError, TransportError
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
|
||||
# Record types to index
|
||||
default_index = 'dns-zones'
|
||||
record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for zone file records.'''
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, es_host: str, es_port: int, es_user: str, es_password: str, es_api_key: str, es_index: str, dry_run: bool = False, self_signed: bool = False, retries: int = 10, timeout: int = 30):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param es_host: Elasticsearch host(s)
|
||||
:param es_port: Elasticsearch port
|
||||
:param es_user: Elasticsearch username
|
||||
:param es_password: Elasticsearch password
|
||||
:param es_api_key: Elasticsearch API Key
|
||||
:param es_index: Elasticsearch index name
|
||||
:param dry_run: If True, do not initialize Elasticsearch client
|
||||
:param self_signed: If True, do not verify SSL certificates
|
||||
:param retries: Number of times to retry indexing a batch before failing
|
||||
:param timeout: Number of seconds to wait before retrying a batch
|
||||
'''
|
||||
|
||||
self.dry_run = dry_run
|
||||
self.es = None
|
||||
self.es_index = es_index
|
||||
|
||||
if not dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{es_host}:{es_port}'],
|
||||
'verify_certs': self_signed,
|
||||
'ssl_show_warn': self_signed,
|
||||
'request_timeout': timeout,
|
||||
'max_retries': retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60
|
||||
}
|
||||
|
||||
if es_api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {es_api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (es_user, es_password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
|
||||
|
||||
def create_index(self, shards: int = 1, replicas: int = 1):
|
||||
'''
|
||||
Create the Elasticsearch index with the defined mapping.
|
||||
|
||||
:param shards: Number of shards for the index
|
||||
:param replicas: Number of replicas for the index
|
||||
'''
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'settings': {
|
||||
'number_of_shards': shards,
|
||||
'number_of_replicas': replicas
|
||||
},
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'domain': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
|
||||
'domain': keyword_mapping,
|
||||
'records': { 'properties': {} },
|
||||
'seen': {'type': 'date'}
|
||||
}
|
||||
@ -119,71 +34,21 @@ class ElasticIndexer:
|
||||
else:
|
||||
mapping['mappings']['properties']['records']['properties'][item] = {
|
||||
'properties': {
|
||||
'data': { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } },
|
||||
'data': keyword_mapping,
|
||||
'ttl': { 'type': 'integer' }
|
||||
}
|
||||
}
|
||||
|
||||
if not self.es.indices.exists(index=self.es_index):
|
||||
response = self.es.indices.create(index=self.es_index, body=mapping)
|
||||
|
||||
if response.get('acknowledged') and response.get('shards_acknowledged'):
|
||||
logging.info(f'Index \'{self.es_index}\' successfully created.')
|
||||
else:
|
||||
raise Exception(f'Failed to create index. ({response})')
|
||||
|
||||
else:
|
||||
logging.warning(f'Index \'{self.es_index}\' already exists.')
|
||||
return mapping
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def process_file(self, file_path: str, watch: bool = False, chunk: dict = {}):
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and index DNS records in batches to Elasticsearch, handling large volumes efficiently.
|
||||
Read and process zone file records.
|
||||
|
||||
:param file_path: Path to the Masscan log file
|
||||
:param watch: If True, input file will be watched for new lines and indexed in real time\
|
||||
:param chunk: Chunk configuration for indexing in batches
|
||||
|
||||
Example record:
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
|
||||
1-800-flowers.vegas. 3600 in ns dns1.cscdns.net.
|
||||
1-800-flowers.vegas. 3600 in ns dns2.cscdns.net.
|
||||
100.vegas. 3600 in ns ns51.domaincontrol.com.
|
||||
100.vegas. 3600 in ns ns52.domaincontrol.com.
|
||||
1001.vegas. 3600 in ns ns11.waterrockdigital.com.
|
||||
1001.vegas. 3600 in ns ns12.waterrockdigital.com.
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"domain": "1001.vegas",
|
||||
"records": {
|
||||
"ns": [
|
||||
{"ttl": 3600, "data": "ns11.waterrockdigital.com"},
|
||||
{"ttl": 3600, "data": "ns12.waterrockdigital.com"}
|
||||
]
|
||||
},
|
||||
"seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
|
||||
}
|
||||
:param file_path: Path to the zone file
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
domain_records = {}
|
||||
last_domain = None
|
||||
|
||||
@ -227,15 +92,7 @@ class ElasticIndexer:
|
||||
|
||||
del domain_records[last_domain]
|
||||
|
||||
if self.dry_run:
|
||||
print(source)
|
||||
else:
|
||||
struct = {'_index': self.es_index, '_source': source}
|
||||
records.append(struct)
|
||||
count += 1
|
||||
if len(records) >= chunk['batch']:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
records = []
|
||||
yield source
|
||||
|
||||
last_domain = domain
|
||||
|
||||
@ -246,155 +103,30 @@ class ElasticIndexer:
|
||||
|
||||
domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, chunk, count)
|
||||
return None # EOF
|
||||
|
||||
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, chunk: dict, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
Example record:
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
|
||||
1-800-flowers.vegas. 3600 in ns dns1.cscdns.net.
|
||||
1-800-flowers.vegas. 3600 in ns dns2.cscdns.net.
|
||||
100.vegas. 3600 in ns ns51.domaincontrol.com.
|
||||
100.vegas. 3600 in ns ns52.domaincontrol.com.
|
||||
1001.vegas. 3600 in ns ns11.waterrockdigital.com.
|
||||
1001.vegas. 3600 in ns ns12.waterrockdigital.com.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param count: Total number of records processed
|
||||
Will be indexed as:
|
||||
{
|
||||
"domain": "1001.vegas",
|
||||
"records": {
|
||||
"ns": [
|
||||
{"ttl": 3600, "data": "ns11.waterrockdigital.com"},
|
||||
{"ttl": 3600, "data": "ns12.waterrockdigital.com"}
|
||||
]
|
||||
},
|
||||
"seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
|
||||
}
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': chunk['size'],
|
||||
'max_chunk_bytes': chunk['max_size'] * 1024 * 1024, # MB
|
||||
'thread_count': chunk['threads'],
|
||||
'queue_size': 2
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Watch the input file for new lines and index them in real time')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', help='Elasticsearch API Key for authentication')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', default='dns-zones', help='Elasticsearch index name')
|
||||
parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--batch-max', type=int, default=10, help='Maximum size in MB of a batch')
|
||||
parser.add_argument('--batch-size', type=int, default=5000, help='Number of records to index in a batch')
|
||||
parser.add_argument('--batch-threads', type=int, default=2, help='Number of threads to use when indexing in batches')
|
||||
parser.add_argument('--retries', type=int, default=10, help='Number of times to retry indexing a batch before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a batch')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Argument validation
|
||||
if not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
if not args.dry_run:
|
||||
if args.batch_size < 1:
|
||||
raise ValueError('Batch size must be greater than 0')
|
||||
|
||||
elif args.retries < 1:
|
||||
raise ValueError('Number of retries must be greater than 0')
|
||||
|
||||
elif args.timeout < 5:
|
||||
raise ValueError('Timeout must be greater than 4')
|
||||
|
||||
elif args.batch_max < 1:
|
||||
raise ValueError('Batch max size must be greater than 0')
|
||||
|
||||
elif args.batch_threads < 1:
|
||||
raise ValueError('Batch threads must be greater than 0')
|
||||
|
||||
elif not args.host:
|
||||
raise ValueError('Missing required Elasticsearch argument: host')
|
||||
|
||||
elif not args.api_key and (not args.user or not args.password):
|
||||
raise ValueError('Missing required Elasticsearch argument: either user and password or apikey')
|
||||
|
||||
elif args.shards < 1:
|
||||
raise ValueError('Number of shards must be greater than 0')
|
||||
|
||||
elif args.replicas < 0:
|
||||
raise ValueError('Number of replicas must be greater than 0')
|
||||
|
||||
edx = ElasticIndexer(args.host, args.port, args.user, args.password, args.api_key, args.index, args.dry_run, args.self_signed, args.retries, args.timeout)
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
edx.create_index(args.shards, args.replicas) # Create the index if it does not exist
|
||||
|
||||
chunk = {
|
||||
'size': args.batch_size,
|
||||
'max_size': args.batch_max * 1024 * 1024, # MB
|
||||
'threads': args.batch_threads
|
||||
}
|
||||
|
||||
chunk['batch'] = nodes * (chunk['size'] * chunk['threads'])
|
||||
else:
|
||||
chunk = {} # Ugly hack to get this working...
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, args.watch, chunk)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, args.watch, chunk)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# sniff_patch.py
|
||||
|
||||
# Note:
|
||||
# This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
|
Loading…
Reference in New Issue
Block a user