feat(document-sync): add Document Sync Utilities and VMH Document Sync Handler for xAI integration

This commit is contained in:
bsiggel
2026-03-03 06:55:54 +00:00
parent cb0e170ee9
commit ee9aab049f
2 changed files with 598 additions and 0 deletions

View File

@@ -0,0 +1,318 @@
"""
Document Sync Utilities
Hilfsfunktionen für Document-Synchronisation mit xAI:
- Distributed locking via Redis + syncStatus
- Entscheidungslogik: Wann muss ein Document zu xAI?
- Related Entities ermitteln (Many-to-Many Attachments)
- xAI Collection Management
"""
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
import logging
import redis
import os
logger = logging.getLogger(__name__)
# Lock TTL in seconds (prevents deadlocks)
LOCK_TTL_SECONDS = 900 # 15 minutes
# Max retry before permanent failure
MAX_SYNC_RETRIES = 5
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
class DocumentSync:
"""Utility-Klasse für Document-Synchronisation mit xAI"""
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
self.espocrm = espocrm_api
self.context = context
self.logger = context.logger if context else logger
self.redis = redis_client or self._init_redis()
def _init_redis(self) -> redis.Redis:
"""Initialize Redis client for distributed locking"""
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
client.ping()
return client
except Exception as e:
self._log(f"Redis connection failed: {e}", level='error')
return None
def _log(self, message: str, level: str = 'info'):
"""Logging mit Context-Support"""
if self.context and hasattr(self.context, 'logger'):
getattr(self.context.logger, level)(message)
else:
getattr(logger, level)(message)
async def acquire_sync_lock(self, entity_id: str) -> bool:
"""
Atomic distributed lock via Redis + syncStatus update
Args:
entity_id: EspoCRM Document ID
Returns:
True wenn Lock erfolgreich, False wenn bereits im Sync
"""
try:
# STEP 1: Atomic Redis lock (prevents race conditions)
if self.redis:
lock_key = f"sync_lock:document:{entity_id}"
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
if not acquired:
self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn')
return False
# STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert
# NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden
try:
await self.espocrm.update_entity('Document', entity_id, {
'xaiSyncStatus': 'syncing'
})
except Exception as e:
self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug')
self._log(f"Sync-Lock für Document {entity_id} erworben")
return True
except Exception as e:
self._log(f"Fehler beim Acquire Lock: {e}", level='error')
# Clean up Redis lock on error
if self.redis:
try:
lock_key = f"sync_lock:document:{entity_id}"
self.redis.delete(lock_key)
except:
pass
return False
async def release_sync_lock(
self,
entity_id: str,
success: bool = True,
error_message: Optional[str] = None,
extra_fields: Optional[Dict[str, Any]] = None
) -> None:
"""
Gibt Sync-Lock frei und setzt finalen Status
Args:
entity_id: EspoCRM Document ID
success: Ob Sync erfolgreich war
error_message: Optional: Fehlermeldung
extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections)
"""
try:
update_data = {}
# Status-Feld (falls vorhanden)
try:
update_data['xaiSyncStatus'] = 'synced' if success else 'failed'
if error_message:
update_data['xaiSyncError'] = error_message[:2000]
else:
update_data['xaiSyncError'] = None
except:
pass # Felder existieren evtl. nicht
# Merge extra fields (z.B. xaiFileId, xaiCollections)
if extra_fields:
update_data.update(extra_fields)
if update_data:
await self.espocrm.update_entity('Document', entity_id, update_data)
self._log(f"Sync-Lock released: Document {entity_id}{'success' if success else 'failed'}")
# Release Redis lock
if self.redis:
lock_key = f"sync_lock:document:{entity_id}"
self.redis.delete(lock_key)
except Exception as e:
self._log(f"Fehler beim Release Lock: {e}", level='error')
# Ensure Redis lock is released even on error
if self.redis:
try:
lock_key = f"sync_lock:document:{entity_id}"
self.redis.delete(lock_key)
except:
pass
async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]:
"""
Entscheidet ob ein Document zu xAI synchronisiert werden muss
Args:
document: Vollständiges Document Entity von EspoCRM
Returns:
Tuple[bool, List[str], str]:
- bool: Ob Sync nötig ist
- List[str]: Liste der Collection-IDs in die das Document soll
- str: Grund/Beschreibung der Entscheidung
"""
doc_id = document.get('id')
doc_name = document.get('name', 'Unbenannt')
# xAI-relevante Felder
xai_file_id = document.get('xaiFileId')
xai_collections = document.get('xaiCollections') or []
self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})")
self._log(f" xaiFileId: {xai_file_id or 'N/A'}")
self._log(f" xaiCollections: {xai_collections}")
# ═══════════════════════════════════════════════════════════════
# FALL 1: Document ist bereits in xAI UND Collections sind gesetzt
# ═══════════════════════════════════════════════════════════════
if xai_file_id and xai_collections:
self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)")
# Prüfe ob Update nötig (z.B. wenn File selbst geändert wurde)
# TODO: Implementiere File-Hash-Vergleich für Update-Erkennung
return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt")
# ═══════════════════════════════════════════════════════════════
# FALL 2: Document hat xaiFileId aber Collections ist leer/None
# ═══════════════════════════════════════════════════════════════
if xai_file_id and not xai_collections:
self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities")
# Fallthrough zu FALL 3 - prüfe Related Entities
# ═══════════════════════════════════════════════════════════════
# FALL 3: Prüfe Related Entities (Attachments)
# ═══════════════════════════════════════════════════════════════
required_collections = await self._get_required_collections_from_relations(doc_id)
if required_collections:
self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben")
return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen")
# ═══════════════════════════════════════════════════════════════
# FALL 4: Keine Collections gefunden → kein Sync nötig
# ═══════════════════════════════════════════════════════════════
self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections")
return (False, [], "Keine verknüpften Entities mit xAI Collections")
async def _get_required_collections_from_relations(self, document_id: str) -> List[str]:
"""
Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind
EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein
(CBeteiligte, Account, CVmhErstgespraech, etc.)
Args:
document_id: Document ID
Returns:
Liste von xAI Collection-IDs (dedupliziert)
"""
collections = set()
# Liste von Entity-Types die xAI Collections haben können
# NOTE: Erweiterbar für andere Entities
entity_types_with_collections = [
'CBeteiligte',
'Account',
'CVmhErstgespraech',
# Weitere Entity-Types hier hinzufügen
]
self._log(f"🔍 Prüfe Attachments von Document {document_id}...")
for entity_type in entity_types_with_collections:
try:
# Finde alle Entities dieses Typs die dieses Document attached haben
# EspoCRM API: Suche wo documentsIds das Document enthält
result = await self.espocrm.list_entities(
entity_type,
where=[{
'type': 'arrayAnyOf',
'attribute': 'documentsIds',
'value': [document_id]
}],
select='id,name,xaiCollectionId',
max_size=100
)
entities = result.get('list', [])
if entities:
self._log(f"{len(entities)} {entity_type}(s) gefunden")
for entity in entities:
collection_id = entity.get('xaiCollectionId')
if collection_id:
collections.add(collection_id)
self._log(f"{entity.get('name')}: Collection {collection_id[:16]}...")
except Exception as e:
self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn')
continue
result = list(collections)
self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden")
return result
async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]:
"""
Holt Download-Informationen für ein Document
Returns:
Dict mit:
- download_url: URL zum Download
- filename: Dateiname
- mime_type: MIME-Type
- size: Dateigröße in Bytes
"""
try:
# Hole vollständiges Document
doc = await self.espocrm.get_entity('Document', document_id)
# EspoCRM Document hat Attachments (Attachment ID in attachmentsIds)
attachment_ids = doc.get('attachmentsIds') or []
if not attachment_ids:
self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn')
return None
# Nehme erstes Attachment (Documents haben normalerweise nur 1 File)
attachment_id = attachment_ids[0]
# Hole Attachment-Details
attachment = await self.espocrm.get_entity('Attachment', attachment_id)
return {
'attachment_id': attachment_id,
'download_url': f"/api/v1/Attachment/file/{attachment_id}",
'filename': attachment.get('name', 'unknown'),
'mime_type': attachment.get('type', 'application/octet-stream'),
'size': attachment.get('size', 0)
}
except Exception as e:
self._log(f"❌ Fehler beim Laden von Download-Info: {e}", level='error')
return None

