From fe49255f69049b7e55330411806795f2960a38c4 Mon Sep 17 00:00:00 2001 From: acidvegas Date: Fri, 15 Mar 2024 01:25:09 -0400 Subject: [PATCH] Added elastic common schema (ecs) logging to file for ingesting eris logs straight into ES --- README.md | 12 ++++--- eris.py | 102 ++++++++++++++++++++++++++++-------------------------- 2 files changed, 61 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index d52693b..df5e0e9 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ The is a suite of tools to aid in the ingestion of recon data from various sourc ## Prerequisites - [python](https://www.python.org/) - [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)* + - [ecs_logging](https://pypi.org/project/ecs-logging) *(`pip install ecs-logging`)* - [aiofiles](https://pypi.org/project/aiofiles) *(`pip install aiofiles`)* - [aiohttp](https://pypi.org/projects/aiohttp) *(`pip install aiohttp`)* - [websockets](https://pypi.org/project/websockets/) *(`pip install websockets`) (only required for `--certs` ingestion)* @@ -18,10 +19,13 @@ python eris.py [options] ### Options ###### General arguments -| Argument | Description | -|--------------|-----------------------------------------------| -| `input_path` | Path to the input file or directory | -| `--watch` | Create or watch a FIFO for real-time indexing | +| Argument | Description | +|--------------|------------------------------------------------------------------| +| `input_path` | Path to the input file or directory | +| `--watch` | Create or watch a FIFO for real-time indexing | +| `--log` | Logging level for file *(debug, info, warning, error, critical)* | + +**Note:** File logging is disabled by default. When enabled, it will log using the [Elastic Common Schema](https://www.elastic.co/guide/en/ecs-logging/python/current/intro.html) *(ECS)*. ###### Elasticsearch arguments | Argument | Description | Default | diff --git a/eris.py b/eris.py index 70f8e59..1d86adc 100644 --- a/eris.py +++ b/eris.py @@ -19,6 +19,11 @@ try: except ImportError: raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') +try: + from ecs_logging import StdlibFormatter +except ImportError: + raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)') + class ElasticIndexer: def __init__(self, args: argparse.Namespace): @@ -34,13 +39,15 @@ class ElasticIndexer: # Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005) es_config = { - #'hosts' : [f'{args.host}:{args.port}'], - 'hosts' : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing - 'verify_certs' : args.self_signed, - 'ssl_show_warn' : args.self_signed, - 'request_timeout' : args.timeout, - 'max_retries' : args.retries, - 'retry_on_timeout' : True + #'hosts' : [f'{args.host}:{args.port}'], + 'hosts' : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing + 'verify_certs' : args.self_signed, + 'ssl_show_warn' : args.self_signed, + 'request_timeout' : args.timeout, + 'max_retries' : args.retries, + 'retry_on_timeout' : True, + 'http_compress' : True, + 'connections_per_node' : 3 # Experiment with this value #'sniff_on_start': True, #'sniff_on_node_failure': True, #'min_delay_between_sniffing': 60 @@ -91,23 +98,6 @@ class ElasticIndexer: raise Exception(f'Failed to create index. ({response})') - async def get_cluster_health(self) -> dict: - '''Get the health of the Elasticsearch cluster.''' - - - - return await self.es.cluster.health() - - - async def get_cluster_size(self) -> int: - '''Get the number of nodes in the Elasticsearch cluster.''' - - cluster_stats = await self.es.cluster.stats() - number_of_nodes = cluster_stats['nodes']['count']['total'] - - return number_of_nodes - - async def process_data(self, file_path: str, data_generator: callable): ''' Index records in chunks to Elasticsearch. @@ -141,33 +131,38 @@ class ElasticIndexer: raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})') -def setup_logger(level: int = logging.INFO, to_file: bool = False, max_bytes: int = 250000, backups: int = 7) -> logging.Logger: +def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5): ''' - Setup a custom logger with options for console and file logging. + Setup the global logger for the application. - :param level: Logging level. - :param to_file: Whether to log to a file. - :param max_bytes: Maximum size in bytes before rotating log file. - :param backups: Number of backup files to keep. + :param console_level: Minimum level to capture logs to the console. + :param file_level: Minimum level to capture logs to the file. + :param log_file: File to write logs to. + :param max_file_size: Maximum size of the log file before it is rotated. + :param backups: Number of backup log files to keep. ''' + # Configure the root logger logger = logging.getLogger() - logger.setLevel(level) + logger.setLevel(logging.DEBUG) # Minimum level to capture all logs - logger.handlers.clear() + # Clear existing handlers + logger.handlers = [] - formatter_console = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S') - formatter_file = logging.Formatter('%(asctime)s | %(levelname)9s | %(filename)s.%(funcName)s | %(message)s', '%Y-%m-%d %I:%M:%S') + # Setup console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(console_level) + console_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S') + console_handler.setFormatter(console_formatter) + logger.addHandler(console_handler) - sh = logging.StreamHandler() - sh.setFormatter(formatter_console) - logger.addHandler(sh) - - if to_file: - os.makedirs('logs', exist_ok=True) - fh = logging.handlers.RotatingFileHandler('logs/debug.log', maxBytes=max_bytes, backupCount=backups, encoding='utf-8') - fh.setFormatter(formatter_file) - logger.addHandler(fh) + # Setup rotating file handler if file logging is enabled + if file_level is not None: + file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups) + file_handler.setLevel(file_level) + ecs_formatter = StdlibFormatter() # ECS (Elastic Common Schema) formatter + file_handler.setFormatter(ecs_formatter) + logger.addHandler(file_handler) async def main(): @@ -178,6 +173,7 @@ async def main(): # General arguments parser.add_argument('input_path', help='Path to the input file or directory') # Required parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') + parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)') # Elasticsearch arguments parser.add_argument('--host', default='http://localhost', help='Elasticsearch host') @@ -206,6 +202,12 @@ async def main(): parser.add_argument('--massdns', action='store_true', help='Index Massdns records') parser.add_argument('--zone', action='store_true', help='Index Zone records') + if args.log: + levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL} + setup_logger(file_level=levels[args.log], log_file='eris.log') + else: + setup_logger() + args = parser.parse_args() if args.host.endswith('/'): @@ -219,6 +221,8 @@ async def main(): elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') + logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}') + edx = ElasticIndexer(args) if args.certstream: @@ -237,10 +241,7 @@ async def main(): health = await edx.es.cluster.health() logging.info(health) - await asyncio.sleep(5) # Delay to allow time for sniffing to complete - - nodes = await edx.get_cluster_size() - logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') + #await asyncio.sleep(5) # Delay to allow time for sniffing to complete (Sniffer temporarily disabled) if not edx.es_index: edx.es_index = ingestor.default_index @@ -252,6 +253,10 @@ async def main(): logging.info(f'Processing file: {args.input_path}') await edx.process_data(args.input_path, ingestor.process_data) + elif stat.S_ISFIFO(os.stat(args.input_path).st_mode): + logging.info(f'Watching FIFO: {args.input_path}') + await edx.process_data(args.input_path, ingestor.process_data) + elif os.path.isdir(args.input_path): count = 1 total = len(os.listdir(args.input_path)) @@ -268,10 +273,9 @@ async def main(): if __name__ == '__main__': - setup_logger(to_file=True) print('') print('┏┓┳┓┳┏┓ Elasticsearch Recon Ingestion Scripts') print('┣ ┣┫┃┗┓ Developed by Acidvegas in Python') print('┗┛┛┗┻┗┛ https://git.acid.vegas/eris') print('') - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())