417 lines
12 KiB
Python
417 lines
12 KiB
Python
"""
|
|
Konsistenter Logging Wrapper für BitByLaw Integration
|
|
|
|
Vereinheitlicht Logging über:
|
|
- Standard Python Logger
|
|
- Motia FlowContext Logger
|
|
- Structured Logging
|
|
|
|
Usage Guidelines:
|
|
=================
|
|
|
|
FOR SERVICES: Use get_service_logger('service_name', context)
|
|
-----------------------------------------------------------------
|
|
Example:
|
|
from services.logging_utils import get_service_logger
|
|
|
|
class XAIService:
|
|
def __init__(self, ctx=None):
|
|
self.logger = get_service_logger('xai', ctx)
|
|
|
|
def upload(self):
|
|
self.logger.info("Uploading file...")
|
|
|
|
FOR STEPS: Use ctx.logger directly (preferred)
|
|
-----------------------------------------------------------------
|
|
Steps already have ctx.logger available - use it directly:
|
|
async def handler(event_data, ctx: FlowContext):
|
|
ctx.logger.info("Processing event")
|
|
|
|
Alternative: Use get_step_logger() for additional loggers:
|
|
step_logger = get_step_logger('beteiligte_sync', ctx)
|
|
|
|
FOR SYNC UTILS: Inherit from BaseSyncUtils (provides self.logger)
|
|
-----------------------------------------------------------------
|
|
from services.sync_utils_base import BaseSyncUtils
|
|
|
|
class MySync(BaseSyncUtils):
|
|
def __init__(self, espocrm, redis, context):
|
|
super().__init__(espocrm, redis, context)
|
|
# self.logger is now available
|
|
|
|
def sync(self):
|
|
self._log("Syncing...", level='info')
|
|
|
|
FOR STANDALONE UTILITIES: Use get_logger()
|
|
-----------------------------------------------------------------
|
|
from services.logging_utils import get_logger
|
|
|
|
logger = get_logger('my_module', context)
|
|
logger.info("Processing...")
|
|
|
|
CONSISTENCY RULES:
|
|
==================
|
|
✅ Services: get_service_logger('service_name', ctx)
|
|
✅ Steps: ctx.logger (direct) or get_step_logger('step_name', ctx)
|
|
✅ Sync Utils: Inherit from BaseSyncUtils → use self._log() or self.logger
|
|
✅ Standalone: get_logger('module_name', ctx)
|
|
|
|
❌ DO NOT: Use module-level logging.getLogger(__name__)
|
|
❌ DO NOT: Mix get_logger() and get_service_logger() in same module
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Optional, Any, Dict
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
|
|
|
|
class IntegrationLogger:
|
|
"""
|
|
Unified Logger mit Support für:
|
|
- Motia FlowContext
|
|
- Standard Python Logging
|
|
- Structured Logging
|
|
- Performance Tracking
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
context: Optional[Any] = None,
|
|
extra_fields: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""
|
|
Initialize logger.
|
|
|
|
Args:
|
|
name: Logger name (z.B. 'advoware.api')
|
|
context: Optional Motia FlowContext
|
|
extra_fields: Optional extra fields für structured logging
|
|
"""
|
|
self.name = name
|
|
self.context = context
|
|
self.extra_fields = extra_fields or {}
|
|
self._standard_logger = logging.getLogger(name)
|
|
|
|
def _format_message(self, message: str, **kwargs) -> str:
|
|
"""
|
|
Formatiert Log-Message mit optionalen Feldern.
|
|
|
|
Args:
|
|
message: Base message
|
|
**kwargs: Extra fields
|
|
|
|
Returns:
|
|
Formatted message
|
|
"""
|
|
if not kwargs and not self.extra_fields:
|
|
return message
|
|
|
|
# Merge extra fields
|
|
fields = {**self.extra_fields, **kwargs}
|
|
|
|
if fields:
|
|
field_str = " | ".join(f"{k}={v}" for k, v in fields.items())
|
|
return f"{message} | {field_str}"
|
|
|
|
return message
|
|
|
|
def _log(
|
|
self,
|
|
level: str,
|
|
message: str,
|
|
exc_info: bool = False,
|
|
**kwargs
|
|
) -> None:
|
|
"""
|
|
Internal logging method.
|
|
|
|
Args:
|
|
level: Log level (debug, info, warning, error, critical)
|
|
message: Log message
|
|
exc_info: Include exception info
|
|
**kwargs: Extra fields for structured logging
|
|
"""
|
|
formatted_msg = self._format_message(message, **kwargs)
|
|
|
|
# Log to FlowContext if available
|
|
if self.context and hasattr(self.context, 'logger'):
|
|
try:
|
|
log_func = getattr(self.context.logger, level, self.context.logger.info)
|
|
log_func(formatted_msg)
|
|
except Exception:
|
|
# Fallback to standard logger
|
|
pass
|
|
|
|
# Always log to standard Python logger
|
|
log_func = getattr(self._standard_logger, level, self._standard_logger.info)
|
|
log_func(formatted_msg, exc_info=exc_info)
|
|
|
|
def debug(self, message: str, **kwargs) -> None:
|
|
"""Log debug message"""
|
|
self._log('debug', message, **kwargs)
|
|
|
|
def info(self, message: str, **kwargs) -> None:
|
|
"""Log info message"""
|
|
self._log('info', message, **kwargs)
|
|
|
|
def warning(self, message: str, **kwargs) -> None:
|
|
"""Log warning message"""
|
|
self._log('warning', message, **kwargs)
|
|
|
|
def warn(self, message: str, **kwargs) -> None:
|
|
"""Alias for warning"""
|
|
self.warning(message, **kwargs)
|
|
|
|
def error(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
|
"""Log error message (with exception info by default)"""
|
|
self._log('error', message, exc_info=exc_info, **kwargs)
|
|
|
|
def critical(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
|
"""Log critical message"""
|
|
self._log('critical', message, exc_info=exc_info, **kwargs)
|
|
|
|
def exception(self, message: str, **kwargs) -> None:
|
|
"""Log exception with traceback"""
|
|
self._log('error', message, exc_info=True, **kwargs)
|
|
|
|
@contextmanager
|
|
def operation(self, operation_name: str, **context_fields):
|
|
"""
|
|
Context manager für Operations mit automatischem Timing.
|
|
|
|
Args:
|
|
operation_name: Name der Operation
|
|
**context_fields: Context fields für logging
|
|
|
|
Example:
|
|
with logger.operation('sync_beteiligte', entity_id='123'):
|
|
# Do sync
|
|
pass
|
|
"""
|
|
start_time = time.time()
|
|
self.info(f"▶️ Starting: {operation_name}", **context_fields)
|
|
|
|
try:
|
|
yield
|
|
duration_ms = int((time.time() - start_time) * 1000)
|
|
self.info(
|
|
f"✅ Completed: {operation_name}",
|
|
duration_ms=duration_ms,
|
|
**context_fields
|
|
)
|
|
except Exception as e:
|
|
duration_ms = int((time.time() - start_time) * 1000)
|
|
self.error(
|
|
f"❌ Failed: {operation_name} - {str(e)}",
|
|
duration_ms=duration_ms,
|
|
error_type=type(e).__name__,
|
|
**context_fields
|
|
)
|
|
raise
|
|
|
|
@contextmanager
|
|
def api_call(self, endpoint: str, method: str = 'GET', **context_fields):
|
|
"""
|
|
Context manager speziell für API-Calls.
|
|
|
|
Args:
|
|
endpoint: API endpoint
|
|
method: HTTP method
|
|
**context_fields: Extra context
|
|
|
|
Example:
|
|
with logger.api_call('/api/v1/Beteiligte', method='POST'):
|
|
result = await api.post(...)
|
|
"""
|
|
start_time = time.time()
|
|
self.debug(f"API Call: {method} {endpoint}", **context_fields)
|
|
|
|
try:
|
|
yield
|
|
duration_ms = int((time.time() - start_time) * 1000)
|
|
self.debug(
|
|
f"API Success: {method} {endpoint}",
|
|
duration_ms=duration_ms,
|
|
**context_fields
|
|
)
|
|
except Exception as e:
|
|
duration_ms = int((time.time() - start_time) * 1000)
|
|
self.error(
|
|
f"API Error: {method} {endpoint} - {str(e)}",
|
|
duration_ms=duration_ms,
|
|
error_type=type(e).__name__,
|
|
**context_fields
|
|
)
|
|
raise
|
|
|
|
def with_context(self, **extra_fields) -> 'IntegrationLogger':
|
|
"""
|
|
Erstellt neuen Logger mit zusätzlichen Context-Feldern.
|
|
|
|
Args:
|
|
**extra_fields: Additional context fields
|
|
|
|
Returns:
|
|
New logger instance with merged context
|
|
"""
|
|
merged_fields = {**self.extra_fields, **extra_fields}
|
|
return IntegrationLogger(
|
|
name=self.name,
|
|
context=self.context,
|
|
extra_fields=merged_fields
|
|
)
|
|
|
|
|
|
# ========== Factory Functions ==========
|
|
|
|
def get_logger(
|
|
name: str,
|
|
context: Optional[Any] = None,
|
|
**extra_fields
|
|
) -> IntegrationLogger:
|
|
"""
|
|
Factory function für Logger.
|
|
|
|
Args:
|
|
name: Logger name
|
|
context: Optional Motia FlowContext
|
|
**extra_fields: Extra context fields
|
|
|
|
Returns:
|
|
Configured logger
|
|
|
|
Example:
|
|
logger = get_logger('advoware.sync', context=ctx, entity_id='123')
|
|
logger.info("Starting sync")
|
|
"""
|
|
return IntegrationLogger(name, context, extra_fields)
|
|
|
|
|
|
def get_service_logger(
|
|
service_name: str,
|
|
context: Optional[Any] = None
|
|
) -> IntegrationLogger:
|
|
"""
|
|
Factory für Service-Logger.
|
|
|
|
Args:
|
|
service_name: Service name (z.B. 'advoware', 'espocrm')
|
|
context: Optional FlowContext
|
|
|
|
Returns:
|
|
Service logger
|
|
"""
|
|
return IntegrationLogger(f"services.{service_name}", context)
|
|
|
|
|
|
def get_step_logger(
|
|
step_name: str,
|
|
context: Optional[Any] = None
|
|
) -> IntegrationLogger:
|
|
"""
|
|
Factory für Step-Logger.
|
|
|
|
Args:
|
|
step_name: Step name
|
|
context: FlowContext (required for steps)
|
|
|
|
Returns:
|
|
Step logger
|
|
"""
|
|
return IntegrationLogger(f"steps.{step_name}", context)
|
|
|
|
|
|
# ========== Decorator for Logging ==========
|
|
|
|
def log_operation(operation_name: str):
|
|
"""
|
|
Decorator für automatisches Operation-Logging.
|
|
|
|
Args:
|
|
operation_name: Name der Operation
|
|
|
|
Example:
|
|
@log_operation('sync_beteiligte')
|
|
async def sync_entity(entity_id: str):
|
|
...
|
|
"""
|
|
def decorator(func):
|
|
async def async_wrapper(*args, **kwargs):
|
|
# Try to find context in args
|
|
context = None
|
|
for arg in args:
|
|
if hasattr(arg, 'logger'):
|
|
context = arg
|
|
break
|
|
|
|
logger = get_logger(func.__module__, context)
|
|
|
|
with logger.operation(operation_name):
|
|
return await func(*args, **kwargs)
|
|
|
|
def sync_wrapper(*args, **kwargs):
|
|
context = None
|
|
for arg in args:
|
|
if hasattr(arg, 'logger'):
|
|
context = arg
|
|
break
|
|
|
|
logger = get_logger(func.__module__, context)
|
|
|
|
with logger.operation(operation_name):
|
|
return func(*args, **kwargs)
|
|
|
|
# Return appropriate wrapper
|
|
import asyncio
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
else:
|
|
return sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
# ========== Performance Tracking ==========
|
|
|
|
class PerformanceTracker:
|
|
"""Track performance metrics for operations"""
|
|
|
|
def __init__(self, logger: IntegrationLogger):
|
|
self.logger = logger
|
|
self.metrics: Dict[str, list] = {}
|
|
|
|
def record(self, operation: str, duration_ms: int) -> None:
|
|
"""Record operation duration"""
|
|
if operation not in self.metrics:
|
|
self.metrics[operation] = []
|
|
self.metrics[operation].append(duration_ms)
|
|
|
|
def get_stats(self, operation: str) -> Dict[str, float]:
|
|
"""Get statistics for operation"""
|
|
if operation not in self.metrics:
|
|
return {}
|
|
|
|
durations = self.metrics[operation]
|
|
return {
|
|
'count': len(durations),
|
|
'avg_ms': sum(durations) / len(durations),
|
|
'min_ms': min(durations),
|
|
'max_ms': max(durations),
|
|
'total_ms': sum(durations)
|
|
}
|
|
|
|
def log_summary(self) -> None:
|
|
"""Log summary of all operations"""
|
|
self.logger.info("=== Performance Summary ===")
|
|
for operation, durations in self.metrics.items():
|
|
stats = self.get_stats(operation)
|
|
self.logger.info(
|
|
f"{operation}: {stats['count']} calls, "
|
|
f"avg {stats['avg_ms']:.1f}ms, "
|
|
f"min {stats['min_ms']:.1f}ms, "
|
|
f"max {stats['max_ms']:.1f}ms"
|
|
)
|