Modularized each logging functionality into its own pluigin

This commit is contained in:
Dionysus 2024-11-26 22:15:05 -05:00
parent e32f49f9b1
commit 0ff3713131
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
8 changed files with 343 additions and 292 deletions

View File

@ -1,4 +1,4 @@
from .apv import *
__version__ = '1.0.2'
__version__ = '1.0.3'
__author__ = 'acidvegas'

View File

@ -2,48 +2,8 @@
# Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv)
# apv.py
import gzip
import json
import logging
import logging.handlers
import os
import socket
class LogColors:
'''ANSI color codes for log messages.'''
RESET = '\033[0m'
DATE = '\033[90m' # Dark Grey
DEBUG = '\033[96m' # Cyan
INFO = '\033[92m' # Green
WARNING = '\033[93m' # Yellow
ERROR = '\033[91m' # Red
CRITICAL = '\033[97m\033[41m' # White on Red
FATAL = '\033[97m\033[41m' # Same as CRITICAL
NOTSET = '\033[97m' # White text
SEPARATOR = '\033[90m' # Dark Grey
MODULE = '\033[95m' # Pink
FUNCTION = '\033[94m' # Blue
LINE = '\033[33m' # Orange
class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler):
'''RotatingFileHandler that compresses old log files using gzip.'''
def doRollover(self):
'''Compress old log files using gzip.'''
super().doRollover()
if self.backupCount > 0:
for i in range(self.backupCount, 0, -1):
sfn = f'{self.baseFilename}.{i}'
if os.path.exists(sfn):
with open(sfn, 'rb') as f_in:
with gzip.open(f'{sfn}.gz', 'wb') as f_out:
f_out.writelines(f_in)
os.remove(sfn)
class LoggerSetup:
def __init__(self, level='INFO', date_format='%Y-%m-%d %H:%M:%S',
@ -113,119 +73,31 @@ class LoggerSetup:
def setup_console_handler(self, level_num: int):
'''
Set up the console handler with colored output.
:param level_num: The logging level number.
'''
# Define the colored formatter
class ColoredFormatter(logging.Formatter):
def __init__(self, datefmt=None, show_details=False):
super().__init__(datefmt=datefmt)
self.show_details = show_details
self.LEVEL_COLORS = {
'NOTSET' : LogColors.NOTSET,
'DEBUG' : LogColors.DEBUG,
'INFO' : LogColors.INFO,
'WARNING' : LogColors.WARNING,
'ERROR' : LogColors.ERROR,
'CRITICAL' : LogColors.CRITICAL,
'FATAL' : LogColors.FATAL
}
def format(self, record):
log_level = record.levelname
message = record.getMessage()
asctime = self.formatTime(record, self.datefmt)
color = self.LEVEL_COLORS.get(log_level, LogColors.RESET)
separator = f'{LogColors.SEPARATOR}{LogColors.RESET}'
if self.show_details:
module = record.module
line_no = record.lineno
func_name = record.funcName
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{LogColors.MODULE}{module}{LogColors.RESET}'
f'{separator}'
f'{LogColors.FUNCTION}{func_name}{LogColors.RESET}'
f'{separator}'
f'{LogColors.LINE}{line_no}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
else:
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
return formatted
# Create console handler with colored output
console_handler = logging.StreamHandler()
console_handler.setLevel(level_num)
console_formatter = ColoredFormatter(datefmt=self.date_format, show_details=self.show_details)
console_handler.setFormatter(console_formatter)
logging.getLogger().addHandler(console_handler)
'''Set up the console handler.'''
try:
from apv.plugins.console import setup_console_handler
setup_console_handler(level_num, self.date_format, self.show_details)
except ImportError:
logging.error('Failed to import console handler')
def setup_file_handler(self, level_num: int):
'''
Set up the file handler for logging to disk.
:param level_num: The logging level number.
'''
# Create 'logs' directory if it doesn't exist
logs_dir = os.path.join(os.getcwd(), 'logs')
os.makedirs(logs_dir, exist_ok=True)
# Use the specified log file name and set extension based on json_log
file_extension = '.json' if self.json_log else '.log'
log_file_path = os.path.join(logs_dir, f'{self.log_file_name}{file_extension}')
# Create the rotating file handler
if self.compress_backups:
file_handler = GZipRotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups)
else:
file_handler = logging.handlers.RotatingFileHandler(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups)
file_handler.setLevel(level_num)
if self.ecs_log:
try:
import ecs_logging
except ImportError:
raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.")
file_formatter = ecs_logging.StdlibFormatter()
elif self.json_log:
# Create the JSON formatter
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'time' : self.formatTime(record, self.datefmt),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
file_formatter = JsonFormatter(datefmt=self.date_format)
else:
file_formatter = logging.Formatter(fmt='%(asctime)s%(levelname)-8s%(module)s%(funcName)s%(lineno)d%(message)s', datefmt=self.date_format)
file_handler.setFormatter(file_formatter)
logging.getLogger().addHandler(file_handler)
'''Set up the file handler.'''
try:
from apv.plugins.file import setup_file_handler
setup_file_handler(
level_num=level_num,
log_to_disk=self.log_to_disk,
max_log_size=self.max_log_size,
max_backups=self.max_backups,
log_file_name=self.log_file_name,
json_log=self.json_log,
ecs_log=self.ecs_log,
date_format=self.date_format,
compress_backups=self.compress_backups
)
except ImportError:
logging.error('Failed to import file handler')
def setup_graylog_handler(self, level_num: int):
@ -235,57 +107,11 @@ class LoggerSetup:
:param level_num: The logging level number.
'''
graylog_host = self.graylog_host
graylog_port = self.graylog_port
if graylog_host is None or graylog_port is None:
logging.error('Graylog host and port must be specified for Graylog handler.')
return
class GraylogHandler(logging.Handler):
def __init__(self, graylog_host, graylog_port):
super().__init__()
self.graylog_host = graylog_host
self.graylog_port = graylog_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Mapping from Python logging levels to Graylog (syslog) levels
self.level_mapping = {
logging.CRITICAL : 2, # Critical
logging.ERROR : 3, # Error
logging.WARNING : 4, # Warning
logging.INFO : 6, # Informational
logging.DEBUG : 7, # Debug
logging.NOTSET : 7 # Default to Debug
}
def emit(self, record):
try:
log_entry = self.format(record)
graylog_level = self.level_mapping.get(record.levelno, 7) # Default to Debug
gelf_message = {
'version' : '1.1',
'host' : socket.gethostname(),
'short_message' : record.getMessage(),
'full_message' : log_entry,
'timestamp' : record.created,
'level' : graylog_level,
'_logger_name' : record.name,
'_file' : record.pathname,
'_line' : record.lineno,
'_function' : record.funcName,
'_module' : record.module,
}
gelf_json = json.dumps(gelf_message).encode('utf-8')
self.sock.sendto(gelf_json, (self.graylog_host, self.graylog_port))
except Exception:
self.handleError(record)
graylog_handler = GraylogHandler(graylog_host, graylog_port)
graylog_handler.setLevel(level_num)
graylog_formatter = logging.Formatter(fmt='%(message)s')
graylog_handler.setFormatter(graylog_formatter)
logging.getLogger().addHandler(graylog_handler)
try:
from apv.plugins.graylog import setup_graylog_handler
setup_graylog_handler(level_num, self.graylog_host, self.graylog_port)
except ImportError:
logging.error('Failed to import Graylog handler')
def setup_cloudwatch_handler(self, level_num: int):
@ -296,96 +122,15 @@ class LoggerSetup:
'''
try:
import boto3
from botocore.exceptions import ClientError
from apv.plugins.cloudwatch import setup_cloudwatch_handler
setup_cloudwatch_handler(
level_num,
self.cloudwatch_group_name,
self.cloudwatch_stream_name,
self.date_format
)
except ImportError:
raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)')
log_group_name = self.cloudwatch_group_name
log_stream_name = self.cloudwatch_stream_name
if not log_group_name or not log_stream_name:
logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.')
return
class CloudWatchHandler(logging.Handler):
def __init__(self, log_group_name, log_stream_name):
super().__init__()
self.log_group_name = log_group_name
self.log_stream_name = log_stream_name
self.client = boto3.client('logs')
# Create log group if it doesn't exist
try:
self.client.create_log_group(logGroupName=self.log_group_name)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
# Create log stream if it doesn't exist
try:
self.client.create_log_stream(logGroupName=self.log_group_name, logStreamName=self.log_stream_name)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
def _get_sequence_token(self):
try:
response = self.client.describe_log_streams(
logGroupName=self.log_group_name,
logStreamNamePrefix=self.log_stream_name,
limit=1
)
log_streams = response.get('logStreams', [])
if log_streams:
return log_streams[0].get('uploadSequenceToken')
else:
return None
except Exception:
return None
def emit(self, record):
try:
log_entry = self.format(record)
timestamp = int(record.created * 1000)
event = {
'timestamp': timestamp,
'message': log_entry
}
sequence_token = self._get_sequence_token()
kwargs = {
'logGroupName': self.log_group_name,
'logStreamName': self.log_stream_name,
'logEvents': [event]
}
if sequence_token:
kwargs['sequenceToken'] = sequence_token
self.client.put_log_events(**kwargs)
except Exception:
self.handleError(record)
cloudwatch_handler = CloudWatchHandler(log_group_name, log_stream_name)
cloudwatch_handler.setLevel(level_num)
# Log as JSON
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'time' : self.formatTime(record, self.datefmt),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
cloudwatch_formatter = JsonFormatter(datefmt=self.date_format)
cloudwatch_handler.setFormatter(cloudwatch_formatter)
logging.getLogger().addHandler(cloudwatch_handler)
logging.error('Failed to import CloudWatch handler')

