diff --git a/eris.py b/eris.py index 638356f..ece7f15 100644 --- a/eris.py +++ b/eris.py @@ -9,8 +9,9 @@ import logging.handlers import os import stat import sys +import json -sys.dont_write_bytecode = True +sys.dont_write_bytecode = True # FUCKOFF __pycache__ try: from elasticsearch import AsyncElasticsearch @@ -34,8 +35,8 @@ class ElasticIndexer: # Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005) es_config = { - #'hosts' : [f'{args.host}:{args.port}'], - 'hosts' : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing + 'hosts' : [f'{args.host}:{args.port}'], + #'hosts' : [f'{args.host}:{port}' for port in ('9200',)], # Temporary alternative to sniffing 'verify_certs' : args.self_signed, 'ssl_show_warn' : args.self_signed, 'request_timeout' : args.timeout, @@ -104,19 +105,27 @@ class ElasticIndexer: Index records in chunks to Elasticsearch. :param file_path: Path to the file - :param index_name: Name of the index :param data_generator: Generator for the records to index ''' - count = 0 - total = 0 + count = 0 + total = 0 + errors = [] try: - async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max): + async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max,raise_on_error=False): action, result = result.popitem() if not ok: - logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})') + error_type = result.get('error', {}).get('type', 'unknown') + error_reason = result.get('error', {}).get('reason', 'unknown') + logging.error('FAILED DOCUMENT:') + logging.error(f'Error Type : {error_type}') + logging.error(f'Error Reason : {error_reason}') + logging.error('Document : ') + logging.error(json.dumps(result, indent=2)) + input('Press Enter to continue...') + errors.append(result) continue count += 1 @@ -126,7 +135,8 @@ class ElasticIndexer: logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}') count = 0 - logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}') + if errors: + raise Exception(f'{len(errors):,} document(s) failed to index. Check the logs above for details.') except Exception as e: raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})') @@ -148,7 +158,7 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_ logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Minimum level to capture all logs - # Clear existing handlersaise Exception(f'Failed to fetch zone links: {e}') + # Clear existing handlers logger.handlers = [] # Setup console handler @@ -203,8 +213,8 @@ async def main(): parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index') # Performance arguments - parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk') - parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes') + parser.add_argument('--chunk-size', type=int, default=5000, help='Number of records to index in a chunk') + parser.add_argument('--chunk-max', type=int, default=10485760, help='Maximum size of a chunk in bytes (default 10mb)') parser.add_argument('--retries', type=int, default=30, help='Number of times to retry indexing a chunk before failing') parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk') @@ -214,6 +224,8 @@ async def main(): parser.add_argument('--masscan', action='store_true', help='Index Masscan records') parser.add_argument('--massdns', action='store_true', help='Index Massdns records') parser.add_argument('--zone', action='store_true', help='Index Zone records') + parser.add_argument('--rir-delegations', action='store_true', help='Index RIR Delegations records') + parser.add_argument('--rir-transfers', action='store_true', help='Index RIR Transfers records') args = parser.parse_args() @@ -239,15 +251,19 @@ async def main(): edx = ElasticIndexer(args) if args.certstream: - from ingestors import ingest_certstream as ingestor + from ingestors import ingest_certstream as ingestor elif args.httpx: - from ingestors import ingest_httpx as ingestor + from ingestors import ingest_httpx as ingestor elif args.masscan: - from ingestors import ingest_masscan as ingestor + from ingestors import ingest_masscan as ingestor elif args.massdns: - from ingestors import ingest_massdns as ingestor + from ingestors import ingest_massdns as ingestor + elif args.rir_delegations: + from ingestors import ingest_rir_delegations as ingestor + elif args.rir_transfers: + from ingestors import ingest_rir_transfers as ingestor elif args.zone: - from ingestors import ingest_zone as ingestor + from ingestors import ingest_zone as ingestor else: raise ValueError('No ingestor specified') diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index b3e0aae..d585bc4 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -140,13 +140,25 @@ if __name__ == '__main__': ''' Deployment: + printf "\nsession required pam_limits.so" >> /etc/pam.d/su + printf "acidvegas hard nofile 65535\nacidvegas soft nofile 65535" >> /etc/security/limits.conf + echo "net.netfilter.nf_conntrack_max = 131072" >> /etc/sysctl.conf + echo "net.netfilter.nf_conntrack_udp_timeout = 30" >> /etc/sysctl.conf + echo "net.netfilter.nf_conntrack_udp_timeout_stream = 120" >> /etc/sysctl.conf + echo "net.netfilter.nf_conntrack_tcp_timeout_established = 300" >> /etc/sysctl.conf + sysctl -p + sudo apt-get install build-essential gcc make python3 python3-pip pip install aiofiles aiohttp elasticsearch git clone --depth 1 https://github.com/acidvegas/eris.git $HOME/eris git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make - curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt - while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/eris/FIFO; done + wget -O $HOME/massdns/resolvers.txt https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt + while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/resolvers.txt -t PTR --filter NOERROR -s 5000 -o S -w $HOME/eris/FIFO; done + + screen -S eris + python3 $HOME/eris/eris.py --massdns + Output: diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py index 0582315..4b6efa2 100644 --- a/ingestors/ingest_rir_delegations.py +++ b/ingestors/ingest_rir_delegations.py @@ -14,7 +14,7 @@ except ImportError: # Set a default elasticsearch index if one is not provided -default_index = 'rir-delegation-' + time.strftime('%Y-%m-%d') +default_index = 'eris-rir-delegations' # Delegation data sources delegation_db = { @@ -52,7 +52,8 @@ def construct_map() -> dict: }, 'date' : { 'type': 'date' }, 'status' : { 'type': 'keyword' }, - 'extensions' : keyword_mapping + 'extensions' : keyword_mapping, + 'seen' : { 'type': 'date' } } } } @@ -60,8 +61,12 @@ def construct_map() -> dict: return mapping -async def process_data(): - '''Read and process the delegation data.''' +async def process_data(place_holder: str = None): + ''' + Read and process the delegation data. + + :param place_holder: Placeholder parameter to match the process_data function signature of other ingestors. + ''' for registry, url in delegation_db.items(): try: @@ -150,12 +155,13 @@ async def process_data(): if not record['date'] or record['date'] == '00000000': del record['date'] else: - record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')), + record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')) if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'): raise ValueError(f'Invalid status: {cache}') - - #json_output['records'].append(record) + + # Set the seen timestamp + record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ') # Let's just index the records themself (for now) yield {'_index': default_index, '_source': record} diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py index 149f519..4ed997c 100644 --- a/ingestors/ingest_rir_transfers.py +++ b/ingestors/ingest_rir_transfers.py @@ -13,7 +13,7 @@ except ImportError: # Set a default elasticsearch index if one is not provided -default_index = 'rir-transfer-' + time.strftime('%Y-%m-%d') +default_index = 'eris-rir-transfers' # Transfers data sources transfers_db = { @@ -38,27 +38,28 @@ def construct_map() -> dict: 'date' : { 'type': 'date' }, 'ip4nets' : { 'properties': { - 'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }, - 'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } } + 'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }, + 'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } } } }, 'ip6nets' : { 'properties': { - 'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }, - 'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } } + 'original_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } }, + 'transfer_set': { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address': { 'type': 'ip' } } } } }, 'asns' : { 'properties': { - 'original_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } }, - 'transfer_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } } + 'original_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } }, + 'transfer_set': { 'properties': { 'start': { 'type': 'integer' }, 'end': { 'type': 'integer' } } } } }, 'type' : { 'type': 'keyword' }, - 'source_organization' : { 'properties': { 'name': keyword_mapping, 'country_code' : { 'type': 'keyword' } } }, - 'recipient_organization' : { 'properties': { 'name': keyword_mapping, 'country_code' : { 'type': 'keyword' } } }, + 'source_organization' : { 'properties': { 'name': keyword_mapping, 'country_code': { 'type': 'keyword' } } }, + 'recipient_organization' : { 'properties': { 'name': keyword_mapping, 'country_code': { 'type': 'keyword' } } }, 'source_rir' : { 'type': 'keyword' }, 'recipient_rir' : { 'type': 'keyword' }, + 'seen' : { 'type': 'date' } } } } @@ -89,6 +90,7 @@ async def process_data(): raise Exception(f'Invalid {registry} delegation data: {json_data}') for record in json_data['transfers']: + record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) if 'asns' in record: for set_type in ('original_set', 'transfer_set'):