View File

@@ -0,0 +1,280 @@
"""
VMH Document Sync Handler
Zentraler Sync-Handler für Documents mit xAI Collections
Verarbeitet:
- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig
- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig
- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections
"""
from typing import Dict, Any
from motia import FlowContext
from services.espocrm import EspoCRMAPI
from services.document_sync_utils import DocumentSync
import json
import redis
import os
config = {
"name": "VMH Document Sync Handler",
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
"flows": ["vmh-documents"],
"triggers": [
{"type": "queue", "topic": "vmh.document.create"},
{"type": "queue", "topic": "vmh.document.update"},
{"type": "queue", "topic": "vmh.document.delete"}
],
"enqueues": []
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
"""Zentraler Sync-Handler für Documents"""
entity_id = event_data.get('entity_id')
action = event_data.get('action')
source = event_data.get('source')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")
return
ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info(f"Source: {source}")
ctx.logger.info("=" * 80)
# Shared Redis client for distributed locking
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
# APIs initialisieren
espocrm = EspoCRMAPI()
sync_utils = DocumentSync(espocrm, redis_client, ctx)
# TODO: xAI Service wird in nächstem Schritt hinzugefügt
# from services.xai_service import XAIService
# xai_service = XAIService(ctx)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
try:
document = await espocrm.get_entity('Document', entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
return
ctx.logger.info(f"📋 Document geladen:")
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}")
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
if action == 'delete':
await handle_delete(entity_id, document, sync_utils, ctx)
elif action in ['create', 'update']:
await handle_create_or_update(entity_id, document, sync_utils, ctx)
else:
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}")
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
success=False,
error_message=str(e)[:2000]
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:document:{entity_id}"
redis_client.delete(lock_key)
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
"""
Behandelt Create/Update von Documents
Entscheidet ob xAI-Sync nötig ist und führt diesen durch
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
ctx.logger.info("=" * 80)
# Entscheidungslogik: Soll dieses Document zu xAI?
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
ctx.logger.info(f" Grund: {reason}")
if collection_ids:
ctx.logger.info(f" Collections: {collection_ids}")
if not needs_sync:
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
await sync_utils.release_sync_lock(entity_id, success=True)
return
# ═══════════════════════════════════════════════════════════════
# xAI SYNC DURCHFÜHREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🤖 xAI SYNC STARTEN")
ctx.logger.info("=" * 80)
# TODO: Implementierung mit xai_service.py
ctx.logger.warn("⚠️ xAI Service noch nicht implementiert!")
ctx.logger.info("")
ctx.logger.info("TODO: Folgende Schritte werden implementiert:")
ctx.logger.info("1. 📥 Download File von EspoCRM")
ctx.logger.info("2. 📤 Upload zu xAI (falls noch kein xaiFileId)")
ctx.logger.info("3. 📚 Add zu Collections")
ctx.logger.info("4. 💾 Update EspoCRM: xaiFileId + xaiCollections")
ctx.logger.info("")
# PLACEHOLDER Implementation:
#
# # 1. Download File von EspoCRM
# download_info = await sync_utils.get_document_download_info(entity_id)
# if not download_info:
# raise Exception("Konnte Download-Info nicht ermitteln")
#
# # 2. Upload zu xAI (falls noch nicht vorhanden)
# xai_file_id = document.get('xaiFileId')
# if not xai_file_id:
# file_content = await download_file_from_espocrm(download_info)
# xai_file_id = await xai_service.upload_file(file_content, download_info['filename'])
#
# # 3. Add zu Collections
# for collection_id in collection_ids:
# await xai_service.add_file_to_collection(collection_id, xai_file_id)
#
# # 4. Update EspoCRM
# await sync_utils.release_sync_lock(
# entity_id,
# success=True,
# extra_fields={
# 'xaiFileId': xai_file_id,
# 'xaiCollections': collection_ids
# }
# )
# Für jetzt: Success ohne Sync
await sync_utils.release_sync_lock(
entity_id,
success=True,
extra_fields={
# TODO: Echte xAI-Daten hier einsetzen
# 'xaiFileId': xai_file_id,
# 'xaiCollections': collection_ids
}
)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN (PLACEHOLDER)")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Create/Update: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
"""
Behandelt Deletion von Documents
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
ctx.logger.info("=" * 80)
xai_file_id = document.get('xaiFileId')
xai_collections = document.get('xaiCollections') or []
if not xai_file_id or not xai_collections:
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
await sync_utils.release_sync_lock(entity_id, success=True)
return
ctx.logger.info(f"📋 Document Info:")
ctx.logger.info(f" xaiFileId: {xai_file_id}")
ctx.logger.info(f" Collections: {xai_collections}")
# TODO: Implementierung mit xai_service
ctx.logger.warn("⚠️ xAI Delete-Operation noch nicht implementiert!")
ctx.logger.info("")
ctx.logger.info("TODO: Folgende Schritte werden implementiert:")
ctx.logger.info("1. 🗑️ Remove File aus allen Collections")
ctx.logger.info("2. ⚠️ File NICHT von xAI löschen (kann in anderen Collections sein)")
ctx.logger.info("")
# PLACEHOLDER Implementation:
#
# for collection_id in xai_collections:
# await xai_service.remove_file_from_collection(collection_id, xai_file_id)
#
# ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
await sync_utils.release_sync_lock(entity_id, success=True)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))