feat(espocrm): add caching for entity definitions and implement related entity listing
This commit is contained in:
@@ -120,7 +120,11 @@ class DocumentSync(BaseSyncUtils):
|
||||
lock_key = self._get_lock_key(entity_id)
|
||||
self._release_redis_lock(lock_key)
|
||||
|
||||
async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]:
|
||||
async def should_sync_to_xai(
|
||||
self,
|
||||
document: Dict[str, Any],
|
||||
entity_type: str = 'CDokumente'
|
||||
) -> Tuple[bool, List[str], str]:
|
||||
"""
|
||||
Entscheidet ob ein Document zu xAI synchronisiert werden muss
|
||||
|
||||
@@ -159,18 +163,22 @@ class DocumentSync(BaseSyncUtils):
|
||||
self._log(f" SHA: {file_sha[:16] if file_sha else 'N/A'}...")
|
||||
self._log(f" xaiSyncedHash: {xai_synced_hash[:16] if xai_synced_hash else 'N/A'}...")
|
||||
|
||||
# Determine target collections from relations (CDokumente -> linked entities)
|
||||
target_collections = await self._get_required_collections_from_relations(
|
||||
doc_id,
|
||||
entity_type=entity_type
|
||||
)
|
||||
|
||||
if not target_collections:
|
||||
self._log("⏭️ Kein xAI-Sync nötig: Keine Related Entities mit xAI Collections")
|
||||
return (False, [], "Keine verknüpften Entities mit xAI Collections")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PRIORITY CHECK: Datei-Status "Neu" oder "Geändert"
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']:
|
||||
self._log(f"🆕 Datei-Status: '{datei_status}' → xAI-Sync ERFORDERLICH")
|
||||
|
||||
# Hole Collections (entweder existierende oder von Related Entities)
|
||||
if xai_collections:
|
||||
target_collections = xai_collections
|
||||
else:
|
||||
target_collections = await self._get_required_collections_from_relations(doc_id)
|
||||
|
||||
if target_collections:
|
||||
return (True, target_collections, f"Datei-Status: {datei_status}")
|
||||
else:
|
||||
@@ -181,8 +189,8 @@ class DocumentSync(BaseSyncUtils):
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 1: Document ist bereits in xAI UND Collections sind gesetzt
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if xai_file_id and xai_collections:
|
||||
self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)")
|
||||
if xai_file_id:
|
||||
self._log(f"✅ Document bereits in xAI gesynct mit {len(target_collections)} Collection(s)")
|
||||
|
||||
# Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich)
|
||||
current_hash = file_md5 or file_sha
|
||||
@@ -192,38 +200,28 @@ class DocumentSync(BaseSyncUtils):
|
||||
self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich")
|
||||
self._log(f" Alt: {xai_synced_hash[:16]}...")
|
||||
self._log(f" Neu: {current_hash[:16]}...")
|
||||
return (True, xai_collections, "File-Inhalt geändert (Hash-Mismatch)")
|
||||
return (True, target_collections, "File-Inhalt geändert (Hash-Mismatch)")
|
||||
else:
|
||||
self._log(f"✅ Hash identisch - keine Änderung")
|
||||
else:
|
||||
self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich")
|
||||
|
||||
return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt")
|
||||
return (False, target_collections, "Bereits gesynct, keine Änderung erkannt")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 2: Document hat xaiFileId aber Collections ist leer/None
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if xai_file_id and not xai_collections:
|
||||
self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities")
|
||||
|
||||
# Fallthrough zu FALL 3 - prüfe Related Entities
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 3: Prüfe Related Entities (Attachments)
|
||||
# FALL 3: Collections vorhanden aber kein Status/Hash-Trigger
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
required_collections = await self._get_required_collections_from_relations(doc_id)
|
||||
self._log(f"✅ Document ist mit {len(target_collections)} Entity/ies verknüpft die Collections haben")
|
||||
return (True, target_collections, "Verknüpft mit Entities die Collections benötigen")
|
||||
|
||||
if required_collections:
|
||||
self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben")
|
||||
return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 4: Keine Collections gefunden → kein Sync nötig
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections")
|
||||
return (False, [], "Keine verknüpften Entities mit xAI Collections")
|
||||
|
||||
async def _get_required_collections_from_relations(self, document_id: str) -> List[str]:
|
||||
async def _get_required_collections_from_relations(
|
||||
self,
|
||||
document_id: str,
|
||||
entity_type: str = 'Document'
|
||||
) -> List[str]:
|
||||
"""
|
||||
Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind
|
||||
|
||||
@@ -238,45 +236,63 @@ class DocumentSync(BaseSyncUtils):
|
||||
"""
|
||||
collections = set()
|
||||
|
||||
# Liste von Entity-Types die xAI Collections haben können
|
||||
# NOTE: Erweiterbar für andere Entities
|
||||
entity_types_with_collections = [
|
||||
'CBeteiligte',
|
||||
'Account',
|
||||
'CVmhErstgespraech',
|
||||
# Weitere Entity-Types hier hinzufügen
|
||||
]
|
||||
self._log(f"🔍 Prüfe Relations von {entity_type} {document_id}...")
|
||||
|
||||
self._log(f"🔍 Prüfe Attachments von Document {document_id}...")
|
||||
|
||||
for entity_type in entity_types_with_collections:
|
||||
try:
|
||||
# Finde alle Entities dieses Typs die dieses Document attached haben
|
||||
# EspoCRM API: Suche wo documentsIds das Document enthält
|
||||
result = await self.espocrm.list_entities(
|
||||
entity_def = await self.espocrm.get_entity_def(entity_type)
|
||||
links = entity_def.get('links', {}) if isinstance(entity_def, dict) else {}
|
||||
except Exception as e:
|
||||
self._log(f"⚠️ Konnte Metadata fuer {entity_type} nicht laden: {e}", level='warn')
|
||||
links = {}
|
||||
|
||||
link_types = {'hasMany', 'hasChildren', 'manyMany', 'hasManyThrough'}
|
||||
|
||||
for link_name, link_def in links.items():
|
||||
try:
|
||||
if not isinstance(link_def, dict):
|
||||
continue
|
||||
if link_def.get('type') not in link_types:
|
||||
continue
|
||||
|
||||
related_entity = link_def.get('entity')
|
||||
if not related_entity:
|
||||
continue
|
||||
|
||||
related_def = await self.espocrm.get_entity_def(related_entity)
|
||||
related_fields = related_def.get('fields', {}) if isinstance(related_def, dict) else {}
|
||||
|
||||
select_fields = ['id']
|
||||
if 'xaiCollectionId' in related_fields:
|
||||
select_fields.append('xaiCollectionId')
|
||||
|
||||
offset = 0
|
||||
page_size = 100
|
||||
|
||||
while True:
|
||||
result = await self.espocrm.list_related(
|
||||
entity_type,
|
||||
where=[{
|
||||
'type': 'arrayAnyOf',
|
||||
'attribute': 'documentsIds',
|
||||
'value': [document_id]
|
||||
}],
|
||||
select='id,name,xaiCollectionId',
|
||||
max_size=100
|
||||
document_id,
|
||||
link_name,
|
||||
select=','.join(select_fields),
|
||||
offset=offset,
|
||||
max_size=page_size
|
||||
)
|
||||
|
||||
entities = result.get('list', [])
|
||||
|
||||
if entities:
|
||||
self._log(f" ✅ {len(entities)} {entity_type}(s) gefunden")
|
||||
if not entities:
|
||||
break
|
||||
|
||||
for entity in entities:
|
||||
collection_id = entity.get('xaiCollectionId')
|
||||
if collection_id:
|
||||
collections.add(collection_id)
|
||||
self._log(f" → {entity.get('name')}: Collection {collection_id[:16]}...")
|
||||
|
||||
if len(entities) < page_size:
|
||||
break
|
||||
offset += page_size
|
||||
|
||||
except Exception as e:
|
||||
self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn')
|
||||
self._log(f" ⚠️ Fehler beim Prüfen von Link {link_name}: {e}", level='warn')
|
||||
continue
|
||||
|
||||
result = list(collections)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional, Dict, Any, List
|
||||
import os
|
||||
|
||||
@@ -56,6 +57,8 @@ class EspoCRMAPI:
|
||||
self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
||||
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
self._entity_defs_cache: Dict[str, Dict[str, Any]] = {}
|
||||
self._entity_defs_cache_ttl_seconds = int(os.getenv('ESPOCRM_METADATA_TTL_SECONDS', '300'))
|
||||
|
||||
# Optional Redis for caching/rate limiting (centralized)
|
||||
self.redis_client = get_redis_client(strict=False)
|
||||
@@ -81,6 +84,21 @@ class EspoCRMAPI:
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
async def get_entity_def(self, entity_type: str) -> Dict[str, Any]:
|
||||
now = time.monotonic()
|
||||
cached = self._entity_defs_cache.get(entity_type)
|
||||
if cached and (now - cached['ts']) < self._entity_defs_cache_ttl_seconds:
|
||||
return cached['data']
|
||||
|
||||
try:
|
||||
data = await self.api_call(f"/Metadata/EntityDefs/{entity_type}", method='GET')
|
||||
except EspoCRMAPIError:
|
||||
all_defs = await self.api_call("/Metadata/EntityDefs", method='GET')
|
||||
data = all_defs.get(entity_type, {}) if isinstance(all_defs, dict) else {}
|
||||
|
||||
self._entity_defs_cache[entity_type] = {'ts': now, 'data': data}
|
||||
return data
|
||||
|
||||
async def api_call(
|
||||
self,
|
||||
endpoint: str,
|
||||
@@ -228,6 +246,36 @@ class EspoCRMAPI:
|
||||
self._log(f"Listing {entity_type} entities")
|
||||
return await self.api_call(f"/{entity_type}", method='GET', params=params)
|
||||
|
||||
async def list_related(
|
||||
self,
|
||||
entity_type: str,
|
||||
entity_id: str,
|
||||
link: str,
|
||||
where: Optional[List[Dict]] = None,
|
||||
select: Optional[str] = None,
|
||||
order_by: Optional[str] = None,
|
||||
order: Optional[str] = None,
|
||||
offset: int = 0,
|
||||
max_size: int = 50
|
||||
) -> Dict[str, Any]:
|
||||
params = {
|
||||
'offset': offset,
|
||||
'maxSize': max_size
|
||||
}
|
||||
|
||||
if where:
|
||||
import json
|
||||
params['where'] = where if isinstance(where, str) else json.dumps(where)
|
||||
if select:
|
||||
params['select'] = select
|
||||
if order_by:
|
||||
params['orderBy'] = order_by
|
||||
if order:
|
||||
params['order'] = order
|
||||
|
||||
self._log(f"Listing related {entity_type}/{entity_id}/{link}")
|
||||
return await self.api_call(f"/{entity_type}/{entity_id}/{link}", method='GET', params=params)
|
||||
|
||||
async def create_entity(
|
||||
self,
|
||||
entity_type: str,
|
||||
|
||||
Reference in New Issue
Block a user