MassDNS ingestion script now caches the previous record to support IP addresses that yield more than one PTR record (field turned into a list when +1). Records will now upsert so MassDNS can be streaming into ES 24/7=
This commit is contained in:
parent
b15b3d8241
commit
9c6beb71ce
3
.vscode/settings.json
vendored
Normal file
3
.vscode/settings.json
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
{
|
||||
"python.REPL.enableREPLSmartSend": false
|
||||
}
|
27
README.md
27
README.md
@ -60,35 +60,10 @@ python eris.py [options] <input>
|
||||
This ingestion suite will use the built in node sniffer, so by connecting to a single node, you can load balance across the entire cluster.
|
||||
It is good to know how much nodes you have in the cluster to determine how to fine tune the arguments for the best performance, based on your environment.
|
||||
|
||||
## GeoIP Pipeline
|
||||
Create & add a geoip pipeline and use the following in your index mappings:
|
||||
|
||||
```json
|
||||
"geoip": {
|
||||
"city_name": "City",
|
||||
"continent_name": "Continent",
|
||||
"country_iso_code": "CC",
|
||||
"country_name": "Country",
|
||||
"location": {
|
||||
"lat": 0.0000,
|
||||
"lon": 0.0000
|
||||
},
|
||||
"region_iso_code": "RR",
|
||||
"region_name": "Region"
|
||||
}
|
||||
```
|
||||
|
||||
## Changelog
|
||||
- Added ingestion script for certificate transparency logs in real time using websockets.
|
||||
- `--dry-run` removed as this nears production level
|
||||
- Implemented [async elasticsearch](https://elasticsearch-py.readthedocs.io/en/latest/async.html) into the codebase & refactored some of the logic to accomadate.
|
||||
- The `--watch` feature now uses a FIFO to do live ingestion.
|
||||
- Isolated eris.py into it's own file and seperated the ingestion agents into their own modules.
|
||||
|
||||
## Roadmap
|
||||
- Fix issue with `ingest_certs.py` and not needing to pass a file to it.
|
||||
- Create a module for RIR database ingestion *(WHOIS, delegations, transfer, ASN mapping, peering, etc)*
|
||||
- Dynamically update the batch metrics when the sniffer adds or removes nodes.
|
||||
- Fix issue with `ingest_certs.py` and not needing to pass a file to it.
|
||||
|
||||
___
|
||||
|
||||
|
25
helpers/filter.py
Normal file
25
helpers/filter.py
Normal file
@ -0,0 +1,25 @@
|
||||
# All of the errors below are yielded from the "title" service name.
|
||||
bad_banners = [
|
||||
'301 Moved Permanently',
|
||||
'400 Bad Request',
|
||||
'403 Access Denied',
|
||||
'403 Forbidden',
|
||||
'404 Not Found',
|
||||
'408 Request Timeout',
|
||||
'408 Timeout',
|
||||
'500 Internal Server Error',
|
||||
'Access Denied',
|
||||
'Application Server Error',
|
||||
'Bad Request',
|
||||
'Bad Request (400)',
|
||||
'Invalid URL',
|
||||
'Error',
|
||||
'ERROR: The request could not be satisfied',
|
||||
'Fastly error: unknown domain',
|
||||
'HTTP 400',
|
||||
'Request Timeout',
|
||||
'Server Error',
|
||||
'Service Unavailable',
|
||||
'Welcome to nginx!',
|
||||
'Welcome to nginx on Debian!',
|
||||
]
|
@ -11,8 +11,10 @@ try:
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
|
||||
|
||||
|
||||
default_index = 'masscan-logs'
|
||||
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for Masscan records.'''
|
||||
|
||||
@ -33,12 +35,16 @@ def construct_map() -> dict:
|
||||
'properties': {
|
||||
'ip' : { 'type': 'ip' },
|
||||
'port' : { 'type': 'integer' },
|
||||
'proto' : { 'type': 'keyword' },
|
||||
'service' : { 'type': 'keyword' },
|
||||
'banner' : keyword_mapping,
|
||||
#'geoip' : { 'properties': geoip_mapping } # Used witht he geoip pipeline to enrich the data
|
||||
'seen' : { 'type': 'date' }
|
||||
|
||||
'data' : {
|
||||
'properties': {
|
||||
'proto' : { 'type': 'keyword' },
|
||||
'service' : { 'type': 'keyword' },
|
||||
'banner' : keyword_mapping,
|
||||
'seen' : { 'type': 'date' }
|
||||
}
|
||||
},
|
||||
#'geoip' : { 'properties': geoip_mapping } # Used with the geoip pipeline to enrich the data
|
||||
'last_seen' : { 'type': 'date' }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -70,8 +76,8 @@ async def process_data(file_path: str):
|
||||
record = json.loads(line)
|
||||
except json.decoder.JSONDecodeError:
|
||||
# In rare cases, the JSON record may be incomplete or malformed:
|
||||
# { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
|
||||
# { "ip": "83.66.211.246", "timestamp": "1706557002"
|
||||
# { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
|
||||
# { "ip": "83.66.211.246", "timestamp": "1706557002"
|
||||
logging.error(f'Failed to parse JSON record! ({line})')
|
||||
input('Press Enter to continue...') # Pause for review & debugging (remove this in production)
|
||||
continue
|
||||
@ -83,10 +89,13 @@ async def process_data(file_path: str):
|
||||
|
||||
for port_info in record['ports']:
|
||||
struct = {
|
||||
'ip' : record['ip'],
|
||||
'port' : port_info['port'],
|
||||
'proto' : port_info['proto'],
|
||||
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
|
||||
'ip' : record['ip'],
|
||||
'data' : {
|
||||
'port' : port_info['port'],
|
||||
'proto' : port_info['proto'],
|
||||
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
|
||||
},
|
||||
'last_seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
|
||||
}
|
||||
|
||||
if 'service' in port_info:
|
||||
|
@ -10,7 +10,9 @@ try:
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
|
||||
|
||||
default_index = 'ptr-records'
|
||||
|
||||
default_index = 'ptr-records-eris'
|
||||
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for MassDNS records'''
|
||||
@ -19,14 +21,13 @@ def construct_map() -> dict:
|
||||
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'ip' : { 'type': 'ip' },
|
||||
'name' : { 'type': 'keyword' },
|
||||
'record' : keyword_mapping,
|
||||
'seen' : { 'type': 'date' }
|
||||
}
|
||||
'properties': {
|
||||
'ip' : { 'type': 'ip' },
|
||||
'record' : keyword_mapping,
|
||||
'seen' : { 'type': 'date' }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
@ -39,61 +40,104 @@ async def process_data(file_path: str):
|
||||
'''
|
||||
|
||||
async with aiofiles.open(file_path, mode='r') as input_file:
|
||||
|
||||
last = None
|
||||
|
||||
async for line in input_file:
|
||||
line = line.strip()
|
||||
|
||||
if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
|
||||
break
|
||||
# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
|
||||
if line == '~eof':
|
||||
yield last
|
||||
|
||||
# Skip empty lines
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Split the line into its parts
|
||||
parts = line.split()
|
||||
|
||||
# Ensure the line has at least 3 parts
|
||||
if len(parts) < 3:
|
||||
raise ValueError(f'Invalid PTR record: {line}')
|
||||
logging.warning(f'Invalid PTR record: {line}')
|
||||
continue
|
||||
|
||||
# Split the PTR record into its parts
|
||||
name, record_type, record = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
|
||||
|
||||
# Do we handle CNAME records returned by MassDNS?
|
||||
# Do not index other records
|
||||
if record_type != 'PTR':
|
||||
logging.warning(f'Invalid record type: {record_type}: {line}')
|
||||
continue
|
||||
|
||||
# Do not index PTR records that do not have a record
|
||||
if not record:
|
||||
logging.warning(f'Empty PTR record: {line}')
|
||||
continue
|
||||
|
||||
# Let's not index the PTR record if it's the same as the in-addr.arpa domain
|
||||
if record == name:
|
||||
logging.warning(f'PTR record is the same as the in-addr.arpa domain: {line}')
|
||||
continue
|
||||
|
||||
if not record: # Skip empty records
|
||||
continue
|
||||
|
||||
# Get the IP address from the in-addr.arpa domain
|
||||
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
|
||||
|
||||
struct = {
|
||||
'ip' : ip,
|
||||
'record' : record,
|
||||
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
||||
# Check if we are still processing the same IP address
|
||||
if last:
|
||||
if ip == last['_id']:
|
||||
last_record = last['_doc']['record']
|
||||
if isinstance(last_record, list):
|
||||
if record not in last_record:
|
||||
last['_doc']['record'].append(record)
|
||||
else:
|
||||
logging.warning(f'Duplicate PTR record: {line}')
|
||||
else:
|
||||
if record != last_record:
|
||||
last['_doc']['record'] = [last_record, record] # IP addresses with more than one PTR record will turn into a list
|
||||
continue
|
||||
else:
|
||||
yield last
|
||||
|
||||
# Cache the this document in-case we have more for the same IP address
|
||||
last = {
|
||||
'_op_type' : 'update',
|
||||
'_id' : ip,
|
||||
'_index' : default_index,
|
||||
'_doc' : {
|
||||
'ip' : ip,
|
||||
'record' : record,
|
||||
'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
||||
},
|
||||
'doc_as_upsert' : True # This will create the document if it does not exist
|
||||
}
|
||||
|
||||
yield {'_id': ip, '_index': default_index, '_source': struct} # Store with ip as the unique id to allow the record to be reindexed if it exists.
|
||||
|
||||
|
||||
|
||||
'''
|
||||
Example PTR record:
|
||||
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
|
||||
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
|
||||
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
|
||||
0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
|
||||
0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
|
||||
Deployment:
|
||||
git clone https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
|
||||
curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
|
||||
pythons ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"_id" : "47.229.6.0"
|
||||
"_index" : "ptr-records",
|
||||
"_source" : {
|
||||
"ip" : "47.229.6.0",
|
||||
"record" : "047-229-006-000.res.spectrum.com.",
|
||||
"seen" : "2021-06-30T18:31:00Z"
|
||||
Output:
|
||||
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
|
||||
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
|
||||
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
|
||||
|
||||
Input:
|
||||
{
|
||||
"_id" : "47.229.6.0"
|
||||
"_index" : "ptr-records",
|
||||
"_source" : {
|
||||
"ip" : "47.229.6.0",
|
||||
"record" : "047-229-006-000.res.spectrum.com", # This will be a list if there are more than one PTR record
|
||||
"seen" : "2021-06-30T18:31:00Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Notes:
|
||||
- Why do some IP addresses return a CNAME from a PTR request
|
||||
- What is dns-servfail.net (Frequent CNAME response from PTR requests)
|
||||
- Do we need JSON output from massdns?
|
||||
'''
|
287
old/eris.py
287
old/eris.py
@ -1,287 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import time
|
||||
import sys
|
||||
|
||||
sys.dont_write_bytecode = True
|
||||
|
||||
try:
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
from elasticsearch.exceptions import NotFoundError
|
||||
except ImportError:
|
||||
raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
|
||||
|
||||
# Setting up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
|
||||
|
||||
|
||||
class ElasticIndexer:
|
||||
def __init__(self, args: argparse.Namespace):
|
||||
'''
|
||||
Initialize the Elastic Search indexer.
|
||||
|
||||
:param args: Parsed arguments from argparse
|
||||
'''
|
||||
|
||||
self.chunk_max = args.chunk_max * 1024 * 1024 # MB
|
||||
self.chunk_size = args.chunk_size
|
||||
self.chunk_threads = args.chunk_threads
|
||||
self.dry_run = args.dry_run
|
||||
self.es_index = args.index
|
||||
|
||||
if not args.dry_run:
|
||||
es_config = {
|
||||
'hosts': [f'{args.host}:{args.port}'],
|
||||
'verify_certs': args.self_signed,
|
||||
'ssl_show_warn': args.self_signed,
|
||||
'request_timeout': args.timeout,
|
||||
'max_retries': args.retries,
|
||||
'retry_on_timeout': True,
|
||||
'sniff_on_start': False,
|
||||
'sniff_on_node_failure': True,
|
||||
'min_delay_between_sniffing': 60 # Add config option for this?
|
||||
}
|
||||
|
||||
if args.api_key:
|
||||
es_config['headers'] = {'Authorization': f'ApiKey {args.api_key}'}
|
||||
else:
|
||||
es_config['basic_auth'] = (args.user, args.password)
|
||||
|
||||
# Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
|
||||
import sniff_patch
|
||||
self.es = sniff_patch.init_elasticsearch(**es_config)
|
||||
|
||||
# Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
|
||||
#self.es = Elasticsearch(**es_config)
|
||||
|
||||
|
||||
def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ):
|
||||
'''
|
||||
Create the Elasticsearch index with the defined mapping.
|
||||
|
||||
:param pipline: Name of the ingest pipeline to use for the index
|
||||
:param replicas: Number of replicas for the index
|
||||
:param shards: Number of shards for the index
|
||||
'''
|
||||
|
||||
if self.es.indices.exists(index=self.es_index):
|
||||
logging.info(f'Index \'{self.es_index}\' already exists.')
|
||||
return
|
||||
|
||||
mapping = map_body
|
||||
|
||||
mapping['settings'] = {
|
||||
'number_of_shards': shards,
|
||||
'number_of_replicas': replicas
|
||||
}
|
||||
|
||||
if pipeline:
|
||||
try:
|
||||
self.es.ingest.get_pipeline(id=pipeline)
|
||||
logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
|
||||
mapping['settings']['index.default_pipeline'] = pipeline
|
||||
except NotFoundError:
|
||||
raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
|
||||
|
||||
response = self.es.indices.create(index=self.es_index, body=mapping)
|
||||
|
||||
if response.get('acknowledged') and response.get('shards_acknowledged'):
|
||||
logging.info(f'Index \'{self.es_index}\' successfully created.')
|
||||
else:
|
||||
raise Exception(f'Failed to create index. ({response})')
|
||||
|
||||
|
||||
def get_cluster_health(self) -> dict:
|
||||
'''Get the health of the Elasticsearch cluster.'''
|
||||
|
||||
return self.es.cluster.health()
|
||||
|
||||
|
||||
def get_cluster_size(self) -> int:
|
||||
'''Get the number of nodes in the Elasticsearch cluster.'''
|
||||
|
||||
cluster_stats = self.es.cluster.stats()
|
||||
number_of_nodes = cluster_stats['nodes']['count']['total']
|
||||
|
||||
return number_of_nodes
|
||||
|
||||
|
||||
def bulk_index(self, documents: list, file_path: str, count: int):
|
||||
'''
|
||||
Index a batch of documents to Elasticsearch.
|
||||
|
||||
:param documents: List of documents to index
|
||||
:param file_path: Path to the file being indexed
|
||||
:param count: Total number of records processed
|
||||
'''
|
||||
|
||||
remaining_documents = documents
|
||||
|
||||
parallel_bulk_config = {
|
||||
'client': self.es,
|
||||
'chunk_size': self.chunk_size,
|
||||
'max_chunk_bytes': self.chunk_max,
|
||||
'thread_count': self.chunk_threads,
|
||||
'queue_size': 2 # Add config option for this?
|
||||
}
|
||||
|
||||
while remaining_documents:
|
||||
failed_documents = []
|
||||
|
||||
try:
|
||||
for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config):
|
||||
if not success:
|
||||
failed_documents.append(response)
|
||||
|
||||
if not failed_documents:
|
||||
ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count']
|
||||
logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}')
|
||||
break
|
||||
|
||||
else:
|
||||
logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...')
|
||||
remaining_documents = failed_documents
|
||||
except Exception as e:
|
||||
logging.error(f'Failed to index documents! ({e})')
|
||||
time.sleep(30) # Should we add a config option for this?
|
||||
|
||||
|
||||
def process_file(self, file_path: str, batch_size: int, ingest_function: callable):
|
||||
'''
|
||||
Read and index records in batches to Elasticsearch.
|
||||
|
||||
:param file_path: Path to the file
|
||||
:param batch_size: Number of records to index per batch
|
||||
:param ingest_function: Function to process the file
|
||||
'''
|
||||
|
||||
count = 0
|
||||
records = []
|
||||
|
||||
for processed in ingest_function(file_path):
|
||||
|
||||
if not processed:
|
||||
break
|
||||
|
||||
if self.dry_run:
|
||||
print(processed)
|
||||
continue
|
||||
|
||||
struct = {'_index': self.es_index, '_source': processed}
|
||||
records.append(struct)
|
||||
count += 1
|
||||
|
||||
if len(records) >= batch_size:
|
||||
self.bulk_index(records, file_path, count)
|
||||
records = []
|
||||
|
||||
if records:
|
||||
self.bulk_index(records, file_path, count)
|
||||
|
||||
|
||||
def main():
|
||||
'''Main function when running this script directly.'''
|
||||
|
||||
parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
|
||||
|
||||
# General arguments
|
||||
parser.add_argument('input_path', help='Path to the input file or directory') # Required
|
||||
parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)')
|
||||
parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
|
||||
|
||||
# Elasticsearch arguments
|
||||
parser.add_argument('--host', default='localhost', help='Elasticsearch host')
|
||||
parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
|
||||
parser.add_argument('--user', default='elastic', help='Elasticsearch username')
|
||||
parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
|
||||
parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
|
||||
parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
|
||||
|
||||
# Elasticsearch indexing arguments
|
||||
parser.add_argument('--index', help='Elasticsearch index name')
|
||||
parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
|
||||
parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
|
||||
parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index')
|
||||
|
||||
# Performance arguments
|
||||
parser.add_argument('--chunk-max', type=int, default=10, help='Maximum size in MB of a chunk')
|
||||
parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
|
||||
parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks')
|
||||
parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing')
|
||||
parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk')
|
||||
|
||||
# Ingestion arguments
|
||||
parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
|
||||
parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
|
||||
parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
|
||||
parser.add_argument('--zone', action='store_true', help='Index Zone records')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.watch:
|
||||
if not os.path.exists(args.input_path):
|
||||
os.mkfifo(args.input_path)
|
||||
elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode):
|
||||
raise ValueError(f'Path {args.input_path} is not a FIFO')
|
||||
elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
|
||||
raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
|
||||
|
||||
edx = ElasticIndexer(args)
|
||||
|
||||
if args.httpx:
|
||||
from ingestors import ingest_httpx as ingestor
|
||||
elif args.masscan:
|
||||
from ingestors import ingest_masscan as ingestor
|
||||
elif args.massdns:
|
||||
from ingestors import ingest_massdns as ingestor
|
||||
elif args.zone:
|
||||
from ingestors import ingest_zone as ingestor
|
||||
|
||||
batch_size = 0
|
||||
|
||||
if not args.dry_run:
|
||||
print(edx.get_cluster_health())
|
||||
|
||||
time.sleep(3) # Delay to allow time for sniffing to complete
|
||||
|
||||
nodes = edx.get_cluster_size()
|
||||
logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
|
||||
|
||||
if not edx.es_index:
|
||||
edx.es_index = ingestor.default_index
|
||||
|
||||
map_body = ingestor.construct_map()
|
||||
edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
|
||||
|
||||
batch_size = int(nodes * (args.chunk_size * args.chunk_threads))
|
||||
|
||||
if os.path.isfile(args.input_path):
|
||||
logging.info(f'Processing file: {args.input_path}')
|
||||
edx.process_file(args.input_path, batch_size, ingestor.process_file)
|
||||
|
||||
elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
|
||||
logging.info(f'Watching FIFO: {args.input_path}')
|
||||
edx.process_file(args.input_path, batch_size, ingestor.process_file)
|
||||
|
||||
elif os.path.isdir(args.input_path):
|
||||
count = 1
|
||||
total = len(os.listdir(args.input_path))
|
||||
logging.info(f'Processing {total:,} files in directory: {args.input_path}')
|
||||
for file in sorted(os.listdir(args.input_path)):
|
||||
file_path = os.path.join(args.input_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
|
||||
edx.process_file(file_path, batch_size, ingestor.process_file)
|
||||
count += 1
|
||||
else:
|
||||
logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,98 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_httpx.py
|
||||
|
||||
import json
|
||||
|
||||
default_index = 'httpx-logs'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for Masscan records.'''
|
||||
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'change': 'me'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and process HTTPX records from the log file.
|
||||
|
||||
:param file_path: Path to the HTTPX log file
|
||||
'''
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
record = json.loads(line)
|
||||
|
||||
record['seen'] = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
|
||||
record['domain'] = record.pop('input')
|
||||
|
||||
del record['failed'], record['knowledgebase'], record['time']
|
||||
|
||||
yield record
|
||||
|
||||
return None # EOF
|
||||
|
||||
|
||||
|
||||
''''
|
||||
Example record:
|
||||
{
|
||||
"timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
|
||||
"hash": { # Do we need all of these ?
|
||||
"body_md5":"4ae9394eb98233b482508cbda3b33a66",
|
||||
"body_mmh3":"-4111954",
|
||||
"body_sha256":"89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
|
||||
"body_simhash":"9814303593401624250",
|
||||
"header_md5":"980366deb2b2fb5df2ad861fc63e79ce",
|
||||
"header_mmh3":"-813072798",
|
||||
"header_sha256":"39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
|
||||
"header_simhash":"10962523587435277678"
|
||||
},
|
||||
"port":"443",
|
||||
"url":"https://supernets.org", # Remove this and only use the input field as "domain" maybe
|
||||
"input":"supernets.org", # rename to domain
|
||||
"title":"SuperNETs",
|
||||
"scheme":"https",
|
||||
"webserver":"nginx",
|
||||
"body_preview":"SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
|
||||
"content_type":"text/html",
|
||||
"method":"GET", # Do we need this ?
|
||||
"host":"51.89.151.158",
|
||||
"path":"/",
|
||||
"favicon":"-674048714",
|
||||
"favicon_path":"/i/favicon.png",
|
||||
"time":"592.907689ms", # Do we need this ?
|
||||
"a":[
|
||||
"6.150.220.23"
|
||||
],
|
||||
"tech":[
|
||||
"Bootstrap:4.0.0",
|
||||
"HSTS",
|
||||
"Nginx"
|
||||
],
|
||||
"words":436, # Do we need this ?
|
||||
"lines":79, # Do we need this ?
|
||||
"status_code":200,
|
||||
"content_length":4597,
|
||||
"failed":false, # Do we need this ?
|
||||
"knowledgebase":{ # Do we need this ?
|
||||
"PageType":"nonerror",
|
||||
"pHash":0
|
||||
}
|
||||
}
|
||||
'''
|
@ -1,136 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_masscan.py
|
||||
|
||||
'''
|
||||
apt-get install iptables masscan libpcap-dev screen
|
||||
setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
|
||||
/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP
|
||||
printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32" > exclude.conf
|
||||
screen -S scan
|
||||
masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
|
||||
masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
|
||||
|
||||
Note: The above iptables rule is not persistent and will be removed on reboot.
|
||||
'''
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
default_index = 'masscan-logs'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for Masscan records.'''
|
||||
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'ip': { 'type': 'ip' },
|
||||
'port': { 'type': 'integer' },
|
||||
'proto': { 'type': 'keyword' },
|
||||
'service': { 'type': 'keyword' },
|
||||
'banner': keyword_mapping,
|
||||
'ref_id': { 'type': 'keyword' },
|
||||
'seen': { 'type': 'date' }
|
||||
#'geoip': {
|
||||
# 'properties': {
|
||||
# 'city_name': keyword_mapping,
|
||||
# 'continent_name': keyword_mapping,
|
||||
# 'country_iso_code': keyword_mapping,
|
||||
# 'country_name': keyword_mapping,
|
||||
# 'location': { 'type': 'geo_point' },
|
||||
# 'region_iso_code': keyword_mapping,
|
||||
# 'region_name': keyword_mapping,
|
||||
# }
|
||||
#}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and process Masscan records from the log file.
|
||||
|
||||
:param file_path: Path to the Masscan log file
|
||||
'''
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line or not line.startswith('{'):
|
||||
continue
|
||||
|
||||
if line.endswith(','):
|
||||
line = line[:-1]
|
||||
|
||||
try:
|
||||
record = json.loads(line)
|
||||
except json.decoder.JSONDecodeError:
|
||||
logging.error(f'Failed to parse JSON record! ({line})')
|
||||
input('Press Enter to continue...') # Debugging
|
||||
continue
|
||||
|
||||
for port_info in record['ports']:
|
||||
struct = {
|
||||
'ip': record['ip'],
|
||||
'port': port_info['port'],
|
||||
'proto': port_info['proto'],
|
||||
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
|
||||
}
|
||||
|
||||
if 'service' in port_info:
|
||||
if 'name' in port_info['service']:
|
||||
if port_info['service']['name'] != 'unknown':
|
||||
struct['service'] = port_info['service']['name']
|
||||
|
||||
if 'banner' in port_info['service']:
|
||||
banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
|
||||
if banner:
|
||||
match = re.search(r'\(Ref\.Id: (.*?)\)', banner)
|
||||
if match:
|
||||
struct['ref_id'] = match.group(1)
|
||||
else:
|
||||
struct['banner'] = banner
|
||||
|
||||
yield struct
|
||||
|
||||
return None # EOF
|
||||
|
||||
|
||||
|
||||
'''
|
||||
Example record:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"timestamp": "1705255468", # Convert to ZULU BABY
|
||||
"ports": [ # We will create a record for each port opened
|
||||
{
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": { # This field is optional
|
||||
"name": "ssh",
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "43.134.51.142",
|
||||
"port": 22,
|
||||
"proto": "tcp",
|
||||
"service": "ssh",
|
||||
"banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
|
||||
"seen": "2021-10-08T02:04:28Z",
|
||||
"ref_id": "?sKfOvsC4M4a2W8PaC4zF?" # TCP RST Payload (Do we need this?)
|
||||
}
|
||||
'''
|
@ -1,87 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_massdns.py
|
||||
|
||||
import time
|
||||
|
||||
default_index = 'ptr-records'
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for MassDNS records'''
|
||||
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'ip': { 'type': 'ip' },
|
||||
'name': { 'type': 'keyword' },
|
||||
'record': keyword_mapping,
|
||||
'seen': { 'type': 'date' }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and process Massdns records from the log file.
|
||||
|
||||
:param file_path: Path to the Massdns log file
|
||||
'''
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split()
|
||||
|
||||
if len(parts) < 3:
|
||||
raise ValueError(f'Invalid PTR record: {line}')
|
||||
|
||||
name, record_type, data = parts[0].rstrip('.'), parts[1], ' '.join(parts[2:]).rstrip('.')
|
||||
|
||||
if record_type != 'PTR':
|
||||
continue
|
||||
|
||||
#if record_type == 'CNAME':
|
||||
# if data.endswith('.in-addr.arpa'):
|
||||
# continue
|
||||
|
||||
# Let's not index the PTR record if it's the same as the in-addr.arpa domain
|
||||
if data == name:
|
||||
continue
|
||||
|
||||
ip = '.'.join(name.replace('.in-addr.arpa', '').split('.')[::-1])
|
||||
|
||||
struct = {
|
||||
'ip': ip,
|
||||
'record': data,
|
||||
'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
||||
}
|
||||
|
||||
yield struct
|
||||
|
||||
return None # EOF
|
||||
|
||||
|
||||
'''
|
||||
Example PTR record:
|
||||
0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
|
||||
0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
|
||||
0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
|
||||
0.6.212.173.in-addr.arpa. PTR 173-212-6-0.cpe.surry.net.
|
||||
0.6.201.133.in-addr.arpa. PTR flh2-133-201-6-0.tky.mesh.ad.jp.
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"ip": "47.229.6.0",
|
||||
"record": "047-229-006-000.res.spectrum.com.",
|
||||
"seen": "2021-06-30T18:31:00Z"
|
||||
}
|
||||
'''
|
@ -1,132 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# ingest_zone.py
|
||||
|
||||
import time
|
||||
|
||||
default_index = 'dns-zones'
|
||||
record_types = ('a','aaaa','caa','cdnskey','cds','cname','dnskey','ds','mx','naptr','ns','nsec','nsec3','nsec3param','ptr','rrsig','rp','sshfp','soa','srv','txt','type65534')
|
||||
|
||||
def construct_map() -> dict:
|
||||
'''Construct the Elasticsearch index mapping for zone file records.'''
|
||||
|
||||
keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
|
||||
|
||||
mapping = {
|
||||
'mappings': {
|
||||
'properties': {
|
||||
'domain': keyword_mapping,
|
||||
'records': { 'properties': {} },
|
||||
'seen': {'type': 'date'}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Add record types to mapping dynamically to not clutter the code
|
||||
for item in record_types:
|
||||
if item in ('a','aaaa'):
|
||||
mapping['mappings']['properties']['records']['properties'][item] = {
|
||||
'properties': {
|
||||
'data': { 'type': 'ip' },
|
||||
'ttl': { 'type': 'integer' }
|
||||
}
|
||||
}
|
||||
else:
|
||||
mapping['mappings']['properties']['records']['properties'][item] = {
|
||||
'properties': {
|
||||
'data': keyword_mapping,
|
||||
'ttl': { 'type': 'integer' }
|
||||
}
|
||||
}
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def process_file(file_path: str):
|
||||
'''
|
||||
Read and process zone file records.
|
||||
|
||||
:param file_path: Path to the zone file
|
||||
'''
|
||||
|
||||
domain_records = {}
|
||||
last_domain = None
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
|
||||
if not line or line.startswith(';'):
|
||||
continue
|
||||
|
||||
parts = line.split()
|
||||
|
||||
if len(parts) < 5:
|
||||
raise ValueError(f'Invalid line: {line}')
|
||||
|
||||
domain, ttl, record_class, record_type, data = parts[0].rstrip('.').lower(), parts[1], parts[2].lower(), parts[3].lower(), ' '.join(parts[4:])
|
||||
|
||||
if not ttl.isdigit():
|
||||
raise ValueError(f'Invalid TTL: {ttl} with line: {line}')
|
||||
|
||||
ttl = int(ttl)
|
||||
|
||||
if record_class != 'in':
|
||||
raise ValueError(f'Unsupported record class: {record_class} with line: {line}') # Anomaly (Doubtful any CHAOS/HESIOD records will be found)
|
||||
|
||||
# We do not want to collide with our current mapping (Again, this is an anomaly)
|
||||
if record_type not in record_types:
|
||||
raise ValueError(f'Unsupported record type: {record_type} with line: {line}')
|
||||
|
||||
# Little tidying up for specific record types
|
||||
if record_type == 'nsec':
|
||||
data = ' '.join([data.split()[0].rstrip('.'), *data.split()[1:]])
|
||||
elif record_type == 'soa':
|
||||
data = ' '.join([part.rstrip('.') if '.' in part else part for part in data.split()])
|
||||
elif data.endswith('.'):
|
||||
data = data.rstrip('.')
|
||||
|
||||
if domain != last_domain:
|
||||
if last_domain:
|
||||
source = {'domain': last_domain, 'records': domain_records[last_domain], 'seen': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())}
|
||||
|
||||
del domain_records[last_domain]
|
||||
|
||||
yield source
|
||||
|
||||
last_domain = domain
|
||||
|
||||
domain_records[domain] = {}
|
||||
|
||||
if record_type not in domain_records[domain]:
|
||||
domain_records[domain][record_type] = []
|
||||
|
||||
domain_records[domain][record_type].append({'ttl': ttl, 'data': data})
|
||||
|
||||
return None # EOF
|
||||
|
||||
|
||||
|
||||
'''
|
||||
Example record:
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in nsec3 1 1 100 332539EE7F95C32A 10MHUKG4FHIAVEFDOTF6NKU5KFCB2J3A NS DS RRSIG
|
||||
0so9l9nrl425q3tf7dkv1nmv2r3is6vm.vegas. 3600 in rrsig NSEC3 8 2 3600 20240122151947 20240101141947 4125 vegas. hzIvQrZIxBSwRWyiHkb5M2W0R3ikNehv884nilkvTt9DaJSDzDUrCtqwQb3jh6+BesByBqfMQK+L2n9c//ZSmD5/iPqxmTPCuYIB9uBV2qSNSNXxCY7uUt5w7hKUS68SLwOSjaQ8GRME9WQJhY6gck0f8TT24enjXXRnQC8QitY=
|
||||
1-800-flowers.vegas. 3600 in ns dns1.cscdns.net.
|
||||
1-800-flowers.vegas. 3600 in ns dns2.cscdns.net.
|
||||
100.vegas. 3600 in ns ns51.domaincontrol.com.
|
||||
100.vegas. 3600 in ns ns52.domaincontrol.com.
|
||||
1001.vegas. 3600 in ns ns11.waterrockdigital.com.
|
||||
1001.vegas. 3600 in ns ns12.waterrockdigital.com.
|
||||
|
||||
Will be indexed as:
|
||||
{
|
||||
"domain": "1001.vegas",
|
||||
"records": {
|
||||
"ns": [
|
||||
{"ttl": 3600, "data": "ns11.waterrockdigital.com"},
|
||||
{"ttl": 3600, "data": "ns12.waterrockdigital.com"}
|
||||
]
|
||||
},
|
||||
"seen": "2021-09-01T00:00:00Z" # Zulu time added upon indexing
|
||||
}
|
||||
'''
|
@ -1,96 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
|
||||
# sniff_patch.py
|
||||
|
||||
# Note:
|
||||
# This is a patch for the elasticsearch 8.x client to fix the sniff_* options.
|
||||
# This patch is only needed if you use the sniff_* options and only works with basic auth.
|
||||
# Call init_elasticsearch() with normal Elasticsearch params.
|
||||
#
|
||||
# Source:
|
||||
# - https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960
|
||||
|
||||
import base64
|
||||
|
||||
import elasticsearch._sync.client as client
|
||||
from elasticsearch.exceptions import SerializationError, ConnectionError
|
||||
|
||||
|
||||
def init_elasticsearch(*args, **kwargs):
|
||||
'''
|
||||
Initialize the Elasticsearch client with the sniff patch.
|
||||
|
||||
:param args: Elasticsearch positional arguments.
|
||||
:param kwargs: Elasticsearch keyword arguments.
|
||||
'''
|
||||
client.default_sniff_callback = _override_sniff_callback(kwargs['basic_auth'])
|
||||
|
||||
return client.Elasticsearch(*args, **kwargs)
|
||||
|
||||
|
||||
def _override_sniff_callback(basic_auth):
|
||||
'''
|
||||
Taken from https://github.com/elastic/elasticsearch-py/blob/8.8/elasticsearch/_sync/client/_base.py#L166
|
||||
Completely unmodified except for adding the auth header to the elastic request.
|
||||
Allows us to continue using the sniff_* options while this is broken in the library.
|
||||
|
||||
TODO: Remove this when this issue is patched:
|
||||
- https://github.com/elastic/elasticsearch-py/issues/2005
|
||||
'''
|
||||
auth_str = base64.b64encode(':'.join(basic_auth).encode()).decode()
|
||||
sniffed_node_callback = client._base._default_sniffed_node_callback
|
||||
|
||||
def modified_sniff_callback(transport, sniff_options):
|
||||
for _ in transport.node_pool.all():
|
||||
try:
|
||||
meta, node_infos = transport.perform_request(
|
||||
'GET',
|
||||
'/_nodes/_all/http',
|
||||
headers = {
|
||||
'accept': 'application/vnd.elasticsearch+json; compatible-with=8',
|
||||
'authorization': f'Basic {auth_str}' # This auth header is missing in 8.x releases of the client, and causes 401s
|
||||
},
|
||||
request_timeout = (
|
||||
sniff_options.sniff_timeout
|
||||
if not sniff_options.is_initial_sniff
|
||||
else None
|
||||
),
|
||||
)
|
||||
except (SerializationError, ConnectionError):
|
||||
continue
|
||||
|
||||
if not 200 <= meta.status <= 299:
|
||||
continue
|
||||
|
||||
node_configs = []
|
||||
for node_info in node_infos.get('nodes', {}).values():
|
||||
address = node_info.get('http', {}).get('publish_address')
|
||||
if not address or ':' not in address:
|
||||
continue
|
||||
|
||||
if '/' in address:
|
||||
# Support 7.x host/ip:port behavior where http.publish_host has been set.
|
||||
fqdn, ipaddress = address.split('/', 1)
|
||||
host = fqdn
|
||||
_, port_str = ipaddress.rsplit(':', 1)
|
||||
port = int(port_str)
|
||||
else:
|
||||
host, port_str = address.rsplit(':', 1)
|
||||
port = int(port_str)
|
||||
|
||||
assert sniffed_node_callback is not None
|
||||
sniffed_node = sniffed_node_callback(
|
||||
node_info, meta.node.replace(host=host, port=port)
|
||||
)
|
||||
if sniffed_node is None:
|
||||
continue
|
||||
|
||||
# Use the node which was able to make the request as a base.
|
||||
node_configs.append(sniffed_node)
|
||||
|
||||
if node_configs:
|
||||
return node_configs
|
||||
|
||||
return []
|
||||
|
||||
return modified_sniff_callback
|
Loading…
Reference in New Issue
Block a user