#!/usr/bin/env python3 # Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv) # apv.py import gzip import json import logging import logging.handlers import os import socket class LogColors: '''ANSI color codes for log messages.''' RESET = '\033[0m' DATE = '\033[90m' # Dark Grey DEBUG = '\033[96m' # Cyan INFO = '\033[92m' # Green WARNING = '\033[93m' # Yellow ERROR = '\033[91m' # Red CRITICAL = '\033[97m\033[41m' # White on Red FATAL = '\033[97m\033[41m' # Same as CRITICAL NOTSET = '\033[97m' # White text SEPARATOR = '\033[90m' # Dark Grey MODULE = '\033[95m' # Pink FUNCTION = '\033[94m' # Blue LINE = '\033[33m' # Orange class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler): '''RotatingFileHandler that compresses old log files using gzip.''' def doRollover(self): '''Compress old log files using gzip.''' super().doRollover() if self.backupCount > 0: for i in range(self.backupCount, 0, -1): sfn = f'{self.baseFilename}.{i}' if os.path.exists(sfn): with open(sfn, 'rb') as f_in: with gzip.open(f'{sfn}.gz', 'wb') as f_out: f_out.writelines(f_in) os.remove(sfn) class LoggerSetup: def __init__(self, level='INFO', date_format='%Y-%m-%d %H:%M:%S', log_to_disk=False, max_log_size=10*1024*1024, max_backups=7, log_file_name='app', json_log=False, ecs_log=False, show_details=False, compress_backups=False, enable_graylog=False, graylog_host=None, graylog_port=None, enable_cloudwatch=False, cloudwatch_group_name=None, cloudwatch_stream_name=None): ''' Initialize the LoggerSetup with provided parameters. :param level: The logging level (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'). :param date_format: The date format for log messages. :param log_to_disk: Whether to log to disk. :param max_log_size: The maximum size of log files before rotation. :param max_backups: The maximum number of backup log files to keep. :param log_file_name: The base name of the log file. :param json_log: Whether to log in JSON format. :param show_details: Whether to show detailed log messages. :param compress_backups: Whether to compress old log files using gzip. :param enable_graylog: Whether to enable Graylog logging. :param graylog_host: The Graylog host. :param graylog_port: The Graylog port. :param enable_cloudwatch: Whether to enable CloudWatch logging. :param cloudwatch_group_name: The CloudWatch log group name. :param cloudwatch_stream_name: The CloudWatch log stream name. ''' self.level = level self.date_format = date_format self.log_to_disk = log_to_disk self.max_log_size = max_log_size self.max_backups = max_backups self.log_file_name = log_file_name self.json_log = json_log self.ecs_log = ecs_log self.show_details = show_details self.compress_backups = compress_backups self.enable_graylog = enable_graylog self.graylog_host = graylog_host self.graylog_port = graylog_port self.enable_cloudwatch = enable_cloudwatch self.cloudwatch_group_name = cloudwatch_group_name self.cloudwatch_stream_name = cloudwatch_stream_name def setup(self): '''Set up logging with various handlers and options.''' # Clear existing handlers logging.getLogger().handlers.clear() logging.getLogger().setLevel(logging.DEBUG) # Capture all logs at the root level # Convert the level string to a logging level object level_num = getattr(logging, self.level.upper(), logging.INFO) self.setup_console_handler(level_num) if self.log_to_disk: self.setup_file_handler(level_num) if self.enable_graylog: self.setup_graylog_handler(level_num) if self.enable_cloudwatch: self.setup_cloudwatch_handler(level_num) def setup_console_handler(self, level_num: int): ''' Set up the console handler with colored output. :param level_num: The logging level number. ''' # Define the colored formatter class ColoredFormatter(logging.Formatter): def __init__(self, datefmt=None, show_details=False): super().__init__(datefmt=datefmt) self.show_details = show_details self.LEVEL_COLORS = { 'NOTSET' : LogColors.NOTSET, 'DEBUG' : LogColors.DEBUG, 'INFO' : LogColors.INFO, 'WARNING' : LogColors.WARNING, 'ERROR' : LogColors.ERROR, 'CRITICAL' : LogColors.CRITICAL, 'FATAL' : LogColors.FATAL } def format(self, record): log_level = record.levelname message = record.getMessage() asctime = self.formatTime(record, self.datefmt) color = self.LEVEL_COLORS.get(log_level, LogColors.RESET) separator = f'{LogColors.SEPARATOR} ┃ {LogColors.RESET}' if self.show_details: module = record.module line_no = record.lineno func_name = record.funcName formatted = ( f'{LogColors.DATE}{asctime}{LogColors.RESET}' f'{separator}' f'{color}{log_level:<8}{LogColors.RESET}' f'{separator}' f'{LogColors.MODULE}{module}{LogColors.RESET}' f'{separator}' f'{LogColors.FUNCTION}{func_name}{LogColors.RESET}' f'{separator}' f'{LogColors.LINE}{line_no}{LogColors.RESET}' f'{separator}' f'{message}' ) else: formatted = ( f'{LogColors.DATE}{asctime}{LogColors.RESET}' f'{separator}' f'{color}{log_level:<8}{LogColors.RESET}' f'{separator}' f'{message}' ) return formatted # Create console handler with colored output console_handler = logging.StreamHandler() console_handler.setLevel(level_num) console_formatter = ColoredFormatter(datefmt=self.date_format, show_details=self.show_details) console_handler.setFormatter(console_formatter) logging.getLogger().addHandler(console_handler) def setup_file_handler(self, level_num: int): ''' Set up the file handler for logging to disk. :param level_num: The logging level number. ''' # Create 'logs' directory if it doesn't exist logs_dir = os.path.join(os.getcwd(), 'logs') os.makedirs(logs_dir, exist_ok=True) # Use the specified log file name and set extension based on json_log file_extension = '.json' if self.json_log else '.log' log_file_path = os.path.join(logs_dir, f'{self.log_file_name}{file_extension}') # Create the rotating file handler if self.compress_backups: file_handler = GZipRotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups) else: file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups) file_handler.setLevel(level_num) if self.ecs_log: try: import ecs_logging except ImportError: raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.") file_formatter = ecs_logging.StdlibFormatter() elif self.json_log: # Create the JSON formatter class JsonFormatter(logging.Formatter): def format(self, record): log_record = { 'time' : self.formatTime(record, self.datefmt), 'level' : record.levelname, 'module' : record.module, 'function' : record.funcName, 'line' : record.lineno, 'message' : record.getMessage(), 'name' : record.name, 'filename' : record.filename, 'threadName' : record.threadName, 'processName' : record.processName, } return json.dumps(log_record) file_formatter = JsonFormatter(datefmt=self.date_format) else: file_formatter = logging.Formatter(fmt='%(asctime)s ┃ %(levelname)-8s ┃ %(module)s ┃ %(funcName)s ┃ %(lineno)d ┃ %(message)s', datefmt=self.date_format) file_handler.setFormatter(file_formatter) logging.getLogger().addHandler(file_handler) def setup_graylog_handler(self, level_num: int): ''' Set up the Graylog handler. :param level_num: The logging level number. ''' graylog_host = self.graylog_host graylog_port = self.graylog_port if graylog_host is None or graylog_port is None: logging.error('Graylog host and port must be specified for Graylog handler.') return class GraylogHandler(logging.Handler): def __init__(self, graylog_host, graylog_port): super().__init__() self.graylog_host = graylog_host self.graylog_port = graylog_port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Mapping from Python logging levels to Graylog (syslog) levels self.level_mapping = { logging.CRITICAL : 2, # Critical logging.ERROR : 3, # Error logging.WARNING : 4, # Warning logging.INFO : 6, # Informational logging.DEBUG : 7, # Debug logging.NOTSET : 7 # Default to Debug } def emit(self, record): try: log_entry = self.format(record) graylog_level = self.level_mapping.get(record.levelno, 7) # Default to Debug gelf_message = { 'version' : '1.1', 'host' : socket.gethostname(), 'short_message' : record.getMessage(), 'full_message' : log_entry, 'timestamp' : record.created, 'level' : graylog_level, '_logger_name' : record.name, '_file' : record.pathname, '_line' : record.lineno, '_function' : record.funcName, '_module' : record.module, } gelf_json = json.dumps(gelf_message).encode('utf-8') self.sock.sendto(gelf_json, (self.graylog_host, self.graylog_port)) except Exception: self.handleError(record) graylog_handler = GraylogHandler(graylog_host, graylog_port) graylog_handler.setLevel(level_num) graylog_formatter = logging.Formatter(fmt='%(message)s') graylog_handler.setFormatter(graylog_formatter) logging.getLogger().addHandler(graylog_handler) def setup_cloudwatch_handler(self, level_num: int): ''' Set up the CloudWatch handler. :param level_num: The logging level number. ''' try: import boto3 from botocore.exceptions import ClientError except ImportError: raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)') log_group_name = self.cloudwatch_group_name log_stream_name = self.cloudwatch_stream_name if not log_group_name or not log_stream_name: logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.') return class CloudWatchHandler(logging.Handler): def __init__(self, log_group_name, log_stream_name): super().__init__() self.log_group_name = log_group_name self.log_stream_name = log_stream_name self.client = boto3.client('logs') # Create log group if it doesn't exist try: self.client.create_log_group(logGroupName=self.log_group_name) except ClientError as e: if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': raise e # Create log stream if it doesn't exist try: self.client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name) except ClientError as e: if e.response['Error']['Code'] != 'ResourceAlreadyExistsException': raise e def _get_sequence_token(self): try: response = self.client.describe_log_streams( logGroupName=self.log_group_name, logStreamNamePrefix=self.log_stream_name, limit=1 ) log_streams = response.get('logStreams', []) if log_streams: return log_streams[0].get('uploadSequenceToken') else: return None except Exception: return None def emit(self, record): try: log_entry = self.format(record) timestamp = int(record.created * 1000) event = { 'timestamp': timestamp, 'message': log_entry } sequence_token = self._get_sequence_token() kwargs = { 'logGroupName': self.log_group_name, 'logStreamName': self.log_stream_name, 'logEvents': [event] } if sequence_token: kwargs['sequenceToken'] = sequence_token self.client.put_log_events(**kwargs) except Exception: self.handleError(record) cloudwatch_handler = CloudWatchHandler(log_group_name, log_stream_name) cloudwatch_handler.setLevel(level_num) # Log as JSON class JsonFormatter(logging.Formatter): def format(self, record): log_record = { 'time' : self.formatTime(record, self.datefmt), 'level' : record.levelname, 'module' : record.module, 'function' : record.funcName, 'line' : record.lineno, 'message' : record.getMessage(), 'name' : record.name, 'filename' : record.filename, 'threadName' : record.threadName, 'processName' : record.processName, } return json.dumps(log_record) cloudwatch_formatter = JsonFormatter(datefmt=self.date_format) cloudwatch_handler.setFormatter(cloudwatch_formatter) logging.getLogger().addHandler(cloudwatch_handler) def setup_logging(**kwargs): '''Set up logging with various handlers and options.''' logger_setup = LoggerSetup(**kwargs) logger_setup.setup()