over simplified

This commit is contained in:
Dionysus 2025-05-27 12:04:43 -04:00
parent ccd9642784
commit 69efe86fb9
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
10 changed files with 262 additions and 575 deletions

View File

@ -14,12 +14,10 @@
- [Console Logging with Details](#console-logging-with-details)
- [File Logging with Rotation](#file-logging-with-rotation)
- [File Logging with Compression and JSON Format](#file-logging-with-compression-and-json-format)
- [Graylog Integration](#graylog-integration)
- [AWS CloudWatch Integration](#aws-cloudwatch-integration)
- [Mixing it all together](#mixing-it-all-together)
## Introduction
APV emerged from a simple observation: despite the abundance of logging solutions, there's a glaring lack of standardization in application logging. As a developer deeply entrenched in Elasticsearch, AWS, and Graylog ecosystems, I found myself repeatedly grappling with inconsistent log formats and cumbersome integrations. APV is my response to this challenge a logging library that doesn't aim to revolutionize the field, but rather to streamline it.
APV emerged from a simple observation: despite the abundance of logging solutions, there's a glaring lack of standardization in application logging. APV is my response to this challenge a logging library that doesn't aim to revolutionize the field, but rather to streamline it.
## Requirements
- Python 3.10+
@ -28,17 +26,7 @@ APV emerged from a simple observation: despite the abundance of logging solution
### From PyPI
```bash
# Basic installation
pip install apv
# With CloudWatch support
pip install apv[cloudwatch]
# With ECS logging support
pip install apv[ecs]
# With all optional dependencies
pip install "apv[cloudwatch,ecs]"
```
### From Source
@ -53,18 +41,14 @@ pip install .
- **File Logging**: Write logs to files with support for log rotation based on size and number of backups.
- **Log Compression**: Automatically compress old log files using gzip to save disk space.
- **JSON Logging**: Output logs in JSON format for better structure and integration with log management systems.
- **ECS Logging**: Output logs in ECS format for better integration with [Elasticsearch](https://www.elastic.co/elasticsearch/)
- **Detailed Log Messages**: Option to include module name, function name, and line number in log messages.
- **Graylog Integration**: Send logs to a [Graylog](https://www.graylog.org/) server using GELF over UDP.
- **AWS CloudWatch Integration**: Send logs to [AWS CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html).
- **Customizable Logging Levels**: Set the logging level to control verbosity.
## Configuration Options
The `setup_logging` function accepts the following keyword arguments to customize logging behavior:
| Name | Default | Description |
|--------------------------|--------------------------|--------------------------------------------------------------------------------------|
|-------------------|--------------------------|-------------------------------------------------------------------------------|
| `level` | `INFO` | The logging level. *(`DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`)* |
| `date_format` | `'%Y-%m-%d %H:%M:%S'` | The date format for log messages. |
| `log_to_disk` | `False` | Whether to log to disk. |
@ -72,15 +56,8 @@ The `setup_logging` function accepts the following keyword arguments to customiz
| `max_backups` | `7` | The maximum number of backup log files to keep. |
| `log_file_name` | `'app'` | The base name of the log file. |
| `json_log` | `False` | Whether to log in JSON format. |
| `ecs_log` | `False` | Whether to log in ECS format. |
| `show_details` | `False` | Whether to include module name, function name, & line number in log messages. |
| `compress_backups`| `False` | Whether to compress old log files using gzip. |
| `enable_graylog` | `False` | Whether to enable logging to a Graylog server. |
| `graylog_host` | `None` | The Graylog server host. *(Required if `enable_graylog` is `True`)* |
| `graylog_port` | `None` | The Graylog server port. *(Required if `enable_graylog` is `True`)* |
| `enable_cloudwatch` | `False` | Whether to enable logging to AWS CloudWatch Logs. |
| `cloudwatch_group_name` | `None` | The name of the CloudWatch log group. *(Required if `enable_cloudwatch` is `True`)* |
| `cloudwatch_stream_name` | `None` | The name of the CloudWatch log stream. *(Required if `enable_cloudwatch` is `True`)* |
## Usage
@ -147,53 +124,6 @@ apv.setup_logging(
logging.debug('This is a debug message in JSON format.')
```
### Graylog Integration
```python
import logging
import apv
# Set up logging to Graylog server
apv.setup_logging(
level='INFO',
enable_graylog=True,
graylog_host='graylog.example.com',
graylog_port=12201
)
logging.info('This message will be sent to Graylog.')
```
### AWS CloudWatch Integration
```python
import logging
import apv
# Set up logging to AWS CloudWatch Logs
apv.setup_logging(
level='INFO',
enable_cloudwatch=True,
cloudwatch_group_name='my_log_group',
cloudwatch_stream_name='my_log_stream'
)
logging.info('This message will be sent to AWS CloudWatch.')
```
### ECS Logging
```python
import logging
import apv
# Set up ECS logging
apv.setup_logging(
level='INFO',
ecs_log=True
)
```
### Mixing it all together
```python
@ -209,12 +139,6 @@ apv.setup_logging(
log_file_name='app',
json_log=True,
compress_backups=True,
enable_graylog=True,
graylog_host='graylog.example.com',
graylog_port=12201,
enable_cloudwatch=True,
cloudwatch_group_name='my_log_group',
cloudwatch_stream_name='my_log_stream',
show_details=True
)
```

View File

@ -1,4 +1,3 @@
from .apv import *
__version__ = '1.0.4'
__author__ = 'acidvegas'
#!/usr/bin/env python3
# Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv)
# apv/__init__.py

View File

@ -2,37 +2,134 @@
# Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv)
# apv.py
import gzip
import json
import logging
import logging.handlers
import os
import socket
import sys
sys.stdout.reconfigure(encoding='utf-8')
class LogColors:
'''ANSI color codes for log messages'''
NOTSET = '\033[97m' # White text
DEBUG = '\033[96m' # Cyan
INFO = '\033[92m' # Green
WARNING = '\033[93m' # Yellow
ERROR = '\033[91m' # Red
CRITICAL = '\033[97m\033[41m' # White on Red
FATAL = '\033[97m\033[41m' # Same as CRITICAL
DATE = '\033[90m' # Dark Grey
MODULE = '\033[95m' # Pink
FUNCTION = '\033[94m' # Blue
LINE = '\033[33m' # Orange
RESET = '\033[0m'
SEPARATOR = '\033[90m' # Dark Grey
class ConsoleFormatter(logging.Formatter):
'''A formatter for the consolethat supports colored output'''
def __init__(self, datefmt: str = None, details: bool = False):
super().__init__(datefmt=datefmt)
self.details = details
def format(self, record: logging.LogRecord) -> str:
'''
Format a log record for the console
:param record: The log record to format
'''
# Get the color for the log level
color = getattr(LogColors, record.levelname, LogColors.RESET)
# Format the log level
log_level = f'{color}{record.levelname:<8}{LogColors.RESET}'
# Get the log message
message = record.getMessage()
# Format the timestamp
asctime = f'{LogColors.DATE}{self.formatTime(record, self.datefmt)}'
# Get the separator
separator = f'{LogColors.SEPARATOR}{LogColors.RESET}'
details = f'{LogColors.MODULE}{record.module}{separator}{LogColors.FUNCTION}{record.funcName}{separator}{LogColors.LINE}{record.lineno}{separator}' if self.details else ''
return f'{asctime}{separator}{log_level}{separator}{details}{message}'
class JsonFormatter(logging.Formatter):
'''Formatter for JSON output'''
def __init__(self, datefmt: str = None):
super().__init__(datefmt=datefmt)
def format(self, record: logging.LogRecord) -> str:
'''
Format a log record for JSON output
:param record: The log record to format
'''
# Create a dictionary to store the log record
log_dict = {
'@timestamp' : self.formatTime(record, self.datefmt),
'level' : record.levelname,
'message' : record.getMessage(),
'process_id' : record.process,
'process_name' : record.processName,
'thread_id' : record.thread,
'thread_name' : record.threadName,
'logger_name' : record.name,
'filename' : record.filename,
'line_number' : record.lineno,
'function' : record.funcName,
'module' : record.module,
'hostname' : socket.gethostname()
}
# Add the exception if it exists
if record.exc_info:
log_dict['exception'] = self.formatException(record.exc_info)
# Add any custom attributes that start with an underscore
custom_attrs = {k: v for k, v in record.__dict__.items() if k.startswith('_') and not k.startswith('__')}
log_dict.update(custom_attrs)
return json.dumps(log_dict)
class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler):
'''RotatingFileHandler that compresses rotated log files'''
def rotation_filename(self, default_name: str) -> str:
return default_name + '.gz'
def rotate(self, source: str, dest: str):
with open(source, 'rb') as src, gzip.open(dest, 'wb') as dst:
dst.write(src.read())
class LoggerSetup:
def __init__(self, level='INFO', date_format='%Y-%m-%d %H:%M:%S',
log_to_disk=False, max_log_size=10*1024*1024,
max_backups=7, log_file_name='app', json_log=False,
ecs_log=False, show_details=False, compress_backups=False,
enable_graylog=False, graylog_host=None, graylog_port=None,
enable_cloudwatch=False, cloudwatch_group_name=None, cloudwatch_stream_name=None):
def __init__(self, level: str = 'INFO', date_format: str = '%Y-%m-%d %H:%M:%S', log_to_disk: bool = False, max_log_size: int = 10*1024*1024, max_backups: int = 7, log_file_name: str = 'app', json_log: bool = False, show_details: bool = False, compress_backups: bool = False):
'''
Initialize the LoggerSetup with provided parameters.
Initialize the LoggerSetup with provided parameters
:param level: The logging level (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL').
:param date_format: The date format for log messages.
:param log_to_disk: Whether to log to disk.
:param max_log_size: The maximum size of log files before rotation.
:param max_backups: The maximum number of backup log files to keep.
:param log_file_name: The base name of the log file.
:param json_log: Whether to log in JSON format.
:param show_details: Whether to show detailed log messages.
:param compress_backups: Whether to compress old log files using gzip.
:param enable_graylog: Whether to enable Graylog logging.
:param graylog_host: The Graylog host.
:param graylog_port: The Graylog port.
:param enable_cloudwatch: Whether to enable CloudWatch logging.
:param cloudwatch_group_name: The CloudWatch log group name.
:param cloudwatch_stream_name: The CloudWatch log stream name.
:param level: The logging level (e.g., 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
:param date_format: The date format for log messages
:param log_to_disk: Whether to log to disk
:param max_log_size: The maximum size of log files before rotation
:param max_backups: The maximum number of backup log files to keep
:param log_file_name: The base name of the log file
:param json_log: Whether to log in JSON format
:param show_details: Whether to show detailed log messages
:param compress_backups: Whether to compress old log files using gzip
'''
self.level = level
@ -42,103 +139,79 @@ class LoggerSetup:
self.max_backups = max_backups
self.log_file_name = log_file_name
self.json_log = json_log
self.ecs_log = ecs_log
self.show_details = show_details
self.compress_backups = compress_backups
self.enable_graylog = enable_graylog
self.graylog_host = graylog_host
self.graylog_port = graylog_port
self.enable_cloudwatch = enable_cloudwatch
self.cloudwatch_group_name = cloudwatch_group_name
self.cloudwatch_stream_name = cloudwatch_stream_name
def setup(self):
'''Set up logging with various handlers and options.'''
'''Set up logging with various handlers and options'''
# Clear existing handlers
logging.getLogger().handlers.clear()
logging.getLogger().setLevel(logging.DEBUG) # Capture all logs at the root level
logging.getLogger().setLevel(logging.DEBUG)
# Convert the level string to a logging level object
level_num = getattr(logging, self.level.upper(), logging.INFO)
# Setup console handler
self.setup_console_handler(level_num)
# Setup file handler if enabled
if self.log_to_disk:
self.setup_file_handler(level_num)
if self.enable_graylog:
self.setup_graylog_handler(level_num)
if self.enable_cloudwatch:
self.setup_cloudwatch_handler(level_num)
def setup_console_handler(self, level_num: int):
'''Set up the console handler.'''
try:
from apv.plugins.console import setup_console_handler
setup_console_handler(level_num, self.date_format, self.show_details)
except ImportError:
logging.error('Failed to import console handler')
'''
Set up the console handler
:param level_num: The logging level number
'''
# Create the console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level_num)
# Create the formatter
formatter = JsonFormatter(datefmt=self.date_format) if self.json_log else ConsoleFormatter(datefmt=self.date_format, details=self.show_details)
console_handler.setFormatter(formatter)
# Add the handler to the root logger
logging.getLogger().addHandler(console_handler)
def setup_file_handler(self, level_num: int):
'''Set up the file handler.'''
try:
from apv.plugins.file import setup_file_handler
setup_file_handler(
level_num=level_num,
log_to_disk=self.log_to_disk,
max_log_size=self.max_log_size,
max_backups=self.max_backups,
log_file_name=self.log_file_name,
json_log=self.json_log,
ecs_log=self.ecs_log,
date_format=self.date_format,
compress_backups=self.compress_backups
)
except ImportError:
logging.error('Failed to import file handler')
def setup_graylog_handler(self, level_num: int):
'''
Set up the Graylog handler.
Set up the file handler
:param level_num: The logging level number.
:param level_num: The logging level number
'''
try:
from apv.plugins.graylog import setup_graylog_handler
setup_graylog_handler(level_num, self.graylog_host, self.graylog_port)
except ImportError:
logging.error('Failed to import Graylog handler')
# Create logs directory if it doesn't exist
logs_dir = os.path.join(sys.path[0], 'logs')
os.makedirs(logs_dir, exist_ok=True)
# Set up log file path
file_extension = '.json' if self.json_log else '.log'
log_file_path = os.path.join(logs_dir, f'{self.log_file_name}{file_extension}')
def setup_cloudwatch_handler(self, level_num: int):
'''
Set up the CloudWatch handler.
# Create the rotating file handler
handler_class = GZipRotatingFileHandler if self.compress_backups else logging.handlers.RotatingFileHandler
file_handler = handler_class(log_file_path, maxBytes=self.max_log_size, backupCount=self.max_backups)
file_handler.setLevel(level_num)
:param level_num: The logging level number.
'''
try:
from apv.plugins.cloudwatch import setup_cloudwatch_handler
setup_cloudwatch_handler(
level_num,
self.cloudwatch_group_name,
self.cloudwatch_stream_name,
self.date_format
)
except ImportError:
logging.error('Failed to import CloudWatch handler')
# Set up the appropriate formatter
formatter = JsonFormatter(datefmt=self.date_format) if self.json_log else logging.Formatter(fmt='%(asctime)s%(levelname)-8s%(module)s%(funcName)s%(lineno)d%(message)s', datefmt=self.date_format)
file_handler.setFormatter(formatter)
logging.getLogger().addHandler(file_handler)
def setup_logging(**kwargs):
'''Set up logging with various handlers and options.'''
'''Set up logging with various handlers and options'''
# Create a LoggerSetup instance with the provided keyword arguments
logger_setup = LoggerSetup(**kwargs)
# Set up the logging system
logger_setup.setup()

View File

@ -1 +0,0 @@
# Empty file to make plugins a package

View File

@ -1,100 +0,0 @@
import logging
import json
import boto3
from botocore.exceptions import ClientError
class CloudWatchHandler(logging.Handler):
def __init__(self, group_name, stream_name):
super().__init__()
self.group_name = group_name
self.stream_name = stream_name
self.client = boto3.client('logs')
self._initialize_log_group_and_stream()
def _initialize_log_group_and_stream(self):
# Create log group if it doesn't exist
try:
self.client.create_log_group(logGroupName=self.group_name)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
# Create log stream if it doesn't exist
try:
self.client.create_log_stream(
logGroupName=self.group_name,
logStreamName=self.stream_name
)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise e
def _get_sequence_token(self):
try:
response = self.client.describe_log_streams(
logGroupName=self.group_name,
logStreamNamePrefix=self.stream_name,
limit=1
)
log_streams = response.get('logStreams', [])
return log_streams[0].get('uploadSequenceToken') if log_streams else None
except Exception:
return None
def emit(self, record):
try:
log_entry = self.format(record)
timestamp = int(record.created * 1000)
event = {
'timestamp': timestamp,
'message': log_entry
}
kwargs = {
'logGroupName': self.group_name,
'logStreamName': self.stream_name,
'logEvents': [event]
}
sequence_token = self._get_sequence_token()
if sequence_token:
kwargs['sequenceToken'] = sequence_token
self.client.put_log_events(**kwargs)
except Exception:
self.handleError(record)
def setup_cloudwatch_handler(level_num: int, group_name: str, stream_name: str, date_format: str):
'''Set up the CloudWatch handler.'''
try:
import boto3
except ImportError:
raise ImportError('boto3 is required for CloudWatch logging. (pip install boto3)')
if not group_name or not stream_name:
logging.error('CloudWatch log group and log stream must be specified for CloudWatch handler.')
return
cloudwatch_handler = CloudWatchHandler(group_name, stream_name)
cloudwatch_handler.setLevel(level_num)
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'time' : self.formatTime(record, date_format),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
cloudwatch_formatter = JsonFormatter(datefmt=date_format)
cloudwatch_handler.setFormatter(cloudwatch_formatter)
logging.getLogger().addHandler(cloudwatch_handler)

View File

@ -1,70 +0,0 @@
import logging
class LogColors:
'''ANSI color codes for log messages.'''
RESET = '\033[0m'
DATE = '\033[90m' # Dark Grey
DEBUG = '\033[96m' # Cyan
INFO = '\033[92m' # Green
WARNING = '\033[93m' # Yellow
ERROR = '\033[91m' # Red
CRITICAL = '\033[97m\033[41m' # White on Red
FATAL = '\033[97m\033[41m' # Same as CRITICAL
NOTSET = '\033[97m' # White text
SEPARATOR = '\033[90m' # Dark Grey
MODULE = '\033[95m' # Pink
FUNCTION = '\033[94m' # Blue
LINE = '\033[33m' # Orange
class ColoredFormatter(logging.Formatter):
def __init__(self, datefmt=None, show_details=False):
super().__init__(datefmt=datefmt)
self.show_details = show_details
self.LEVEL_COLORS = {
'NOTSET' : LogColors.NOTSET,
'DEBUG' : LogColors.DEBUG,
'INFO' : LogColors.INFO,
'WARNING' : LogColors.WARNING,
'ERROR' : LogColors.ERROR,
'CRITICAL' : LogColors.CRITICAL,
'FATAL' : LogColors.FATAL
}
def format(self, record):
log_level = record.levelname
message = record.getMessage()
asctime = self.formatTime(record, self.datefmt)
color = self.LEVEL_COLORS.get(log_level, LogColors.RESET)
separator = f'{LogColors.SEPARATOR}{LogColors.RESET}'
if self.show_details:
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{LogColors.MODULE}{record.module}{LogColors.RESET}'
f'{separator}'
f'{LogColors.FUNCTION}{record.funcName}{LogColors.RESET}'
f'{separator}'
f'{LogColors.LINE}{record.lineno}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
else:
formatted = (
f'{LogColors.DATE}{asctime}{LogColors.RESET}'
f'{separator}'
f'{color}{log_level:<8}{LogColors.RESET}'
f'{separator}'
f'{message}'
)
return formatted
def setup_console_handler(level_num: int, date_format: str, show_details: bool):
'''Set up the console handler with colored output.'''
console_handler = logging.StreamHandler()
console_handler.setLevel(level_num)
console_formatter = ColoredFormatter(datefmt=date_format, show_details=show_details)
console_handler.setFormatter(console_formatter)
logging.getLogger().addHandler(console_handler)

View File

@ -1,77 +0,0 @@
import logging
import logging.handlers
import json
import os
import gzip
class GZipRotatingFileHandler(logging.handlers.RotatingFileHandler):
'''RotatingFileHandler that compresses old log files using gzip.'''
def doRollover(self):
'''Compress old log files using gzip.'''
super().doRollover()
if self.backupCount > 0:
for i in range(self.backupCount, 0, -1):
sfn = f'{self.baseFilename}.{i}'
if os.path.exists(sfn):
with open(sfn, 'rb') as f_in:
with gzip.open(f'{sfn}.gz', 'wb') as f_out:
f_out.writelines(f_in)
os.remove(sfn)
class JsonFormatter(logging.Formatter):
def __init__(self, date_format):
super().__init__()
self.date_format = date_format
def format(self, record):
log_record = {
'time' : self.formatTime(record, self.date_format),
'level' : record.levelname,
'module' : record.module,
'function' : record.funcName,
'line' : record.lineno,
'message' : record.getMessage(),
'name' : record.name,
'filename' : record.filename,
'threadName' : record.threadName,
'processName' : record.processName,
}
return json.dumps(log_record)
def setup_file_handler(level_num: int, log_to_disk: bool, max_log_size: int,
max_backups: int, log_file_name: str, json_log: bool,
ecs_log: bool, date_format: str, compress_backups: bool):
'''Set up the file handler for logging to disk.'''
if not log_to_disk:
return
# Create 'logs' directory if it doesn't exist
logs_dir = os.path.join(os.getcwd(), 'logs')
os.makedirs(logs_dir, exist_ok=True)
# Use the specified log file name and set extension based on json_log
file_extension = '.json' if json_log else '.log'
log_file_path = os.path.join(logs_dir, f'{log_file_name}{file_extension}')
# Create the rotating file handler
handler_class = GZipRotatingFileHandler if compress_backups else logging.handlers.RotatingFileHandler
file_handler = handler_class(log_file_path, maxBytes=max_log_size, backupCount=max_backups)
file_handler.setLevel(level_num)
if ecs_log:
try:
import ecs_logging
except ImportError:
raise ImportError("The 'ecs-logging' library is required for ECS logging. Install it with 'pip install ecs-logging'.")
file_formatter = ecs_logging.StdlibFormatter()
elif json_log:
file_formatter = JsonFormatter(date_format)
else:
file_formatter = logging.Formatter(
fmt='%(asctime)s%(levelname)-8s%(module)s%(funcName)s%(lineno)d%(message)s',
datefmt=date_format
)
file_handler.setFormatter(file_formatter)
logging.getLogger().addHandler(file_handler)

View File

@ -1,58 +0,0 @@
import logging
import json
import socket
import zlib
class GraylogHandler(logging.Handler):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Mapping from Python logging levels to Graylog (syslog) levels
self.level_mapping = {
logging.CRITICAL : 2, # Critical
logging.ERROR : 3, # Error
logging.WARNING : 4, # Warning
logging.INFO : 6, # Informational
logging.DEBUG : 7, # Debug
logging.NOTSET : 7 # Default to Debug
}
def emit(self, record):
try:
log_entry = self.format(record)
graylog_level = self.level_mapping.get(record.levelno, 7)
gelf_message = {
'version' : '1.1',
'host' : socket.gethostname(),
'short_message' : record.getMessage(),
'full_message' : log_entry,
'timestamp' : record.created,
'level' : graylog_level,
'_logger_name' : record.name,
'_file' : record.pathname,
'_line' : record.lineno,
'_function' : record.funcName,
'_module' : record.module,
}
message = json.dumps(gelf_message).encode('utf-8')
compressed = zlib.compress(message)
self.sock.sendto(compressed, (self.host, self.port))
except Exception:
self.handleError(record)
def setup_graylog_handler(level_num: int, graylog_host: str, graylog_port: int):
'''Set up the Graylog handler.'''
if graylog_host is None or graylog_port is None:
logging.error('Graylog host and port must be specified for Graylog handler.')
return
graylog_handler = GraylogHandler(graylog_host, graylog_port)
graylog_handler.setLevel(level_num)
graylog_formatter = logging.Formatter(fmt='%(message)s')
graylog_handler.setFormatter(graylog_formatter)
logging.getLogger().addHandler(graylog_handler)

View File

@ -4,12 +4,13 @@
from setuptools import setup, find_packages
with open('README.md', 'r', encoding='utf-8') as fh:
long_description = fh.read()
setup(
name='apv',
version='1.0.4',
version='4.0.0',
description='Advanced Python Logging',
author='acidvegas',
author_email='acid.vegas@acid.vegas',
@ -25,10 +26,6 @@ setup(
install_requires=[
# No required dependencies for basic functionality
],
extras_require={
'cloudwatch': ['boto3'],
'ecs' : ['ecs-logging'],
},
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: ISC License (ISCL)',

View File

@ -1,95 +1,95 @@
#!/usr/bin/env python3
# Advanced Python Logging - Developed by acidvegas in Python (https://git.acid.vegas/apv)
# unittest.py
# unit_test.py
import logging
import os
import random
import sys
import time
# prevent bytecode files (.pyc) from being written
from sys import dont_write_bytecode
dont_write_bytecode = True
sys.dont_write_bytecode = True # FUCKOFF __pycache__
import apv
import apv.apv as apv
# Test console logging with custom date format
def test_console_logging():
'''Test basic console logging functionality'''
print('\nTesting Console Logging...')
apv.setup_logging(level='DEBUG', date_format='%H:%M:%S')
logging.debug('Testing debug message in console.')
logging.info('Testing info message in console.')
logging.warning('Testing warning message in console.')
logging.error('Testing error message in console.')
logging.critical('Testing critical message in console.')
for level in ['debug', 'info', 'warning', 'error', 'critical']:
getattr(logging, level)(f'Testing {level} message in console.')
time.sleep(1)
print()
# Test console logging with details
time.sleep(2)
apv.setup_logging(level='DEBUG', date_format='%Y-%m-%d %H:%M:%S', show_details=True)
logging.debug('Testing debug message in console with details.')
logging.info('Testing info message in console with details.')
logging.warning('Testing warning message in console with details.')
logging.error('Testing error message in console with details.')
logging.critical('Testing critical message in console with details.')
def test_json_console_logging():
'''Test JSON console logging'''
print()
print('\nTesting JSON Console Logging...')
apv.setup_logging(level='DEBUG', date_format='%H:%M:%S', json_log=True, log_to_disk=False)
logging.info('Test JSON console message with custom field', extra={'_custom_field': 'test value'})
logging.warning('Test JSON console warning with error', exc_info=Exception('Test error'))
time.sleep(1)
# Test disk logging with JSON and regular rotation
logging.debug('Starting test: Disk logging with JSON and regular rotation...')
time.sleep(2)
apv.setup_logging(level='DEBUG', log_to_disk=True, max_log_size=1024, max_backups=3, log_file_name='json_log', json_log=True, show_details=True)
def test_detailed_logging():
'''Test console logging with details'''
print('\nTesting Detailed Logging...')
apv.setup_logging(level='DEBUG', show_details=True)
for level in ['debug', 'info', 'warning', 'error', 'critical']:
getattr(logging, level)(f'Testing {level} message with details.')
time.sleep(1)
def test_file_logging():
'''Test file logging with rotation'''
print('\nTesting File Logging...')
log_file = 'logs/test_log.log'
apv.setup_logging(level='DEBUG', log_to_disk=True, max_log_size=1024, max_backups=3, log_file_name='test_log')
for i in range(50):
level = random.choice(['debug', 'info', 'warning', 'error', 'critical'])
getattr(logging, level)(f'File logging test message {i}')
assert os.path.exists(log_file), "Log file was not created"
time.sleep(1)
def test_json_logging():
'''Test JSON format logging'''
print('\nTesting JSON Logging...')
apv.setup_logging(level='DEBUG', log_to_disk=True, log_file_name='json_test', json_log=True)
logging.info('Test JSON formatted log message')
assert os.path.exists('logs/json_test.json'), "JSON log file was not created"
time.sleep(1)
def test_compressed_logging():
'''Test compressed log files'''
print('\nTesting Compressed Logging...')
apv.setup_logging(level='DEBUG', log_to_disk=True, max_log_size=512, max_backups=2, log_file_name='compressed_test', compress_backups=True)
for i in range(100):
log_level = random.choice([logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL])
logging.log(log_level, f'Log entry {i+1} for JSON & regular rotation test.')
time.sleep(0.1)
logging.info(f'Testing compression message {i}')
time.sleep(1)
# Check for .gz files
gz_files = [f for f in os.listdir('logs') if f.startswith('compressed_test') and f.endswith('.gz')]
assert len(gz_files) > 0, 'No compressed log files were created'
print()
# Test disk logging with rotation & compression
logging.debug('Starting test: Disk logging with rotation & compression...')
time.sleep(2)
apv.setup_logging(level='DEBUG', log_to_disk=True, max_log_size=1024, max_backups=3, log_file_name='plain_log', show_details=True, compress_backups=True)
for i in range(100):
log_level = random.choice([logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL])
logging.log(log_level, f'Log entry {i+1} for disk rotation & compression test.')
time.sleep(0.1)
if __name__ == '__main__':
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
logging.info('Test completed. Check the logs directory for disk logging & JSON logging tests.')
# Run all tests
test_console_logging()
test_json_console_logging()
test_detailed_logging()
test_file_logging()
test_json_logging()
test_compressed_logging()
print()
try:
import ecs_logging
except ImportError:
pass
else:
# Test ECS logging
logging.debug('Starting test: ECS logging...')
time.sleep(2)
apv.setup_logging(level='DEBUG', ecs_log=True)
logging.debug('This is a test log message to ECS.')
logging.info('This is a test log message to ECS.')
logging.warning('This is a test log message to ECS.')
logging.error('This is a test log message to ECS.')
logging.critical('This is a test log message to ECS.')
print()
# Test Graylog handler (Uncomment & configure to test)
# logging.debug('Starting test: Graylog handler...')
# time.sleep(2)
# apv.setup_logging(level='DEBUG', enable_graylog=True, graylog_host='your_graylog_host', graylog_port=12201)
# logging.debug('This is a test log message to Graylog.')
# logging.info('This is a test log message to Graylog.')
# logging.warning('This is a test log message to Graylog.')
# logging.error('This is a test log message to Graylog.')
# logging.critical('This is a test log message to Graylog.')
# Test CloudWatch handler (Uncomment & configure to test)
# logging.debug('Starting test: CloudWatch handler...')
# time.sleep(2)
# apv.setup_logging(level='DEBUG', enable_cloudwatch=True, cloudwatch_group_name='your_log_group', cloudwatch_stream_name='your_log_stream')
# logging.debug('This is a test log message to CloudWatch.')
# logging.info('This is a test log message to CloudWatch.')
# logging.warning('This is a test log message to CloudWatch.')
# logging.error('This is a test log message to CloudWatch.')
# logging.critical('This is a test log message to CloudWatch.')
print('\nAll tests completed successfully!')