""" Document Sync Utilities Hilfsfunktionen für Document-Synchronisation mit xAI: - Distributed locking via Redis + syncStatus - Entscheidungslogik: Wann muss ein Document zu xAI? - Related Entities ermitteln (Many-to-Many Attachments) - xAI Collection Management """ from typing import Dict, Any, Optional, List, Tuple from datetime import datetime, timedelta import logging import redis import os logger = logging.getLogger(__name__) # Lock TTL in seconds (prevents deadlocks) LOCK_TTL_SECONDS = 900 # 15 minutes # Max retry before permanent failure MAX_SYNC_RETRIES = 5 # Retry backoff: Wartezeit zwischen Retries (in Minuten) RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h class DocumentSync: """Utility-Klasse für Document-Synchronisation mit xAI""" def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None): self.espocrm = espocrm_api self.context = context self.logger = context.logger if context else logger self.redis = redis_client or self._init_redis() def _init_redis(self) -> redis.Redis: """Initialize Redis client for distributed locking""" try: redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) client.ping() return client except Exception as e: self._log(f"Redis connection failed: {e}", level='error') return None def _log(self, message: str, level: str = 'info'): """Logging mit Context-Support""" if self.context and hasattr(self.context, 'logger'): getattr(self.context.logger, level)(message) else: getattr(logger, level)(message) async def acquire_sync_lock(self, entity_id: str) -> bool: """ Atomic distributed lock via Redis + syncStatus update Args: entity_id: EspoCRM Document ID Returns: True wenn Lock erfolgreich, False wenn bereits im Sync """ try: # STEP 1: Atomic Redis lock (prevents race conditions) if self.redis: lock_key = f"sync_lock:document:{entity_id}" acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) if not acquired: self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn') return False # STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert # NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden try: await self.espocrm.update_entity('Document', entity_id, { 'xaiSyncStatus': 'syncing' }) except Exception as e: self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug') self._log(f"Sync-Lock für Document {entity_id} erworben") return True except Exception as e: self._log(f"Fehler beim Acquire Lock: {e}", level='error') # Clean up Redis lock on error if self.redis: try: lock_key = f"sync_lock:document:{entity_id}" self.redis.delete(lock_key) except: pass return False async def release_sync_lock( self, entity_id: str, success: bool = True, error_message: Optional[str] = None, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Gibt Sync-Lock frei und setzt finalen Status Args: entity_id: EspoCRM Document ID success: Ob Sync erfolgreich war error_message: Optional: Fehlermeldung extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections) """ try: update_data = {} # Status-Feld (falls vorhanden) try: update_data['xaiSyncStatus'] = 'synced' if success else 'failed' if error_message: update_data['xaiSyncError'] = error_message[:2000] else: update_data['xaiSyncError'] = None except: pass # Felder existieren evtl. nicht # Merge extra fields (z.B. xaiFileId, xaiCollections) if extra_fields: update_data.update(extra_fields) if update_data: await self.espocrm.update_entity('Document', entity_id, update_data) self._log(f"Sync-Lock released: Document {entity_id} → {'success' if success else 'failed'}") # Release Redis lock if self.redis: lock_key = f"sync_lock:document:{entity_id}" self.redis.delete(lock_key) except Exception as e: self._log(f"Fehler beim Release Lock: {e}", level='error') # Ensure Redis lock is released even on error if self.redis: try: lock_key = f"sync_lock:document:{entity_id}" self.redis.delete(lock_key) except: pass async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]: """ Entscheidet ob ein Document zu xAI synchronisiert werden muss Prüft: 1. Datei-Status Feld ("Neu", "Geändert") 2. Hash-Werte für Change Detection 3. Related Entities mit xAI Collections Args: document: Vollständiges Document Entity von EspoCRM Returns: Tuple[bool, List[str], str]: - bool: Ob Sync nötig ist - List[str]: Liste der Collection-IDs in die das Document soll - str: Grund/Beschreibung der Entscheidung """ doc_id = document.get('id') doc_name = document.get('name', 'Unbenannt') # xAI-relevante Felder xai_file_id = document.get('xaiFileId') xai_collections = document.get('xaiCollections') or [] # Datei-Status und Hash-Felder datei_status = document.get('dateiStatus') or document.get('fileStatus') file_md5 = document.get('md5') or document.get('fileMd5') file_sha = document.get('sha') or document.get('fileSha') xai_synced_hash = document.get('xaiSyncedHash') # Hash beim letzten xAI-Sync self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})") self._log(f" xaiFileId: {xai_file_id or 'N/A'}") self._log(f" xaiCollections: {xai_collections}") self._log(f" Datei-Status: {datei_status or 'N/A'}") self._log(f" MD5: {file_md5[:16] if file_md5 else 'N/A'}...") self._log(f" SHA: {file_sha[:16] if file_sha else 'N/A'}...") self._log(f" xaiSyncedHash: {xai_synced_hash[:16] if xai_synced_hash else 'N/A'}...") # ═══════════════════════════════════════════════════════════════ # PRIORITY CHECK: Datei-Status "Neu" oder "Geändert" # ═══════════════════════════════════════════════════════════════ if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']: self._log(f"🆕 Datei-Status: '{datei_status}' → xAI-Sync ERFORDERLICH") # Hole Collections (entweder existierende oder von Related Entities) if xai_collections: target_collections = xai_collections else: target_collections = await self._get_required_collections_from_relations(doc_id) if target_collections: return (True, target_collections, f"Datei-Status: {datei_status}") else: # Datei ist neu/geändert aber keine Collections gefunden self._log(f"⚠️ Datei-Status '{datei_status}' aber keine Collections gefunden - überspringe Sync") return (False, [], f"Datei-Status: {datei_status}, aber keine Collections") # ═══════════════════════════════════════════════════════════════ # FALL 1: Document ist bereits in xAI UND Collections sind gesetzt # ═══════════════════════════════════════════════════════════════ if xai_file_id and xai_collections: self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)") # Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich) current_hash = file_md5 or file_sha if current_hash and xai_synced_hash: if current_hash != xai_synced_hash: self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich") self._log(f" Alt: {xai_synced_hash[:16]}...") self._log(f" Neu: {current_hash[:16]}...") return (True, xai_collections, "File-Inhalt geändert (Hash-Mismatch)") else: self._log(f"✅ Hash identisch - keine Änderung") else: self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich") return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt") # ═══════════════════════════════════════════════════════════════ # FALL 2: Document hat xaiFileId aber Collections ist leer/None # ═══════════════════════════════════════════════════════════════ if xai_file_id and not xai_collections: self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities") # Fallthrough zu FALL 3 - prüfe Related Entities # ═══════════════════════════════════════════════════════════════ # FALL 3: Prüfe Related Entities (Attachments) # ═══════════════════════════════════════════════════════════════ required_collections = await self._get_required_collections_from_relations(doc_id) if required_collections: self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben") return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen") # ═══════════════════════════════════════════════════════════════ # FALL 4: Keine Collections gefunden → kein Sync nötig # ═══════════════════════════════════════════════════════════════ self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections") return (False, [], "Keine verknüpften Entities mit xAI Collections") async def _get_required_collections_from_relations(self, document_id: str) -> List[str]: """ Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein (CBeteiligte, Account, CVmhErstgespraech, etc.) Args: document_id: Document ID Returns: Liste von xAI Collection-IDs (dedupliziert) """ collections = set() # Liste von Entity-Types die xAI Collections haben können # NOTE: Erweiterbar für andere Entities entity_types_with_collections = [ 'CBeteiligte', 'Account', 'CVmhErstgespraech', # Weitere Entity-Types hier hinzufügen ] self._log(f"🔍 Prüfe Attachments von Document {document_id}...") for entity_type in entity_types_with_collections: try: # Finde alle Entities dieses Typs die dieses Document attached haben # EspoCRM API: Suche wo documentsIds das Document enthält result = await self.espocrm.list_entities( entity_type, where=[{ 'type': 'arrayAnyOf', 'attribute': 'documentsIds', 'value': [document_id] }], select='id,name,xaiCollectionId', max_size=100 ) entities = result.get('list', []) if entities: self._log(f" ✅ {len(entities)} {entity_type}(s) gefunden") for entity in entities: collection_id = entity.get('xaiCollectionId') if collection_id: collections.add(collection_id) self._log(f" → {entity.get('name')}: Collection {collection_id[:16]}...") except Exception as e: self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn') continue result = list(collections) self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden") return result async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]: """ Holt Download-Informationen für ein Document Returns: Dict mit: - download_url: URL zum Download - filename: Dateiname - mime_type: MIME-Type - size: Dateigröße in Bytes """ try: # Hole vollständiges Document doc = await self.espocrm.get_entity('Document', document_id) # EspoCRM Document hat Attachments (Attachment ID in attachmentsIds) attachment_ids = doc.get('attachmentsIds') or [] if not attachment_ids: self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn') return None # Nehme erstes Attachment (Documents haben normalerweise nur 1 File) attachment_id = attachment_ids[0] # Hole Attachment-Details attachment = await self.espocrm.get_entity('Attachment', attachment_id) return { 'attachment_id': attachment_id, 'download_url': f"/api/v1/Attachment/file/{attachment_id}", 'filename': attachment.get('name', 'unknown'), 'mime_type': attachment.get('type', 'application/octet-stream'), 'size': attachment.get('size', 0) } except Exception as e: self._log(f"❌ Fehler beim Laden von Download-Info: {e}", level='error') return None async def generate_thumbnail(self, file_path: str, mime_type: str) -> Optional[bytes]: """ Generiert Vorschaubild (Thumbnail) für ein Document Unterstützt: - PDF: Erste Seite als Bild - DOCX/DOC: Konvertierung zu PDF, dann erste Seite - Images: Resize auf Thumbnail-Größe - Andere: Platzhalter-Icon basierend auf MIME-Type Args: file_path: Pfad zur Datei (lokal oder Download-URL) mime_type: MIME-Type des Documents Returns: Thumbnail als bytes (PNG/JPEG) oder None bei Fehler """ self._log(f"🖼️ Thumbnail-Generierung für {mime_type}") # TODO: Implementierung # # Benötigte Libraries: # - pdf2image (für PDF → Image) # - python-docx + docx2pdf (für DOCX → PDF → Image) # - Pillow (PIL) für Image-Processing # - poppler-utils (System-Dependency für pdf2image) # # Implementierungs-Schritte: # # 1. PDF-Handling: # from pdf2image import convert_from_path # images = convert_from_path(file_path, first_page=1, last_page=1) # thumbnail = images[0].resize((200, 280)) # return thumbnail_to_bytes(thumbnail) # # 2. DOCX-Handling: # - Konvertiere zu temporärem PDF # - Dann wie PDF behandeln # # 3. Image-Handling: # from PIL import Image # img = Image.open(file_path) # img.thumbnail((200, 280)) # return image_to_bytes(img) # # 4. Fallback: # - Generic file-type icon basierend auf MIME-Type self._log(f"⚠️ Thumbnail-Generierung noch nicht implementiert", level='warn') return None async def update_sync_metadata( self, document_id: str, xai_file_id: str, collection_ids: List[str], file_hash: Optional[str] = None, thumbnail_data: Optional[bytes] = None ) -> None: """ Updated Document-Metadaten nach erfolgreichem xAI-Sync Args: document_id: EspoCRM Document ID xai_file_id: xAI File ID collection_ids: Liste der xAI Collection IDs file_hash: MD5/SHA Hash des gesyncten Files thumbnail_data: Vorschaubild als bytes """ try: update_data = { 'xaiFileId': xai_file_id, 'xaiCollections': collection_ids, 'dateiStatus': 'Gesynct', # Status zurücksetzen } # Hash speichern für zukünftige Change Detection if file_hash: update_data['xaiSyncedHash'] = file_hash # Thumbnail als Attachment hochladen (falls vorhanden) if thumbnail_data: # TODO: Implementiere Thumbnail-Upload zu EspoCRM # EspoCRM unterstützt Preview-Images für Documents # Muss als separates Attachment hochgeladen werden self._log(f"⚠️ Thumbnail-Upload noch nicht implementiert", level='warn') await self.espocrm.update_entity('Document', document_id, update_data) self._log(f"✅ Sync-Metadaten aktualisiert für Document {document_id}") except Exception as e: self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error') raise