From d7b2b5543f1a664a05c48a225d096991ff4f8181 Mon Sep 17 00:00:00 2001 From: bsiggel Date: Sun, 8 Mar 2026 17:51:36 +0000 Subject: [PATCH] feat(espocrm): add caching for entity definitions and implement related entity listing --- services/document_sync_utils.py | 140 ++++++++++++++++++-------------- services/espocrm.py | 48 +++++++++++ 2 files changed, 126 insertions(+), 62 deletions(-) diff --git a/services/document_sync_utils.py b/services/document_sync_utils.py index 660167f..5e083e6 100644 --- a/services/document_sync_utils.py +++ b/services/document_sync_utils.py @@ -120,7 +120,11 @@ class DocumentSync(BaseSyncUtils): lock_key = self._get_lock_key(entity_id) self._release_redis_lock(lock_key) - async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]: + async def should_sync_to_xai( + self, + document: Dict[str, Any], + entity_type: str = 'CDokumente' + ) -> Tuple[bool, List[str], str]: """ Entscheidet ob ein Document zu xAI synchronisiert werden muss @@ -158,6 +162,16 @@ class DocumentSync(BaseSyncUtils): self._log(f" MD5: {file_md5[:16] if file_md5 else 'N/A'}...") self._log(f" SHA: {file_sha[:16] if file_sha else 'N/A'}...") self._log(f" xaiSyncedHash: {xai_synced_hash[:16] if xai_synced_hash else 'N/A'}...") + + # Determine target collections from relations (CDokumente -> linked entities) + target_collections = await self._get_required_collections_from_relations( + doc_id, + entity_type=entity_type + ) + + if not target_collections: + self._log("⏭️ Kein xAI-Sync nötig: Keine Related Entities mit xAI Collections") + return (False, [], "Keine verknüpften Entities mit xAI Collections") # ═══════════════════════════════════════════════════════════════ # PRIORITY CHECK: Datei-Status "Neu" oder "Geändert" @@ -165,12 +179,6 @@ class DocumentSync(BaseSyncUtils): if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']: self._log(f"🆕 Datei-Status: '{datei_status}' → xAI-Sync ERFORDERLICH") - # Hole Collections (entweder existierende oder von Related Entities) - if xai_collections: - target_collections = xai_collections - else: - target_collections = await self._get_required_collections_from_relations(doc_id) - if target_collections: return (True, target_collections, f"Datei-Status: {datei_status}") else: @@ -181,8 +189,8 @@ class DocumentSync(BaseSyncUtils): # ═══════════════════════════════════════════════════════════════ # FALL 1: Document ist bereits in xAI UND Collections sind gesetzt # ═══════════════════════════════════════════════════════════════ - if xai_file_id and xai_collections: - self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)") + if xai_file_id: + self._log(f"✅ Document bereits in xAI gesynct mit {len(target_collections)} Collection(s)") # Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich) current_hash = file_md5 or file_sha @@ -192,38 +200,28 @@ class DocumentSync(BaseSyncUtils): self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich") self._log(f" Alt: {xai_synced_hash[:16]}...") self._log(f" Neu: {current_hash[:16]}...") - return (True, xai_collections, "File-Inhalt geändert (Hash-Mismatch)") + return (True, target_collections, "File-Inhalt geändert (Hash-Mismatch)") else: self._log(f"✅ Hash identisch - keine Änderung") else: self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich") - return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt") + return (False, target_collections, "Bereits gesynct, keine Änderung erkannt") # ═══════════════════════════════════════════════════════════════ # FALL 2: Document hat xaiFileId aber Collections ist leer/None # ═══════════════════════════════════════════════════════════════ - if xai_file_id and not xai_collections: - self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities") - - # Fallthrough zu FALL 3 - prüfe Related Entities - # ═══════════════════════════════════════════════════════════════ - # FALL 3: Prüfe Related Entities (Attachments) + # FALL 3: Collections vorhanden aber kein Status/Hash-Trigger # ═══════════════════════════════════════════════════════════════ - required_collections = await self._get_required_collections_from_relations(doc_id) - - if required_collections: - self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben") - return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen") - - # ═══════════════════════════════════════════════════════════════ - # FALL 4: Keine Collections gefunden → kein Sync nötig - # ═══════════════════════════════════════════════════════════════ - self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections") - return (False, [], "Keine verknüpften Entities mit xAI Collections") + self._log(f"✅ Document ist mit {len(target_collections)} Entity/ies verknüpft die Collections haben") + return (True, target_collections, "Verknüpft mit Entities die Collections benötigen") - async def _get_required_collections_from_relations(self, document_id: str) -> List[str]: + async def _get_required_collections_from_relations( + self, + document_id: str, + entity_type: str = 'Document' + ) -> List[str]: """ Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind @@ -238,45 +236,63 @@ class DocumentSync(BaseSyncUtils): """ collections = set() - # Liste von Entity-Types die xAI Collections haben können - # NOTE: Erweiterbar für andere Entities - entity_types_with_collections = [ - 'CBeteiligte', - 'Account', - 'CVmhErstgespraech', - # Weitere Entity-Types hier hinzufügen - ] - - self._log(f"🔍 Prüfe Attachments von Document {document_id}...") - - for entity_type in entity_types_with_collections: + self._log(f"🔍 Prüfe Relations von {entity_type} {document_id}...") + + try: + entity_def = await self.espocrm.get_entity_def(entity_type) + links = entity_def.get('links', {}) if isinstance(entity_def, dict) else {} + except Exception as e: + self._log(f"⚠️ Konnte Metadata fuer {entity_type} nicht laden: {e}", level='warn') + links = {} + + link_types = {'hasMany', 'hasChildren', 'manyMany', 'hasManyThrough'} + + for link_name, link_def in links.items(): try: - # Finde alle Entities dieses Typs die dieses Document attached haben - # EspoCRM API: Suche wo documentsIds das Document enthält - result = await self.espocrm.list_entities( - entity_type, - where=[{ - 'type': 'arrayAnyOf', - 'attribute': 'documentsIds', - 'value': [document_id] - }], - select='id,name,xaiCollectionId', - max_size=100 - ) - - entities = result.get('list', []) - - if entities: - self._log(f" ✅ {len(entities)} {entity_type}(s) gefunden") - + if not isinstance(link_def, dict): + continue + if link_def.get('type') not in link_types: + continue + + related_entity = link_def.get('entity') + if not related_entity: + continue + + related_def = await self.espocrm.get_entity_def(related_entity) + related_fields = related_def.get('fields', {}) if isinstance(related_def, dict) else {} + + select_fields = ['id'] + if 'xaiCollectionId' in related_fields: + select_fields.append('xaiCollectionId') + + offset = 0 + page_size = 100 + + while True: + result = await self.espocrm.list_related( + entity_type, + document_id, + link_name, + select=','.join(select_fields), + offset=offset, + max_size=page_size + ) + + entities = result.get('list', []) + if not entities: + break + for entity in entities: collection_id = entity.get('xaiCollectionId') if collection_id: collections.add(collection_id) - self._log(f" → {entity.get('name')}: Collection {collection_id[:16]}...") - + + if len(entities) < page_size: + break + offset += page_size + except Exception as e: - self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn') + self._log(f" ⚠️ Fehler beim Prüfen von Link {link_name}: {e}", level='warn') continue result = list(collections) diff --git a/services/espocrm.py b/services/espocrm.py index 4794963..001d7f0 100644 --- a/services/espocrm.py +++ b/services/espocrm.py @@ -2,6 +2,7 @@ import aiohttp import asyncio import logging +import time from typing import Optional, Dict, Any, List import os @@ -56,6 +57,8 @@ class EspoCRMAPI: self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}") self._session: Optional[aiohttp.ClientSession] = None + self._entity_defs_cache: Dict[str, Dict[str, Any]] = {} + self._entity_defs_cache_ttl_seconds = int(os.getenv('ESPOCRM_METADATA_TTL_SECONDS', '300')) # Optional Redis for caching/rate limiting (centralized) self.redis_client = get_redis_client(strict=False) @@ -81,6 +84,21 @@ class EspoCRMAPI: if self._session and not self._session.closed: await self._session.close() + async def get_entity_def(self, entity_type: str) -> Dict[str, Any]: + now = time.monotonic() + cached = self._entity_defs_cache.get(entity_type) + if cached and (now - cached['ts']) < self._entity_defs_cache_ttl_seconds: + return cached['data'] + + try: + data = await self.api_call(f"/Metadata/EntityDefs/{entity_type}", method='GET') + except EspoCRMAPIError: + all_defs = await self.api_call("/Metadata/EntityDefs", method='GET') + data = all_defs.get(entity_type, {}) if isinstance(all_defs, dict) else {} + + self._entity_defs_cache[entity_type] = {'ts': now, 'data': data} + return data + async def api_call( self, endpoint: str, @@ -228,6 +246,36 @@ class EspoCRMAPI: self._log(f"Listing {entity_type} entities") return await self.api_call(f"/{entity_type}", method='GET', params=params) + async def list_related( + self, + entity_type: str, + entity_id: str, + link: str, + where: Optional[List[Dict]] = None, + select: Optional[str] = None, + order_by: Optional[str] = None, + order: Optional[str] = None, + offset: int = 0, + max_size: int = 50 + ) -> Dict[str, Any]: + params = { + 'offset': offset, + 'maxSize': max_size + } + + if where: + import json + params['where'] = where if isinstance(where, str) else json.dumps(where) + if select: + params['select'] = select + if order_by: + params['orderBy'] = order_by + if order: + params['order'] = order + + self._log(f"Listing related {entity_type}/{entity_id}/{link}") + return await self.api_call(f"/{entity_type}/{entity_id}/{link}", method='GET', params=params) + async def create_entity( self, entity_type: str,