1
apv/plugins/__init__.py Normal file
View File

@ -0,0 +1 @@
# Empty file to make plugins a package

100
apv/plugins/cloudwatch.py Normal file
View File

@ -0,0 +1,100 @@
import logging
import json
import boto3
from botocore.exceptions import ClientError
class CloudWatchHandler(logging.Handler):
def __init__(self, group_name, stream_name):
super().__init__()
self.group_name = group_name
self.stream_name = stream_name
self.client = boto3.client('logs')
self._initialize_log_group_and_stream()
def _initialize_log_group_and_stream(self):
# Create log group if it doesn't exist
try:
self.client.create_log_group(logGroupName=self.group_name)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
# Create log stream if it doesn't exist
try:
self.client.create_log_stream(
logGroupName=self.group_name,
logStreamName=self.stream_name
)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
def _get_sequence_token(self):
try:
response = self.client.describe_log_streams(
logGroupName=self.group_name,
logStreamNamePrefix=self.stream_name,
limit=1
)
log_streams = response.get('logStreams', [])
return log_streams[0].get('uploadSequenceToken') if log_streams else None
except Exception:
return None
def emit(self, record):
try:
log_entry = self.format(record)
timestamp = int(record.created * 1000)
event = {
'timestamp': timestamp,
'message': log_entry
}
kwargs = {
'logGroupName': self.group_name,
'logStreamName': self.stream_name,
'logEvents': [event]
}
sequence_token = self._get_sequence_token()
if sequence_token:
kwargs['sequenceToken'] = sequence_token
self.client.put_log_events(**kwargs)
except Exception:
self.handleError(record)
def setup_cloudwatch_handler(level_num: int, group_name: str, stream_name: str, date_format: str):
'''Set up the CloudWatch handler.'''
try:
import boto3
except ImportError:
raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)')
if not group_name or not stream_name:
logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.')
return
cloudwatch_handler = CloudWatchHandler(group_name, stream_name)
cloudwatch_handler.setLevel(level_num)
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'time' : self.formatTime(record, date_format),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
cloudwatch_formatter = JsonFormatter(datefmt=date_format)
cloudwatch_handler.setFormatter(cloudwatch_formatter)
logging.getLogger().addHandler(cloudwatch_handler)

