""" Document Sync Utilities Hilfsfunktionen für Document-Synchronisation mit xAI: - Distributed locking via Redis + syncStatus - Entscheidungslogik: Wann muss ein Document zu xAI? - Related Entities ermitteln (Many-to-Many Attachments) - xAI Collection Management """ from typing import Dict, Any, Optional, List, Tuple from datetime import datetime, timedelta import logging from services.sync_utils_base import BaseSyncUtils logger = logging.getLogger(__name__) # Max retry before permanent failure MAX_SYNC_RETRIES = 5 # Retry backoff: Wartezeit zwischen Retries (in Minuten) RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h class DocumentSync(BaseSyncUtils): """Utility-Klasse für Document-Synchronisation mit xAI""" def _get_lock_key(self, entity_id: str) -> str: """Redis Lock-Key für Documents""" return f"sync_lock:document:{entity_id}" async def acquire_sync_lock(self, entity_id: str, entity_type: str = 'CDokumente') -> bool: """ Atomic distributed lock via Redis + syncStatus update Args: entity_id: EspoCRM Document ID entity_type: Entity-Type (CDokumente oder Document) Returns: True wenn Lock erfolgreich, False wenn bereits im Sync """ try: # STEP 1: Atomic Redis lock (prevents race conditions) lock_key = self._get_lock_key(entity_id) if not self._acquire_redis_lock(lock_key): self._log(f"Redis lock bereits aktiv für {entity_type} {entity_id}", level='warn') return False # STEP 2: Update syncStatus (für UI visibility) - nur bei Document Entity # CDokumente hat dieses Feld nicht - überspringen if entity_type == 'Document': try: await self.espocrm.update_entity(entity_type, entity_id, { 'xaiSyncStatus': 'syncing' }) except Exception as e: self._log(f"Konnte xaiSyncStatus nicht setzen: {e}", level='debug') self._log(f"Sync-Lock für {entity_type} {entity_id} erworben") return True except Exception as e: self._log(f"Fehler beim Acquire Lock: {e}", level='error') # Clean up Redis lock on error lock_key = self._get_lock_key(entity_id) self._release_redis_lock(lock_key) return False async def release_sync_lock( self, entity_id: str, success: bool = True, error_message: Optional[str] = None, extra_fields: Optional[Dict[str, Any]] = None, entity_type: str = 'CDokumente' ) -> None: """ Gibt Sync-Lock frei und setzt finalen Status Args: entity_id: EspoCRM Document ID success: Ob Sync erfolgreich war error_message: Optional: Fehlermeldung extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections) entity_type: Entity-Type (CDokumente oder Document) """ try: update_data = {} # Status-Felder nur bei Document Entity (CDokumente hat diese Felder nicht) if entity_type == 'Document': try: update_data['xaiSyncStatus'] = 'synced' if success else 'failed' if error_message: update_data['xaiSyncError'] = error_message[:2000] else: update_data['xaiSyncError'] = None except: pass # Felder existieren evtl. nicht # Merge extra fields (z.B. xaiFileId, xaiCollections) if extra_fields: update_data.update(extra_fields) if update_data: await self.espocrm.update_entity(entity_type, entity_id, update_data) self._log(f"Sync-Lock released: {entity_type} {entity_id} → {'success' if success else 'failed'}") # Release Redis lock lock_key = self._get_lock_key(entity_id) self._release_redis_lock(lock_key) except Exception as e: self._log(f"Fehler beim Release Lock: {e}", level='error') # Ensure Redis lock is released even on error lock_key = self._get_lock_key(entity_id) self._release_redis_lock(lock_key) async def should_sync_to_xai( self, document: Dict[str, Any], entity_type: str = 'CDokumente' ) -> Tuple[bool, List[str], str]: """ Entscheidet ob ein Document zu xAI synchronisiert werden muss Prüft: 1. Datei-Status Feld ("Neu", "Geändert") 2. Hash-Werte für Change Detection 3. Related Entities mit xAI Collections Args: document: Vollständiges Document Entity von EspoCRM Returns: Tuple[bool, List[str], str]: - bool: Ob Sync nötig ist - List[str]: Liste der Collection-IDs in die das Document soll - str: Grund/Beschreibung der Entscheidung """ doc_id = document.get('id') doc_name = document.get('name', 'Unbenannt') # xAI-relevante Felder xai_file_id = document.get('xaiFileId') xai_collections = document.get('xaiCollections') or [] # Datei-Status und Hash-Felder datei_status = document.get('dateiStatus') or document.get('fileStatus') file_md5 = document.get('md5') or document.get('fileMd5') file_sha = document.get('sha') or document.get('fileSha') xai_synced_hash = document.get('xaiSyncedHash') # Hash beim letzten xAI-Sync self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})") self._log(f" xaiFileId: {xai_file_id or 'N/A'}") self._log(f" xaiCollections: {xai_collections}") self._log(f" Datei-Status: {datei_status or 'N/A'}") self._log(f" MD5: {file_md5[:16] if file_md5 else 'N/A'}...") self._log(f" SHA: {file_sha[:16] if file_sha else 'N/A'}...") self._log(f" xaiSyncedHash: {xai_synced_hash[:16] if xai_synced_hash else 'N/A'}...") # Determine target collections from relations (CDokumente -> linked entities) target_collections = await self._get_required_collections_from_relations( doc_id, entity_type=entity_type ) if not target_collections: self._log("⏭️ Kein xAI-Sync nötig: Keine Related Entities mit xAI Collections") return (False, [], "Keine verknüpften Entities mit xAI Collections") # ═══════════════════════════════════════════════════════════════ # PRIORITY CHECK: Datei-Status "Neu" oder "Geändert" # ═══════════════════════════════════════════════════════════════ if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']: self._log(f"🆕 Datei-Status: '{datei_status}' → xAI-Sync ERFORDERLICH") if target_collections: return (True, target_collections, f"Datei-Status: {datei_status}") else: # Datei ist neu/geändert aber keine Collections gefunden self._log(f"⚠️ Datei-Status '{datei_status}' aber keine Collections gefunden - überspringe Sync") return (False, [], f"Datei-Status: {datei_status}, aber keine Collections") # ═══════════════════════════════════════════════════════════════ # FALL 1: Document ist bereits in xAI UND Collections sind gesetzt # ═══════════════════════════════════════════════════════════════ if xai_file_id: self._log(f"✅ Document bereits in xAI gesynct mit {len(target_collections)} Collection(s)") # Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich) current_hash = file_md5 or file_sha if current_hash and xai_synced_hash: if current_hash != xai_synced_hash: self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich") self._log(f" Alt: {xai_synced_hash[:16]}...") self._log(f" Neu: {current_hash[:16]}...") return (True, target_collections, "File-Inhalt geändert (Hash-Mismatch)") else: self._log(f"✅ Hash identisch - keine Änderung") else: self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich") return (False, target_collections, "Bereits gesynct, keine Änderung erkannt") # ═══════════════════════════════════════════════════════════════ # FALL 2: Document hat xaiFileId aber Collections ist leer/None # ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════ # FALL 3: Collections vorhanden aber kein Status/Hash-Trigger # ═══════════════════════════════════════════════════════════════ self._log(f"✅ Document ist mit {len(target_collections)} Entity/ies verknüpft die Collections haben") return (True, target_collections, "Verknüpft mit Entities die Collections benötigen") async def _get_required_collections_from_relations( self, document_id: str, entity_type: str = 'Document' ) -> List[str]: """ Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein (CBeteiligte, Account, CVmhErstgespraech, etc.) Args: document_id: Document ID Returns: Liste von xAI Collection-IDs (dedupliziert) """ collections = set() self._log(f"🔍 Prüfe Relations von {entity_type} {document_id}...") try: entity_def = await self.espocrm.get_entity_def(entity_type) links = entity_def.get('links', {}) if isinstance(entity_def, dict) else {} except Exception as e: self._log(f"⚠️ Konnte Metadata fuer {entity_type} nicht laden: {e}", level='warn') links = {} link_types = {'hasMany', 'hasChildren', 'manyMany', 'hasManyThrough'} for link_name, link_def in links.items(): try: if not isinstance(link_def, dict): continue if link_def.get('type') not in link_types: continue related_entity = link_def.get('entity') if not related_entity: continue related_def = await self.espocrm.get_entity_def(related_entity) related_fields = related_def.get('fields', {}) if isinstance(related_def, dict) else {} select_fields = ['id'] if 'xaiCollectionId' in related_fields: select_fields.append('xaiCollectionId') offset = 0 page_size = 100 while True: result = await self.espocrm.list_related( entity_type, document_id, link_name, select=','.join(select_fields), offset=offset, max_size=page_size ) entities = result.get('list', []) if not entities: break for entity in entities: collection_id = entity.get('xaiCollectionId') if collection_id: collections.add(collection_id) if len(entities) < page_size: break offset += page_size except Exception as e: self._log(f" ⚠️ Fehler beim Prüfen von Link {link_name}: {e}", level='warn') continue result = list(collections) self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden") return result async def get_document_download_info(self, document_id: str, entity_type: str = 'CDokumente') -> Optional[Dict[str, Any]]: """ Holt Download-Informationen für ein Document Args: document_id: ID des Documents entity_type: Entity-Type (CDokumente oder Document) Returns: Dict mit: - attachment_id: ID des Attachments - download_url: URL zum Download - filename: Dateiname - mime_type: MIME-Type - size: Dateigröße in Bytes """ try: # Hole vollständiges Document doc = await self.espocrm.get_entity(entity_type, document_id) # EspoCRM Documents können Files auf verschiedene Arten speichern: # CDokumente: dokumentId/dokumentName (Custom Entity) # Document: fileId/fileName ODER attachmentsIds attachment_id = None filename = None # Prüfe zuerst dokumentId (CDokumente Custom Entity) if doc.get('dokumentId'): attachment_id = doc.get('dokumentId') filename = doc.get('dokumentName') self._log(f"📎 CDokumente verwendet dokumentId: {attachment_id}") # Fallback: fileId (Standard Document Entity) elif doc.get('fileId'): attachment_id = doc.get('fileId') filename = doc.get('fileName') self._log(f"📎 Document verwendet fileId: {attachment_id}") # Fallback 2: attachmentsIds (z.B. bei zusätzlichen Attachments) elif doc.get('attachmentsIds'): attachment_ids = doc.get('attachmentsIds') if attachment_ids: attachment_id = attachment_ids[0] self._log(f"📎 Document verwendet attachmentsIds: {attachment_id}") if not attachment_id: self._log(f"⚠️ {entity_type} {document_id} hat weder dokumentId, fileId noch attachmentsIds", level='warn') self._log(f" Verfügbare Felder: {list(doc.keys())}") return None # Hole Attachment-Details attachment = await self.espocrm.get_entity('Attachment', attachment_id) # Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment final_filename = filename or attachment.get('name', 'unknown') return { 'attachment_id': attachment_id, 'download_url': f"/api/v1/Attachment/file/{attachment_id}", 'filename': final_filename, 'mime_type': attachment.get('type', 'application/octet-stream'), 'size': attachment.get('size', 0) } except Exception as e: self._log(f"❌ Fehler beim Laden von Download-Info: {e}", level='error') return None async def generate_thumbnail(self, file_path: str, mime_type: str, max_width: int = 600, max_height: int = 800) -> Optional[bytes]: """ Generiert Vorschaubild (Preview) für ein Document im WebP-Format Unterstützt: - PDF: Erste Seite als Bild - DOCX/DOC: Konvertierung zu PDF, dann erste Seite - Images: Resize auf Preview-Größe - Andere: Platzhalter-Icon basierend auf MIME-Type Args: file_path: Pfad zur Datei (lokal) mime_type: MIME-Type des Documents max_width: Maximale Breite (default: 600px) max_height: Maximale Höhe (default: 800px) Returns: Preview als WebP bytes oder None bei Fehler """ self._log(f"🖼️ Preview-Generierung für {mime_type} (max: {max_width}x{max_height})") try: from PIL import Image import io thumbnail = None # PDF-Handling if mime_type == 'application/pdf': try: from pdf2image import convert_from_path self._log(" Converting PDF page 1 to image...") images = convert_from_path(file_path, first_page=1, last_page=1, dpi=150) if images: thumbnail = images[0] except ImportError: self._log("⚠️ pdf2image nicht installiert - überspringe PDF-Preview", level='warn') return None except Exception as e: self._log(f"⚠️ PDF-Konvertierung fehlgeschlagen: {e}", level='warn') return None # DOCX/DOC-Handling elif mime_type in ['application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'application/msword']: try: import tempfile import os from docx2pdf import convert from pdf2image import convert_from_path self._log(" Converting DOCX → PDF → Image...") # Temporäres PDF erstellen with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp: pdf_path = tmp.name # DOCX → PDF (benötigt LibreOffice) convert(file_path, pdf_path) # PDF → Image images = convert_from_path(pdf_path, first_page=1, last_page=1, dpi=150) if images: thumbnail = images[0] # Cleanup os.remove(pdf_path) except ImportError: self._log("⚠️ docx2pdf nicht installiert - überspringe DOCX-Preview", level='warn') return None except Exception as e: self._log(f"⚠️ DOCX-Konvertierung fehlgeschlagen: {e}", level='warn') return None # Image-Handling elif mime_type.startswith('image/'): try: self._log(" Processing image file...") thumbnail = Image.open(file_path) except Exception as e: self._log(f"⚠️ Image-Laden fehlgeschlagen: {e}", level='warn') return None else: self._log(f"⚠️ Keine Preview-Generierung für MIME-Type: {mime_type}", level='warn') return None if not thumbnail: return None # Resize auf max dimensions (behält Aspect Ratio) thumbnail.thumbnail((max_width, max_height), Image.Resampling.LANCZOS) # Convert zu WebP bytes buffer = io.BytesIO() thumbnail.save(buffer, format='WEBP', quality=85) webp_bytes = buffer.getvalue() self._log(f"✅ Preview generiert: {len(webp_bytes)} bytes WebP") return webp_bytes except Exception as e: self._log(f"❌ Fehler bei Preview-Generierung: {e}", level='error') import traceback self._log(traceback.format_exc(), level='debug') return None async def update_sync_metadata( self, document_id: str, xai_file_id: Optional[str] = None, collection_ids: Optional[List[str]] = None, file_hash: Optional[str] = None, preview_data: Optional[bytes] = None, reset_file_status: bool = False, entity_type: str = 'CDokumente' ) -> None: """ Updated Document-Metadaten nach erfolgreichem xAI-Sync oder Preview-Generierung Args: document_id: EspoCRM Document ID xai_file_id: xAI File ID (optional - setzt nur wenn vorhanden) collection_ids: Liste der xAI Collection IDs (optional) file_hash: MD5/SHA Hash des gesyncten Files preview_data: Vorschaubild (WebP) als bytes reset_file_status: Ob fileStatus/dateiStatus zurückgesetzt werden soll entity_type: Entity-Type (CDokumente oder Document) """ try: update_data = {} # Nur xAI-Felder updaten wenn vorhanden if xai_file_id: # CDokumente verwendet xaiId, Document verwendet xaiFileId if entity_type == 'CDokumente': update_data['xaiId'] = xai_file_id else: update_data['xaiFileId'] = xai_file_id if collection_ids is not None: update_data['xaiCollections'] = collection_ids # Status zurücksetzen wenn xAI-Sync erfolgt ist ODER explizit angefordert if xai_file_id or reset_file_status: # CDokumente verwendet fileStatus, Document verwendet dateiStatus if entity_type == 'CDokumente': # Bei xAI-Sync: "synced", bei nur Preview: "processed" update_data['fileStatus'] = 'synced' if xai_file_id else 'processed' else: # Bei xAI-Sync: "Gesynct", bei nur Preview: "Verarbeitet" update_data['dateiStatus'] = 'Gesynct' if xai_file_id else 'Verarbeitet' # Hash speichern für zukünftige Change Detection if file_hash: update_data['xaiSyncedHash'] = file_hash # Preview als Attachment hochladen (falls vorhanden) if preview_data: await self._upload_preview_to_espocrm(document_id, preview_data, entity_type) # Nur updaten wenn es etwas zu updaten gibt if update_data: await self.espocrm.update_entity(entity_type, document_id, update_data) self._log(f"✅ Sync-Metadaten aktualisiert für {entity_type} {document_id}: {list(update_data.keys())}") except Exception as e: self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error') raise async def _upload_preview_to_espocrm(self, document_id: str, preview_data: bytes, entity_type: str = 'CDokumente') -> None: """ Lädt Preview-Image als Attachment zu EspoCRM hoch Args: document_id: Document ID preview_data: WebP Preview als bytes entity_type: Entity-Type (CDokumente oder Document) """ try: self._log(f"📤 Uploading preview image to {entity_type} ({len(preview_data)} bytes)...") # EspoCRM erwartet base64-encoded file im Format: data:mime/type;base64,xxxxx import base64 import aiohttp # Base64-encode preview data base64_data = base64.b64encode(preview_data).decode('ascii') file_data_uri = f"data:image/webp;base64,{base64_data}" # Upload via JSON POST mit base64-encoded file field url = self.espocrm.api_base_url.rstrip('/') + '/Attachment' headers = { 'X-Api-Key': self.espocrm.api_key, 'Content-Type': 'application/json' } payload = { 'name': 'preview.webp', 'type': 'image/webp', 'role': 'Attachment', 'field': 'preview', 'relatedType': entity_type, 'relatedId': document_id, 'file': file_data_uri } self._log(f"📤 Posting to {url} with base64-encoded file ({len(base64_data)} chars)") self._log(f" relatedType={entity_type}, relatedId={document_id}, field=preview") timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, headers=headers, json=payload) as response: self._log(f"Upload response status: {response.status}") if response.status >= 400: error_text = await response.text() self._log(f"❌ Upload failed: {error_text}", level='error') raise Exception(f"Upload error {response.status}: {error_text}") result = await response.json() attachment_id = result.get('id') self._log(f"✅ Preview Attachment created: {attachment_id}") # Update Entity mit previewId self._log(f"📝 Updating {entity_type} with previewId...") await self.espocrm.update_entity(entity_type, document_id, { 'previewId': attachment_id, 'previewName': 'preview.webp' }) self._log(f"✅ {entity_type} previewId/previewName aktualisiert") except Exception as e: self._log(f"❌ Fehler beim Preview-Upload: {e}", level='error') # Don't raise - Preview ist optional, Sync sollte trotzdem erfolgreich sein