diff --git a/services/advoware.py b/services/advoware.py index a1fc95c..d79bdd6 100644 --- a/services/advoware.py +++ b/services/advoware.py @@ -8,7 +8,6 @@ import hashlib import base64 import os import datetime -import logging from typing import Optional, Dict, Any from services.exceptions import ( @@ -21,8 +20,6 @@ from services.redis_client import get_redis_client from services.config import ADVOWARE_CONFIG, API_CONFIG from services.logging_utils import get_service_logger -logger = logging.getLogger(__name__) - class AdvowareAPI: """ @@ -93,7 +90,7 @@ class AdvowareAPI: try: api_key_bytes = base64.b64decode(self.api_key) - logger.debug("API Key decoded from base64") + self.logger.debug("API Key decoded from base64") except Exception as e: self._log(f"API Key not base64-encoded, using as-is: {e}", level='debug') api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key diff --git a/services/advoware_service.py b/services/advoware_service.py index 8a253dc..90244b9 100644 --- a/services/advoware_service.py +++ b/services/advoware_service.py @@ -1,24 +1,29 @@ """ Advoware Service Wrapper -Erweitert AdvowareAPI mit höheren Operations + +Extends AdvowareAPI with higher-level operations for business logic. """ -import logging from typing import Dict, Any, Optional from services.advoware import AdvowareAPI - -logger = logging.getLogger(__name__) +from services.logging_utils import get_service_logger class AdvowareService: """ - Service-Layer für Advoware Operations - Verwendet AdvowareAPI für API-Calls + Service layer for Advoware operations. + Uses AdvowareAPI for API calls. """ def __init__(self, context=None): self.api = AdvowareAPI(context) self.context = context + self.logger = get_service_logger('advoware_service', context) + + def _log(self, message: str, level: str = 'info') -> None: + """Internal logging helper""" + log_func = getattr(self.logger, level, self.logger.info) + log_func(message) async def api_call(self, *args, **kwargs): """Delegate api_call to underlying AdvowareAPI""" @@ -26,29 +31,29 @@ class AdvowareService: # ========== BETEILIGTE ========== - async def get_beteiligter(self, betnr: int) -> Optional[Dict]: + async def get_beteiligter(self, betnr: int) -> Optional[Dict[str, Any]]: """ - Lädt Beteiligten mit allen Daten + Load Beteiligte with all data. Returns: - Beteiligte-Objekt + Beteiligte object or None """ try: endpoint = f"api/v1/advonet/Beteiligte/{betnr}" result = await self.api.api_call(endpoint, method='GET') return result except Exception as e: - logger.error(f"[ADVO] Fehler beim Laden von Beteiligte {betnr}: {e}", exc_info=True) + self._log(f"[ADVO] Error loading Beteiligte {betnr}: {e}", level='error') return None # ========== KOMMUNIKATION ========== - async def create_kommunikation(self, betnr: int, data: Dict[str, Any]) -> Optional[Dict]: + async def create_kommunikation(self, betnr: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ - Erstellt neue Kommunikation + Create new Kommunikation. Args: - betnr: Beteiligten-Nummer + betnr: Beteiligte number data: { 'tlf': str, # Required 'bemerkung': str, # Optional @@ -57,68 +62,68 @@ class AdvowareService: } Returns: - Neue Kommunikation mit 'id' + New Kommunikation with 'id' or None """ try: endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen" result = await self.api.api_call(endpoint, method='POST', json_data=data) if result: - logger.info(f"[ADVO] ✅ Created Kommunikation: betnr={betnr}, kommKz={data.get('kommKz')}") + self._log(f"[ADVO] ✅ Created Kommunikation: betnr={betnr}, kommKz={data.get('kommKz')}") return result except Exception as e: - logger.error(f"[ADVO] Fehler beim Erstellen von Kommunikation: {e}", exc_info=True) + self._log(f"[ADVO] Error creating Kommunikation: {e}", level='error') return None async def update_kommunikation(self, betnr: int, komm_id: int, data: Dict[str, Any]) -> bool: """ - Aktualisiert bestehende Kommunikation + Update existing Kommunikation. Args: - betnr: Beteiligten-Nummer - komm_id: Kommunikation-ID + betnr: Beteiligte number + komm_id: Kommunikation ID data: { 'tlf': str, # Optional 'bemerkung': str, # Optional 'online': bool # Optional } - NOTE: kommKz ist READ-ONLY und kann nicht geändert werden + NOTE: kommKz is READ-ONLY and cannot be changed Returns: - True wenn erfolgreich + True if successful """ try: endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}" await self.api.api_call(endpoint, method='PUT', json_data=data) - logger.info(f"[ADVO] ✅ Updated Kommunikation: betnr={betnr}, komm_id={komm_id}") + self._log(f"[ADVO] ✅ Updated Kommunikation: betnr={betnr}, komm_id={komm_id}") return True except Exception as e: - logger.error(f"[ADVO] Fehler beim Update von Kommunikation: {e}", exc_info=True) + self._log(f"[ADVO] Error updating Kommunikation: {e}", level='error') return False async def delete_kommunikation(self, betnr: int, komm_id: int) -> bool: """ - Löscht Kommunikation (aktuell 403 Forbidden) + Delete Kommunikation (currently returns 403 Forbidden). - NOTE: DELETE ist in Advoware API deaktiviert - Verwende stattdessen: Leere Slots mit empty_slot_marker + NOTE: DELETE is disabled in Advoware API. + Use empty slots with empty_slot_marker instead. Returns: - True wenn erfolgreich + True if successful """ try: endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}" await self.api.api_call(endpoint, method='DELETE') - logger.info(f"[ADVO] ✅ Deleted Kommunikation: betnr={betnr}, komm_id={komm_id}") + self._log(f"[ADVO] ✅ Deleted Kommunikation: betnr={betnr}, komm_id={komm_id}") return True except Exception as e: # Expected: 403 Forbidden - logger.warning(f"[ADVO] DELETE not allowed (expected): {e}") + self._log(f"[ADVO] DELETE not allowed (expected): {e}", level='warning') return False diff --git a/services/beteiligte_sync_utils.py b/services/beteiligte_sync_utils.py index e88a49b..f41d8b3 100644 --- a/services/beteiligte_sync_utils.py +++ b/services/beteiligte_sync_utils.py @@ -207,16 +207,15 @@ class BeteiligteSync: except: pass - @staticmethod - def parse_timestamp(ts: Any) -> Optional[datetime]: + def parse_timestamp(self, ts: Any) -> Optional[datetime]: """ - Parse verschiedene Timestamp-Formate zu datetime + Parse various timestamp formats to datetime. Args: - ts: String, datetime oder None + ts: String, datetime or None Returns: - datetime-Objekt oder None + datetime object or None """ if not ts: return None @@ -225,13 +224,13 @@ class BeteiligteSync: return ts if isinstance(ts, str): - # EspoCRM Format: "2026-02-07 14:30:00" - # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" + # EspoCRM format: "2026-02-07 14:30:00" + # Advoware format: "2026-02-07T14:30:00" or "2026-02-07T14:30:00Z" try: - # Entferne trailing Z falls vorhanden + # Remove trailing Z if present ts = ts.rstrip('Z') - # Versuche verschiedene Formate + # Try various formats for fmt in [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', @@ -242,11 +241,11 @@ class BeteiligteSync: except ValueError: continue - # Fallback: ISO-Format + # Fallback: ISO format return datetime.fromisoformat(ts) except Exception as e: - logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}") + self._log(f"Could not parse timestamp: {ts} - {e}", level='warning') return None return None diff --git a/services/document_sync_utils.py b/services/document_sync_utils.py index 07dc1f3..df8e0df 100644 --- a/services/document_sync_utils.py +++ b/services/document_sync_utils.py @@ -1,17 +1,18 @@ """ Document Sync Utilities -Hilfsfunktionen für Document-Synchronisation mit xAI: +Utility functions for document synchronization with xAI: - Distributed locking via Redis + syncStatus -- Entscheidungslogik: Wann muss ein Document zu xAI? -- Related Entities ermitteln (Many-to-Many Attachments) -- xAI Collection Management +- Decision logic: When does a document need xAI sync? +- Related entities determination (Many-to-Many attachments) +- xAI Collection management """ from typing import Dict, Any, Optional, List, Tuple from datetime import datetime, timedelta from services.sync_utils_base import BaseSyncUtils +from services.models import FileStatus, XAISyncStatus # Max retry before permanent failure MAX_SYNC_RETRIES = 5 @@ -21,10 +22,10 @@ RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h class DocumentSync(BaseSyncUtils): - """Utility-Klasse für Document-Synchronisation mit xAI""" + """Utility class for document synchronization with xAI""" def _get_lock_key(self, entity_id: str) -> str: - """Redis Lock-Key für Documents""" + """Redis lock key for documents""" return f"sync_lock:document:{entity_id}" async def acquire_sync_lock(self, entity_id: str, entity_type: str = 'CDokumente') -> bool: @@ -45,13 +46,13 @@ class DocumentSync(BaseSyncUtils): self._log(f"Redis lock bereits aktiv für {entity_type} {entity_id}", level='warn') return False - # STEP 2: Update xaiSyncStatus auf pending_sync + # STEP 2: Update xaiSyncStatus to pending_sync try: await self.espocrm.update_entity(entity_type, entity_id, { - 'xaiSyncStatus': 'pending_sync' + 'xaiSyncStatus': XAISyncStatus.PENDING_SYNC.value }) except Exception as e: - self._log(f"Konnte xaiSyncStatus nicht setzen: {e}", level='debug') + self._log(f"Could not set xaiSyncStatus: {e}", level='debug') self._log(f"Sync-Lock für {entity_type} {entity_id} erworben") return True @@ -84,16 +85,16 @@ class DocumentSync(BaseSyncUtils): try: update_data = {} - # xaiSyncStatus setzen: clean bei Erfolg, failed bei Fehler + # Set xaiSyncStatus: clean on success, failed on error try: - update_data['xaiSyncStatus'] = 'clean' if success else 'failed' + update_data['xaiSyncStatus'] = XAISyncStatus.CLEAN.value if success else XAISyncStatus.FAILED.value if error_message: update_data['xaiSyncError'] = error_message[:2000] else: update_data['xaiSyncError'] = None except: - pass # Felder existieren evtl. nicht + pass # Fields may not exist # Merge extra fields (z.B. xaiFileId, xaiCollections) if extra_fields: @@ -120,37 +121,37 @@ class DocumentSync(BaseSyncUtils): entity_type: str = 'CDokumente' ) -> Tuple[bool, List[str], str]: """ - Entscheidet ob ein Document zu xAI synchronisiert werden muss + Decide if a document needs to be synchronized to xAI. - Prüft: - 1. Datei-Status Feld ("Neu", "Geändert") - 2. Hash-Werte für Change Detection - 3. Related Entities mit xAI Collections + Checks: + 1. File status field ("new", "changed") + 2. Hash values for change detection + 3. Related entities with xAI collections Args: - document: Vollständiges Document Entity von EspoCRM + document: Complete document entity from EspoCRM Returns: Tuple[bool, List[str], str]: - - bool: Ob Sync nötig ist - - List[str]: Liste der Collection-IDs in die das Document soll - - str: Grund/Beschreibung der Entscheidung + - bool: Whether sync is needed + - List[str]: List of collection IDs where the document should go + - str: Reason/description of the decision """ doc_id = document.get('id') doc_name = document.get('name', 'Unbenannt') - # xAI-relevante Felder + # xAI-relevant fields xai_file_id = document.get('xaiFileId') xai_collections = document.get('xaiCollections') or [] xai_sync_status = document.get('xaiSyncStatus') - # Datei-Status und Hash-Felder + # File status and hash fields datei_status = document.get('dateiStatus') or document.get('fileStatus') file_md5 = document.get('md5') or document.get('fileMd5') file_sha = document.get('sha') or document.get('fileSha') - xai_synced_hash = document.get('xaiSyncedHash') # Hash beim letzten xAI-Sync + xai_synced_hash = document.get('xaiSyncedHash') # Hash at last xAI sync - self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})") + self._log(f"📋 Document analysis: {doc_name} (ID: {doc_id})") self._log(f" xaiFileId: {xai_file_id or 'N/A'}") self._log(f" xaiCollections: {xai_collections}") self._log(f" xaiSyncStatus: {xai_sync_status or 'N/A'}") @@ -165,65 +166,74 @@ class DocumentSync(BaseSyncUtils): entity_type=entity_type ) - # Prüfe xaiSyncStatus="no_sync" → kein Sync für dieses Dokument - if xai_sync_status == 'no_sync': - self._log("⏭️ Kein xAI-Sync nötig: xaiSyncStatus='no_sync'") - return (False, [], "xaiSyncStatus ist 'no_sync'") + # Check xaiSyncStatus="no_sync" -> no sync for this document + if xai_sync_status == XAISyncStatus.NO_SYNC.value: + self._log("⏭️ No xAI sync needed: xaiSyncStatus='no_sync'") + return (False, [], "xaiSyncStatus is 'no_sync'") if not target_collections: - self._log("⏭️ Kein xAI-Sync nötig: Keine Related Entities mit xAI Collections") - return (False, [], "Keine verknüpften Entities mit xAI Collections") + self._log("⏭️ No xAI sync needed: No related entities with xAI collections") + return (False, [], "No linked entities with xAI collections") # ═══════════════════════════════════════════════════════════════ - # PRIORITY CHECK 1: xaiSyncStatus="unclean" → Dokument wurde geändert + # PRIORITY CHECK 1: xaiSyncStatus="unclean" -> document was changed # ═══════════════════════════════════════════════════════════════ - if xai_sync_status == 'unclean': - self._log(f"🆕 xaiSyncStatus='unclean' → xAI-Sync ERFORDERLICH") + if xai_sync_status == XAISyncStatus.UNCLEAN.value: + self._log(f"🆕 xaiSyncStatus='unclean' → xAI sync REQUIRED") return (True, target_collections, "xaiSyncStatus='unclean'") # ═══════════════════════════════════════════════════════════════ - # PRIORITY CHECK 2: fileStatus "new" oder "changed" + # PRIORITY CHECK 2: fileStatus "new" or "changed" # ═══════════════════════════════════════════════════════════════ - if datei_status in ['new', 'changed', 'neu', 'geändert', 'New', 'Changed', 'Neu', 'Geändert']: - self._log(f"🆕 fileStatus: '{datei_status}' → xAI-Sync ERFORDERLICH") + if datei_status in [ + FileStatus.NEW.value, + FileStatus.CHANGED.value, + 'neu', # Legacy German values + 'geändert', # Legacy German values + 'Neu', # Case variations + 'Geändert', + 'New', + 'Changed' + ]: + self._log(f"🆕 fileStatus: '{datei_status}' → xAI sync REQUIRED") if target_collections: return (True, target_collections, f"fileStatus: {datei_status}") else: - # Datei ist neu/geändert aber keine Collections gefunden - self._log(f"⚠️ fileStatus '{datei_status}' aber keine Collections gefunden - überspringe Sync") - return (False, [], f"fileStatus: {datei_status}, aber keine Collections") + # File is new/changed but no collections found + self._log(f"⚠️ fileStatus '{datei_status}' but no collections found - skipping sync") + return (False, [], f"fileStatus: {datei_status}, but no collections") # ═══════════════════════════════════════════════════════════════ - # FALL 1: Document ist bereits in xAI UND Collections sind gesetzt + # CASE 1: Document is already in xAI AND collections are set # ═══════════════════════════════════════════════════════════════ if xai_file_id: - self._log(f"✅ Document bereits in xAI gesynct mit {len(target_collections)} Collection(s)") + self._log(f"✅ Document already synced to xAI with {len(target_collections)} collection(s)") - # Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich) + # Check if file content was changed (hash comparison) current_hash = file_md5 or file_sha if current_hash and xai_synced_hash: if current_hash != xai_synced_hash: - self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich") - self._log(f" Alt: {xai_synced_hash[:16]}...") - self._log(f" Neu: {current_hash[:16]}...") - return (True, target_collections, "File-Inhalt geändert (Hash-Mismatch)") + self._log(f"🔄 Hash change detected! RESYNC required") + self._log(f" Old: {xai_synced_hash[:16]}...") + self._log(f" New: {current_hash[:16]}...") + return (True, target_collections, "File content changed (hash mismatch)") else: - self._log(f"✅ Hash identisch - keine Änderung") + self._log(f"✅ Hash identical - no change") else: - self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich") + self._log(f"⚠️ No hash values available for comparison") - return (False, target_collections, "Bereits gesynct, keine Änderung erkannt") + return (False, target_collections, "Already synced, no change detected") # ═══════════════════════════════════════════════════════════════ - # FALL 2: Document hat xaiFileId aber Collections ist leer/None + # CASE 2: Document has xaiFileId but collections is empty/None # ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════ - # FALL 3: Collections vorhanden aber kein Status/Hash-Trigger + # CASE 3: Collections present but no status/hash trigger # ═══════════════════════════════════════════════════════════════ - self._log(f"✅ Document ist mit {len(target_collections)} Entity/ies verknüpft die Collections haben") - return (True, target_collections, "Verknüpft mit Entities die Collections benötigen") + self._log(f"✅ Document is linked to {len(target_collections)} entity/ies with collections") + return (True, target_collections, "Linked to entities that require collections") async def _get_required_collections_from_relations( self, @@ -231,20 +241,20 @@ class DocumentSync(BaseSyncUtils): entity_type: str = 'Document' ) -> List[str]: """ - Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind + Determine all xAI collection IDs of entities linked to this document. - EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein + EspoCRM Many-to-Many: Document can be linked to arbitrary entities (CBeteiligte, Account, CVmhErstgespraech, etc.) Args: document_id: Document ID Returns: - Liste von xAI Collection-IDs (dedupliziert) + List of xAI collection IDs (deduplicated) """ collections = set() - self._log(f"🔍 Prüfe Relations von {entity_type} {document_id}...") + self._log(f"🔍 Checking relations of {entity_type} {document_id}...") try: entity_def = await self.espocrm.get_entity_def(entity_type) diff --git a/services/models.py b/services/models.py index b0538bb..f58c1e3 100644 --- a/services/models.py +++ b/services/models.py @@ -16,7 +16,7 @@ from enum import Enum # ========== Enums ========== class Rechtsform(str, Enum): - """Rechtsformen für Beteiligte""" + """Legal forms for Beteiligte""" NATUERLICHE_PERSON = "" GMBH = "GmbH" AG = "AG" @@ -29,7 +29,7 @@ class Rechtsform(str, Enum): class SyncStatus(str, Enum): - """Sync Status für EspoCRM Entities""" + """Sync status for EspoCRM entities (Beteiligte)""" PENDING_SYNC = "pending_sync" SYNCING = "syncing" CLEAN = "clean" @@ -38,8 +38,30 @@ class SyncStatus(str, Enum): PERMANENTLY_FAILED = "permanently_failed" +class FileStatus(str, Enum): + """Valid values for CDokumente.fileStatus field""" + NEW = "new" + CHANGED = "changed" + SYNCED = "synced" + + def __str__(self) -> str: + return self.value + + +class XAISyncStatus(str, Enum): + """Valid values for CDokumente.xaiSyncStatus field""" + NO_SYNC = "no_sync" # Entity has no xAI collections + PENDING_SYNC = "pending_sync" # Sync in progress (locked) + CLEAN = "clean" # Synced successfully + UNCLEAN = "unclean" # Needs re-sync (file changed) + FAILED = "failed" # Sync failed (see xaiSyncError) + + def __str__(self) -> str: + return self.value + + class SalutationType(str, Enum): - """Anredetypen""" + """Salutation types""" HERR = "Herr" FRAU = "Frau" DIVERS = "Divers" diff --git a/services/redis_client.py b/services/redis_client.py index a526648..1106cd5 100644 --- a/services/redis_client.py +++ b/services/redis_client.py @@ -1,51 +1,58 @@ """ Redis Client Factory -Zentralisierte Redis-Client-Verwaltung mit: -- Singleton Pattern -- Connection Pooling -- Automatic Reconnection -- Health Checks +Centralized Redis client management with: +- Singleton pattern +- Connection pooling +- Automatic reconnection +- Health checks """ import redis import os -import logging from typing import Optional from services.exceptions import RedisConnectionError - -logger = logging.getLogger(__name__) +from services.logging_utils import get_service_logger class RedisClientFactory: """ - Singleton Factory für Redis Clients. + Singleton factory for Redis clients. - Vorteile: - - Eine zentrale Konfiguration - - Connection Pooling - - Lazy Initialization - - Besseres Error Handling + Benefits: + - Centralized configuration + - Connection pooling + - Lazy initialization + - Better error handling """ _instance: Optional[redis.Redis] = None _connection_pool: Optional[redis.ConnectionPool] = None + _logger = None + + @classmethod + def _get_logger(cls): + """Get logger instance (lazy initialization)""" + if cls._logger is None: + cls._logger = get_service_logger('redis_factory', None) + return cls._logger @classmethod def get_client(cls, strict: bool = False) -> Optional[redis.Redis]: """ - Gibt Redis Client zurück (erstellt wenn nötig). + Return Redis client (creates if needed). Args: - strict: Wenn True, wirft Exception bei Verbindungsfehlern. - Wenn False, gibt None zurück (für optionale Redis-Nutzung). + strict: If True, raises exception on connection failures. + If False, returns None (for optional Redis usage). Returns: - Redis client oder None (wenn strict=False und Verbindung fehlschlägt) + Redis client or None (if strict=False and connection fails) Raises: - RedisConnectionError: Wenn strict=True und Verbindung fehlschlägt + RedisConnectionError: If strict=True and connection fails """ + logger = cls._get_logger() if cls._instance is None: try: cls._instance = cls._create_client() @@ -65,14 +72,15 @@ class RedisClientFactory: @classmethod def _create_client(cls) -> redis.Redis: """ - Erstellt neuen Redis Client mit Connection Pool. + Create new Redis client with connection pool. Returns: Configured Redis client Raises: - redis.ConnectionError: Bei Verbindungsproblemen + redis.ConnectionError: On connection problems """ + logger = cls._get_logger() # Load configuration from environment redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) @@ -94,7 +102,7 @@ class RedisClientFactory: socket_timeout=redis_timeout, socket_connect_timeout=redis_timeout, max_connections=redis_max_connections, - decode_responses=True # Auto-decode bytes zu strings + decode_responses=True # Auto-decode bytes to strings ) # Create client from pool @@ -108,10 +116,11 @@ class RedisClientFactory: @classmethod def reset(cls) -> None: """ - Reset factory state (hauptsächlich für Tests). + Reset factory state (mainly for tests). - Schließt bestehende Verbindungen und setzt Singleton zurück. + Closes existing connections and resets singleton. """ + logger = cls._get_logger() if cls._instance: try: cls._instance.close() @@ -131,11 +140,12 @@ class RedisClientFactory: @classmethod def health_check(cls) -> bool: """ - Prüft Redis-Verbindung. + Check Redis connection. Returns: - True wenn Redis erreichbar, False sonst + True if Redis is reachable, False otherwise """ + logger = cls._get_logger() try: client = cls.get_client(strict=False) if client is None: @@ -150,11 +160,12 @@ class RedisClientFactory: @classmethod def get_info(cls) -> Optional[dict]: """ - Gibt Redis Server Info zurück (für Monitoring). + Return Redis server info (for monitoring). Returns: - Redis info dict oder None bei Fehler + Redis info dict or None on error """ + logger = cls._get_logger() try: client = cls.get_client(strict=False) if client is None: @@ -170,22 +181,22 @@ class RedisClientFactory: def get_redis_client(strict: bool = False) -> Optional[redis.Redis]: """ - Convenience function für Redis Client. + Convenience function for Redis client. Args: - strict: Wenn True, wirft Exception bei Fehler + strict: If True, raises exception on error Returns: - Redis client oder None + Redis client or None """ return RedisClientFactory.get_client(strict=strict) def is_redis_available() -> bool: """ - Prüft ob Redis verfügbar ist. + Check if Redis is available. Returns: - True wenn Redis erreichbar + True if Redis is reachable """ return RedisClientFactory.health_check() diff --git a/steps/advoware_cal_sync/calendar_sync_api_step.py b/steps/advoware_cal_sync/calendar_sync_api_step.py index bb59972..899088a 100644 --- a/steps/advoware_cal_sync/calendar_sync_api_step.py +++ b/steps/advoware_cal_sync/calendar_sync_api_step.py @@ -7,7 +7,7 @@ Supports syncing a single employee or all employees. import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) -from calendar_sync_utils import get_redis_client, set_employee_lock, log_operation +from calendar_sync_utils import get_redis_client, set_employee_lock, get_logger from motia import http, ApiRequest, ApiResponse, FlowContext @@ -38,10 +38,10 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: kuerzel = body.get('kuerzel') if not kuerzel: return ApiResponse( - status=400, + status_code=400, body={ 'error': 'kuerzel required', - 'message': 'Bitte kuerzel im Body angeben' + 'message': 'Please provide kuerzel in body' } ) @@ -49,7 +49,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: if kuerzel_upper == 'ALL': # Emit sync-all event - log_operation('info', "Calendar Sync API: Emitting sync-all event", context=ctx) + ctx.logger.info("Calendar Sync API: Emitting sync-all event") await ctx.enqueue({ "topic": "calendar_sync_all", "data": { @@ -57,10 +57,10 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: } }) return ApiResponse( - status=200, + status_code=200, body={ 'status': 'triggered', - 'message': 'Calendar sync wurde für alle Mitarbeiter ausgelöst', + 'message': 'Calendar sync triggered for all employees', 'triggered_by': 'api' } ) @@ -69,9 +69,9 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: redis_client = get_redis_client(ctx) if not set_employee_lock(redis_client, kuerzel_upper, 'api', ctx): - log_operation('info', f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping", context=ctx) + ctx.logger.info(f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping") return ApiResponse( - status=409, + status_code=409, body={ 'status': 'conflict', 'message': f'Calendar sync already active for {kuerzel_upper}', @@ -80,7 +80,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: } ) - log_operation('info', f"Calendar Sync API called for {kuerzel_upper}", context=ctx) + ctx.logger.info(f"Calendar Sync API called for {kuerzel_upper}") # Lock successfully set, now emit event await ctx.enqueue({ @@ -92,19 +92,19 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: }) return ApiResponse( - status=200, + status_code=200, body={ 'status': 'triggered', - 'message': f'Calendar sync was triggered for {kuerzel_upper}', + 'message': f'Calendar sync triggered for {kuerzel_upper}', 'kuerzel': kuerzel_upper, 'triggered_by': 'api' } ) except Exception as e: - log_operation('error', f"Error in API trigger: {e}", context=ctx) + ctx.logger.error(f"Error in API trigger: {e}") return ApiResponse( - status=500, + status_code=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/advoware_cal_sync/calendar_sync_utils.py b/steps/advoware_cal_sync/calendar_sync_utils.py index 3148b1f..3c379c4 100644 --- a/steps/advoware_cal_sync/calendar_sync_utils.py +++ b/steps/advoware_cal_sync/calendar_sync_utils.py @@ -3,50 +3,24 @@ Calendar Sync Utilities Shared utility functions for calendar synchronization between Google Calendar and Advoware. """ -import logging import asyncpg import os import redis import time +from typing import Optional, Any, List from googleapiclient.discovery import build from google.oauth2 import service_account - -# Configure logging -logger = logging.getLogger(__name__) +from services.logging_utils import get_service_logger -def log_operation(level: str, message: str, context=None, **context_vars): - """Centralized logging with context, supporting file and console logging.""" - context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None) - full_message = f"{message} {context_str}".strip() - - # Use ctx.logger if context is available (Motia III FlowContext) - if context and hasattr(context, 'logger'): - if level == 'info': - context.logger.info(full_message) - elif level == 'warning': - context.logger.warning(full_message) - elif level == 'error': - context.logger.error(full_message) - elif level == 'debug': - context.logger.debug(full_message) - else: - # Fallback to standard logger - if level == 'info': - logger.info(full_message) - elif level == 'warning': - logger.warning(full_message) - elif level == 'error': - logger.error(full_message) - elif level == 'debug': - logger.debug(full_message) - - # Also log to console for journalctl visibility - print(f"[{level.upper()}] {full_message}") +def get_logger(context=None): + """Get logger for calendar sync operations""" + return get_service_logger('calendar_sync', context) async def connect_db(context=None): """Connect to Postgres DB from environment variables.""" + logger = get_logger(context) try: conn = await asyncpg.connect( host=os.getenv('POSTGRES_HOST', 'localhost'), @@ -57,12 +31,13 @@ async def connect_db(context=None): ) return conn except Exception as e: - log_operation('error', f"Failed to connect to DB: {e}", context=context) + logger.error(f"Failed to connect to DB: {e}") raise async def get_google_service(context=None): """Initialize Google Calendar service.""" + logger = get_logger(context) try: service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json') if not os.path.exists(service_account_path): @@ -75,48 +50,53 @@ async def get_google_service(context=None): service = build('calendar', 'v3', credentials=creds) return service except Exception as e: - log_operation('error', f"Failed to initialize Google service: {e}", context=context) + logger.error(f"Failed to initialize Google service: {e}") raise -def get_redis_client(context=None): +def get_redis_client(context=None) -> redis.Redis: """Initialize Redis client for calendar sync operations.""" + logger = get_logger(context) try: redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', '6379')), db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')), - socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')) + socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')), + decode_responses=True ) return redis_client except Exception as e: - log_operation('error', f"Failed to initialize Redis client: {e}", context=context) + logger.error(f"Failed to initialize Redis client: {e}") raise -async def get_advoware_employees(advoware, context=None): +async def get_advoware_employees(advoware, context=None) -> List[Any]: """Fetch list of employees from Advoware.""" + logger = get_logger(context) try: result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) employees = result if isinstance(result, list) else [] - log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context) + logger.info(f"Fetched {len(employees)} Advoware employees") return employees except Exception as e: - log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context) + logger.error(f"Failed to fetch Advoware employees: {e}") raise -def set_employee_lock(redis_client, kuerzel: str, triggered_by: str, context=None) -> bool: +def set_employee_lock(redis_client: redis.Redis, kuerzel: str, triggered_by: str, context=None) -> bool: """Set lock for employee sync operation.""" + logger = get_logger(context) employee_lock_key = f'calendar_sync_lock_{kuerzel}' if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None: - log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context) + logger.info(f"Sync already active for {kuerzel}, skipping") return False return True -def clear_employee_lock(redis_client, kuerzel: str, context=None): +def clear_employee_lock(redis_client: redis.Redis, kuerzel: str, context=None) -> None: """Clear lock for employee sync operation and update last-synced timestamp.""" + logger = get_logger(context) try: employee_lock_key = f'calendar_sync_lock_{kuerzel}' employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}' @@ -128,6 +108,6 @@ def clear_employee_lock(redis_client, kuerzel: str, context=None): # Delete the lock redis_client.delete(employee_lock_key) - log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context) + logger.debug(f"Cleared lock and updated last-synced for {kuerzel} to {current_time}") except Exception as e: - log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context) + logger.warning(f"Failed to clear lock and update last-synced for {kuerzel}: {e}") diff --git a/steps/vmh/webhook/bankverbindungen_create_api_step.py b/steps/vmh/webhook/bankverbindungen_create_api_step.py index c7b594a..dc18557 100644 --- a/steps/vmh/webhook/bankverbindungen_create_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_create_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Bankverbindungen Create", - "description": "Empfängt Create-Webhooks von EspoCRM für Bankverbindungen", + "description": "Receives create webhooks from EspoCRM for Bankverbindungen", "flows": ["vmh-bankverbindungen"], "triggers": [ http("POST", "/vmh/webhook/bankverbindungen/create") @@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() if isinstance(payload, list): @@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for create sync") # Emit events for entity_id in entity_ids: @@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Create Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Create Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'create', @@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN CREATE WEBHOOK") + ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN CREATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/bankverbindungen_delete_api_step.py b/steps/vmh/webhook/bankverbindungen_delete_api_step.py index 0c06a06..847c486 100644 --- a/steps/vmh/webhook/bankverbindungen_delete_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_delete_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Bankverbindungen Delete", - "description": "Empfängt Delete-Webhooks von EspoCRM für Bankverbindungen", + "description": "Receives delete webhooks from EspoCRM for Bankverbindungen", "flows": ["vmh-bankverbindungen"], "triggers": [ http("POST", "/vmh/webhook/bankverbindungen/delete") @@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs + # Collect all IDs entity_ids = set() if isinstance(payload, list): @@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync") # Emit events for entity_id in entity_ids: @@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Delete Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Delete Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'delete', @@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN DELETE WEBHOOK") + ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN DELETE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/bankverbindungen_update_api_step.py b/steps/vmh/webhook/bankverbindungen_update_api_step.py index 11c24ba..94dbf07 100644 --- a/steps/vmh/webhook/bankverbindungen_update_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_update_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Bankverbindungen Update", - "description": "Empfängt Update-Webhooks von EspoCRM für Bankverbindungen", + "description": "Receives update webhooks from EspoCRM for Bankverbindungen", "flows": ["vmh-bankverbindungen"], "triggers": [ http("POST", "/vmh/webhook/bankverbindungen/update") @@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs + # Collect all IDs entity_ids = set() if isinstance(payload, list): @@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for update sync") # Emit events for entity_id in entity_ids: @@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Update Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Update Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'update', @@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN UPDATE WEBHOOK") + ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN UPDATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/beteiligte_create_api_step.py b/steps/vmh/webhook/beteiligte_create_api_step.py index 23d7bb2..b8dfbde 100644 --- a/steps/vmh/webhook/beteiligte_create_api_step.py +++ b/steps/vmh/webhook/beteiligte_create_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Beteiligte Create", - "description": "Empfängt Create-Webhooks von EspoCRM für Beteiligte", + "description": "Receives create webhooks from EspoCRM for Beteiligte", "flows": ["vmh-beteiligte"], "triggers": [ http("POST", "/vmh/webhook/beteiligte/create") @@ -32,7 +32,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() if isinstance(payload, list): @@ -42,9 +42,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for create sync") - # Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock) + # Emit events for queue processing (deduplication via lock in event handler) for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.beteiligte.create', @@ -56,11 +56,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Create Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Create Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'create', @@ -70,11 +70,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: VMH CREATE WEBHOOK") + ctx.logger.error("❌ ERROR: VMH CREATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/vmh/webhook/beteiligte_delete_api_step.py b/steps/vmh/webhook/beteiligte_delete_api_step.py index d5fe8ad..763f38c 100644 --- a/steps/vmh/webhook/beteiligte_delete_api_step.py +++ b/steps/vmh/webhook/beteiligte_delete_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Beteiligte Delete", - "description": "Empfängt Delete-Webhooks von EspoCRM für Beteiligte", + "description": "Receives delete webhooks from EspoCRM for Beteiligte", "flows": ["vmh-beteiligte"], "triggers": [ http("POST", "/vmh/webhook/beteiligte/delete") @@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() if isinstance(payload, list): @@ -39,9 +39,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync") - # Emit events für Queue-Processing + # Emit events for queue processing for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.beteiligte.delete', @@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Delete Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Delete Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'delete', @@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: BETEILIGTE DELETE WEBHOOK") + ctx.logger.error("❌ ERROR: BETEILIGTE DELETE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/beteiligte_update_api_step.py b/steps/vmh/webhook/beteiligte_update_api_step.py index acb9207..b02e0a2 100644 --- a/steps/vmh/webhook/beteiligte_update_api_step.py +++ b/steps/vmh/webhook/beteiligte_update_api_step.py @@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "VMH Webhook Beteiligte Update", - "description": "Empfängt Update-Webhooks von EspoCRM für Beteiligte", + "description": "Receives update webhooks from EspoCRM for Beteiligte", "flows": ["vmh-beteiligte"], "triggers": [ http("POST", "/vmh/webhook/beteiligte/update") @@ -20,8 +20,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: """ Webhook handler for Beteiligte updates in EspoCRM. - Note: Loop-Prevention ist auf EspoCRM-Seite implementiert. - rowId-Updates triggern keine Webhooks mehr, daher keine Filterung nötig. + Note: Loop prevention is implemented on EspoCRM side. + rowId updates no longer trigger webhooks, so no filtering needed. """ try: payload = request.body or [] @@ -32,7 +32,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") ctx.logger.info("=" * 80) - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() if isinstance(payload, list): @@ -42,9 +42,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) - ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} IDs found for update sync") - # Emit events für Queue-Processing + # Emit events for queue processing for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.beteiligte.update', @@ -56,11 +56,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ VMH Update Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ VMH Update Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'status': 'received', 'action': 'update', @@ -70,11 +70,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: VMH UPDATE WEBHOOK") + ctx.logger.error("❌ ERROR: VMH UPDATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/vmh/webhook/document_create_api_step.py b/steps/vmh/webhook/document_create_api_step.py index bbdac1b..5f58f26 100644 --- a/steps/vmh/webhook/document_create_api_step.py +++ b/steps/vmh/webhook/document_create_api_step.py @@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() entity_type = 'CDokumente' # Default @@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') - ctx.logger.info(f"{len(entity_ids)} Document IDs zum Create-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} document IDs found for create sync") - # Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock) + # Emit events for queue processing (deduplication via lock in event handler) for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.document.create', @@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ Document Create Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ Document Create Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'success': True, - 'message': f'{len(entity_ids)} Document(s) zum Sync enqueued', + 'message': f'{len(entity_ids)} document(s) enqueued for sync', 'entity_ids': list(entity_ids) } ) except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: DOCUMENT CREATE WEBHOOK") + ctx.logger.error("❌ ERROR: DOCUMENT CREATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={ 'success': False, 'error': str(e) diff --git a/steps/vmh/webhook/document_delete_api_step.py b/steps/vmh/webhook/document_delete_api_step.py index 5c199e5..c74899e 100644 --- a/steps/vmh/webhook/document_delete_api_step.py +++ b/steps/vmh/webhook/document_delete_api_step.py @@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() entity_type = 'CDokumente' # Default @@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') - ctx.logger.info(f"{len(entity_ids)} Document IDs zum Delete-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} document IDs found for delete sync") - # Emit events für Queue-Processing + # Emit events for queue processing for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.document.delete', @@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ Document Delete Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ Document Delete Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'success': True, - 'message': f'{len(entity_ids)} Document(s) zum Delete enqueued', + 'message': f'{len(entity_ids)} document(s) enqueued for deletion', 'entity_ids': list(entity_ids) } ) except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: DOCUMENT DELETE WEBHOOK") + ctx.logger.error("❌ ERROR: DOCUMENT DELETE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={ 'success': False, 'error': str(e) diff --git a/steps/vmh/webhook/document_update_api_step.py b/steps/vmh/webhook/document_update_api_step.py index 69aa40d..88b3c15 100644 --- a/steps/vmh/webhook/document_update_api_step.py +++ b/steps/vmh/webhook/document_update_api_step.py @@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") - # Sammle alle IDs aus dem Batch + # Collect all IDs from batch entity_ids = set() entity_type = 'CDokumente' # Default @@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') - ctx.logger.info(f"{len(entity_ids)} Document IDs zum Update-Sync gefunden") + ctx.logger.info(f"{len(entity_ids)} document IDs found for update sync") - # Emit events für Queue-Processing + # Emit events for queue processing for entity_id in entity_ids: await ctx.enqueue({ 'topic': 'vmh.document.update', @@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info("✅ Document Update Webhook verarbeitet: " - f"{len(entity_ids)} Events emittiert") + ctx.logger.info("✅ Document Update Webhook processed: " + f"{len(entity_ids)} events emitted") return ApiResponse( - status=200, + status_code=200, body={ 'success': True, - 'message': f'{len(entity_ids)} Document(s) zum Sync enqueued', + 'message': f'{len(entity_ids)} document(s) enqueued for sync', 'entity_ids': list(entity_ids) } ) except Exception as e: ctx.logger.error("=" * 80) - ctx.logger.error("❌ FEHLER: DOCUMENT UPDATE WEBHOOK") + ctx.logger.error("❌ ERROR: DOCUMENT UPDATE WEBHOOK") ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") ctx.logger.error("=" * 80) return ApiResponse( - status=500, + status_code=500, body={ 'success': False, 'error': str(e)