""" VMH Document Sync Handler Zentraler Sync-Handler für Documents mit xAI Collections Verarbeitet: - vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig - vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig - vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections """ from typing import Dict, Any from motia import FlowContext from services.espocrm import EspoCRMAPI from services.document_sync_utils import DocumentSync import json import redis import os config = { "name": "VMH Document Sync Handler", "description": "Zentraler Sync-Handler für Documents mit xAI Collections", "flows": ["vmh-documents"], "triggers": [ {"type": "queue", "topic": "vmh.document.create"}, {"type": "queue", "topic": "vmh.document.update"}, {"type": "queue", "topic": "vmh.document.delete"} ], "enqueues": [] } async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): """Zentraler Sync-Handler für Documents""" entity_id = event_data.get('entity_id') action = event_data.get('action') source = event_data.get('source') if not entity_id: ctx.logger.error("Keine entity_id im Event gefunden") return ctx.logger.info("=" * 80) ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET") ctx.logger.info("=" * 80) ctx.logger.info(f"Action: {action.upper()}") ctx.logger.info(f"Document ID: {entity_id}") ctx.logger.info(f"Source: {source}") ctx.logger.info("=" * 80) # Shared Redis client for distributed locking redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) # APIs initialisieren espocrm = EspoCRMAPI() sync_utils = DocumentSync(espocrm, redis_client, ctx) # TODO: xAI Service wird in nächstem Schritt hinzugefügt # from services.xai_service import XAIService # xai_service = XAIService(ctx) try: # 1. ACQUIRE LOCK (verhindert parallele Syncs) lock_acquired = await sync_utils.acquire_sync_lock(entity_id) if not lock_acquired: ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe") return # Lock erfolgreich acquired - MUSS im finally block released werden! try: # 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM try: document = await espocrm.get_entity('Document', entity_id) except Exception as e: ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}") await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) return ctx.logger.info(f"📋 Document geladen:") ctx.logger.info(f" Name: {document.get('name', 'N/A')}") ctx.logger.info(f" Type: {document.get('type', 'N/A')}") ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}") ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}") # 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION if action == 'delete': await handle_delete(entity_id, document, sync_utils, ctx) elif action in ['create', 'update']: await handle_create_or_update(entity_id, document, sync_utils, ctx) else: ctx.logger.warn(f"⚠️ Unbekannte Action: {action}") await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}") except Exception as e: # Unerwarteter Fehler während Sync - GARANTIERE Lock-Release ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") import traceback ctx.logger.error(traceback.format_exc()) try: await sync_utils.release_sync_lock( entity_id, success=False, error_message=str(e)[:2000] ) except Exception as release_error: # Selbst Lock-Release failed - logge kritischen Fehler ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}") # Force Redis lock release try: lock_key = f"sync_lock:document:{entity_id}" redis_client.delete(lock_key) ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}") except: pass except Exception as e: # Fehler VOR Lock-Acquire - kein Lock-Release nötig ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}") import traceback ctx.logger.error(traceback.format_exc()) async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): """ Behandelt Create/Update von Documents Entscheidet ob xAI-Sync nötig ist und führt diesen durch """ try: ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?") ctx.logger.info("=" * 80) # Datei-Status für Preview-Generierung datei_status = document.get('dateiStatus') or document.get('fileStatus') # Entscheidungslogik: Soll dieses Document zu xAI? needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document) ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}") ctx.logger.info(f" Grund: {reason}") ctx.logger.info(f" Datei-Status: {datei_status or 'N/A'}") if collection_ids: ctx.logger.info(f" Collections: {collection_ids}") # ═══════════════════════════════════════════════════════════════ # PREVIEW-GENERIERUNG bei neuen/geänderten Dateien # ═══════════════════════════════════════════════════════════════ if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']: ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN") ctx.logger.info(f" Datei-Status: {datei_status}") ctx.logger.info("=" * 80) try: # 1. Hole Download-Informationen download_info = await sync_utils.get_document_download_info(entity_id) if not download_info: ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview") else: ctx.logger.info(f"📥 Datei-Info:") ctx.logger.info(f" Filename: {download_info['filename']}") ctx.logger.info(f" MIME-Type: {download_info['mime_type']}") ctx.logger.info(f" Size: {download_info['size']} bytes") # 2. Download File von EspoCRM ctx.logger.info(f"📥 Downloading file...") espocrm = sync_utils.espocrm file_content = await espocrm.download_attachment(download_info['attachment_id']) ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes") # 3. Speichere temporär für Preview-Generierung import tempfile import os with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{download_info['filename']}") as tmp_file: tmp_file.write(file_content) tmp_path = tmp_file.name try: # 4. Generiere Preview ctx.logger.info(f"🖼️ Generating preview (600x800 WebP)...") preview_data = await sync_utils.generate_thumbnail( tmp_path, download_info['mime_type'], max_width=600, max_height=800 ) if preview_data: ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP") # 5. Upload Preview zu EspoCRM ctx.logger.info(f"📤 Uploading preview to EspoCRM...") await sync_utils.update_sync_metadata( entity_id, preview_data=preview_data # Keine xaiFileId/collections - nur Preview update ) ctx.logger.info(f"✅ Preview uploaded successfully") else: ctx.logger.warn("⚠️ Preview-Generierung lieferte keine Daten") finally: # Cleanup temp file try: os.remove(tmp_path) except: pass except Exception as e: ctx.logger.error(f"❌ Fehler bei Preview-Generierung: {e}") import traceback ctx.logger.error(traceback.format_exc()) # Continue - Preview ist optional ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("✅ PREVIEW-VERARBEITUNG ABGESCHLOSSEN") ctx.logger.info("=" * 80) # ═══════════════════════════════════════════════════════════════ # xAI SYNC (falls erforderlich) # ═══════════════════════════════════════════════════════════════ if not needs_sync: ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released") await sync_utils.release_sync_lock(entity_id, success=True) return # ═══════════════════════════════════════════════════════════════ # xAI SYNC DURCHFÜHREN # ═══════════════════════════════════════════════════════════════ ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("🤖 xAI SYNC STARTEN") ctx.logger.info("=" * 80) # TODO: Implementierung mit xai_service.py ctx.logger.warn("⚠️ xAI Service noch nicht implementiert!") ctx.logger.info("") ctx.logger.info("TODO: Folgende Schritte werden implementiert:") ctx.logger.info("1. 📥 Download File von EspoCRM") ctx.logger.info("2. 📤 Upload zu xAI (falls noch kein xaiFileId)") ctx.logger.info("3. 📚 Add zu Collections") ctx.logger.info("4. 💾 Update EspoCRM: xaiFileId + xaiCollections") ctx.logger.info("") # PLACEHOLDER Implementation: # # # 1. Download File von EspoCRM # download_info = await sync_utils.get_document_download_info(entity_id) # if not download_info: # raise Exception("Konnte Download-Info nicht ermitteln") # # # 2. Upload zu xAI (falls noch nicht vorhanden) # xai_file_id = document.get('xaiFileId') # if not xai_file_id: # file_content = await download_file_from_espocrm(download_info) # xai_file_id = await xai_service.upload_file(file_content, download_info['filename']) # # # 3. Add zu Collections # for collection_id in collection_ids: # await xai_service.add_file_to_collection(collection_id, xai_file_id) # # # 4. Update EspoCRM # await sync_utils.release_sync_lock( # entity_id, # success=True, # extra_fields={ # 'xaiFileId': xai_file_id, # 'xaiCollections': collection_ids # } # ) # Für jetzt: Success ohne Sync await sync_utils.release_sync_lock( entity_id, success=True, extra_fields={ # TODO: Echte xAI-Daten hier einsetzen # 'xaiFileId': xai_file_id, # 'xaiCollections': collection_ids } ) ctx.logger.info("=" * 80) ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN (PLACEHOLDER)") ctx.logger.info("=" * 80) except Exception as e: ctx.logger.error(f"❌ Fehler bei Create/Update: {e}") import traceback ctx.logger.error(traceback.format_exc()) await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): """ Behandelt Deletion von Documents Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein) """ try: ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP") ctx.logger.info("=" * 80) xai_file_id = document.get('xaiFileId') xai_collections = document.get('xaiCollections') or [] if not xai_file_id or not xai_collections: ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun") await sync_utils.release_sync_lock(entity_id, success=True) return ctx.logger.info(f"📋 Document Info:") ctx.logger.info(f" xaiFileId: {xai_file_id}") ctx.logger.info(f" Collections: {xai_collections}") # TODO: Implementierung mit xai_service ctx.logger.warn("⚠️ xAI Delete-Operation noch nicht implementiert!") ctx.logger.info("") ctx.logger.info("TODO: Folgende Schritte werden implementiert:") ctx.logger.info("1. 🗑️ Remove File aus allen Collections") ctx.logger.info("2. ⚠️ File NICHT von xAI löschen (kann in anderen Collections sein)") ctx.logger.info("") # PLACEHOLDER Implementation: # # for collection_id in xai_collections: # await xai_service.remove_file_from_collection(collection_id, xai_file_id) # # ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt") await sync_utils.release_sync_lock(entity_id, success=True) ctx.logger.info("=" * 80) ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)") ctx.logger.info("=" * 80) except Exception as e: ctx.logger.error(f"❌ Fehler bei Delete: {e}") import traceback ctx.logger.error(traceback.format_exc()) await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))