diff --git a/eris.py b/eris.py index 1d86adc..638356f 100644 --- a/eris.py +++ b/eris.py @@ -19,11 +19,6 @@ try: except ImportError: raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') -try: - from ecs_logging import StdlibFormatter -except ImportError: - raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)') - class ElasticIndexer: def __init__(self, args: argparse.Namespace): @@ -61,6 +56,12 @@ class ElasticIndexer: self.es = AsyncElasticsearch(**es_config) + async def close_connect(self): + '''Close the Elasticsearch connection.''' + + await self.es.close() + + async def create_index(self, map_body: dict, pipeline: str = None, replicas: int = 1, shards: int = 1): ''' Create the Elasticsearch index with the defined mapping. @@ -131,7 +132,7 @@ class ElasticIndexer: raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})') -def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5): +def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5, ecs_format: bool = False): ''' Setup the global logger for the application. @@ -140,13 +141,14 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_ :param log_file: File to write logs to. :param max_file_size: Maximum size of the log file before it is rotated. :param backups: Number of backup log files to keep. + :param ecs_format: Use the Elastic Common Schema (ECS) format for logs. ''' # Configure the root logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Minimum level to capture all logs - # Clear existing handlers + # Clear existing handlersaise Exception(f'Failed to fetch zone links: {e}') logger.handlers = [] # Setup console handler @@ -160,8 +162,18 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_ if file_level is not None: file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups) file_handler.setLevel(file_level) - ecs_formatter = StdlibFormatter() # ECS (Elastic Common Schema) formatter - file_handler.setFormatter(ecs_formatter) + + # Setup formatter to use ECS format if enabled or default format + if ecs_format: + try: + from ecs_logging import StdlibFormatter + except ImportError: + raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)') + file_formatter = StdlibFormatter() # ECS formatter + else: + file_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %H:%M:%S') + + file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) @@ -174,6 +186,7 @@ async def main(): parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)') + parser.add_argument('--ecs', action='store_true', default=False, help='Use the Elastic Common Schema (ECS) for logging') # Elasticsearch arguments parser.add_argument('--host', default='http://localhost', help='Elasticsearch host') @@ -202,14 +215,14 @@ async def main(): parser.add_argument('--massdns', action='store_true', help='Index Massdns records') parser.add_argument('--zone', action='store_true', help='Index Zone records') + args = parser.parse_args() + if args.log: levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL} - setup_logger(file_level=levels[args.log], log_file='eris.log') + setup_logger(file_level=levels[args.log], log_file='eris.log', ecs_format=args.ecs) else: setup_logger() - args = parser.parse_args() - if args.host.endswith('/'): args.host = args.host[:-1] @@ -270,6 +283,8 @@ async def main(): else: logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') + await edx.close_connect() # Close the Elasticsearch connection to stop "Unclosed client session" warnings + if __name__ == '__main__': diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py index 523cea2..31946bf 100644 --- a/ingestors/ingest_certstream.py +++ b/ingestors/ingest_certstream.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) -# ingest_certs.py +# ingest_certstream.py import asyncio import json @@ -16,6 +16,10 @@ except ImportError: # Set a default elasticsearch index if one is not provided default_index = 'eris-certstream' +# Set the cache size for the Certstream records to prevent duplicates +cache = [] +cache_size = 5000 + def construct_map() -> dict: '''Construct the Elasticsearch index mapping for Certstream records.''' @@ -56,14 +60,14 @@ async def process_data(place_holder: str = None): continue # Grab the unique domains from the records - all_domains = record['data']['leaf_cert']['all_domains'] + all_domains = set(record['data']['leaf_cert']['all_domains']) domains = list() # We only care about subdomains (excluding www. and wildcards) for domain in all_domains: if domain.startswith('*.'): domain = domain[2:] - elif domain.startswith('www.') and domain.count('.') == 2: + if domain.startswith('www.') and domain.count('.') == 2: continue if domain.count('.') > 1: # TODO: Add a check for PSL TLDs...domain.co.uk, domain.com.au, etc. (we want to ignore these if they are not subdomains) @@ -72,6 +76,14 @@ async def process_data(place_holder: str = None): # Construct the document for domain in domains: + if domain in cache: + continue # Skip the domain if it is already in the cache + + if len(cache) >= cache_size: + cache.pop(0) # Remove the oldest domain from the cache + + cache.append(domain) # Add the domain to the cache + struct = { 'domain' : domain, 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) @@ -160,6 +172,13 @@ Output: "message_type": "certificate_update" } + +Input: + { + "domain" : "d7zdnegbre53n.amplifyapp.com", + "seen" : "2022-01-02T12:00:00Z" + } + Notes: - Fix the "no close frame received or sent" error ''' diff --git a/ingestors/ingest_ixps.py b/ingestors/ingest_ixps.py index ba4102f..25aa23c 100644 --- a/ingestors/ingest_ixps.py +++ b/ingestors/ingest_ixps.py @@ -26,10 +26,15 @@ def construct_map() -> dict: mapping = { 'mappings': { 'properties': { - 'name' : {'type': 'keyword'}, - 'alternatenames' : {'type': 'keyword'}, - 'sources' : {'type': 'keyword'}, - 'prefixes' : { 'properties': { 'ipv4' : {'type': 'ip'}, 'ipv6' : {'type': 'ip_range'} } }, + 'name' : { 'type': 'keyword' }, + 'alternatenames' : { 'type': 'keyword' }, + 'sources' : { 'type': 'keyword' }, + 'prefixes' : { + 'properties': { + 'ipv4' : { 'type': 'ip' }, + 'ipv6' : { 'type': 'ip_range' } + } + }, 'url' : { 'type': 'keyword' }, 'region' : { 'type': 'keyword' }, 'country' : { 'type': 'keyword' }, diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py index d89a8fe..cd739bf 100644 --- a/ingestors/ingest_masscan.py +++ b/ingestors/ingest_masscan.py @@ -94,7 +94,7 @@ async def process_data(input_path: str): # Process each port in the record for port_info in record['ports']: struct = { - 'ip' : record['ip'], + 'ip' : record['ip'], 'port' : port_info['port'], 'proto' : port_info['proto'], 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))) diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py index e4372df..b3e0aae 100644 --- a/ingestors/ingest_massdns.py +++ b/ingestors/ingest_massdns.py @@ -104,7 +104,7 @@ async def process_data(input_path: str): '_index' : default_index, 'doc' : { 'ip' : ip, - 'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address) + 'record' : [record], 'seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) }, 'doc_as_upsert' : True # Create the document if it does not exist