Files
motia-iii/steps/vmh/document_sync_event_step.py

361 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
VMH Document Sync Handler
Zentraler Sync-Handler für Documents mit xAI Collections
Verarbeitet:
- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig
- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig
- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections
"""
from typing import Dict, Any
from motia import FlowContext
from services.espocrm import EspoCRMAPI
from services.document_sync_utils import DocumentSync
from services.xai_service import XAIService
import hashlib
import json
import redis
import os
config = {
"name": "VMH Document Sync Handler",
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
"flows": ["vmh-documents"],
"triggers": [
{"type": "queue", "topic": "vmh.document.create"},
{"type": "queue", "topic": "vmh.document.update"},
{"type": "queue", "topic": "vmh.document.delete"}
],
"enqueues": []
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
"""Zentraler Sync-Handler für Documents"""
entity_id = event_data.get('entity_id')
entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente
action = event_data.get('action')
source = event_data.get('source')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")
return
ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info(f"Source: {source}")
ctx.logger.info("=" * 80)
# Shared Redis client for distributed locking
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
# APIs initialisieren
espocrm = EspoCRMAPI()
sync_utils = DocumentSync(espocrm, redis_client, ctx)
xai_service = XAIService(ctx)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_type} {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
try:
document = await espocrm.get_entity(entity_type, entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
return
ctx.logger.info(f"📋 {entity_type} geladen:")
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}")
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
if action == 'delete':
await handle_delete(entity_id, document, sync_utils, xai_service, ctx, entity_type)
elif action in ['create', 'update']:
await handle_create_or_update(entity_id, document, sync_utils, xai_service, ctx, entity_type)
else:
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type)
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
success=False,
error_message=str(e)[:2000],
entity_type=entity_type
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:document:{entity_id}"
redis_client.delete(lock_key)
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
"""
Behandelt Create/Update von Documents
Entscheidet ob xAI-Sync nötig ist und führt diesen durch
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
ctx.logger.info("=" * 80)
# Datei-Status für Preview-Generierung (verschiedene Feld-Namen unterstützen)
datei_status = document.get('fileStatus') or document.get('dateiStatus')
# Entscheidungslogik: Soll dieses Document zu xAI?
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
ctx.logger.info(f" Grund: {reason}")
ctx.logger.info(f" File-Status: {datei_status or 'N/A'}")
if collection_ids:
ctx.logger.info(f" Collections: {collection_ids}")
# ═══════════════════════════════════════════════════════════════
# PREVIEW-GENERIERUNG bei neuen/geänderten Dateien
# ═══════════════════════════════════════════════════════════════
# Case-insensitive check für Datei-Status
datei_status_lower = (datei_status or '').lower()
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN")
ctx.logger.info(f" Datei-Status: {datei_status}")
ctx.logger.info("=" * 80)
try:
# 1. Hole Download-Informationen
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview")
else:
ctx.logger.info(f"📥 Datei-Info:")
ctx.logger.info(f" Filename: {download_info['filename']}")
ctx.logger.info(f" MIME-Type: {download_info['mime_type']}")
ctx.logger.info(f" Size: {download_info['size']} bytes")
# 2. Download File von EspoCRM
ctx.logger.info(f"📥 Downloading file...")
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. Speichere temporär für Preview-Generierung
import tempfile
import os
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{download_info['filename']}") as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
try:
# 4. Generiere Preview
ctx.logger.info(f"🖼️ Generating preview (600x800 WebP)...")
preview_data = await sync_utils.generate_thumbnail(
tmp_path,
download_info['mime_type'],
max_width=600,
max_height=800
)
if preview_data:
ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP")
# 5. Upload Preview zu EspoCRM
ctx.logger.info(f"📤 Uploading preview to EspoCRM...")
await sync_utils.update_sync_metadata(
entity_id,
preview_data=preview_data,
entity_type=entity_type
# Keine xaiFileId/collections - nur Preview update
)
ctx.logger.info(f"✅ Preview uploaded successfully")
else:
ctx.logger.warn("⚠️ Preview-Generierung lieferte keine Daten")
finally:
# Cleanup temp file
try:
os.remove(tmp_path)
except:
pass
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Preview-Generierung: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
# Continue - Preview ist optional
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ PREVIEW-VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════════
# xAI SYNC (falls erforderlich)
# ═══════════════════════════════════════════════════════════════
if not needs_sync:
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
# ═══════════════════════════════════════════════════════════════
# xAI SYNC DURCHFÜHREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🤖 xAI SYNC STARTEN")
ctx.logger.info("=" * 80)
# 1. Hole Download-Informationen (falls nicht schon aus Preview-Schritt vorhanden)
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
raise Exception("Konnte Download-Info nicht ermitteln Datei fehlt?")
ctx.logger.info(f"📥 Datei: {download_info['filename']} ({download_info['size']} bytes, {download_info['mime_type']})")
# 2. Download Datei von EspoCRM
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. MD5-Hash berechnen für Change-Detection
file_hash = hashlib.md5(file_content).hexdigest()
ctx.logger.info(f"🔑 MD5: {file_hash}")
# 4. Upload zu xAI
# Immer neu hochladen wenn needs_sync=True (neues File oder Hash geändert)
ctx.logger.info("📤 Uploading to xAI...")
xai_file_id = await xai_service.upload_file(
file_content,
download_info['filename'],
download_info['mime_type']
)
ctx.logger.info(f"✅ xAI file_id: {xai_file_id}")
# 5. Zu allen Ziel-Collections hinzufügen
ctx.logger.info(f"📚 Füge zu {len(collection_ids)} Collection(s) hinzu...")
added_collections = await xai_service.add_to_collections(collection_ids, xai_file_id)
ctx.logger.info(f"✅ In {len(added_collections)}/{len(collection_ids)} Collections eingetragen")
# 6. EspoCRM Metadaten aktualisieren und Lock freigeben
await sync_utils.update_sync_metadata(
entity_id,
xai_file_id=xai_file_id,
collection_ids=added_collections,
file_hash=file_hash,
entity_type=entity_type
)
await sync_utils.release_sync_lock(
entity_id,
success=True,
entity_type=entity_type
)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Create/Update: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
"""
Behandelt Delete von Documents
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
ctx.logger.info("=" * 80)
xai_file_id = document.get('xaiFileId') or document.get('xaiId')
xai_collections = document.get('xaiCollections') or []
if not xai_file_id or not xai_collections:
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
ctx.logger.info(f"📋 Document Info:")
ctx.logger.info(f" xaiFileId: {xai_file_id}")
ctx.logger.info(f" Collections: {xai_collections}")
ctx.logger.info(f"🗑️ Entferne aus {len(xai_collections)} Collection(s)...")
await xai_service.remove_from_collections(xai_collections, xai_file_id)
ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
ctx.logger.info(" (File selbst bleibt in xAI kann in anderen Collections sein)")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DELETE ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)