Refactor and enhance logging in webhook handlers and Redis client

- Translated comments and docstrings from German to English for better clarity.
- Improved logging consistency across various webhook handlers for create, delete, and update operations.
- Centralized logging functionality by utilizing a dedicated logger utility.
- Added new enums for file and XAI sync statuses in models.
- Updated Redis client factory to use a centralized logger and improved error handling.
- Enhanced API responses to include more descriptive messages and status codes.
This commit is contained in:
bsiggel
2026-03-08 21:50:34 +00:00
parent f392ec0f06
commit a0cf845877
17 changed files with 300 additions and 276 deletions

View File

@@ -8,7 +8,6 @@ import hashlib
import base64
import os
import datetime
import logging
from typing import Optional, Dict, Any
from services.exceptions import (
@@ -21,8 +20,6 @@ from services.redis_client import get_redis_client
from services.config import ADVOWARE_CONFIG, API_CONFIG
from services.logging_utils import get_service_logger
logger = logging.getLogger(__name__)
class AdvowareAPI:
"""
@@ -93,7 +90,7 @@ class AdvowareAPI:
try:
api_key_bytes = base64.b64decode(self.api_key)
logger.debug("API Key decoded from base64")
self.logger.debug("API Key decoded from base64")
except Exception as e:
self._log(f"API Key not base64-encoded, using as-is: {e}", level='debug')
api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key

View File

@@ -1,24 +1,29 @@
"""
Advoware Service Wrapper
Erweitert AdvowareAPI mit höheren Operations
Extends AdvowareAPI with higher-level operations for business logic.
"""
import logging
from typing import Dict, Any, Optional
from services.advoware import AdvowareAPI
logger = logging.getLogger(__name__)
from services.logging_utils import get_service_logger
class AdvowareService:
"""
Service-Layer für Advoware Operations
Verwendet AdvowareAPI für API-Calls
Service layer for Advoware operations.
Uses AdvowareAPI for API calls.
"""
def __init__(self, context=None):
self.api = AdvowareAPI(context)
self.context = context
self.logger = get_service_logger('advoware_service', context)
def _log(self, message: str, level: str = 'info') -> None:
"""Internal logging helper"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(message)
async def api_call(self, *args, **kwargs):
"""Delegate api_call to underlying AdvowareAPI"""
@@ -26,29 +31,29 @@ class AdvowareService:
# ========== BETEILIGTE ==========
async def get_beteiligter(self, betnr: int) -> Optional[Dict]:
async def get_beteiligter(self, betnr: int) -> Optional[Dict[str, Any]]:
"""
Lädt Beteiligten mit allen Daten
Load Beteiligte with all data.
Returns:
Beteiligte-Objekt
Beteiligte object or None
"""
try:
endpoint = f"api/v1/advonet/Beteiligte/{betnr}"
result = await self.api.api_call(endpoint, method='GET')
return result
except Exception as e:
logger.error(f"[ADVO] Fehler beim Laden von Beteiligte {betnr}: {e}", exc_info=True)
self._log(f"[ADVO] Error loading Beteiligte {betnr}: {e}", level='error')
return None
# ========== KOMMUNIKATION ==========
async def create_kommunikation(self, betnr: int, data: Dict[str, Any]) -> Optional[Dict]:
async def create_kommunikation(self, betnr: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Erstellt neue Kommunikation
Create new Kommunikation.
Args:
betnr: Beteiligten-Nummer
betnr: Beteiligte number
data: {
'tlf': str, # Required
'bemerkung': str, # Optional
@@ -57,68 +62,68 @@ class AdvowareService:
}
Returns:
Neue Kommunikation mit 'id'
New Kommunikation with 'id' or None
"""
try:
endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen"
result = await self.api.api_call(endpoint, method='POST', json_data=data)
if result:
logger.info(f"[ADVO] ✅ Created Kommunikation: betnr={betnr}, kommKz={data.get('kommKz')}")
self._log(f"[ADVO] ✅ Created Kommunikation: betnr={betnr}, kommKz={data.get('kommKz')}")
return result
except Exception as e:
logger.error(f"[ADVO] Fehler beim Erstellen von Kommunikation: {e}", exc_info=True)
self._log(f"[ADVO] Error creating Kommunikation: {e}", level='error')
return None
async def update_kommunikation(self, betnr: int, komm_id: int, data: Dict[str, Any]) -> bool:
"""
Aktualisiert bestehende Kommunikation
Update existing Kommunikation.
Args:
betnr: Beteiligten-Nummer
komm_id: Kommunikation-ID
betnr: Beteiligte number
komm_id: Kommunikation ID
data: {
'tlf': str, # Optional
'bemerkung': str, # Optional
'online': bool # Optional
}
NOTE: kommKz ist READ-ONLY und kann nicht geändert werden
NOTE: kommKz is READ-ONLY and cannot be changed
Returns:
True wenn erfolgreich
True if successful
"""
try:
endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}"
await self.api.api_call(endpoint, method='PUT', json_data=data)
logger.info(f"[ADVO] ✅ Updated Kommunikation: betnr={betnr}, komm_id={komm_id}")
self._log(f"[ADVO] ✅ Updated Kommunikation: betnr={betnr}, komm_id={komm_id}")
return True
except Exception as e:
logger.error(f"[ADVO] Fehler beim Update von Kommunikation: {e}", exc_info=True)
self._log(f"[ADVO] Error updating Kommunikation: {e}", level='error')
return False
async def delete_kommunikation(self, betnr: int, komm_id: int) -> bool:
"""
Löscht Kommunikation (aktuell 403 Forbidden)
Delete Kommunikation (currently returns 403 Forbidden).
NOTE: DELETE ist in Advoware API deaktiviert
Verwende stattdessen: Leere Slots mit empty_slot_marker
NOTE: DELETE is disabled in Advoware API.
Use empty slots with empty_slot_marker instead.
Returns:
True wenn erfolgreich
True if successful
"""
try:
endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}"
await self.api.api_call(endpoint, method='DELETE')
logger.info(f"[ADVO] ✅ Deleted Kommunikation: betnr={betnr}, komm_id={komm_id}")
self._log(f"[ADVO] ✅ Deleted Kommunikation: betnr={betnr}, komm_id={komm_id}")
return True
except Exception as e:
# Expected: 403 Forbidden
logger.warning(f"[ADVO] DELETE not allowed (expected): {e}")
self._log(f"[ADVO] DELETE not allowed (expected): {e}", level='warning')
return False

View File

@@ -207,16 +207,15 @@ class BeteiligteSync:
except:
pass
@staticmethod
def parse_timestamp(ts: Any) -> Optional[datetime]:
def parse_timestamp(self, ts: Any) -> Optional[datetime]:
"""
Parse verschiedene Timestamp-Formate zu datetime
Parse various timestamp formats to datetime.
Args:
ts: String, datetime oder None
ts: String, datetime or None
Returns:
datetime-Objekt oder None
datetime object or None
"""
if not ts:
return None
@@ -225,13 +224,13 @@ class BeteiligteSync:
return ts
if isinstance(ts, str):
# EspoCRM Format: "2026-02-07 14:30:00"
# Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z"
# EspoCRM format: "2026-02-07 14:30:00"
# Advoware format: "2026-02-07T14:30:00" or "2026-02-07T14:30:00Z"
try:
# Entferne trailing Z falls vorhanden
# Remove trailing Z if present
ts = ts.rstrip('Z')
# Versuche verschiedene Formate
# Try various formats
for fmt in [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
@@ -242,11 +241,11 @@ class BeteiligteSync:
except ValueError:
continue
# Fallback: ISO-Format
# Fallback: ISO format
return datetime.fromisoformat(ts)
except Exception as e:
logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}")
self._log(f"Could not parse timestamp: {ts} - {e}", level='warning')
return None
return None

View File

@@ -1,17 +1,18 @@
"""
Document Sync Utilities
Hilfsfunktionen für Document-Synchronisation mit xAI:
Utility functions for document synchronization with xAI:
- Distributed locking via Redis + syncStatus
- Entscheidungslogik: Wann muss ein Document zu xAI?
- Related Entities ermitteln (Many-to-Many Attachments)
- xAI Collection Management
- Decision logic: When does a document need xAI sync?
- Related entities determination (Many-to-Many attachments)
- xAI Collection management
"""
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
from services.sync_utils_base import BaseSyncUtils
from services.models import FileStatus, XAISyncStatus
# Max retry before permanent failure
MAX_SYNC_RETRIES = 5
@@ -21,10 +22,10 @@ RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
class DocumentSync(BaseSyncUtils):
"""Utility-Klasse für Document-Synchronisation mit xAI"""
"""Utility class for document synchronization with xAI"""
def _get_lock_key(self, entity_id: str) -> str:
"""Redis Lock-Key für Documents"""
"""Redis lock key for documents"""
return f"sync_lock:document:{entity_id}"
async def acquire_sync_lock(self, entity_id: str, entity_type: str = 'CDokumente') -> bool:
@@ -45,13 +46,13 @@ class DocumentSync(BaseSyncUtils):
self._log(f"Redis lock bereits aktiv für {entity_type} {entity_id}", level='warn')
return False
# STEP 2: Update xaiSyncStatus auf pending_sync
# STEP 2: Update xaiSyncStatus to pending_sync
try:
await self.espocrm.update_entity(entity_type, entity_id, {
'xaiSyncStatus': 'pending_sync'
'xaiSyncStatus': XAISyncStatus.PENDING_SYNC.value
})
except Exception as e:
self._log(f"Konnte xaiSyncStatus nicht setzen: {e}", level='debug')
self._log(f"Could not set xaiSyncStatus: {e}", level='debug')
self._log(f"Sync-Lock für {entity_type} {entity_id} erworben")
return True
@@ -84,16 +85,16 @@ class DocumentSync(BaseSyncUtils):
try:
update_data = {}
# xaiSyncStatus setzen: clean bei Erfolg, failed bei Fehler
# Set xaiSyncStatus: clean on success, failed on error
try:
update_data['xaiSyncStatus'] = 'clean' if success else 'failed'
update_data['xaiSyncStatus'] = XAISyncStatus.CLEAN.value if success else XAISyncStatus.FAILED.value
if error_message:
update_data['xaiSyncError'] = error_message[:2000]
else:
update_data['xaiSyncError'] = None
except:
pass # Felder existieren evtl. nicht
pass # Fields may not exist
# Merge extra fields (z.B. xaiFileId, xaiCollections)
if extra_fields:
@@ -120,37 +121,37 @@ class DocumentSync(BaseSyncUtils):
entity_type: str = 'CDokumente'
) -> Tuple[bool, List[str], str]:
"""
Entscheidet ob ein Document zu xAI synchronisiert werden muss
Decide if a document needs to be synchronized to xAI.
Prüft:
1. Datei-Status Feld ("Neu", "Geändert")
2. Hash-Werte für Change Detection
3. Related Entities mit xAI Collections
Checks:
1. File status field ("new", "changed")
2. Hash values for change detection
3. Related entities with xAI collections
Args:
document: Vollständiges Document Entity von EspoCRM
document: Complete document entity from EspoCRM
Returns:
Tuple[bool, List[str], str]:
- bool: Ob Sync nötig ist
- List[str]: Liste der Collection-IDs in die das Document soll
- str: Grund/Beschreibung der Entscheidung
- bool: Whether sync is needed
- List[str]: List of collection IDs where the document should go
- str: Reason/description of the decision
"""
doc_id = document.get('id')
doc_name = document.get('name', 'Unbenannt')
# xAI-relevante Felder
# xAI-relevant fields
xai_file_id = document.get('xaiFileId')
xai_collections = document.get('xaiCollections') or []
xai_sync_status = document.get('xaiSyncStatus')
# Datei-Status und Hash-Felder
# File status and hash fields
datei_status = document.get('dateiStatus') or document.get('fileStatus')
file_md5 = document.get('md5') or document.get('fileMd5')
file_sha = document.get('sha') or document.get('fileSha')
xai_synced_hash = document.get('xaiSyncedHash') # Hash beim letzten xAI-Sync
xai_synced_hash = document.get('xaiSyncedHash') # Hash at last xAI sync
self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})")
self._log(f"📋 Document analysis: {doc_name} (ID: {doc_id})")
self._log(f" xaiFileId: {xai_file_id or 'N/A'}")
self._log(f" xaiCollections: {xai_collections}")
self._log(f" xaiSyncStatus: {xai_sync_status or 'N/A'}")
@@ -165,65 +166,74 @@ class DocumentSync(BaseSyncUtils):
entity_type=entity_type
)
# Prüfe xaiSyncStatus="no_sync" → kein Sync für dieses Dokument
if xai_sync_status == 'no_sync':
self._log("⏭️ Kein xAI-Sync nötig: xaiSyncStatus='no_sync'")
return (False, [], "xaiSyncStatus ist 'no_sync'")
# Check xaiSyncStatus="no_sync" -> no sync for this document
if xai_sync_status == XAISyncStatus.NO_SYNC.value:
self._log("⏭️ No xAI sync needed: xaiSyncStatus='no_sync'")
return (False, [], "xaiSyncStatus is 'no_sync'")
if not target_collections:
self._log("⏭️ Kein xAI-Sync nötig: Keine Related Entities mit xAI Collections")
return (False, [], "Keine verknüpften Entities mit xAI Collections")
self._log("⏭️ No xAI sync needed: No related entities with xAI collections")
return (False, [], "No linked entities with xAI collections")
# ═══════════════════════════════════════════════════════════════
# PRIORITY CHECK 1: xaiSyncStatus="unclean" → Dokument wurde geändert
# PRIORITY CHECK 1: xaiSyncStatus="unclean" -> document was changed
# ═══════════════════════════════════════════════════════════════
if xai_sync_status == 'unclean':
self._log(f"🆕 xaiSyncStatus='unclean' → xAI-Sync ERFORDERLICH")
if xai_sync_status == XAISyncStatus.UNCLEAN.value:
self._log(f"🆕 xaiSyncStatus='unclean' → xAI sync REQUIRED")
return (True, target_collections, "xaiSyncStatus='unclean'")
# ═══════════════════════════════════════════════════════════════
# PRIORITY CHECK 2: fileStatus "new" oder "changed"
# PRIORITY CHECK 2: fileStatus "new" or "changed"
# ═══════════════════════════════════════════════════════════════
if datei_status in ['new', 'changed', 'neu', 'geändert', 'New', 'Changed', 'Neu', 'Geändert']:
self._log(f"🆕 fileStatus: '{datei_status}' → xAI-Sync ERFORDERLICH")
if datei_status in [
FileStatus.NEW.value,
FileStatus.CHANGED.value,
'neu', # Legacy German values
'geändert', # Legacy German values
'Neu', # Case variations
'Geändert',
'New',
'Changed'
]:
self._log(f"🆕 fileStatus: '{datei_status}' → xAI sync REQUIRED")
if target_collections:
return (True, target_collections, f"fileStatus: {datei_status}")
else:
# Datei ist neu/geändert aber keine Collections gefunden
self._log(f"⚠️ fileStatus '{datei_status}' aber keine Collections gefunden - überspringe Sync")
return (False, [], f"fileStatus: {datei_status}, aber keine Collections")
# File is new/changed but no collections found
self._log(f"⚠️ fileStatus '{datei_status}' but no collections found - skipping sync")
return (False, [], f"fileStatus: {datei_status}, but no collections")
# ═══════════════════════════════════════════════════════════════
# FALL 1: Document ist bereits in xAI UND Collections sind gesetzt
# CASE 1: Document is already in xAI AND collections are set
# ═══════════════════════════════════════════════════════════════
if xai_file_id:
self._log(f"✅ Document bereits in xAI gesynct mit {len(target_collections)} Collection(s)")
self._log(f"✅ Document already synced to xAI with {len(target_collections)} collection(s)")
# Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich)
# Check if file content was changed (hash comparison)
current_hash = file_md5 or file_sha
if current_hash and xai_synced_hash:
if current_hash != xai_synced_hash:
self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich")
self._log(f" Alt: {xai_synced_hash[:16]}...")
self._log(f" Neu: {current_hash[:16]}...")
return (True, target_collections, "File-Inhalt geändert (Hash-Mismatch)")
self._log(f"🔄 Hash change detected! RESYNC required")
self._log(f" Old: {xai_synced_hash[:16]}...")
self._log(f" New: {current_hash[:16]}...")
return (True, target_collections, "File content changed (hash mismatch)")
else:
self._log(f"✅ Hash identisch - keine Änderung")
self._log(f"✅ Hash identical - no change")
else:
self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich")
self._log(f"⚠️ No hash values available for comparison")
return (False, target_collections, "Bereits gesynct, keine Änderung erkannt")
return (False, target_collections, "Already synced, no change detected")
# ═══════════════════════════════════════════════════════════════
# FALL 2: Document hat xaiFileId aber Collections ist leer/None
# CASE 2: Document has xaiFileId but collections is empty/None
# ═══════════════════════════════════════════════════════════════
# ═══════════════════════════════════════════════════════════════
# FALL 3: Collections vorhanden aber kein Status/Hash-Trigger
# CASE 3: Collections present but no status/hash trigger
# ═══════════════════════════════════════════════════════════════
self._log(f"✅ Document ist mit {len(target_collections)} Entity/ies verknüpft die Collections haben")
return (True, target_collections, "Verknüpft mit Entities die Collections benötigen")
self._log(f"✅ Document is linked to {len(target_collections)} entity/ies with collections")
return (True, target_collections, "Linked to entities that require collections")
async def _get_required_collections_from_relations(
self,
@@ -231,20 +241,20 @@ class DocumentSync(BaseSyncUtils):
entity_type: str = 'Document'
) -> List[str]:
"""
Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind
Determine all xAI collection IDs of entities linked to this document.
EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein
EspoCRM Many-to-Many: Document can be linked to arbitrary entities
(CBeteiligte, Account, CVmhErstgespraech, etc.)
Args:
document_id: Document ID
Returns:
Liste von xAI Collection-IDs (dedupliziert)
List of xAI collection IDs (deduplicated)
"""
collections = set()
self._log(f"🔍 Prüfe Relations von {entity_type} {document_id}...")
self._log(f"🔍 Checking relations of {entity_type} {document_id}...")
try:
entity_def = await self.espocrm.get_entity_def(entity_type)

View File

@@ -16,7 +16,7 @@ from enum import Enum
# ========== Enums ==========
class Rechtsform(str, Enum):
"""Rechtsformen für Beteiligte"""
"""Legal forms for Beteiligte"""
NATUERLICHE_PERSON = ""
GMBH = "GmbH"
AG = "AG"
@@ -29,7 +29,7 @@ class Rechtsform(str, Enum):
class SyncStatus(str, Enum):
"""Sync Status für EspoCRM Entities"""
"""Sync status for EspoCRM entities (Beteiligte)"""
PENDING_SYNC = "pending_sync"
SYNCING = "syncing"
CLEAN = "clean"
@@ -38,8 +38,30 @@ class SyncStatus(str, Enum):
PERMANENTLY_FAILED = "permanently_failed"
class FileStatus(str, Enum):
"""Valid values for CDokumente.fileStatus field"""
NEW = "new"
CHANGED = "changed"
SYNCED = "synced"
def __str__(self) -> str:
return self.value
class XAISyncStatus(str, Enum):
"""Valid values for CDokumente.xaiSyncStatus field"""
NO_SYNC = "no_sync" # Entity has no xAI collections
PENDING_SYNC = "pending_sync" # Sync in progress (locked)
CLEAN = "clean" # Synced successfully
UNCLEAN = "unclean" # Needs re-sync (file changed)
FAILED = "failed" # Sync failed (see xaiSyncError)
def __str__(self) -> str:
return self.value
class SalutationType(str, Enum):
"""Anredetypen"""
"""Salutation types"""
HERR = "Herr"
FRAU = "Frau"
DIVERS = "Divers"

View File

@@ -1,51 +1,58 @@
"""
Redis Client Factory
Zentralisierte Redis-Client-Verwaltung mit:
- Singleton Pattern
- Connection Pooling
- Automatic Reconnection
- Health Checks
Centralized Redis client management with:
- Singleton pattern
- Connection pooling
- Automatic reconnection
- Health checks
"""
import redis
import os
import logging
from typing import Optional
from services.exceptions import RedisConnectionError
logger = logging.getLogger(__name__)
from services.logging_utils import get_service_logger
class RedisClientFactory:
"""
Singleton Factory für Redis Clients.
Singleton factory for Redis clients.
Vorteile:
- Eine zentrale Konfiguration
- Connection Pooling
- Lazy Initialization
- Besseres Error Handling
Benefits:
- Centralized configuration
- Connection pooling
- Lazy initialization
- Better error handling
"""
_instance: Optional[redis.Redis] = None
_connection_pool: Optional[redis.ConnectionPool] = None
_logger = None
@classmethod
def _get_logger(cls):
"""Get logger instance (lazy initialization)"""
if cls._logger is None:
cls._logger = get_service_logger('redis_factory', None)
return cls._logger
@classmethod
def get_client(cls, strict: bool = False) -> Optional[redis.Redis]:
"""
Gibt Redis Client zurück (erstellt wenn nötig).
Return Redis client (creates if needed).
Args:
strict: Wenn True, wirft Exception bei Verbindungsfehlern.
Wenn False, gibt None zurück (für optionale Redis-Nutzung).
strict: If True, raises exception on connection failures.
If False, returns None (for optional Redis usage).
Returns:
Redis client oder None (wenn strict=False und Verbindung fehlschlägt)
Redis client or None (if strict=False and connection fails)
Raises:
RedisConnectionError: Wenn strict=True und Verbindung fehlschlägt
RedisConnectionError: If strict=True and connection fails
"""
logger = cls._get_logger()
if cls._instance is None:
try:
cls._instance = cls._create_client()
@@ -65,14 +72,15 @@ class RedisClientFactory:
@classmethod
def _create_client(cls) -> redis.Redis:
"""
Erstellt neuen Redis Client mit Connection Pool.
Create new Redis client with connection pool.
Returns:
Configured Redis client
Raises:
redis.ConnectionError: Bei Verbindungsproblemen
redis.ConnectionError: On connection problems
"""
logger = cls._get_logger()
# Load configuration from environment
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
@@ -94,7 +102,7 @@ class RedisClientFactory:
socket_timeout=redis_timeout,
socket_connect_timeout=redis_timeout,
max_connections=redis_max_connections,
decode_responses=True # Auto-decode bytes zu strings
decode_responses=True # Auto-decode bytes to strings
)
# Create client from pool
@@ -108,10 +116,11 @@ class RedisClientFactory:
@classmethod
def reset(cls) -> None:
"""
Reset factory state (hauptsächlich für Tests).
Reset factory state (mainly for tests).
Schließt bestehende Verbindungen und setzt Singleton zurück.
Closes existing connections and resets singleton.
"""
logger = cls._get_logger()
if cls._instance:
try:
cls._instance.close()
@@ -131,11 +140,12 @@ class RedisClientFactory:
@classmethod
def health_check(cls) -> bool:
"""
Prüft Redis-Verbindung.
Check Redis connection.
Returns:
True wenn Redis erreichbar, False sonst
True if Redis is reachable, False otherwise
"""
logger = cls._get_logger()
try:
client = cls.get_client(strict=False)
if client is None:
@@ -150,11 +160,12 @@ class RedisClientFactory:
@classmethod
def get_info(cls) -> Optional[dict]:
"""
Gibt Redis Server Info zurück (für Monitoring).
Return Redis server info (for monitoring).
Returns:
Redis info dict oder None bei Fehler
Redis info dict or None on error
"""
logger = cls._get_logger()
try:
client = cls.get_client(strict=False)
if client is None:
@@ -170,22 +181,22 @@ class RedisClientFactory:
def get_redis_client(strict: bool = False) -> Optional[redis.Redis]:
"""
Convenience function für Redis Client.
Convenience function for Redis client.
Args:
strict: Wenn True, wirft Exception bei Fehler
strict: If True, raises exception on error
Returns:
Redis client oder None
Redis client or None
"""
return RedisClientFactory.get_client(strict=strict)
def is_redis_available() -> bool:
"""
Prüft ob Redis verfügbar ist.
Check if Redis is available.
Returns:
True wenn Redis erreichbar
True if Redis is reachable
"""
return RedisClientFactory.health_check()

View File

@@ -7,7 +7,7 @@ Supports syncing a single employee or all employees.
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import get_redis_client, set_employee_lock, log_operation
from calendar_sync_utils import get_redis_client, set_employee_lock, get_logger
from motia import http, ApiRequest, ApiResponse, FlowContext
@@ -38,10 +38,10 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
kuerzel = body.get('kuerzel')
if not kuerzel:
return ApiResponse(
status=400,
status_code=400,
body={
'error': 'kuerzel required',
'message': 'Bitte kuerzel im Body angeben'
'message': 'Please provide kuerzel in body'
}
)
@@ -49,7 +49,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
if kuerzel_upper == 'ALL':
# Emit sync-all event
log_operation('info', "Calendar Sync API: Emitting sync-all event", context=ctx)
ctx.logger.info("Calendar Sync API: Emitting sync-all event")
await ctx.enqueue({
"topic": "calendar_sync_all",
"data": {
@@ -57,10 +57,10 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
}
})
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'triggered',
'message': 'Calendar sync wurde für alle Mitarbeiter ausgelöst',
'message': 'Calendar sync triggered for all employees',
'triggered_by': 'api'
}
)
@@ -69,9 +69,9 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
redis_client = get_redis_client(ctx)
if not set_employee_lock(redis_client, kuerzel_upper, 'api', ctx):
log_operation('info', f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping", context=ctx)
ctx.logger.info(f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping")
return ApiResponse(
status=409,
status_code=409,
body={
'status': 'conflict',
'message': f'Calendar sync already active for {kuerzel_upper}',
@@ -80,7 +80,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
}
)
log_operation('info', f"Calendar Sync API called for {kuerzel_upper}", context=ctx)
ctx.logger.info(f"Calendar Sync API called for {kuerzel_upper}")
# Lock successfully set, now emit event
await ctx.enqueue({
@@ -92,19 +92,19 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
})
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'triggered',
'message': f'Calendar sync was triggered for {kuerzel_upper}',
'message': f'Calendar sync triggered for {kuerzel_upper}',
'kuerzel': kuerzel_upper,
'triggered_by': 'api'
}
)
except Exception as e:
log_operation('error', f"Error in API trigger: {e}", context=ctx)
ctx.logger.error(f"Error in API trigger: {e}")
return ApiResponse(
status=500,
status_code=500,
body={
'error': 'Internal server error',
'details': str(e)

View File

@@ -3,50 +3,24 @@ Calendar Sync Utilities
Shared utility functions for calendar synchronization between Google Calendar and Advoware.
"""
import logging
import asyncpg
import os
import redis
import time
from typing import Optional, Any, List
from googleapiclient.discovery import build
from google.oauth2 import service_account
# Configure logging
logger = logging.getLogger(__name__)
from services.logging_utils import get_service_logger
def log_operation(level: str, message: str, context=None, **context_vars):
"""Centralized logging with context, supporting file and console logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
# Use ctx.logger if context is available (Motia III FlowContext)
if context and hasattr(context, 'logger'):
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
elif level == 'debug':
context.logger.debug(full_message)
else:
# Fallback to standard logger
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
# Also log to console for journalctl visibility
print(f"[{level.upper()}] {full_message}")
def get_logger(context=None):
"""Get logger for calendar sync operations"""
return get_service_logger('calendar_sync', context)
async def connect_db(context=None):
"""Connect to Postgres DB from environment variables."""
logger = get_logger(context)
try:
conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
@@ -57,12 +31,13 @@ async def connect_db(context=None):
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
logger = get_logger(context)
try:
service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
if not os.path.exists(service_account_path):
@@ -75,48 +50,53 @@ async def get_google_service(context=None):
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
logger.error(f"Failed to initialize Google service: {e}")
raise
def get_redis_client(context=None):
def get_redis_client(context=None) -> redis.Redis:
"""Initialize Redis client for calendar sync operations."""
logger = get_logger(context)
try:
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')),
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')),
decode_responses=True
)
return redis_client
except Exception as e:
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
logger.error(f"Failed to initialize Redis client: {e}")
raise
async def get_advoware_employees(advoware, context=None):
async def get_advoware_employees(advoware, context=None) -> List[Any]:
"""Fetch list of employees from Advoware."""
logger = get_logger(context)
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
logger.error(f"Failed to fetch Advoware employees: {e}")
raise
def set_employee_lock(redis_client, kuerzel: str, triggered_by: str, context=None) -> bool:
def set_employee_lock(redis_client: redis.Redis, kuerzel: str, triggered_by: str, context=None) -> bool:
"""Set lock for employee sync operation."""
logger = get_logger(context)
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
logger.info(f"Sync already active for {kuerzel}, skipping")
return False
return True
def clear_employee_lock(redis_client, kuerzel: str, context=None):
def clear_employee_lock(redis_client: redis.Redis, kuerzel: str, context=None) -> None:
"""Clear lock for employee sync operation and update last-synced timestamp."""
logger = get_logger(context)
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
@@ -128,6 +108,6 @@ def clear_employee_lock(redis_client, kuerzel: str, context=None):
# Delete the lock
redis_client.delete(employee_lock_key)
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
logger.debug(f"Cleared lock and updated last-synced for {kuerzel} to {current_time}")
except Exception as e:
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)
logger.warning(f"Failed to clear lock and update last-synced for {kuerzel}: {e}")

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Bankverbindungen",
"description": "Receives create webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/create")
@@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
@@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for create sync")
# Emit events
for entity_id in entity_ids:
@@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Create Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'create',
@@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: BANKVERBINDUNGEN CREATE WEBHOOK")
ctx.logger.error("ERROR: BANKVERBINDUNGEN CREATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Bankverbindungen",
"description": "Receives delete webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/delete")
@@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs
# Collect all IDs
entity_ids = set()
if isinstance(payload, list):
@@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync")
# Emit events
for entity_id in entity_ids:
@@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Delete Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'delete',
@@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: BANKVERBINDUNGEN DELETE WEBHOOK")
ctx.logger.error("ERROR: BANKVERBINDUNGEN DELETE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Bankverbindungen",
"description": "Receives update webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/update")
@@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs
# Collect all IDs
entity_ids = set()
if isinstance(payload, list):
@@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for update sync")
# Emit events
for entity_id in entity_ids:
@@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Update Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'update',
@@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: BANKVERBINDUNGEN UPDATE WEBHOOK")
ctx.logger.error("ERROR: BANKVERBINDUNGEN UPDATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Beteiligte",
"description": "Receives create webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/create")
@@ -32,7 +32,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
@@ -42,9 +42,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for create sync")
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
# Emit events for queue processing (deduplication via lock in event handler)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.create',
@@ -56,11 +56,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Create Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'create',
@@ -70,11 +70,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: VMH CREATE WEBHOOK")
ctx.logger.error("ERROR: VMH CREATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={
'error': 'Internal server error',
'details': str(e)

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Beteiligte",
"description": "Receives delete webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/delete")
@@ -29,7 +29,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
@@ -39,9 +39,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync")
# Emit events für Queue-Processing
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.delete',
@@ -53,11 +53,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Delete Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'delete',
@@ -67,10 +67,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: BETEILIGTE DELETE WEBHOOK")
ctx.logger.error("ERROR: BETEILIGTE DELETE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -7,7 +7,7 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Beteiligte",
"description": "Receives update webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/update")
@@ -20,8 +20,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte updates in EspoCRM.
Note: Loop-Prevention ist auf EspoCRM-Seite implementiert.
rowId-Updates triggern keine Webhooks mehr, daher keine Filterung nötig.
Note: Loop prevention is implemented on EspoCRM side.
rowId updates no longer trigger webhooks, so no filtering needed.
"""
try:
payload = request.body or []
@@ -32,7 +32,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
@@ -42,9 +42,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} IDs found for update sync")
# Emit events für Queue-Processing
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.update',
@@ -56,11 +56,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ VMH Update Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ VMH Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'status': 'received',
'action': 'update',
@@ -70,11 +70,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: VMH UPDATE WEBHOOK")
ctx.logger.error("ERROR: VMH UPDATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={
'error': 'Internal server error',
'details': str(e)

View File

@@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
@@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Create-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} document IDs found for create sync")
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
# Emit events for queue processing (deduplication via lock in event handler)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.create',
@@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ Document Create Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ Document Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
'message': f'{len(entity_ids)} document(s) enqueued for sync',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: DOCUMENT CREATE WEBHOOK")
ctx.logger.error("ERROR: DOCUMENT CREATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {request.body}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={
'success': False,
'error': str(e)

View File

@@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
@@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Delete-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} document IDs found for delete sync")
# Emit events für Queue-Processing
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.delete',
@@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ Document Delete Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ Document Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Delete enqueued',
'message': f'{len(entity_ids)} document(s) enqueued for deletion',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: DOCUMENT DELETE WEBHOOK")
ctx.logger.error("ERROR: DOCUMENT DELETE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {request.body}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={
'success': False,
'error': str(e)

View File

@@ -30,7 +30,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
@@ -45,9 +45,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Update-Sync gefunden")
ctx.logger.info(f"{len(entity_ids)} document IDs found for update sync")
# Emit events für Queue-Processing
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.update',
@@ -59,27 +59,27 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
}
})
ctx.logger.info("✅ Document Update Webhook verarbeitet: "
f"{len(entity_ids)} Events emittiert")
ctx.logger.info("✅ Document Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
'message': f'{len(entity_ids)} document(s) enqueued for sync',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("FEHLER: DOCUMENT UPDATE WEBHOOK")
ctx.logger.error("ERROR: DOCUMENT UPDATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {request.body}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
status_code=500,
body={
'success': False,
'error': str(e)