70
apv/plugins/console.py Normal file
View File

@ -0,0 +1,70 @@
import logging
class LogColors:
'''ANSI color codes for log messages.'''
RESET = '\033[0m'
DATE = '\033[90m' # Dark Grey
DEBUG = '\033[96m' # Cyan
INFO = '\033[92m' # Green
WARNING = '\033[93m' # Yellow
ERROR = '\033[91m' # Red
CRITICAL = '\033[97m\033[41m' # White on Red
FATAL = '\033[97m\033[41m' # Same as CRITICAL
NOTSET = '\033[97m' # White text
SEPARATOR = '\033[90m' # Dark Grey
MODULE = '\033[95m' # Pink
FUNCTION = '\033[94m' # Blue
LINE = '\033[33m' # Orange
class ColoredFormatter(logging.Formatter):
def __init__(self, datefmt=None, show_details=False):
super().__init__(datefmt=datefmt)
self.show_details = show_details
self.LEVEL_COLORS = {
'NOTSET' : LogColors.NOTSET,
'DEBUG' : LogColors.DEBUG,
'INFO' : LogColors.INFO,
'WARNING' : LogColors.WARNING,
'ERROR' : LogColors.ERROR,
'CRITICAL' : LogColors.CRITICAL,
'FATAL' : LogColors.FATAL
}
def format(self, record):
log_level = record.levelname
message = record.getMessage()
asctime = self.formatTime(record, self.datefmt)
color = self.LEVEL_COLORS.get(log_level, LogColors.RESET)
separator = f'{LogColors.SEPARATOR}{LogColors.RESET}'
if self.show_details:
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{LogColors.MODULE}{record.module}{LogColors.RESET}'
f'{separator}'
f'{LogColors.FUNCTION}{record.funcName}{LogColors.RESET}'
f'{separator}'
f'{LogColors.LINE}{record.lineno}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
else:
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
return formatted
def setup_console_handler(level_num: int, date_format: str, show_details: bool):
'''Set up the console handler with colored output.'''
console_handler = logging.StreamHandler()
console_handler.setLevel(level_num)
console_formatter = ColoredFormatter(datefmt=date_format, show_details=show_details)
console_handler.setFormatter(console_formatter)
logging.getLogger().addHandler(console_handler)

