diff --git a/pyproject.toml b/pyproject.toml index 4b6bdf6..cafcd1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,8 @@ dependencies = [ "asyncpg>=0.29.0", # PostgreSQL async driver for calendar sync "google-api-python-client>=2.100.0", # Google Calendar API "google-auth>=2.23.0", # Google OAuth2 - "backoff>=2.2.1", # Retry/backoff decorator + "backoff>=2.2.1", + "ragflow-sdk>=0.24.0", # RAGFlow AI Provider "langchain>=0.3.0", # LangChain framework "langchain-xai>=0.2.0", # xAI integration for LangChain "langchain-core>=0.3.0", # LangChain core diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py new file mode 100644 index 0000000..66dd95c --- /dev/null +++ b/services/aiknowledge_sync_utils.py @@ -0,0 +1,557 @@ +""" +AI Knowledge Sync Utilities + +Provider-agnostische Sync-Logik fuer CAIKnowledge Entities. + +Unterstuetzt: + - aiProvider = "xai" → XAIService (Collections API) + - aiProvider = "ragflow" → RAGFlowService (Dataset API) + +Lifecycle: + new → Dataset/Collection erstellen → active + active → Dokumente syncen + paused → kein Sync, kein Delete + deactivated → Dataset/Collection loeschen, Junction zuruecksetzen + +Change Detection: + - Blake3-Hash in EspoCRM (CDokumente.blake3hash) als primaere Quelle + - Bei xAI: blake3hash == xaiBlake3Hash → kein Re-Upload + - Bei RAGFlow: blake3hash == junction.syncedHash → kein Re-Upload + metadata-hash → nur meta_fields update noetig +""" + +from __future__ import annotations + +import hashlib +import mimetypes +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import unquote + +import pytz + +from services.models import ( + AIKnowledgeActivationStatus, + AIKnowledgeSyncStatus, + JunctionSyncStatus, +) +from services.sync_utils_base import BaseSyncUtils +from services.config import SYNC_CONFIG + + +def _compute_metadata_hash(doc: Dict[str, Any]) -> str: + """Berechnet einen deterministischen Hash der Dokument-Metadaten.""" + parts = [ + doc.get('name', ''), + doc.get('beschreibung', '') or '', + doc.get('advowareArt', '') or '', + doc.get('advowareBemerkung', '') or '', + ] + return hashlib.md5('|'.join(parts).encode()).hexdigest() + + +class AIKnowledgeSyncUtils(BaseSyncUtils): + """ + Provider-agnostische Sync-Utilities fuer CAIKnowledge. + + Verwendung im Step: + sync = AIKnowledgeSyncUtils(espocrm, redis_client, ctx) + await sync.run_sync(knowledge_id) + """ + + def _get_lock_key(self, entity_id: str) -> str: + return f"sync_lock:aiknowledge:{entity_id}" + + async def acquire_sync_lock(self, entity_id: str, **kwargs) -> bool: + """Redis-Lock + syncStatus → pending_sync.""" + lock_key = self._get_lock_key(entity_id) + if not self._acquire_redis_lock(lock_key): + self._log(f"⏸️ Lock aktiv fuer CAIKnowledge {entity_id}", level='warn') + return False + try: + await self.espocrm.update_entity('CAIKnowledge', entity_id, { + 'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value + }) + except Exception as e: + self._log(f"syncStatus update fehlgeschlagen: {e}", level='debug') + return True + + async def release_sync_lock( + self, + entity_id: str, + success: bool = True, + error_message: Optional[str] = None, + extra_fields: Optional[Dict[str, Any]] = None, + **kwargs, + ) -> None: + """Redis-Lock freigeben + finaler syncStatus.""" + now = self._get_espocrm_datetime() + update: Dict[str, Any] = { + 'syncStatus': ( + AIKnowledgeSyncStatus.SYNCED.value + if success + else AIKnowledgeSyncStatus.FAILED.value + ), + 'lastSync': now, + } + if error_message: + update['syncError'] = error_message[:2000] + else: + update['syncError'] = None + if extra_fields: + update.update(extra_fields) + try: + await self.espocrm.update_entity('CAIKnowledge', entity_id, update) + except Exception as e: + self._log(f"release_sync_lock update failed: {e}", level='error') + finally: + self._release_redis_lock(self._get_lock_key(entity_id)) + + # ========================================================= + # Main Entry Point + # ========================================================= + + async def run_sync(self, knowledge_id: str) -> None: + """ + Vollstaendiger Sync-Durchlauf fuer eine CAIKnowledge-Entity. + + 1. Entity laden + Lock acquieren + 2. Provider bestimmen (xai / ragflow) + 3. Lifecycle-Action: + - new → Dataset/Collection erstellen + - active → Dokumente syncen + - paused → nichts tun + - deactivated → Dataset/Collection loeschen + """ + self._log("=" * 70) + self._log(f"🔄 AI KNOWLEDGE SYNC START: {knowledge_id}") + self._log("=" * 70) + + # 1. Entity laden + try: + entity = await self.espocrm.get_entity('CAIKnowledge', knowledge_id) + except Exception as e: + self._log(f"❌ Entity laden fehlgeschlagen: {e}", level='error') + return + + name = entity.get('name', knowledge_id) + activation = entity.get('activationStatus', AIKnowledgeActivationStatus.NEW.value) + sync_status = entity.get('syncStatus', AIKnowledgeSyncStatus.UNCLEAN.value) + provider = entity.get('aiProvider', 'xai') + dataset_id = entity.get('datenbankId') + + self._log(f"📋 Entity: {name}") + self._log(f" activationStatus: {activation}") + self._log(f" syncStatus : {sync_status}") + self._log(f" aiProvider : {provider}") + self._log(f" datenbankId : {dataset_id or 'N/A'}") + + # Pausiert → nichts tun + if activation == AIKnowledgeActivationStatus.PAUSED.value: + self._log("⏸️ PAUSED – kein Sync") + await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'syncStatus': AIKnowledgeSyncStatus.SYNCED.value + }) + return + + # Lock acquieren + acquired = await self.acquire_sync_lock(knowledge_id) + if not acquired: + return + + try: + # 2. Provider-Service instanziieren + ai = self._build_provider(provider) + + # 3. Lifecycle + if activation == AIKnowledgeActivationStatus.NEW.value: + dataset_id = await self._handle_new(knowledge_id, entity, ai) + if not dataset_id: + await self.release_sync_lock( + knowledge_id, success=False, + error_message="Dataset/Collection erstellen fehlgeschlagen" + ) + return + # Status wechselt zu active → nochmal syncen + activation = AIKnowledgeActivationStatus.ACTIVE.value + + if activation == AIKnowledgeActivationStatus.ACTIVE.value: + error = await self._handle_active(knowledge_id, entity, dataset_id, ai, provider) + if error: + await self.release_sync_lock(knowledge_id, success=False, error_message=error) + return + + elif activation == AIKnowledgeActivationStatus.DEACTIVATED.value: + await self._handle_deactivated(knowledge_id, entity, dataset_id, ai) + + await self.release_sync_lock(knowledge_id, success=True) + self._log(f"✅ AI KNOWLEDGE SYNC DONE: {name}") + + except Exception as e: + self._log(f"❌ Unerwarteter Fehler: {e}", level='error') + await self.release_sync_lock(knowledge_id, success=False, error_message=str(e)) + + # ========================================================= + # Lifecycle: NEW + # ========================================================= + + async def _handle_new( + self, + knowledge_id: str, + entity: Dict[str, Any], + ai, + ) -> Optional[str]: + """Erstellt Dataset/Collection und aktualisiert datenbankId + activationStatus.""" + name = entity.get('name', knowledge_id) + self._log(f"🆕 NEW → Dataset erstellen: {name}") + + try: + result = await ai.ensure_dataset(name=name, description=f"CAIKnowledge: {name}") + dataset_id = result['id'] + await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'datenbankId': dataset_id, + 'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value, + 'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value, + }) + self._log(f"✅ Dataset erstellt: {dataset_id} → activationStatus=active") + return dataset_id + except Exception as e: + self._log(f"❌ Dataset erstellen fehlgeschlagen: {e}", level='error') + return None + + # ========================================================= + # Lifecycle: ACTIVE + # ========================================================= + + async def _handle_active( + self, + knowledge_id: str, + entity: Dict[str, Any], + dataset_id: str, + ai, + provider: str, + ) -> Optional[str]: + """ + Synchronisiert alle verknuepften Dokumente. + + Returns: + Fehlermeldung (str) oder None bei Erfolg. + """ + if not dataset_id: + return "datenbankId fehlt – Dataset wurde noch nicht erstellt" + + # Alle verknuepften Dokumente laden (Junction: CAIKnowledgeCDokumente) + self._log(f"📋 Lade verknuepfte Dokumente fuer {knowledge_id}…") + try: + junction_entries = await self.espocrm.list_related_all( + 'CAIKnowledge', knowledge_id, 'dokumentes' + ) + except Exception as e: + return f"Junction laden fehlgeschlagen: {e}" + + self._log(f" {len(junction_entries)} Dokument(e) verknuepft") + + # Remotedokumente holen (fuer Orphan-Detection) + try: + remote_docs = await ai.list_documents(dataset_id) + remote_ids: set = {d['id'] for d in remote_docs if d.get('id')} + except Exception: + remote_ids = set() + + synced_remote_ids: set = set() + has_error = False + + for junction in junction_entries: + doc_id = junction.get('id') + ok = await self._sync_single_document( + doc_id, junction, knowledge_id, dataset_id, ai, provider, synced_remote_ids + ) + if not ok: + has_error = True + + # Orphan-Detection: remote Dokumente ohne Junction → loeschen + orphans = remote_ids - synced_remote_ids + if orphans: + self._log(f"🧹 Orphan-Cleanup: {len(orphans)} Dokument(e) ohne Junction") + for orphan_id in orphans: + try: + await ai.remove_document(dataset_id, orphan_id) + self._log(f" 🗑️ Orphan geloescht: {orphan_id}") + except Exception as e: + self._log(f" ⚠️ Orphan cleanup fehlgeschlagen ({orphan_id}): {e}", level='warn') + + if has_error: + return "Einige Dokumente konnten nicht synchroisert werden (Partial Failure)" + return None + + async def _sync_single_document( + self, + doc_id: str, + junction: Dict[str, Any], + knowledge_id: str, + dataset_id: str, + ai, + provider: str, + synced_remote_ids: set, + ) -> bool: + """ + Synchronisiert ein einzelnes Dokument. + + Entscheidungslogik: + - junction.syncstatus == 'synced' UND Hash unveraendert UND + Metadata-Hash unveraendert → Skip + - junction.syncstatus in ('new', 'unclean', 'failed') → Upload/Update + - Kein Attachment → 'unsupported' + + Returns: + True bei Erfolg/Skip, False bei Fehler + """ + doc_name = junction.get('name', doc_id) + junction_status = junction.get('syncstatus', JunctionSyncStatus.NEW.value) + ai_doc_id = junction.get('aiDocumentId') # RAGFlow doc ID / xAI file ID + synced_hash = junction.get('syncedHash') + synced_meta_hash = junction.get('syncedMetadataHash') + blake3_hash = junction.get('blake3hash') # EspoCRM berechnet + + self._log(f"\n 📄 {doc_name} (junction_status={junction_status})") + + # Metadaten-Hash berechnen + current_meta_hash = _compute_metadata_hash(junction) + + # Skip-Pruefung: synced + Hash unveraendert + if junction_status == JunctionSyncStatus.SYNCED.value and ai_doc_id: + file_unchanged = (synced_hash and blake3_hash and synced_hash == blake3_hash) + meta_unchanged = (synced_meta_hash == current_meta_hash) + if file_unchanged and meta_unchanged: + self._log(f" ✅ Unveraendert – Skip") + synced_remote_ids.add(ai_doc_id) + return True + if file_unchanged and not meta_unchanged: + # Nur Metadaten aendern (kein Re-Upload) + return await self._update_metadata_only( + doc_id, junction, ai_doc_id, dataset_id, + current_meta_hash, ai, provider, synced_remote_ids + ) + + # Upload (neu oder geaendert) + return await self._upload_document( + doc_id, junction, knowledge_id, dataset_id, + current_meta_hash, blake3_hash, ai_doc_id, ai, provider, synced_remote_ids + ) + + async def _upload_document( + self, + doc_id: str, + junction: Dict[str, Any], + knowledge_id: str, + dataset_id: str, + current_meta_hash: str, + blake3_hash: Optional[str], + old_ai_doc_id: Optional[str], + ai, + provider: str, + synced_remote_ids: set, + ) -> bool: + doc_name = junction.get('name', doc_id) + attachment_id = junction.get('dokumentId') or junction.get('attachmentId') + + if not attachment_id: + self._log(f" ⚠️ Kein Attachment – unsupported") + await self._update_junction(doc_id, knowledge_id, { + 'syncstatus': JunctionSyncStatus.UNSUPPORTED.value, + 'lastSync': self._get_espocrm_datetime(), + }) + return True # kein Fehler, nur unsupported + + # MIME-Type ermitteln + filename = unquote(junction.get('dokumentName') or junction.get('name') or 'document.bin') + mime_type, _ = mimetypes.guess_type(filename) + if not mime_type: + mime_type = 'application/octet-stream' + + # MIME-Type Support pruefen + if not ai.is_mime_type_supported(mime_type): + self._log(f" ⚠️ MIME-Type nicht unterstuetzt: {mime_type} – unsupported") + await self._update_junction(doc_id, knowledge_id, { + 'syncstatus': JunctionSyncStatus.UNSUPPORTED.value, + 'lastSync': self._get_espocrm_datetime(), + }) + return True + + # Datei von EspoCRM herunterladen + try: + self._log(f" 📥 Downloading {filename} ({attachment_id})…") + file_content = await self.espocrm.download_attachment(attachment_id) + self._log(f" Downloaded {len(file_content)} bytes") + except Exception as e: + self._log(f" ❌ Download fehlgeschlagen: {e}", level='error') + await self._update_junction(doc_id, knowledge_id, { + 'syncstatus': JunctionSyncStatus.FAILED.value, + 'lastSync': self._get_espocrm_datetime(), + }) + return False + + # Altes Dokument im Provider loeschen (bei Update) + if old_ai_doc_id: + try: + await ai.remove_document(dataset_id, old_ai_doc_id) + self._log(f" 🗑️ Altes Dokument geloescht: {old_ai_doc_id}") + except Exception: + pass # Non-fatal + + # Upload + Metadaten + try: + self._log(f" 📤 Uploading zu {provider}…") + result = await ai.upload_document( + dataset_id=dataset_id, + file_content=file_content, + filename=filename, + mime_type=mime_type, + blake3_hash=blake3_hash, + espocrm_id=doc_id, + description=junction.get('beschreibung') or junction.get('description') or '', + advoware_art=junction.get('advowareArt') or '', + advoware_bemerkung=junction.get('advowareBemerkung') or '', + ) + new_ai_doc_id = result['id'] + self._log(f" ✅ Upload OK: {new_ai_doc_id}") + except Exception as e: + self._log(f" ❌ Upload fehlgeschlagen: {e}", level='error') + await self._update_junction(doc_id, knowledge_id, { + 'syncstatus': JunctionSyncStatus.FAILED.value, + 'lastSync': self._get_espocrm_datetime(), + }) + return False + + synced_remote_ids.add(new_ai_doc_id) + + # Junction aktualisieren + await self._update_junction(doc_id, knowledge_id, { + 'aiDocumentId': new_ai_doc_id, + 'syncstatus': JunctionSyncStatus.SYNCED.value, + 'syncedHash': blake3_hash or '', + 'syncedMetadataHash': current_meta_hash, + 'lastSync': self._get_espocrm_datetime(), + }) + return True + + async def _update_metadata_only( + self, + doc_id: str, + junction: Dict[str, Any], + ai_doc_id: str, + dataset_id: str, + current_meta_hash: str, + ai, + provider: str, + synced_remote_ids: set, + ) -> bool: + """Nur Metadaten aktualisieren (kein Re-Upload der Datei).""" + self._log(f" ✏️ Nur Metadaten aendern (kein Re-Upload)") + synced_remote_ids.add(ai_doc_id) + + try: + await ai.update_document_meta( + dataset_id=dataset_id, + doc_id=ai_doc_id, + description=junction.get('beschreibung') or junction.get('description') or '', + advoware_art=junction.get('advowareArt') or '', + advoware_bemerkung=junction.get('advowareBemerkung') or '', + ) + await self._update_junction(doc_id, junction.get('__knowledge_id__', ''), { + 'syncedMetadataHash': current_meta_hash, + 'lastSync': self._get_espocrm_datetime(), + }) + self._log(f" ✅ Metadaten aktualisiert") + return True + except Exception as e: + self._log(f" ⚠️ Metadaten-Update fehlgeschlagen: {e}", level='warn') + # Non-fatal: Datei ist noch synced, Metadaten kommen beim naechsten Sync + return True + + # ========================================================= + # Lifecycle: DEACTIVATED + # ========================================================= + + async def _handle_deactivated( + self, + knowledge_id: str, + entity: Dict[str, Any], + dataset_id: Optional[str], + ai, + ) -> None: + """Loescht Dataset/Collection und setzt alle Junctions zurueck.""" + self._log(f"🔴 DEACTIVATED → Dataset loeschen") + + if dataset_id: + try: + await ai.delete_dataset(dataset_id) + self._log(f"✅ Dataset geloescht: {dataset_id}") + except Exception as e: + self._log(f"⚠️ Dataset loeschen fehlgeschlagen: {e}", level='warn') + + # datenbankId leeren + await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'datenbankId': None, + }) + + # Alle Junction-Eintraege zuruecksetzen + try: + junctions = await self.espocrm.list_related_all( + 'CAIKnowledge', knowledge_id, 'dokumentes' + ) + for j in junctions: + await self._update_junction(j['id'], knowledge_id, { + 'aiDocumentId': None, + 'syncstatus': JunctionSyncStatus.NEW.value, + 'syncedHash': None, + 'syncedMetadataHash': None, + 'lastSync': None, + }) + self._log(f"✅ {len(junctions)} Junction(s) zurueckgesetzt") + except Exception as e: + self._log(f"⚠️ Junction-Reset fehlgeschlagen: {e}", level='warn') + + # ========================================================= + # Provider Factory + # ========================================================= + + def _build_provider(self, provider: str): + """ + Gibt den passenden AI-Provider-Service zurueck. + + xai → XAIProviderAdapter (wrapt XAIService auf Provider-Interface) + ragflow → RAGFlowService (implementiert Interface nativ) + """ + if provider == 'ragflow': + from services.ragflow_service import RAGFlowService + return RAGFlowService(ctx=self.context) + else: + # Default: xAI + from services.xai_upload_utils import XAIProviderAdapter + return XAIProviderAdapter(self.context) + + # ========================================================= + # EspoCRM Helpers + # ========================================================= + + async def _update_junction( + self, + doc_id: str, + knowledge_id: str, + fields: Dict[str, Any], + ) -> None: + """ + Aktualisiert einen CAIKnowledgeCDokumente Junction-Eintrag. + + EspoCRM junction additional columns werden ueber den Relationship- + Endpunkt aktualisiert: PUT /CAIKnowledge/{id}/dokumentes/{docId} + """ + try: + await self.espocrm.api_call( + f"/CAIKnowledge/{knowledge_id}/dokumentes/{doc_id}", + method='PUT', + json_data=fields, + ) + except Exception as e: + self._log(f"⚠️ Junction-Update fehlgeschlagen ({doc_id}): {e}", level='warn') diff --git a/services/config.py b/services/config.py index 8f04251..0fa5854 100644 --- a/services/config.py +++ b/services/config.py @@ -336,3 +336,52 @@ def is_retryable_status_code(status_code: int) -> bool: True wenn retryable """ return status_code in API_CONFIG.retry_status_codes + + +# ========== RAGFlow Configuration ========== + +@dataclass +class RAGFlowConfig: + """Konfiguration für RAGFlow AI Provider""" + + # Connection + base_url: str = "http://192.168.1.64:9380" + """RAGFlow Server URL""" + + # Defaults + default_chunk_method: str = "laws" + """Standard Chunk-Methode: 'laws' optimiert fuer Rechtsdokumente""" + + # Parsing + auto_keywords: int = 14 + """Anzahl automatisch generierter Keywords pro Chunk""" + + auto_questions: int = 7 + """Anzahl automatisch generierter Fragen pro Chunk""" + + parse_timeout_seconds: int = 120 + """Timeout beim Warten auf Document-Parsing""" + + parse_poll_interval: float = 3.0 + """Poll-Interval beim Warten auf Parsing (Sekunden)""" + + # Meta-Fields Keys + meta_blake3_key: str = "blake3_hash" + """Key für Blake3-Hash in meta_fields (Change Detection)""" + + meta_espocrm_id_key: str = "espocrm_id" + """Key für EspoCRM Document ID in meta_fields""" + + meta_description_key: str = "description" + """Key für Dokument-Beschreibung in meta_fields""" + + @classmethod + def from_env(cls) -> 'RAGFlowConfig': + """Lädt RAGFlow-Config aus Environment Variables""" + return cls( + base_url=os.getenv('RAGFLOW_BASE_URL', 'http://192.168.1.64:9380'), + parse_timeout_seconds=int(os.getenv('RAGFLOW_PARSE_TIMEOUT', '120')), + ) + + +RAGFLOW_CONFIG = RAGFlowConfig.from_env() diff --git a/services/ragflow_service.py b/services/ragflow_service.py new file mode 100644 index 0000000..2edb575 --- /dev/null +++ b/services/ragflow_service.py @@ -0,0 +1,511 @@ +"""RAGFlow Dataset & Document Service""" +import os +import asyncio +from functools import partial +from typing import Optional, List, Dict, Any +from services.logging_utils import get_service_logger + +RAGFLOW_DEFAULT_BASE_URL = "http://192.168.1.64:9380" + +# Defaults fuer Dokument-Analyse +RAGFLOW_AUTO_KEYWORDS = 14 +RAGFLOW_AUTO_QUESTIONS = 7 + + +def _base_to_dict(obj: Any) -> Any: + """ + Konvertiert ragflow_sdk.modules.base.Base rekursiv zu einem plain dict. + Filtert den internen 'rag'-Client-Key heraus. + """ + try: + from ragflow_sdk.modules.base import Base + if isinstance(obj, Base): + return {k: _base_to_dict(v) for k, v in vars(obj).items() if k != 'rag'} + except ImportError: + pass + if isinstance(obj, dict): + return {k: _base_to_dict(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_base_to_dict(i) for i in obj] + return obj + + +class RAGFlowService: + """ + Client fuer RAGFlow API via ragflow-sdk (Python SDK). + + Wrapt das synchrone SDK in asyncio.run_in_executor, sodass + es nahtlos in Motia-Steps (async) verwendet werden kann. + + Dataflow beim Upload: + upload_document() → + 1. upload_documents([{blob}]) # Datei hochladen + 2. doc.update({meta_fields}) # blake3 + advoware-Felder setzen + 3. async_parse_documents([id]) # Parsing starten (chunk_method=laws) + + Benoetigte Umgebungsvariablen: + - RAGFLOW_API_KEY – API Key + - RAGFLOW_BASE_URL – Optional, URL Override (Default: http://192.168.1.64:9380) + """ + + SUPPORTED_MIME_TYPES = { + 'application/pdf', + 'application/msword', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'application/vnd.ms-excel', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'application/vnd.oasis.opendocument.text', + 'application/epub+zip', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + 'text/plain', + 'text/html', + 'text/markdown', + 'text/csv', + 'text/xml', + 'application/json', + 'application/xml', + } + + def __init__(self, ctx=None): + self.api_key = os.getenv('RAGFLOW_API_KEY', '') + base_url_env = os.getenv('RAGFLOW_BASE_URL', '') + self.base_url = base_url_env or RAGFLOW_DEFAULT_BASE_URL + self.ctx = ctx + self.logger = get_service_logger('ragflow', ctx) + self._rag = None + + if not self.api_key: + raise ValueError("RAGFLOW_API_KEY not configured in environment") + + def _log(self, msg: str, level: str = 'info') -> None: + log_func = getattr(self.logger, level, self.logger.info) + log_func(msg) + + def _get_client(self): + """Gibt RAGFlow SDK Client zurueck (lazy init, sync).""" + if self._rag is None: + from ragflow_sdk import RAGFlow + self._rag = RAGFlow(api_key=self.api_key, base_url=self.base_url) + return self._rag + + async def _run(self, func, *args, **kwargs): + """Fuehrt synchrone SDK-Funktion in ThreadPoolExecutor aus.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, partial(func, *args, **kwargs)) + + # ========== Dataset Management ========== + + async def create_dataset( + self, + name: str, + chunk_method: str = 'laws', + embedding_model: Optional[str] = None, + description: Optional[str] = None, + auto_keywords: int = RAGFLOW_AUTO_KEYWORDS, + auto_questions: int = RAGFLOW_AUTO_QUESTIONS, + ) -> Dict: + """ + Erstellt ein neues RAGFlow Dataset (entspricht xAI Collection). + + Verwendet standardmaessig chunk_method='laws' (optimiert fuer Rechtsdokumente). + Setzt nach der Erstellung auto_keywords=14 und auto_questions=7. + + Returns: + dict mit 'id', 'name', 'chunk_method', 'parser_config', etc. + """ + self._log( + f"📚 Creating dataset: {name} " + f"(chunk_method={chunk_method}, keywords={auto_keywords}, questions={auto_questions})" + ) + + def _create(): + rag = self._get_client() + kwargs = dict(name=name, chunk_method=chunk_method) + if embedding_model: + kwargs['embedding_model'] = embedding_model + if description: + kwargs['description'] = description + dataset = rag.create_dataset(**kwargs) + + # parser_config kann erst nach create via update() gesetzt werden + dataset.update({ + 'parser_config': { + 'auto_keywords': auto_keywords, + 'auto_questions': auto_questions, + } + }) + return self._dataset_to_dict(dataset) + + result = await self._run(_create) + self._log(f"✅ Dataset created: {result.get('id')} ({name})") + return result + + async def get_dataset_by_name(self, name: str) -> Optional[Dict]: + """ + Sucht Dataset nach Name. Gibt None zurueck wenn nicht gefunden. + """ + def _find(): + rag = self._get_client() + # list_datasets(name=...) hat Permission-Bugs – lokal filtern + all_datasets = rag.list_datasets(page_size=100) + for ds in all_datasets: + if getattr(ds, 'name', None) == name: + return self._dataset_to_dict(ds) + return None + + result = await self._run(_find) + if result: + self._log(f"🔍 Dataset found: {result.get('id')} ({name})") + return result + + async def ensure_dataset( + self, + name: str, + chunk_method: str = 'laws', + embedding_model: Optional[str] = None, + description: Optional[str] = None, + auto_keywords: int = RAGFLOW_AUTO_KEYWORDS, + auto_questions: int = RAGFLOW_AUTO_QUESTIONS, + ) -> Dict: + """ + Gibt bestehendes Dataset zurueck oder erstellt ein neues (get-or-create). + Entspricht xAI create_collection mit idempotency. + + Returns: + dict mit 'id', 'name', etc. + """ + existing = await self.get_dataset_by_name(name) + if existing: + self._log(f"✅ Dataset exists: {existing.get('id')} ({name})") + return existing + return await self.create_dataset( + name=name, + chunk_method=chunk_method, + embedding_model=embedding_model, + description=description, + auto_keywords=auto_keywords, + auto_questions=auto_questions, + ) + + async def delete_dataset(self, dataset_id: str) -> None: + """ + Loescht ein Dataset inklusive aller Dokumente. + Entspricht xAI delete_collection. + """ + self._log(f"🗑️ Deleting dataset: {dataset_id}") + + def _delete(): + rag = self._get_client() + rag.delete_datasets(ids=[dataset_id]) + + await self._run(_delete) + self._log(f"✅ Dataset deleted: {dataset_id}") + + async def list_datasets(self) -> List[Dict]: + """Listet alle Datasets auf.""" + def _list(): + rag = self._get_client() + return [self._dataset_to_dict(d) for d in rag.list_datasets()] + + result = await self._run(_list) + self._log(f"📋 Listed {len(result)} datasets") + return result + + # ========== Document Management ========== + + async def upload_document( + self, + dataset_id: str, + file_content: bytes, + filename: str, + mime_type: str = 'application/octet-stream', + blake3_hash: Optional[str] = None, + espocrm_id: Optional[str] = None, + description: Optional[str] = None, + advoware_art: Optional[str] = None, + advoware_bemerkung: Optional[str] = None, + ) -> Dict: + """ + Laedt ein Dokument in ein Dataset hoch. + + Ablauf (3 Schritte): + 1. upload_documents() – Datei hochladen + 2. doc.update(meta_fields) – Metadaten setzen inkl. blake3_hash + 3. async_parse_documents() – Parsing mit chunk_method=laws starten + + Meta-Felder die gesetzt werden: + - blake3_hash (fuer Change Detection, entspricht xAI BLAKE3) + - espocrm_id (Rueckreferenz zu EspoCRM CDokument) + - description (Dokumentbeschreibung) + - advoware_art (Advoware Dokumenten-Art) + - advoware_bemerkung (Advoware Bemerkung/Notiz) + + Returns: + dict mit 'id', 'name', 'run', 'meta_fields', etc. + """ + if mime_type == 'application/octet-stream' and filename.lower().endswith('.pdf'): + mime_type = 'application/pdf' + + self._log( + f"📤 Uploading {len(file_content)} bytes to dataset {dataset_id}: " + f"{filename} ({mime_type})" + ) + + def _upload_and_tag(): + rag = self._get_client() + datasets = rag.list_datasets(id=dataset_id) + if not datasets: + raise RuntimeError(f"Dataset not found: {dataset_id}") + dataset = datasets[0] + + # Schritt 1: Upload + dataset.upload_documents([{ + 'display_name': filename, + 'blob': file_content, + }]) + + # Dokument-ID ermitteln (neuestes mit passendem Namen) + base_name = filename.split('/')[-1] + docs = dataset.list_documents(keywords=base_name, page_size=10) + doc = None + for d in docs: + if d.name == filename or d.name == base_name: + doc = d + break + if doc is None and docs: + doc = docs[0] # Fallback + if doc is None: + raise RuntimeError(f"Document not found after upload: {filename}") + + # Schritt 2: Meta-Fields setzen + meta: Dict[str, str] = {} + if blake3_hash: + meta['blake3_hash'] = blake3_hash + if espocrm_id: + meta['espocrm_id'] = espocrm_id + if description: + meta['description'] = description + if advoware_art: + meta['advoware_art'] = advoware_art + if advoware_bemerkung: + meta['advoware_bemerkung'] = advoware_bemerkung + + if meta: + doc.update({'meta_fields': meta}) + + # Schritt 3: Parsing starten + dataset.async_parse_documents([doc.id]) + + return self._document_to_dict(doc) + + result = await self._run(_upload_and_tag) + self._log( + f"✅ Document uploaded & parsing started: {result.get('id')} ({filename})" + ) + return result + + async def update_document_meta( + self, + dataset_id: str, + doc_id: str, + blake3_hash: Optional[str] = None, + description: Optional[str] = None, + advoware_art: Optional[str] = None, + advoware_bemerkung: Optional[str] = None, + ) -> None: + """ + Aktualisiert nur die Metadaten eines Dokuments (ohne Re-Upload). + Entspricht xAI PATCH-Metadata-Only. + Startet Parsing neu, da Chunk-Injection von meta_fields abhaengt. + """ + self._log(f"✏️ Updating metadata for document {doc_id}") + + def _update(): + rag = self._get_client() + datasets = rag.list_datasets(id=dataset_id) + if not datasets: + raise RuntimeError(f"Dataset not found: {dataset_id}") + dataset = datasets[0] + docs = dataset.list_documents(id=doc_id) + if not docs: + raise RuntimeError(f"Document not found: {doc_id}") + doc = docs[0] + + # Bestehende meta_fields lesen und mergen + existing_meta = _base_to_dict(doc.meta_fields) or {} + if blake3_hash is not None: + existing_meta['blake3_hash'] = blake3_hash + if description is not None: + existing_meta['description'] = description + if advoware_art is not None: + existing_meta['advoware_art'] = advoware_art + if advoware_bemerkung is not None: + existing_meta['advoware_bemerkung'] = advoware_bemerkung + + doc.update({'meta_fields': existing_meta}) + # Re-parsing noetig damit Chunks aktualisierte Metadata enthalten + dataset.async_parse_documents([doc.id]) + + await self._run(_update) + self._log(f"✅ Metadata updated and re-parsing started: {doc_id}") + + async def remove_document(self, dataset_id: str, doc_id: str) -> None: + """ + Loescht ein Dokument aus einem Dataset. + Entspricht xAI remove_from_collection. + """ + self._log(f"🗑️ Removing document {doc_id} from dataset {dataset_id}") + + def _delete(): + rag = self._get_client() + datasets = rag.list_datasets(id=dataset_id) + if not datasets: + raise RuntimeError(f"Dataset not found: {dataset_id}") + datasets[0].delete_documents(ids=[doc_id]) + + await self._run(_delete) + self._log(f"✅ Document removed: {doc_id}") + + async def list_documents(self, dataset_id: str) -> List[Dict]: + """ + Listet alle Dokumente in einem Dataset auf (paginiert). + Entspricht xAI list_collection_documents. + """ + self._log(f"📋 Listing documents in dataset {dataset_id}") + + def _list(): + rag = self._get_client() + datasets = rag.list_datasets(id=dataset_id) + if not datasets: + raise RuntimeError(f"Dataset not found: {dataset_id}") + dataset = datasets[0] + docs = [] + page = 1 + while True: + batch = dataset.list_documents(page=page, page_size=100) + if not batch: + break + docs.extend(batch) + if len(batch) < 100: + break + page += 1 + return [self._document_to_dict(d) for d in docs] + + result = await self._run(_list) + self._log(f"✅ Listed {len(result)} documents") + return result + + async def get_document(self, dataset_id: str, doc_id: str) -> Optional[Dict]: + """Holt ein einzelnes Dokument by ID. None wenn nicht gefunden.""" + def _get(): + rag = self._get_client() + datasets = rag.list_datasets(id=dataset_id) + if not datasets: + return None + docs = datasets[0].list_documents(id=doc_id) + if not docs: + return None + return self._document_to_dict(docs[0]) + + result = await self._run(_get) + if result: + self._log(f"📄 Document found: {result.get('name')} (run={result.get('run')})") + return result + + async def wait_for_parsing( + self, + dataset_id: str, + doc_id: str, + timeout_seconds: int = 120, + poll_interval: float = 3.0, + ) -> Dict: + """ + Wartet bis das Parsing eines Dokuments abgeschlossen ist. + + Returns: + Aktueller Dokument-State als dict. + + Raises: + TimeoutError: Wenn Parsing nicht innerhalb timeout_seconds fertig wird. + RuntimeError: Wenn Parsing fehlschlaegt. + """ + self._log(f"⏳ Waiting for parsing: {doc_id} (timeout={timeout_seconds}s)") + elapsed = 0.0 + + while elapsed < timeout_seconds: + doc = await self.get_document(dataset_id, doc_id) + if doc is None: + raise RuntimeError(f"Document disappeared during parsing: {doc_id}") + + run_status = doc.get('run', 'UNSTART') + if run_status == 'DONE': + self._log( + f"✅ Parsing done: {doc_id} " + f"(chunks={doc.get('chunk_count')}, tokens={doc.get('token_count')})" + ) + return doc + elif run_status in ('FAIL', 'CANCEL'): + raise RuntimeError( + f"Parsing failed for {doc_id}: status={run_status}, " + f"msg={doc.get('progress_msg', '')}" + ) + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + raise TimeoutError( + f"Parsing timeout after {timeout_seconds}s for document {doc_id}" + ) + + # ========== MIME Type Support ========== + + def is_mime_type_supported(self, mime_type: str) -> bool: + """Prueft ob RAGFlow diesen MIME-Type verarbeiten kann.""" + return mime_type.lower().strip() in self.SUPPORTED_MIME_TYPES + + # ========== Internal Helpers ========== + + def _dataset_to_dict(self, dataset) -> Dict: + """Konvertiert RAGFlow DataSet Objekt zu dict (inkl. parser_config unwrap).""" + return { + 'id': getattr(dataset, 'id', None), + 'name': getattr(dataset, 'name', None), + 'chunk_method': getattr(dataset, 'chunk_method', None), + 'embedding_model': getattr(dataset, 'embedding_model', None), + 'description': getattr(dataset, 'description', None), + 'chunk_count': getattr(dataset, 'chunk_count', 0), + 'document_count': getattr(dataset, 'document_count', 0), + 'parser_config': _base_to_dict(getattr(dataset, 'parser_config', {})), + } + + def _document_to_dict(self, doc) -> Dict: + """ + Konvertiert RAGFlow Document Objekt zu dict. + + meta_fields wird via _base_to_dict() zu einem plain dict unwrapped. + Enthaelt blake3_hash, espocrm_id, description, advoware_art, + advoware_bemerkung sofern gesetzt. + """ + raw_meta = getattr(doc, 'meta_fields', None) + meta_dict = _base_to_dict(raw_meta) if raw_meta is not None else {} + + return { + 'id': getattr(doc, 'id', None), + 'name': getattr(doc, 'name', None), + 'dataset_id': getattr(doc, 'dataset_id', None), + 'chunk_method': getattr(doc, 'chunk_method', None), + 'size': getattr(doc, 'size', 0), + 'token_count': getattr(doc, 'token_count', 0), + 'chunk_count': getattr(doc, 'chunk_count', 0), + 'run': getattr(doc, 'run', 'UNSTART'), + 'progress': getattr(doc, 'progress', 0.0), + 'progress_msg': getattr(doc, 'progress_msg', ''), + 'source_type': getattr(doc, 'source_type', 'local'), + 'created_by': getattr(doc, 'created_by', ''), + 'process_duration': getattr(doc, 'process_duration', 0.0), + # Metadaten (blake3_hash hier drin wenn gesetzt) + 'meta_fields': meta_dict, + 'blake3_hash': meta_dict.get('blake3_hash'), + 'espocrm_id': meta_dict.get('espocrm_id'), + 'parser_config': _base_to_dict(getattr(doc, 'parser_config', None)), + } diff --git a/services/xai_service.py b/services/xai_service.py index ea677f8..9e19352 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -552,3 +552,34 @@ class XAIService: normalized = mime_type.lower().strip() return normalized in supported_types + + async def get_collection_by_name(self, name: str) -> Optional[Dict]: + """ + Sucht eine Collection nach Name. + Ruft alle Collections auf (Management API listet sie auf). + + GET https://management-api.x.ai/v1/collections + + Returns: + Collection dict oder None wenn nicht gefunden. + """ + self._log(f"🔍 Looking up collection by name: {name}") + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.get(url, headers=headers) as response: + if response.status not in (200,): + raw = await response.text() + self._log(f"⚠️ list collections failed ({response.status}): {raw}", level='warn') + return None + data = await response.json() + + collections = data if isinstance(data, list) else data.get('collections', []) + for col in collections: + if col.get('collection_name') == name or col.get('name') == name: + self._log(f"✅ Collection found: {col.get('collection_id') or col.get('id')}") + return col + + self._log(f"⚠️ Collection not found by name: {name}", level='warn') + return None diff --git a/services/xai_upload_utils.py b/services/xai_upload_utils.py index a241e48..fe284bb 100644 --- a/services/xai_upload_utils.py +++ b/services/xai_upload_utils.py @@ -209,3 +209,106 @@ class XAIUploadUtils: }) except Exception as e: self._log.warn(f" ⚠️ Could not remove from xAI: {e}") + + +class XAIProviderAdapter: + """ + Adapter der XAIService auf das Provider-Interface bringt, + das AIKnowledgeSyncUtils erwartet. + + Interface (identisch mit RAGFlowService): + ensure_dataset(name, description) -> dict mit 'id' + list_documents(dataset_id) -> list[dict] mit 'id', 'name' + upload_document(dataset_id, file_content, filename, mime_type, + blake3_hash, espocrm_id, description, + advoware_art, advoware_bemerkung) -> dict mit 'id' + update_document_meta(dataset_id, doc_id, ...) -> None + remove_document(dataset_id, doc_id) -> None + delete_dataset(dataset_id) -> None + is_mime_type_supported(mime_type) -> bool + """ + + def __init__(self, ctx=None): + from services.xai_service import XAIService + from services.logging_utils import get_service_logger + self._xai = XAIService(ctx) + self._log = get_service_logger('xai_adapter', ctx) + + async def ensure_dataset(self, name: str, description: str = '') -> dict: + """Erstellt oder verifiziert eine xAI Collection. Gibt {'id': collection_id} zurueck.""" + existing = await self._xai.get_collection_by_name(name) + if existing: + col_id = existing.get('collection_id') or existing.get('id') + return {'id': col_id, 'name': name} + result = await self._xai.create_collection(name=name) + col_id = result.get('collection_id') or result.get('id') + return {'id': col_id, 'name': name} + + async def list_documents(self, dataset_id: str) -> list: + """Listet alle Dokumente in einer xAI Collection auf.""" + raw = await self._xai.list_collection_documents(dataset_id) + return [{'id': d.get('file_id'), 'name': d.get('filename')} for d in raw] + + async def upload_document( + self, + dataset_id: str, + file_content: bytes, + filename: str, + mime_type: str = 'application/octet-stream', + blake3_hash=None, + espocrm_id=None, + description=None, + advoware_art=None, + advoware_bemerkung=None, + ) -> dict: + """Laedt Dokument in xAI Collection mit Metadata-Fields.""" + fields_raw = { + 'document_name': filename, + 'espocrm_id': espocrm_id or '', + 'description': description or '', + 'advoware_art': advoware_art or '', + 'advoware_bemerkung': advoware_bemerkung or '', + } + if blake3_hash: + fields_raw['blake3_hash'] = blake3_hash + fields = {k: v for k, v in fields_raw.items() if v} + + file_id = await self._xai.upload_to_collection( + collection_id=dataset_id, + file_content=file_content, + filename=filename, + mime_type=mime_type, + fields=fields, + ) + return {'id': file_id, 'name': filename} + + async def update_document_meta( + self, + dataset_id: str, + doc_id: str, + blake3_hash=None, + description=None, + advoware_art=None, + advoware_bemerkung=None, + ) -> None: + """ + xAI unterstuetzt kein PATCH fuer Metadaten. + Re-Upload wird vom Caller gesteuert (via syncedMetadataHash Aenderung + fuehrt zum vollstaendigen Upload-Path). + Hier kein-op. + """ + self._log.warn( + "XAIProviderAdapter.update_document_meta: xAI unterstuetzt kein " + "Metadaten-PATCH – kein-op. Naechster Sync loest Re-Upload aus." + ) + + async def remove_document(self, dataset_id: str, doc_id: str) -> None: + """Loescht Dokument aus xAI Collection (Datei bleibt in xAI Files API).""" + await self._xai.remove_from_collection(dataset_id, doc_id) + + async def delete_dataset(self, dataset_id: str) -> None: + """Loescht xAI Collection.""" + await self._xai.delete_collection(dataset_id) + + def is_mime_type_supported(self, mime_type: str) -> bool: + return self._xai.is_mime_type_supported(mime_type) diff --git a/src/steps/crm/document/aiknowledge_daily_cron_step.py b/src/steps/crm/document/aiknowledge_daily_cron_step.py new file mode 100644 index 0000000..e608df6 --- /dev/null +++ b/src/steps/crm/document/aiknowledge_daily_cron_step.py @@ -0,0 +1,89 @@ +""" +AI Knowledge Daily Full Sync (Cron) + +Laueft taeglich um 02:00 Uhr. + +Laedt alle CAIKnowledge-Entities mit activationStatus='active' +und syncStatus IN ('unclean', 'failed') und stellt sicher, +dass sie synchroisiert sind. + +Emits aiknowledge.sync fuer jede betroffene Entity. +""" +from typing import Any +from motia import FlowContext, cron + +from services.espocrm import EspoCRMAPI +from services.logging_utils import get_step_logger + +config = { + "name": "AI Knowledge Daily Cron", + "description": "Taeglich: Vollsync aller unclean/failed CAIKnowledge Entities", + "flows": ["vmh-aiknowledge"], + "triggers": [ + cron("0 2 * * *"), # Taeglich 02:00 Uhr + ], + "enqueues": ["aiknowledge.sync"], +} + + +async def handler(event_data: Any, ctx: FlowContext[Any]) -> None: + """ + Cron-Handler: Enqueued aiknowledge.sync fuer alle die Sync brauchen. + """ + step_logger = get_step_logger('aiknowledge_cron', ctx) + + step_logger.info("=" * 70) + step_logger.info("⏰ AI KNOWLEDGE DAILY CRON START") + step_logger.info("=" * 70) + + espocrm = EspoCRMAPI(ctx) + + # Alle active KBs mit unclean oder failed Status + try: + result = await espocrm.list_entities( + 'CAIKnowledge', + where=[ + { + 'type': 'equals', + 'attribute': 'activationStatus', + 'value': 'active', + }, + { + 'type': 'in', + 'attribute': 'syncStatus', + 'value': ['unclean', 'failed'], + }, + ], + max_size=200, + ) + except Exception as e: + step_logger.error(f"❌ EspoCRM-Abfrage fehlgeschlagen: {e}") + return + + entities = result.get('list', []) + total = result.get('total', len(entities)) + step_logger.info(f"📋 {len(entities)}/{total} Entities brauchen Sync") + + enqueued = 0 + for entity in entities: + knowledge_id = entity.get('id') + name = entity.get('name', knowledge_id) + provider = entity.get('aiProvider', 'xai') + sync_status = entity.get('syncStatus', '?') + + if not knowledge_id: + continue + + step_logger.info(f" → Enqueue: {name} ({provider}, status={sync_status})") + await ctx.enqueue({ + 'topic': 'aiknowledge.sync', + 'data': { + 'knowledge_id': knowledge_id, + 'source': 'cron', + 'action': 'update', + }, + }) + enqueued += 1 + + step_logger.info(f"✅ {enqueued} Sync-Events enqueued") + step_logger.info("=" * 70) diff --git a/src/steps/crm/document/aiknowledge_sync_event_step.py b/src/steps/crm/document/aiknowledge_sync_event_step.py new file mode 100644 index 0000000..5ac0386 --- /dev/null +++ b/src/steps/crm/document/aiknowledge_sync_event_step.py @@ -0,0 +1,64 @@ +""" +AI Knowledge Sync Handler + +Verarbeitet aiknowledge.sync Events (Queue). + +Quellen: + - Webhook: EspoCRM CAIKnowledge.afterSave + - Cron: Taeglich 02:00 Uhr (Vollsync) + +Lifecycle: + new → Dataset/Collection erstellen (xAI oder RAGFlow) + active → Dokumente syncen (Change Detection via Blake3) + paused → Skip + deactivated → Dataset/Collection loeschen +""" +from typing import Any, Dict +from motia import FlowContext, queue + +from services.espocrm import EspoCRMAPI +from services.redis_client import get_redis_client +from services.aiknowledge_sync_utils import AIKnowledgeSyncUtils +from services.logging_utils import get_step_logger + +config = { + "name": "AI Knowledge Sync Handler", + "description": "Synchronisiert CAIKnowledge Entities mit xAI oder RAGFlow", + "flows": ["vmh-aiknowledge"], + "triggers": [ + queue("aiknowledge.sync"), + ], + "enqueues": [], +} + + +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: + """ + Zentraler Sync-Handler fuer CAIKnowledge. + + event_data: + knowledge_id (str) – EspoCRM CAIKnowledge ID + source (str) – 'webhook' | 'cron' + action (str) – 'create' | 'update' + """ + step_logger = get_step_logger('aiknowledge_sync', ctx) + + knowledge_id = event_data.get('knowledge_id') + source = event_data.get('source', 'webhook') + action = event_data.get('action', 'update') + + if not knowledge_id: + step_logger.error("❌ Kein knowledge_id im Event") + return + + step_logger.info("=" * 70) + step_logger.info(f"🔄 AI KNOWLEDGE SYNC EVENT") + step_logger.info(f" ID : {knowledge_id}") + step_logger.info(f" Source: {source} | Action: {action}") + step_logger.info("=" * 70) + + espocrm = EspoCRMAPI(ctx) + redis_client = get_redis_client(strict=False) + sync = AIKnowledgeSyncUtils(espocrm, redis_client, ctx) + + await sync.run_sync(knowledge_id)