diff --git a/services/document_sync_utils.py b/services/document_sync_utils.py new file mode 100644 index 0000000..29e8864 --- /dev/null +++ b/services/document_sync_utils.py @@ -0,0 +1,318 @@ +""" +Document Sync Utilities + +Hilfsfunktionen für Document-Synchronisation mit xAI: +- Distributed locking via Redis + syncStatus +- Entscheidungslogik: Wann muss ein Document zu xAI? +- Related Entities ermitteln (Many-to-Many Attachments) +- xAI Collection Management +""" + +from typing import Dict, Any, Optional, List, Tuple +from datetime import datetime, timedelta +import logging +import redis +import os + +logger = logging.getLogger(__name__) + +# Lock TTL in seconds (prevents deadlocks) +LOCK_TTL_SECONDS = 900 # 15 minutes + +# Max retry before permanent failure +MAX_SYNC_RETRIES = 5 + +# Retry backoff: Wartezeit zwischen Retries (in Minuten) +RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h + + +class DocumentSync: + """Utility-Klasse für Document-Synchronisation mit xAI""" + + def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None): + self.espocrm = espocrm_api + self.context = context + self.logger = context.logger if context else logger + self.redis = redis_client or self._init_redis() + + def _init_redis(self) -> redis.Redis: + """Initialize Redis client for distributed locking""" + try: + redis_host = os.getenv('REDIS_HOST', 'localhost') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + client.ping() + return client + except Exception as e: + self._log(f"Redis connection failed: {e}", level='error') + return None + + def _log(self, message: str, level: str = 'info'): + """Logging mit Context-Support""" + if self.context and hasattr(self.context, 'logger'): + getattr(self.context.logger, level)(message) + else: + getattr(logger, level)(message) + + async def acquire_sync_lock(self, entity_id: str) -> bool: + """ + Atomic distributed lock via Redis + syncStatus update + + Args: + entity_id: EspoCRM Document ID + + Returns: + True wenn Lock erfolgreich, False wenn bereits im Sync + """ + try: + # STEP 1: Atomic Redis lock (prevents race conditions) + if self.redis: + lock_key = f"sync_lock:document:{entity_id}" + acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) + + if not acquired: + self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn') + return False + + # STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert + # NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden + try: + await self.espocrm.update_entity('Document', entity_id, { + 'xaiSyncStatus': 'syncing' + }) + except Exception as e: + self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug') + + self._log(f"Sync-Lock für Document {entity_id} erworben") + return True + + except Exception as e: + self._log(f"Fehler beim Acquire Lock: {e}", level='error') + # Clean up Redis lock on error + if self.redis: + try: + lock_key = f"sync_lock:document:{entity_id}" + self.redis.delete(lock_key) + except: + pass + return False + + async def release_sync_lock( + self, + entity_id: str, + success: bool = True, + error_message: Optional[str] = None, + extra_fields: Optional[Dict[str, Any]] = None + ) -> None: + """ + Gibt Sync-Lock frei und setzt finalen Status + + Args: + entity_id: EspoCRM Document ID + success: Ob Sync erfolgreich war + error_message: Optional: Fehlermeldung + extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections) + """ + try: + update_data = {} + + # Status-Feld (falls vorhanden) + try: + update_data['xaiSyncStatus'] = 'synced' if success else 'failed' + + if error_message: + update_data['xaiSyncError'] = error_message[:2000] + else: + update_data['xaiSyncError'] = None + except: + pass # Felder existieren evtl. nicht + + # Merge extra fields (z.B. xaiFileId, xaiCollections) + if extra_fields: + update_data.update(extra_fields) + + if update_data: + await self.espocrm.update_entity('Document', entity_id, update_data) + + self._log(f"Sync-Lock released: Document {entity_id} → {'success' if success else 'failed'}") + + # Release Redis lock + if self.redis: + lock_key = f"sync_lock:document:{entity_id}" + self.redis.delete(lock_key) + + except Exception as e: + self._log(f"Fehler beim Release Lock: {e}", level='error') + # Ensure Redis lock is released even on error + if self.redis: + try: + lock_key = f"sync_lock:document:{entity_id}" + self.redis.delete(lock_key) + except: + pass + + async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]: + """ + Entscheidet ob ein Document zu xAI synchronisiert werden muss + + Args: + document: Vollständiges Document Entity von EspoCRM + + Returns: + Tuple[bool, List[str], str]: + - bool: Ob Sync nötig ist + - List[str]: Liste der Collection-IDs in die das Document soll + - str: Grund/Beschreibung der Entscheidung + """ + doc_id = document.get('id') + doc_name = document.get('name', 'Unbenannt') + + # xAI-relevante Felder + xai_file_id = document.get('xaiFileId') + xai_collections = document.get('xaiCollections') or [] + + self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})") + self._log(f" xaiFileId: {xai_file_id or 'N/A'}") + self._log(f" xaiCollections: {xai_collections}") + + # ═══════════════════════════════════════════════════════════════ + # FALL 1: Document ist bereits in xAI UND Collections sind gesetzt + # ═══════════════════════════════════════════════════════════════ + if xai_file_id and xai_collections: + self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)") + + # Prüfe ob Update nötig (z.B. wenn File selbst geändert wurde) + # TODO: Implementiere File-Hash-Vergleich für Update-Erkennung + + return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt") + + # ═══════════════════════════════════════════════════════════════ + # FALL 2: Document hat xaiFileId aber Collections ist leer/None + # ═══════════════════════════════════════════════════════════════ + if xai_file_id and not xai_collections: + self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities") + + # Fallthrough zu FALL 3 - prüfe Related Entities + + # ═══════════════════════════════════════════════════════════════ + # FALL 3: Prüfe Related Entities (Attachments) + # ═══════════════════════════════════════════════════════════════ + required_collections = await self._get_required_collections_from_relations(doc_id) + + if required_collections: + self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben") + return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen") + + # ═══════════════════════════════════════════════════════════════ + # FALL 4: Keine Collections gefunden → kein Sync nötig + # ═══════════════════════════════════════════════════════════════ + self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections") + return (False, [], "Keine verknüpften Entities mit xAI Collections") + + async def _get_required_collections_from_relations(self, document_id: str) -> List[str]: + """ + Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind + + EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein + (CBeteiligte, Account, CVmhErstgespraech, etc.) + + Args: + document_id: Document ID + + Returns: + Liste von xAI Collection-IDs (dedupliziert) + """ + collections = set() + + # Liste von Entity-Types die xAI Collections haben können + # NOTE: Erweiterbar für andere Entities + entity_types_with_collections = [ + 'CBeteiligte', + 'Account', + 'CVmhErstgespraech', + # Weitere Entity-Types hier hinzufügen + ] + + self._log(f"🔍 Prüfe Attachments von Document {document_id}...") + + for entity_type in entity_types_with_collections: + try: + # Finde alle Entities dieses Typs die dieses Document attached haben + # EspoCRM API: Suche wo documentsIds das Document enthält + result = await self.espocrm.list_entities( + entity_type, + where=[{ + 'type': 'arrayAnyOf', + 'attribute': 'documentsIds', + 'value': [document_id] + }], + select='id,name,xaiCollectionId', + max_size=100 + ) + + entities = result.get('list', []) + + if entities: + self._log(f" ✅ {len(entities)} {entity_type}(s) gefunden") + + for entity in entities: + collection_id = entity.get('xaiCollectionId') + if collection_id: + collections.add(collection_id) + self._log(f" → {entity.get('name')}: Collection {collection_id[:16]}...") + + except Exception as e: + self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn') + continue + + result = list(collections) + self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden") + + return result + + async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]: + """ + Holt Download-Informationen für ein Document + + Returns: + Dict mit: + - download_url: URL zum Download + - filename: Dateiname + - mime_type: MIME-Type + - size: Dateigröße in Bytes + """ + try: + # Hole vollständiges Document + doc = await self.espocrm.get_entity('Document', document_id) + + # EspoCRM Document hat Attachments (Attachment ID in attachmentsIds) + attachment_ids = doc.get('attachmentsIds') or [] + + if not attachment_ids: + self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn') + return None + + # Nehme erstes Attachment (Documents haben normalerweise nur 1 File) + attachment_id = attachment_ids[0] + + # Hole Attachment-Details + attachment = await self.espocrm.get_entity('Attachment', attachment_id) + + return { + 'attachment_id': attachment_id, + 'download_url': f"/api/v1/Attachment/file/{attachment_id}", + 'filename': attachment.get('name', 'unknown'), + 'mime_type': attachment.get('type', 'application/octet-stream'), + 'size': attachment.get('size', 0) + } + + except Exception as e: + self._log(f"❌ Fehler beim Laden von Download-Info: {e}", level='error') + return None diff --git a/steps/vmh/document_sync_event_step.py b/steps/vmh/document_sync_event_step.py new file mode 100644 index 0000000..f66df7e --- /dev/null +++ b/steps/vmh/document_sync_event_step.py @@ -0,0 +1,280 @@ +""" +VMH Document Sync Handler + +Zentraler Sync-Handler für Documents mit xAI Collections + +Verarbeitet: +- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig +- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig +- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections +""" + +from typing import Dict, Any +from motia import FlowContext +from services.espocrm import EspoCRMAPI +from services.document_sync_utils import DocumentSync +import json +import redis +import os + +config = { + "name": "VMH Document Sync Handler", + "description": "Zentraler Sync-Handler für Documents mit xAI Collections", + "flows": ["vmh-documents"], + "triggers": [ + {"type": "queue", "topic": "vmh.document.create"}, + {"type": "queue", "topic": "vmh.document.update"}, + {"type": "queue", "topic": "vmh.document.delete"} + ], + "enqueues": [] +} + + +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): + """Zentraler Sync-Handler für Documents""" + entity_id = event_data.get('entity_id') + action = event_data.get('action') + source = event_data.get('source') + + if not entity_id: + ctx.logger.error("Keine entity_id im Event gefunden") + return + + ctx.logger.info("=" * 80) + ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET") + ctx.logger.info("=" * 80) + ctx.logger.info(f"Action: {action.upper()}") + ctx.logger.info(f"Document ID: {entity_id}") + ctx.logger.info(f"Source: {source}") + ctx.logger.info("=" * 80) + + # Shared Redis client for distributed locking + redis_host = os.getenv('REDIS_HOST', 'localhost') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + + redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + + # APIs initialisieren + espocrm = EspoCRMAPI() + sync_utils = DocumentSync(espocrm, redis_client, ctx) + + # TODO: xAI Service wird in nächstem Schritt hinzugefügt + # from services.xai_service import XAIService + # xai_service = XAIService(ctx) + + try: + # 1. ACQUIRE LOCK (verhindert parallele Syncs) + lock_acquired = await sync_utils.acquire_sync_lock(entity_id) + + if not lock_acquired: + ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe") + return + + # Lock erfolgreich acquired - MUSS im finally block released werden! + try: + # 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM + try: + document = await espocrm.get_entity('Document', entity_id) + except Exception as e: + ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}") + await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) + return + + ctx.logger.info(f"📋 Document geladen:") + ctx.logger.info(f" Name: {document.get('name', 'N/A')}") + ctx.logger.info(f" Type: {document.get('type', 'N/A')}") + ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}") + ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}") + + # 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION + + if action == 'delete': + await handle_delete(entity_id, document, sync_utils, ctx) + + elif action in ['create', 'update']: + await handle_create_or_update(entity_id, document, sync_utils, ctx) + + else: + ctx.logger.warn(f"⚠️ Unbekannte Action: {action}") + await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}") + + except Exception as e: + # Unerwarteter Fehler während Sync - GARANTIERE Lock-Release + ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + try: + await sync_utils.release_sync_lock( + entity_id, + success=False, + error_message=str(e)[:2000] + ) + except Exception as release_error: + # Selbst Lock-Release failed - logge kritischen Fehler + ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}") + # Force Redis lock release + try: + lock_key = f"sync_lock:document:{entity_id}" + redis_client.delete(lock_key) + ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}") + except: + pass + + except Exception as e: + # Fehler VOR Lock-Acquire - kein Lock-Release nötig + ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + +async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): + """ + Behandelt Create/Update von Documents + + Entscheidet ob xAI-Sync nötig ist und führt diesen durch + """ + try: + ctx.logger.info("") + ctx.logger.info("=" * 80) + ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?") + ctx.logger.info("=" * 80) + + # Entscheidungslogik: Soll dieses Document zu xAI? + needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document) + + ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}") + ctx.logger.info(f" Grund: {reason}") + + if collection_ids: + ctx.logger.info(f" Collections: {collection_ids}") + + if not needs_sync: + ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released") + await sync_utils.release_sync_lock(entity_id, success=True) + return + + # ═══════════════════════════════════════════════════════════════ + # xAI SYNC DURCHFÜHREN + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("") + ctx.logger.info("=" * 80) + ctx.logger.info("🤖 xAI SYNC STARTEN") + ctx.logger.info("=" * 80) + + # TODO: Implementierung mit xai_service.py + ctx.logger.warn("⚠️ xAI Service noch nicht implementiert!") + ctx.logger.info("") + ctx.logger.info("TODO: Folgende Schritte werden implementiert:") + ctx.logger.info("1. 📥 Download File von EspoCRM") + ctx.logger.info("2. 📤 Upload zu xAI (falls noch kein xaiFileId)") + ctx.logger.info("3. 📚 Add zu Collections") + ctx.logger.info("4. 💾 Update EspoCRM: xaiFileId + xaiCollections") + ctx.logger.info("") + + # PLACEHOLDER Implementation: + # + # # 1. Download File von EspoCRM + # download_info = await sync_utils.get_document_download_info(entity_id) + # if not download_info: + # raise Exception("Konnte Download-Info nicht ermitteln") + # + # # 2. Upload zu xAI (falls noch nicht vorhanden) + # xai_file_id = document.get('xaiFileId') + # if not xai_file_id: + # file_content = await download_file_from_espocrm(download_info) + # xai_file_id = await xai_service.upload_file(file_content, download_info['filename']) + # + # # 3. Add zu Collections + # for collection_id in collection_ids: + # await xai_service.add_file_to_collection(collection_id, xai_file_id) + # + # # 4. Update EspoCRM + # await sync_utils.release_sync_lock( + # entity_id, + # success=True, + # extra_fields={ + # 'xaiFileId': xai_file_id, + # 'xaiCollections': collection_ids + # } + # ) + + # Für jetzt: Success ohne Sync + await sync_utils.release_sync_lock( + entity_id, + success=True, + extra_fields={ + # TODO: Echte xAI-Daten hier einsetzen + # 'xaiFileId': xai_file_id, + # 'xaiCollections': collection_ids + } + ) + + ctx.logger.info("=" * 80) + ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN (PLACEHOLDER)") + ctx.logger.info("=" * 80) + + except Exception as e: + ctx.logger.error(f"❌ Fehler bei Create/Update: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) + + +async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): + """ + Behandelt Deletion von Documents + + Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein) + """ + try: + ctx.logger.info("") + ctx.logger.info("=" * 80) + ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP") + ctx.logger.info("=" * 80) + + xai_file_id = document.get('xaiFileId') + xai_collections = document.get('xaiCollections') or [] + + if not xai_file_id or not xai_collections: + ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun") + await sync_utils.release_sync_lock(entity_id, success=True) + return + + ctx.logger.info(f"📋 Document Info:") + ctx.logger.info(f" xaiFileId: {xai_file_id}") + ctx.logger.info(f" Collections: {xai_collections}") + + # TODO: Implementierung mit xai_service + ctx.logger.warn("⚠️ xAI Delete-Operation noch nicht implementiert!") + ctx.logger.info("") + ctx.logger.info("TODO: Folgende Schritte werden implementiert:") + ctx.logger.info("1. 🗑️ Remove File aus allen Collections") + ctx.logger.info("2. ⚠️ File NICHT von xAI löschen (kann in anderen Collections sein)") + ctx.logger.info("") + + # PLACEHOLDER Implementation: + # + # for collection_id in xai_collections: + # await xai_service.remove_file_from_collection(collection_id, xai_file_id) + # + # ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt") + + await sync_utils.release_sync_lock(entity_id, success=True) + + ctx.logger.info("=" * 80) + ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)") + ctx.logger.info("=" * 80) + + except Exception as e: + ctx.logger.error(f"❌ Fehler bei Delete: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))