77
apv/plugins/file.py Normal file
View File

@ -0,0 +1,77 @@
import logging
import logging.handlers
import json
import os
import gzip
class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler):
'''RotatingFileHandler that compresses old log files using gzip.'''
def doRollover(self):
'''Compress old log files using gzip.'''
super().doRollover()
if self.backupCount > 0:
for i in range(self.backupCount, 0, -1):
sfn = f'{self.baseFilename}.{i}'
if os.path.exists(sfn):
with open(sfn, 'rb') as f_in:
with gzip.open(f'{sfn}.gz', 'wb') as f_out:
f_out.writelines(f_in)
os.remove(sfn)
class JsonFormatter(logging.Formatter):
def __init__(self, date_format):
super().__init__()
self.date_format = date_format
def format(self, record):
log_record = {
'time' : self.formatTime(record, self.date_format),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
def setup_file_handler(level_num: int, log_to_disk: bool, max_log_size: int,
max_backups: int, log_file_name: str, json_log: bool,
ecs_log: bool, date_format: str, compress_backups: bool):
'''Set up the file handler for logging to disk.'''
if not log_to_disk:
return
# Create 'logs' directory if it doesn't exist
logs_dir = os.path.join(os.getcwd(), 'logs')
os.makedirs(logs_dir, exist_ok=True)
# Use the specified log file name and set extension based on json_log
file_extension = '.json' if json_log else '.log'
log_file_path = os.path.join(logs_dir, f'{log_file_name}{file_extension}')
# Create the rotating file handler
handler_class = GZipRotatingFileHandler if compress_backups else logging.handlers.RotatingFileHandler
file_handler = handler_class(log_file_path, maxBytes=max_log_size, backupCount=max_backups)
file_handler.setLevel(level_num)
if ecs_log:
try:
import ecs_logging
except ImportError:
raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.")
file_formatter = ecs_logging.StdlibFormatter()
elif json_log:
file_formatter = JsonFormatter(date_format)
else:
file_formatter = logging.Formatter(
fmt='%(asctime)s%(levelname)-8s%(module)s%(funcName)s%(lineno)d%(message)s',
datefmt=date_format
)
file_handler.setFormatter(file_formatter)
logging.getLogger().addHandler(file_handler)

58
apv/plugins/graylog.py Normal file
View File

@ -0,0 +1,58 @@
import logging
import json
import socket
import zlib
class GraylogHandler(logging.Handler):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Mapping from Python logging levels to Graylog (syslog) levels
self.level_mapping = {
logging.CRITICAL : 2, # Critical
logging.ERROR : 3, # Error
logging.WARNING : 4, # Warning
logging.INFO : 6, # Informational
logging.DEBUG : 7, # Debug
logging.NOTSET : 7 # Default to Debug
}
def emit(self, record):
try:
log_entry = self.format(record)
graylog_level = self.level_mapping.get(record.levelno, 7)
gelf_message = {
'version' : '1.1',
'host' : socket.gethostname(),
'short_message' : record.getMessage(),
'full_message' : log_entry,
'timestamp' : record.created,
'level' : graylog_level,
'_logger_name' : record.name,
'_file' : record.pathname,
'_line' : record.lineno,
'_function' : record.funcName,
'_module' : record.module,
}
message = json.dumps(gelf_message).encode('utf-8')
compressed = zlib.compress(message)
self.sock.sendto(compressed, (self.host, self.port))
except Exception:
self.handleError(record)
def setup_graylog_handler(level_num: int, graylog_host: str, graylog_port: int):
'''Set up the Graylog handler.'''
if graylog_host is None or graylog_port is None:
logging.error('Graylog host and port must be specified for Graylog handler.')
return
graylog_handler = GraylogHandler(graylog_host, graylog_port)
graylog_handler.setLevel(level_num)
graylog_formatter = logging.Formatter(fmt='%(message)s')
graylog_handler.setFormatter(graylog_formatter)
logging.getLogger().addHandler(graylog_handler)

View File

@ -9,7 +9,7 @@ with open('README.md', 'r', encoding='utf-8') as fh:
setup(
name='apv',
version='1.0.2',
version='1.0.3',
description='Advanced Python Logging',
author='acidvegas',
author_email='acid.vegas@acid.vegas',