diff --git a/services/beteiligte_sync_utils.py b/services/beteiligte_sync_utils.py index 1632ced..e88a49b 100644 --- a/services/beteiligte_sync_utils.py +++ b/services/beteiligte_sync_utils.py @@ -17,7 +17,7 @@ import pytz from services.exceptions import LockAcquisitionError, SyncError, ValidationError from services.redis_client import get_redis_client from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds -from services.logging_utils import get_logger +from services.logging_utils import get_service_logger import redis @@ -31,7 +31,7 @@ class BeteiligteSync: def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None): self.espocrm = espocrm_api self.context = context - self.logger = get_logger('beteiligte_sync', context) + self.logger = get_service_logger('beteiligte_sync', context) # Use provided Redis client or get from factory self.redis = redis_client or get_redis_client(strict=False) @@ -46,6 +46,11 @@ class BeteiligteSync: from services.notification_utils import NotificationManager self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context) + def _log(self, message: str, level: str = 'info') -> None: + """Delegate logging to the logger with optional level""" + log_func = getattr(self.logger, level, self.logger.info) + log_func(message) + async def acquire_sync_lock(self, entity_id: str) -> bool: """ Atomic distributed lock via Redis + syncStatus update diff --git a/services/document_sync_utils.py b/services/document_sync_utils.py index 45a4a95..07dc1f3 100644 --- a/services/document_sync_utils.py +++ b/services/document_sync_utils.py @@ -10,12 +10,9 @@ Hilfsfunktionen für Document-Synchronisation mit xAI: from typing import Dict, Any, Optional, List, Tuple from datetime import datetime, timedelta -import logging from services.sync_utils_base import BaseSyncUtils -logger = logging.getLogger(__name__) - # Max retry before permanent failure MAX_SYNC_RETRIES = 5 diff --git a/services/logging_utils.py b/services/logging_utils.py index 7f3543a..d8a36ad 100644 --- a/services/logging_utils.py +++ b/services/logging_utils.py @@ -5,6 +5,59 @@ Vereinheitlicht Logging über: - Standard Python Logger - Motia FlowContext Logger - Structured Logging + +Usage Guidelines: +================= + +FOR SERVICES: Use get_service_logger('service_name', context) +----------------------------------------------------------------- +Example: + from services.logging_utils import get_service_logger + + class XAIService: + def __init__(self, ctx=None): + self.logger = get_service_logger('xai', ctx) + + def upload(self): + self.logger.info("Uploading file...") + +FOR STEPS: Use ctx.logger directly (preferred) +----------------------------------------------------------------- +Steps already have ctx.logger available - use it directly: + async def handler(event_data, ctx: FlowContext): + ctx.logger.info("Processing event") + +Alternative: Use get_step_logger() for additional loggers: + step_logger = get_step_logger('beteiligte_sync', ctx) + +FOR SYNC UTILS: Inherit from BaseSyncUtils (provides self.logger) +----------------------------------------------------------------- + from services.sync_utils_base import BaseSyncUtils + + class MySync(BaseSyncUtils): + def __init__(self, espocrm, redis, context): + super().__init__(espocrm, redis, context) + # self.logger is now available + + def sync(self): + self._log("Syncing...", level='info') + +FOR STANDALONE UTILITIES: Use get_logger() +----------------------------------------------------------------- + from services.logging_utils import get_logger + + logger = get_logger('my_module', context) + logger.info("Processing...") + +CONSISTENCY RULES: +================== +✅ Services: get_service_logger('service_name', ctx) +✅ Steps: ctx.logger (direct) or get_step_logger('step_name', ctx) +✅ Sync Utils: Inherit from BaseSyncUtils → use self._log() or self.logger +✅ Standalone: get_logger('module_name', ctx) + +❌ DO NOT: Use module-level logging.getLogger(__name__) +❌ DO NOT: Mix get_logger() and get_service_logger() in same module """ import logging diff --git a/services/sync_utils_base.py b/services/sync_utils_base.py index 5f491d2..2226b72 100644 --- a/services/sync_utils_base.py +++ b/services/sync_utils_base.py @@ -14,7 +14,7 @@ import pytz from services.exceptions import RedisConnectionError, LockAcquisitionError from services.redis_client import get_redis_client from services.config import SYNC_CONFIG, get_lock_key -from services.logging_utils import get_logger +from services.logging_utils import get_service_logger import redis @@ -31,7 +31,7 @@ class BaseSyncUtils: """ self.espocrm = espocrm_api self.context = context - self.logger = get_logger('sync_utils', context) + self.logger = get_service_logger('sync_utils', context) # Use provided Redis client or get from factory self.redis = redis_client or get_redis_client(strict=False) diff --git a/services/xai_service.py b/services/xai_service.py index 479f9df..475d33e 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -1,10 +1,8 @@ """xAI Files & Collections Service""" import os import aiohttp -import logging from typing import Optional, List - -logger = logging.getLogger(__name__) +from services.logging_utils import get_service_logger XAI_FILES_URL = "https://api.x.ai" XAI_MANAGEMENT_URL = "https://management-api.x.ai" @@ -23,6 +21,7 @@ class XAIService: self.api_key = os.getenv('XAI_API_KEY', '') self.management_key = os.getenv('XAI_MANAGEMENT_KEY', '') self.ctx = ctx + self.logger = get_service_logger('xai', ctx) self._session: Optional[aiohttp.ClientSession] = None if not self.api_key: @@ -31,10 +30,9 @@ class XAIService: raise ValueError("XAI_MANAGEMENT_KEY not configured in environment") def _log(self, msg: str, level: str = 'info') -> None: - if self.ctx: - getattr(self.ctx.logger, level, self.ctx.logger.info)(msg) - else: - getattr(logger, level, logger.info)(msg) + """Delegate logging to service logger""" + log_func = getattr(self.logger, level, self.logger.info) + log_func(msg) async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: diff --git a/steps/advoware_proxy/advoware_api_proxy_delete_step.py b/steps/advoware_proxy/advoware_api_proxy_delete_step.py index bf8bf4e..a1332f4 100644 --- a/steps/advoware_proxy/advoware_api_proxy_delete_step.py +++ b/steps/advoware_proxy/advoware_api_proxy_delete_step.py @@ -32,23 +32,33 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: body={'error': 'Endpoint required as query parameter'} ) + ctx.logger.info("=" * 80) + ctx.logger.info("🔄 ADVOWARE PROXY: DELETE REQUEST") + ctx.logger.info("=" * 80) + ctx.logger.info(f"Endpoint: {endpoint}") + ctx.logger.info("=" * 80) + # Initialize Advoware client advoware = AdvowareAPI(ctx) # Forward all query params except 'endpoint' params = {k: v for k, v in request.query_params.items() if k != 'endpoint'} - ctx.logger.info(f"Proxying DELETE request to Advoware: {endpoint}") result = await advoware.api_call( endpoint, method='DELETE', params=params ) + ctx.logger.info("✅ Proxy DELETE erfolgreich") return ApiResponse(status=200, body={'result': result}) except Exception as e: - ctx.logger.error(f"Proxy error: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ ADVOWARE PROXY DELETE FEHLER") + ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/advoware_proxy/advoware_api_proxy_get_step.py b/steps/advoware_proxy/advoware_api_proxy_get_step.py index 820fdcd..a008552 100644 --- a/steps/advoware_proxy/advoware_api_proxy_get_step.py +++ b/steps/advoware_proxy/advoware_api_proxy_get_step.py @@ -32,23 +32,33 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: body={'error': 'Endpoint required as query parameter'} ) + ctx.logger.info("=" * 80) + ctx.logger.info("🔄 ADVOWARE PROXY: GET REQUEST") + ctx.logger.info("=" * 80) + ctx.logger.info(f"Endpoint: {endpoint}") + ctx.logger.info("=" * 80) + # Initialize Advoware client advoware = AdvowareAPI(ctx) # Forward all query params except 'endpoint' params = {k: v for k, v in request.query_params.items() if k != 'endpoint'} - ctx.logger.info(f"Proxying GET request to Advoware: {endpoint}") result = await advoware.api_call( endpoint, method='GET', params=params ) + ctx.logger.info("✅ Proxy GET erfolgreich") return ApiResponse(status=200, body={'result': result}) except Exception as e: - ctx.logger.error(f"Proxy error: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ ADVOWARE PROXY GET FEHLER") + ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/advoware_proxy/advoware_api_proxy_post_step.py b/steps/advoware_proxy/advoware_api_proxy_post_step.py index bc20a48..06e31d6 100644 --- a/steps/advoware_proxy/advoware_api_proxy_post_step.py +++ b/steps/advoware_proxy/advoware_api_proxy_post_step.py @@ -34,6 +34,12 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: body={'error': 'Endpoint required as query parameter'} ) + ctx.logger.info("=" * 80) + ctx.logger.info("🔄 ADVOWARE PROXY: POST REQUEST") + ctx.logger.info("=" * 80) + ctx.logger.info(f"Endpoint: {endpoint}") + ctx.logger.info("=" * 80) + # Initialize Advoware client advoware = AdvowareAPI(ctx) @@ -43,7 +49,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: # Get request body json_data = request.body - ctx.logger.info(f"Proxying POST request to Advoware: {endpoint}") result = await advoware.api_call( endpoint, method='POST', @@ -51,11 +56,17 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: json_data=json_data ) + ctx.logger.info("✅ Proxy POST erfolgreich") return ApiResponse(status=200, body={'result': result}) except Exception as e: - ctx.logger.error(f"Proxy error: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ ADVOWARE PROXY POST FEHLER") + ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} ) + ) diff --git a/steps/advoware_proxy/advoware_api_proxy_put_step.py b/steps/advoware_proxy/advoware_api_proxy_put_step.py index 2618ced..90acfda 100644 --- a/steps/advoware_proxy/advoware_api_proxy_put_step.py +++ b/steps/advoware_proxy/advoware_api_proxy_put_step.py @@ -34,6 +34,12 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: body={'error': 'Endpoint required as query parameter'} ) + ctx.logger.info("=" * 80) + ctx.logger.info("🔄 ADVOWARE PROXY: PUT REQUEST") + ctx.logger.info("=" * 80) + ctx.logger.info(f"Endpoint: {endpoint}") + ctx.logger.info("=" * 80) + # Initialize Advoware client advoware = AdvowareAPI(ctx) @@ -43,7 +49,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: # Get request body json_data = request.body - ctx.logger.info(f"Proxying PUT request to Advoware: {endpoint}") result = await advoware.api_call( endpoint, method='PUT', @@ -51,11 +56,17 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: json_data=json_data ) + ctx.logger.info("✅ Proxy PUT erfolgreich") return ApiResponse(status=200, body={'result': result}) except Exception as e: - ctx.logger.error(f"Proxy error: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ ADVOWARE PROXY PUT FEHLER") + ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} ) + ) diff --git a/steps/vmh/beteiligte_sync_event_step.py b/steps/vmh/beteiligte_sync_event_step.py index 793349b..20cf719 100644 --- a/steps/vmh/beteiligte_sync_event_step.py +++ b/steps/vmh/beteiligte_sync_event_step.py @@ -42,16 +42,13 @@ config = { } -async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> Optional[Dict[str, Any]]: +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: """ Zentraler Sync-Handler für Beteiligte Args: event_data: Event data mit entity_id, action, source ctx: Motia FlowContext - - Returns: - Optional result dict """ entity_id = event_data.get('entity_id') action = event_data.get('action') @@ -61,11 +58,13 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> Optional if not entity_id: step_logger.error("Keine entity_id im Event gefunden") - return None + return - step_logger.info( - f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}" - ) + step_logger.info("=" * 80) + step_logger.info(f"🔄 BETEILIGTE SYNC HANDLER: {action.upper()}") + step_logger.info("=" * 80) + step_logger.info(f"Entity: {entity_id} | Source: {source}") + step_logger.info("=" * 80) # Get shared Redis client (centralized) redis_client = get_redis_client(strict=False) diff --git a/steps/vmh/document_sync_event_step.py b/steps/vmh/document_sync_event_step.py index eb8bdfc..6b00573 100644 --- a/steps/vmh/document_sync_event_step.py +++ b/steps/vmh/document_sync_event_step.py @@ -14,10 +14,9 @@ from motia import FlowContext from services.espocrm import EspoCRMAPI from services.document_sync_utils import DocumentSync from services.xai_service import XAIService +from services.redis_client import get_redis_client import hashlib import json -import redis -import os config = { "name": "VMH Document Sync Handler", @@ -32,7 +31,7 @@ config = { } -async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: """Zentraler Sync-Handler für Documents""" entity_id = event_data.get('entity_id') entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente @@ -52,20 +51,11 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): ctx.logger.info(f"Source: {source}") ctx.logger.info("=" * 80) - # Shared Redis client for distributed locking - redis_host = os.getenv('REDIS_HOST', 'localhost') - redis_port = int(os.getenv('REDIS_PORT', '6379')) - redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + # Shared Redis client for distributed locking (centralized factory) + redis_client = get_redis_client(strict=False) - redis_client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - decode_responses=True - ) - - # APIs initialisieren - espocrm = EspoCRMAPI() + # APIs initialisieren (mit Context für besseres Logging) + espocrm = EspoCRMAPI(ctx) sync_utils = DocumentSync(espocrm, redis_client, ctx) xai_service = XAIService(ctx) diff --git a/steps/vmh/webhook/beteiligte_create_api_step.py b/steps/vmh/webhook/beteiligte_create_api_step.py index b5fde55..23d7bb2 100644 --- a/steps/vmh/webhook/beteiligte_create_api_step.py +++ b/steps/vmh/webhook/beteiligte_create_api_step.py @@ -26,8 +26,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Beteiligte Create empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("📥 VMH WEBHOOK: BETEILIGTE CREATE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs aus dem Batch entity_ids = set() @@ -53,7 +56,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Create Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -65,7 +69,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: VMH CREATE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={ diff --git a/steps/vmh/webhook/beteiligte_update_api_step.py b/steps/vmh/webhook/beteiligte_update_api_step.py index 53b0ac9..acb9207 100644 --- a/steps/vmh/webhook/beteiligte_update_api_step.py +++ b/steps/vmh/webhook/beteiligte_update_api_step.py @@ -26,8 +26,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Beteiligte Update empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("📥 VMH WEBHOOK: BETEILIGTE UPDATE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs aus dem Batch entity_ids = set() @@ -53,7 +56,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Update Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -65,7 +69,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: VMH UPDATE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={ diff --git a/steps/vmh/webhook/document_update_api_step.py b/steps/vmh/webhook/document_update_api_step.py index 5b3c51b..69aa40d 100644 --- a/steps/vmh/webhook/document_update_api_step.py +++ b/steps/vmh/webhook/document_update_api_step.py @@ -25,17 +25,22 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Document Update empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("📥 VMH WEBHOOK: DOCUMENT UPDATE") + ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") # Sammle alle IDs aus dem Batch entity_ids = set() + entity_type = 'CDokumente' # Default if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: entity_ids.add(entity['id']) - entity_type = entity.get('entityType', 'CDokumente') + # Take entityType from first entity if present + if entity_type == 'CDokumente': + entity_type = entity.get('entityType', 'CDokumente') elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') @@ -48,12 +53,15 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: 'topic': 'vmh.document.update', 'data': { 'entity_id': entity_id, - 'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente', + 'entity_type': entity_type, 'action': 'update', 'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None } }) + ctx.logger.info("✅ Document Update Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") + return ApiResponse( status=200, body={ @@ -64,8 +72,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler im Document Update Webhook: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: DOCUMENT UPDATE WEBHOOK") + ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") + ctx.logger.error("=" * 80) return ApiResponse( status=500,