diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py new file mode 100644 index 0000000..3e2fd44 --- /dev/null +++ b/services/aiknowledge_sync_utils.py @@ -0,0 +1,622 @@ +""" +AI Knowledge Sync Utilities + +Utility functions for synchronizing CAIKnowledge entities with XAI Collections: +- Collection lifecycle management (create, delete) +- Document synchronization with BLAKE3 hash verification +- Metadata-only updates via PATCH +- Orphan detection and cleanup +""" + +import hashlib +import json +from typing import Dict, Any, Optional, List, Tuple +from datetime import datetime + +from services.sync_utils_base import BaseSyncUtils +from services.models import ( + AIKnowledgeActivationStatus, + AIKnowledgeSyncStatus, + JunctionSyncStatus +) + + +class AIKnowledgeSync(BaseSyncUtils): + """Utility class for AI Knowledge ↔ XAI Collections synchronization""" + + def _get_lock_key(self, entity_id: str) -> str: + """Redis lock key for AI Knowledge entities""" + return f"sync_lock:aiknowledge:{entity_id}" + + async def acquire_sync_lock(self, knowledge_id: str) -> bool: + """ + Acquire distributed lock via Redis + update EspoCRM syncStatus. + + Args: + knowledge_id: CAIKnowledge entity ID + + Returns: + True if lock acquired, False if already locked + """ + try: + # STEP 1: Atomic Redis lock + lock_key = self._get_lock_key(knowledge_id) + if not self._acquire_redis_lock(lock_key): + self._log(f"Redis lock already active for {knowledge_id}", level='warn') + return False + + # STEP 2: Update syncStatus to pending_sync + try: + await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value + }) + except Exception as e: + self._log(f"Could not set syncStatus: {e}", level='debug') + + self._log(f"Sync lock acquired for {knowledge_id}") + return True + + except Exception as e: + self._log(f"Error acquiring lock: {e}", level='error') + # Clean up Redis lock on error + lock_key = self._get_lock_key(knowledge_id) + self._release_redis_lock(lock_key) + return False + + async def release_sync_lock( + self, + knowledge_id: str, + success: bool = True, + error_message: Optional[str] = None + ) -> None: + """ + Release sync lock and set final status. + + Args: + knowledge_id: CAIKnowledge entity ID + success: Whether sync succeeded + error_message: Optional error message + """ + try: + update_data = { + 'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value + } + + if success: + update_data['lastSync'] = datetime.now().isoformat() + update_data['syncError'] = None + elif error_message: + update_data['syncError'] = error_message[:2000] + + await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data) + + self._log(f"Sync lock released: {knowledge_id} β†’ {'success' if success else 'failed'}") + + # Release Redis lock + lock_key = self._get_lock_key(knowledge_id) + self._release_redis_lock(lock_key) + + except Exception as e: + self._log(f"Error releasing lock: {e}", level='error') + # Ensure Redis lock is released + lock_key = self._get_lock_key(knowledge_id) + self._release_redis_lock(lock_key) + + async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None: + """ + Main sync orchestrator with activation status handling. + + Args: + knowledge_id: CAIKnowledge entity ID + ctx: Motia context for logging + """ + from services.espocrm import EspoCRMAPI + from services.xai_service import XAIService + + espocrm = EspoCRMAPI(ctx) + xai = XAIService(ctx) + + try: + # 1. Load knowledge entity + knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id) + + activation_status = knowledge.get('activationStatus') + collection_id = knowledge.get('datenbankId') + + ctx.logger.info("=" * 80) + ctx.logger.info(f"πŸ“‹ Processing: {knowledge['name']}") + ctx.logger.info(f" activationStatus: {activation_status}") + ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}") + ctx.logger.info("=" * 80) + + # ═══════════════════════════════════════════════════════════ + # CASE 1: NEW β†’ Create Collection + # ═══════════════════════════════════════════════════════════ + if activation_status == AIKnowledgeActivationStatus.NEW.value: + ctx.logger.info("πŸ†• Status 'new' β†’ Creating XAI Collection") + + collection = await xai.create_collection( + name=knowledge['name'], + metadata={ + 'espocrm_entity_type': 'CAIKnowledge', + 'espocrm_entity_id': knowledge_id, + 'created_at': datetime.now().isoformat() + } + ) + + collection_id = collection['id'] + + # Update EspoCRM: Set datenbankId + change status to 'active' + await espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'datenbankId': collection_id, + 'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value, + 'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value + }) + + ctx.logger.info(f"βœ… Collection created: {collection_id}") + ctx.logger.info(" Status changed to 'active', next webhook will sync documents") + return + + # ═══════════════════════════════════════════════════════════ + # CASE 2: DEACTIVATED β†’ Delete Collection from XAI + # ═══════════════════════════════════════════════════════════ + elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value: + ctx.logger.info("πŸ—‘οΈ Status 'deactivated' β†’ Deleting XAI Collection") + + if collection_id: + try: + await xai.delete_collection(collection_id) + ctx.logger.info(f"βœ… Collection deleted from XAI: {collection_id}") + except Exception as e: + ctx.logger.error(f"❌ Failed to delete collection: {e}") + else: + ctx.logger.info("⏭️ No collection ID, nothing to delete") + + # Update junction entries + junction_entries = await espocrm.get_junction_entries( + 'CAIKnowledgeCDokumente', + 'cAIKnowledgeId', + knowledge_id + ) + + for junction in junction_entries: + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { + 'syncstatus': JunctionSyncStatus.NEW.value, + 'aiDocumentId': None + }) + + ctx.logger.info(f"βœ… Deactivation complete, {len(junction_entries)} junction entries reset") + return + + # ═══════════════════════════════════════════════════════════ + # CASE 3: PAUSED β†’ Skip Sync + # ═══════════════════════════════════════════════════════════ + elif activation_status == AIKnowledgeActivationStatus.PAUSED.value: + ctx.logger.info("⏸️ Status 'paused' β†’ No sync performed") + return + + # ═══════════════════════════════════════════════════════════ + # CASE 4: ACTIVE β†’ Normal Sync + # ═══════════════════════════════════════════════════════════ + elif activation_status == AIKnowledgeActivationStatus.ACTIVE.value: + if not collection_id: + ctx.logger.error("❌ Status 'active' but no datenbankId!") + raise RuntimeError("Active knowledge without collection ID") + + ctx.logger.info(f"πŸ”„ Status 'active' β†’ Syncing documents to {collection_id}") + + # Verify collection exists + collection = await xai.get_collection(collection_id) + if not collection: + ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating") + collection = await xai.create_collection( + name=knowledge['name'], + metadata={ + 'espocrm_entity_type': 'CAIKnowledge', + 'espocrm_entity_id': knowledge_id + } + ) + collection_id = collection['id'] + await espocrm.update_entity('CAIKnowledge', knowledge_id, { + 'datenbankId': collection_id + }) + + # Sync documents + await self._sync_knowledge_documents(knowledge_id, collection_id, ctx) + + else: + ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}") + raise ValueError(f"Invalid activationStatus: {activation_status}") + + finally: + await xai.close() + + async def _sync_knowledge_documents( + self, + knowledge_id: str, + collection_id: str, + ctx + ) -> None: + """ + Sync all documents of a knowledge base to XAI collection. + + Args: + knowledge_id: CAIKnowledge entity ID + collection_id: XAI Collection ID + ctx: Motia context + """ + from services.espocrm import EspoCRMAPI + from services.xai_service import XAIService + + espocrm = EspoCRMAPI(ctx) + xai = XAIService(ctx) + + # Load junction entries + junction_entries = await espocrm.get_junction_entries( + 'CAIKnowledgeCDokumente', + 'cAIKnowledgeId', + knowledge_id + ) + + ctx.logger.info(f"πŸ“Š Found {len(junction_entries)} junction entries") + + if not junction_entries: + ctx.logger.info("βœ… No documents to sync") + return + + # Load documents + documents = {} + for junction in junction_entries: + doc_id = junction['cDokumenteId'] + try: + doc = await espocrm.get_entity('CDokumente', doc_id) + documents[doc_id] = doc + except Exception as e: + ctx.logger.error(f"❌ Failed to load document {doc_id}: {e}") + + ctx.logger.info(f"πŸ“Š Loaded {len(documents)}/{len(junction_entries)} documents") + + # Sync each document + successful = 0 + failed = 0 + skipped = 0 + + for junction in junction_entries: + doc_id = junction['cDokumenteId'] + document = documents.get(doc_id) + + if not document: + failed += 1 + continue + + try: + synced = await self._sync_single_document(junction, document, collection_id, ctx) + if synced: + successful += 1 + else: + skipped += 1 + except Exception as e: + failed += 1 + ctx.logger.error(f"❌ Failed to sync document {doc_id}: {e}") + + # Mark as failed + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { + 'syncstatus': JunctionSyncStatus.FAILED.value + }) + + # Remove orphans + try: + await self._remove_orphaned_documents(collection_id, junction_entries, ctx) + except Exception as e: + ctx.logger.warn(f"⚠️ Failed to remove orphans: {e}") + + # Summary + ctx.logger.info("=" * 80) + ctx.logger.info(f"πŸ“Š Sync Statistics:") + ctx.logger.info(f" βœ… Synced: {successful}") + ctx.logger.info(f" ⏭️ Skipped: {skipped}") + ctx.logger.info(f" ❌ Failed: {failed}") + ctx.logger.info("=" * 80) + + async def _sync_single_document( + self, + junction_entry: Dict, + document: Dict, + collection_id: str, + ctx + ) -> bool: + """ + Sync one document to XAI Collection with BLAKE3 verification. + + Args: + junction_entry: Junction table entry + document: CDokumente entity + collection_id: XAI Collection ID + ctx: Motia context + + Returns: + True if synced, False if skipped + """ + from services.espocrm import EspoCRMAPI + from services.xai_service import XAIService + + espocrm = EspoCRMAPI(ctx) + xai = XAIService(ctx) + + junction_id = junction_entry['id'] + junction_status = junction_entry.get('syncstatus') + junction_ai_doc_id = junction_entry.get('aiDocumentId') + + # 1. Check MIME type support + mime_type = document.get('mimeType') or 'application/octet-stream' + if not xai.is_mime_type_supported(mime_type): + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { + 'syncstatus': JunctionSyncStatus.UNSUPPORTED.value + }) + ctx.logger.info(f"⏭️ Unsupported MIME: {document['name']}") + return False + + # 2. Calculate hashes + current_file_hash = document.get('md5') or document.get('sha256') + if not current_file_hash: + ctx.logger.error(f"❌ No hash for document {document['id']}") + return False + + current_metadata_hash = self._calculate_metadata_hash(document) + + synced_file_hash = junction_entry.get('syncedHash') + synced_metadata_hash = junction_entry.get('syncedMetadataHash') + xai_blake3_hash = junction_entry.get('xaiBlake3Hash') + + # 3. Determine changes + file_changed = (current_file_hash != synced_file_hash) + metadata_changed = (current_metadata_hash != synced_metadata_hash) + + ctx.logger.info(f"πŸ“‹ {document['name']}") + ctx.logger.info(f" File changed: {file_changed}, Metadata changed: {metadata_changed}") + + # 4. Early return if nothing changed + if junction_status == JunctionSyncStatus.SYNCED.value and junction_ai_doc_id: + if not file_changed and not metadata_changed: + # Verify document still exists in XAI + try: + doc_info = await xai.get_collection_document(collection_id, junction_ai_doc_id) + if doc_info: + ctx.logger.info(f" βœ… Already synced (verified)") + return False + else: + ctx.logger.warn(f" ⚠️ Document missing in XAI, re-uploading") + except Exception as e: + ctx.logger.warn(f" ⚠️ Could not verify: {e}") + + # 5. Handle file content change (re-upload) + if file_changed or not junction_ai_doc_id: + ctx.logger.info(f" πŸ”„ {'File changed' if file_changed else 'New file'}, uploading") + + # Download from EspoCRM + download_info = await self._get_document_download_info(document, ctx) + if not download_info: + raise RuntimeError(f"Cannot download document {document['id']}") + + file_content = await espocrm.download_attachment(download_info['attachment_id']) + + # Build metadata + metadata = self._build_xai_metadata(document) + + # Upload to XAI + xai_file_id = await xai.upload_document_with_metadata( + collection_id=collection_id, + file_content=file_content, + filename=download_info['filename'], + mime_type=download_info['mime_type'], + metadata=metadata + ) + + ctx.logger.info(f" βœ… Uploaded β†’ {xai_file_id}") + + # Verify upload + ctx.logger.info(f" πŸ” Verifying upload...") + success, blake3_hash = await xai.verify_upload_integrity( + collection_id=collection_id, + file_id=xai_file_id + ) + + if not success: + ctx.logger.error(f" ❌ Upload verification failed!") + raise RuntimeError("Upload verification failed") + + ctx.logger.info(f" βœ… Verified: {blake3_hash[:32]}...") + + # Update junction + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { + 'aiDocumentId': xai_file_id, + 'syncstatus': JunctionSyncStatus.SYNCED.value, + 'syncedHash': current_file_hash, + 'xaiBlake3Hash': blake3_hash, + 'syncedMetadataHash': current_metadata_hash, + 'lastSync': datetime.now().isoformat() + }) + + return True + + # 6. Handle metadata-only change + elif metadata_changed: + ctx.logger.info(f" πŸ“ Metadata changed, updating") + + xai_file_id = junction_ai_doc_id + metadata = self._build_xai_metadata(document) + + try: + # Try PATCH + await xai.update_document_metadata(collection_id, xai_file_id, metadata) + ctx.logger.info(f" βœ… Metadata updated") + + # Get BLAKE3 hash + success, blake3_hash = await xai.verify_upload_integrity( + collection_id, xai_file_id + ) + + # Update junction + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { + 'syncstatus': JunctionSyncStatus.SYNCED.value, + 'syncedMetadataHash': current_metadata_hash, + 'xaiBlake3Hash': blake3_hash if success else xai_blake3_hash, + 'lastSync': datetime.now().isoformat() + }) + + return True + + except Exception as e: + ctx.logger.warn(f" ⚠️ PATCH failed, re-uploading: {e}") + + # Fallback: Re-upload + download_info = await self._get_document_download_info(document, ctx) + file_content = await espocrm.download_attachment(download_info['attachment_id']) + + await xai.remove_from_collection(collection_id, xai_file_id) + + xai_file_id = await xai.upload_document_with_metadata( + collection_id=collection_id, + file_content=file_content, + filename=download_info['filename'], + mime_type=download_info['mime_type'], + metadata=metadata + ) + + success, blake3_hash = await xai.verify_upload_integrity( + collection_id, xai_file_id + ) + + await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { + 'aiDocumentId': xai_file_id, + 'syncstatus': JunctionSyncStatus.SYNCED.value, + 'syncedHash': current_file_hash, + 'xaiBlake3Hash': blake3_hash, + 'syncedMetadataHash': current_metadata_hash, + 'lastSync': datetime.now().isoformat() + }) + + return True + + return False + + async def _remove_orphaned_documents( + self, + collection_id: str, + junction_entries: List[Dict], + ctx + ) -> None: + """ + Remove documents from XAI that are no longer in junction table. + + Args: + collection_id: XAI Collection ID + junction_entries: List of junction entries + ctx: Motia context + """ + from services.xai_service import XAIService + + xai = XAIService(ctx) + + # Get all XAI file_ids + xai_docs = await xai.list_collection_documents(collection_id) + xai_file_ids = {doc.get('file_id') or doc.get('id') for doc in xai_docs if doc.get('file_id') or doc.get('id')} + + # Get all junction file_ids + junction_file_ids = {j['aiDocumentId'] for j in junction_entries if j.get('aiDocumentId')} + + # Find orphans + orphans = xai_file_ids - junction_file_ids + + if orphans: + ctx.logger.info(f"πŸ—‘οΈ Removing {len(orphans)} orphaned documents") + for orphan_id in orphans: + try: + await xai.remove_from_collection(collection_id, orphan_id) + ctx.logger.info(f" βœ… Removed orphan: {orphan_id}") + except Exception as e: + ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") + else: + ctx.logger.info("βœ… No orphaned documents found") + + def _calculate_metadata_hash(self, document: Dict) -> str: + """ + Calculate hash of sync-relevant metadata. + + Args: + document: CDokumente entity + + Returns: + MD5 hash (32 chars) + """ + metadata = { + 'name': document.get('name', ''), + 'description': document.get('description', ''), + } + + metadata_str = json.dumps(metadata, sort_keys=True) + return hashlib.md5(metadata_str.encode()).hexdigest() + + def _build_xai_metadata(self, document: Dict) -> Dict[str, str]: + """ + Build XAI metadata from CDokumente entity. + + Args: + document: CDokumente entity + + Returns: + Metadata dict for XAI + """ + return { + 'document_name': document.get('name', ''), + 'description': document.get('description', ''), + 'created_at': document.get('createdAt', ''), + 'modified_at': document.get('modifiedAt', ''), + 'espocrm_id': document.get('id', '') + } + + async def _get_document_download_info( + self, + document: Dict, + ctx + ) -> Optional[Dict[str, Any]]: + """ + Get download info for CDokumente entity. + + Args: + document: CDokumente entity + ctx: Motia context + + Returns: + Dict with attachment_id, filename, mime_type + """ + from services.espocrm import EspoCRMAPI + + espocrm = EspoCRMAPI(ctx) + + # Check for dokumentId (CDokumente custom field) + attachment_id = None + filename = None + + if document.get('dokumentId'): + attachment_id = document.get('dokumentId') + filename = document.get('dokumentName') + elif document.get('fileId'): + attachment_id = document.get('fileId') + filename = document.get('fileName') + + if not attachment_id: + ctx.logger.error(f"❌ No attachment ID for document {document['id']}") + return None + + # Get attachment details + try: + attachment = await espocrm.get_entity('Attachment', attachment_id) + return { + 'attachment_id': attachment_id, + 'filename': filename or attachment.get('name', 'unknown'), + 'mime_type': attachment.get('type', 'application/octet-stream') + } + except Exception as e: + ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}") + return None diff --git a/services/espocrm.py b/services/espocrm.py index 97233ad..a9e80de 100644 --- a/services/espocrm.py +++ b/services/espocrm.py @@ -473,3 +473,70 @@ class EspoCRMAPI: except aiohttp.ClientError as e: self._log(f"Download failed: {e}", level='error') raise EspoCRMError(f"Download request failed: {e}") from e + + # ========== Junction Table Operations ========== + + async def get_junction_entries( + self, + junction_entity: str, + filter_field: str, + filter_value: str, + max_size: int = 1000 + ) -> List[Dict[str, Any]]: + """ + Load junction table entries with filtering. + + Args: + junction_entity: Junction entity name (e.g., 'CAIKnowledgeCDokumente') + filter_field: Field to filter on (e.g., 'cAIKnowledgeId') + filter_value: Value to match + max_size: Maximum entries to return + + Returns: + List of junction records with ALL additionalColumns + + Example: + entries = await espocrm.get_junction_entries( + 'CAIKnowledgeCDokumente', + 'cAIKnowledgeId', + 'kb-123' + ) + """ + self._log(f"Loading junction entries: {junction_entity} where {filter_field}={filter_value}") + + result = await self.list_entities( + junction_entity, + where=[{ + 'type': 'equals', + 'attribute': filter_field, + 'value': filter_value + }], + max_size=max_size + ) + + entries = result.get('list', []) + self._log(f"βœ… Loaded {len(entries)} junction entries") + return entries + + async def update_junction_entry( + self, + junction_entity: str, + junction_id: str, + fields: Dict[str, Any] + ) -> None: + """ + Update junction table entry. + + Args: + junction_entity: Junction entity name + junction_id: Junction entry ID + fields: Fields to update + + Example: + await espocrm.update_junction_entry( + 'CAIKnowledgeCDokumente', + 'jct-123', + {'syncstatus': 'synced', 'lastSync': '2026-03-11T20:00:00Z'} + ) + """ + await self.update_entity(junction_entity, junction_id, fields) diff --git a/services/models.py b/services/models.py index f58c1e3..7f66f38 100644 --- a/services/models.py +++ b/services/models.py @@ -68,6 +68,40 @@ class SalutationType(str, Enum): FIRMA = "" +class AIKnowledgeActivationStatus(str, Enum): + """Activation status for CAIKnowledge collections""" + NEW = "new" # Collection noch nicht in XAI erstellt + ACTIVE = "active" # Collection aktiv, Sync lΓ€uft + PAUSED = "paused" # Collection existiert, aber kein Sync + DEACTIVATED = "deactivated" # Collection aus XAI gelΓΆscht + + def __str__(self) -> str: + return self.value + + +class AIKnowledgeSyncStatus(str, Enum): + """Sync status for CAIKnowledge""" + UNCLEAN = "unclean" # Γ„nderungen pending + PENDING_SYNC = "pending_sync" # Sync lΓ€uft (locked) + SYNCED = "synced" # Alles synced + FAILED = "failed" # Sync fehlgeschlagen + + def __str__(self) -> str: + return self.value + + +class JunctionSyncStatus(str, Enum): + """Sync status for junction tables (CAIKnowledgeCDokumente)""" + NEW = "new" + UNCLEAN = "unclean" + SYNCED = "synced" + FAILED = "failed" + UNSUPPORTED = "unsupported" + + def __str__(self) -> str: + return self.value + + # ========== Advoware Models ========== class AdvowareBeteiligteBase(BaseModel): diff --git a/services/xai_service.py b/services/xai_service.py index 475d33e..274cd59 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -1,7 +1,8 @@ """xAI Files & Collections Service""" import os +import asyncio import aiohttp -from typing import Optional, List +from typing import Optional, List, Dict, Tuple from services.logging_utils import get_service_logger XAI_FILES_URL = "https://api.x.ai" @@ -173,3 +174,392 @@ class XAIService: f"⚠️ Fehler beim Entfernen aus Collection {collection_id}: {e}", level='warn' ) + + # ========== Collection Management ========== + + async def create_collection( + self, + name: str, + metadata: Optional[Dict[str, str]] = None, + field_definitions: Optional[List[Dict]] = None + ) -> Dict: + """ + Erstellt eine neue xAI Collection. + + POST https://management-api.x.ai/v1/collections + + Args: + name: Collection name + metadata: Optional metadata dict + field_definitions: Optional field definitions for metadata fields + + Returns: + Collection object mit 'id' field + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"πŸ“š Creating collection: {name}") + + # Standard field definitions fΓΌr document metadata + if field_definitions is None: + field_definitions = [ + {"key": "document_name", "inject_into_chunk": True}, + {"key": "description", "inject_into_chunk": True}, + {"key": "created_at", "inject_into_chunk": False}, + {"key": "modified_at", "inject_into_chunk": False}, + {"key": "espocrm_id", "inject_into_chunk": False} + ] + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections" + headers = { + "Authorization": f"Bearer {self.management_key}", + "Content-Type": "application/json" + } + + body = { + "collection_name": name, + "field_definitions": field_definitions + } + + # Add metadata if provided + if metadata: + body["metadata"] = metadata + + async with session.post(url, json=body, headers=headers) as response: + if response.status not in (200, 201): + raw = await response.text() + raise RuntimeError( + f"Failed to create collection ({response.status}): {raw}" + ) + + data = await response.json() + + collection_id = data.get('id') + self._log(f"βœ… Collection created: {collection_id}") + return data + + async def get_collection(self, collection_id: str) -> Optional[Dict]: + """ + Holt Collection-Details. + + GET https://management-api.x.ai/v1/collections/{collection_id} + + Returns: + Collection object or None if not found + + Raises: + RuntimeError: bei HTTP-Fehler (außer 404) + """ + self._log(f"πŸ“„ Getting collection: {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.get(url, headers=headers) as response: + if response.status == 404: + self._log(f"⚠️ Collection not found: {collection_id}", level='warn') + return None + + if response.status not in (200,): + raw = await response.text() + raise RuntimeError( + f"Failed to get collection ({response.status}): {raw}" + ) + + data = await response.json() + + self._log(f"βœ… Collection retrieved: {data.get('collection_name', 'N/A')}") + return data + + async def delete_collection(self, collection_id: str) -> None: + """ + LΓΆscht eine XAI Collection. + + DELETE https://management-api.x.ai/v1/collections/{collection_id} + + NOTE: Documents in der Collection werden NICHT gelΓΆscht! + Sie kΓΆnnen noch in anderen Collections sein. + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"πŸ—‘οΈ Deleting collection {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.delete(url, headers=headers) as response: + if response.status not in (200, 204): + raw = await response.text() + raise RuntimeError( + f"Failed to delete collection {collection_id} ({response.status}): {raw}" + ) + + self._log(f"βœ… Collection deleted: {collection_id}") + + async def list_collection_documents(self, collection_id: str) -> List[Dict]: + """ + Listet alle Dokumente in einer Collection. + + GET https://management-api.x.ai/v1/collections/{collection_id}/documents + + Returns: + List von document objects mit file_id, filename, hash, fields + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"πŸ“‹ Listing documents in collection {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.get(url, headers=headers) as response: + if response.status not in (200,): + raw = await response.text() + raise RuntimeError( + f"Failed to list documents ({response.status}): {raw}" + ) + + data = await response.json() + + # API sollte eine Liste zurΓΌckgeben oder ein dict mit 'documents' key + if isinstance(data, list): + documents = data + elif isinstance(data, dict) and 'documents' in data: + documents = data['documents'] + else: + documents = [] + + self._log(f"βœ… Listed {len(documents)} documents") + return documents + + async def get_collection_document(self, collection_id: str, file_id: str) -> Optional[Dict]: + """ + Holt Dokument-Details aus einer XAI Collection. + + GET https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} + + Returns: + Dict mit document info including BLAKE3 hash: + { + 'file_id': 'file_xyz', + 'filename': 'document.pdf', + 'hash': 'blake3:abcd1234...', # BLAKE3 Hash! + 'fields': {...} # Metadata + } + + Returns None if not found. + """ + self._log(f"πŸ“„ Getting document {file_id} from collection {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.get(url, headers=headers) as response: + if response.status == 404: + return None + + if response.status not in (200,): + raw = await response.text() + raise RuntimeError( + f"Failed to get document from collection ({response.status}): {raw}" + ) + + data = await response.json() + + self._log(f"βœ… Document info retrieved: {data.get('filename', 'N/A')}") + return data + + async def update_document_metadata( + self, + collection_id: str, + file_id: str, + metadata: Dict[str, str] + ) -> None: + """ + Aktualisiert nur Metadaten eines Documents (kein File-Upload). + + PATCH https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} + + Args: + collection_id: XAI Collection ID + file_id: XAI file_id + metadata: Updated metadata fields + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"πŸ“ Updating metadata for document {file_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = { + "Authorization": f"Bearer {self.management_key}", + "Content-Type": "application/json" + } + + body = {"fields": metadata} + + async with session.patch(url, json=body, headers=headers) as response: + if response.status not in (200, 204): + raw = await response.text() + raise RuntimeError( + f"Failed to update document metadata ({response.status}): {raw}" + ) + + self._log(f"βœ… Metadata updated for {file_id}") + + # ========== High-Level Operations ========== + + async def upload_document_with_metadata( + self, + collection_id: str, + file_content: bytes, + filename: str, + mime_type: str, + metadata: Dict[str, str] + ) -> str: + """ + Upload file + add to collection with metadata in one operation. + + Args: + collection_id: XAI Collection ID + file_content: File bytes + filename: Filename + mime_type: MIME type + metadata: Metadata fields + + Returns: + XAI file_id + + Raises: + RuntimeError: bei Upload/Add-Fehler + """ + # Step 1: Upload file + file_id = await self.upload_file(file_content, filename, mime_type) + + try: + # Step 2: Add to collection (XAI API automatically handles metadata) + # Note: Metadata muss beim POST mit angegeben werden + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = { + "Authorization": f"Bearer {self.management_key}", + "Content-Type": "application/json" + } + + body = {"fields": metadata} + + async with session.post(url, json=body, headers=headers) as response: + if response.status not in (200, 201): + raw = await response.text() + raise RuntimeError( + f"Failed to add file to collection with metadata ({response.status}): {raw}" + ) + + self._log(f"βœ… File {file_id} added to collection {collection_id} with metadata") + return file_id + + except Exception as e: + # Cleanup: File wurde hochgeladen aber nicht zur Collection hinzugefΓΌgt + self._log(f"⚠️ Failed to add to collection, file {file_id} may be orphaned", level='warn') + raise + + async def verify_upload_integrity( + self, + collection_id: str, + file_id: str, + retry_attempts: int = 3 + ) -> Tuple[bool, Optional[str]]: + """ + Verifiziert Upload-IntegritΓ€t via BLAKE3 Hash von XAI. + + Args: + collection_id: XAI Collection ID + file_id: XAI file_id + retry_attempts: Retry bei temporΓ€ren Fehlern + + Returns: + (success: bool, blake3_hash: Optional[str]) + """ + for attempt in range(1, retry_attempts + 1): + try: + doc_info = await self.get_collection_document(collection_id, file_id) + + if not doc_info: + self._log(f"⚠️ Document {file_id} not found in collection", level='warn') + return (False, None) + + blake3_hash = doc_info.get('hash') + + if not blake3_hash: + self._log(f"⚠️ No hash returned by XAI API", level='warn') + return (False, None) + + self._log(f"βœ… Upload verified, BLAKE3: {blake3_hash[:32]}...") + return (True, blake3_hash) + + except Exception as e: + if attempt < retry_attempts: + delay = 2 ** attempt # Exponential backoff + self._log(f"⚠️ Verification failed (attempt {attempt}), retry in {delay}s", level='warn') + await asyncio.sleep(delay) + else: + self._log(f"❌ Verification failed after {retry_attempts} attempts: {e}", level='error') + return (False, None) + + return (False, None) + + def is_mime_type_supported(self, mime_type: str) -> bool: + """ + PrΓΌft, ob XAI diesen MIME-Type unterstΓΌtzt. + + Args: + mime_type: MIME type string + + Returns: + True wenn unterstΓΌtzt, False sonst + """ + # Liste der unterstΓΌtzten MIME-Types basierend auf XAI Dokumentation + supported_types = { + # Documents + 'application/pdf', + 'application/msword', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'application/vnd.ms-excel', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'application/vnd.oasis.opendocument.text', + 'application/epub+zip', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + + # Text + 'text/plain', + 'text/html', + 'text/markdown', + 'text/csv', + 'text/xml', + + # Code + 'text/javascript', + 'application/json', + 'application/xml', + 'text/x-python', + 'text/x-java-source', + 'text/x-c', + 'text/x-c++src', + + # Other + 'application/zip', + } + + # Normalisiere MIME-Type (lowercase, strip whitespace) + normalized = mime_type.lower().strip() + + return normalized in supported_types diff --git a/steps/vmh/aiknowledge_full_sync_cron_step.py b/steps/vmh/aiknowledge_full_sync_cron_step.py new file mode 100644 index 0000000..18450db --- /dev/null +++ b/steps/vmh/aiknowledge_full_sync_cron_step.py @@ -0,0 +1,90 @@ +"""AI Knowledge Full Sync - Daily Cron Job""" +from typing import Any +from motia import FlowContext, cron + + +config = { + "name": "AI Knowledge Full Sync", + "description": "Daily full sync of all CAIKnowledge entities (catches missed webhooks)", + "flows": ["aiknowledge-full-sync"], + "triggers": [ + cron("0 0 2 * * *"), # Daily at 2:00 AM + ], + "enqueues": ["aiknowledge.sync"], +} + + +async def handler(input_data: None, ctx: FlowContext[Any]) -> None: + """ + Daily full sync handler. + + Loads all CAIKnowledge entities that need sync and emits events. + Runs every day at 02:00:00. + """ + from services.espocrm import EspoCRMAPI + from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus + + ctx.logger.info("=" * 80) + ctx.logger.info("πŸŒ™ DAILY FULL SYNC STARTED") + ctx.logger.info("=" * 80) + + espocrm = EspoCRMAPI(ctx) + + try: + # Load all CAIKnowledge entities with status 'active' that need sync + result = await espocrm.list_entities( + 'CAIKnowledge', + where=[ + { + 'type': 'equals', + 'attribute': 'activationStatus', + 'value': AIKnowledgeActivationStatus.ACTIVE.value + }, + { + 'type': 'in', + 'attribute': 'syncStatus', + 'value': [ + AIKnowledgeSyncStatus.UNCLEAN.value, + AIKnowledgeSyncStatus.FAILED.value + ] + } + ], + select='id,name,syncStatus', + max_size=1000 # Adjust if you have more + ) + + entities = result.get('list', []) + total = len(entities) + + ctx.logger.info(f"πŸ“Š Found {total} knowledge bases needing sync") + + if total == 0: + ctx.logger.info("βœ… All knowledge bases are synced") + ctx.logger.info("=" * 80) + return + + # Enqueue sync events for all + for i, entity in enumerate(entities, 1): + await ctx.enqueue({ + 'topic': 'aiknowledge.sync', + 'data': { + 'knowledge_id': entity['id'], + 'source': 'daily_full_sync' + } + }) + + ctx.logger.info( + f"πŸ“€ [{i}/{total}] Enqueued: {entity['name']} " + f"(syncStatus={entity.get('syncStatus')})" + ) + + ctx.logger.info("=" * 80) + ctx.logger.info(f"βœ… Full sync complete: {total} events enqueued") + ctx.logger.info("=" * 80) + + except Exception as e: + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FULL SYNC FAILED") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Error: {e}", exc_info=True) + raise diff --git a/steps/vmh/aiknowledge_sync_event_step.py b/steps/vmh/aiknowledge_sync_event_step.py new file mode 100644 index 0000000..7663b68 --- /dev/null +++ b/steps/vmh/aiknowledge_sync_event_step.py @@ -0,0 +1,89 @@ +"""AI Knowledge Sync Event Handler""" +from typing import Dict, Any +from redis import Redis +from motia import FlowContext, queue + + +config = { + "name": "AI Knowledge Sync", + "description": "Synchronizes CAIKnowledge entities with XAI Collections", + "flows": ["vmh-aiknowledge"], + "triggers": [ + queue("aiknowledge.sync") + ], +} + + +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: + """ + Event handler for AI Knowledge synchronization. + + Emitted by: + - Webhook on CAIKnowledge update + - Daily full sync cron job + + Args: + event_data: Event payload with knowledge_id + ctx: Motia context + """ + from services.config import get_redis_client + from services.aiknowledge_sync_utils import AIKnowledgeSync + + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ”„ AI KNOWLEDGE SYNC STARTED") + ctx.logger.info("=" * 80) + + # Extract data + knowledge_id = event_data.get('knowledge_id') + source = event_data.get('source', 'unknown') + + if not knowledge_id: + ctx.logger.error("❌ Missing knowledge_id in event data") + return + + ctx.logger.info(f"πŸ“‹ Knowledge ID: {knowledge_id}") + ctx.logger.info(f"πŸ“‹ Source: {source}") + ctx.logger.info("=" * 80) + + # Get Redis for locking + redis_client: Redis = get_redis_client(strict=False) + + # Initialize sync utils + sync_utils = AIKnowledgeSync(ctx, redis_client) + + # Acquire lock + lock_acquired = await sync_utils.acquire_sync_lock(knowledge_id) + + if not lock_acquired: + ctx.logger.warning(f"⏸️ Lock already held for {knowledge_id}, skipping") + ctx.logger.info(" (Will be retried by Motia queue)") + raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry + + try: + # Perform sync + await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx) + + ctx.logger.info("=" * 80) + ctx.logger.info("βœ… AI KNOWLEDGE SYNC COMPLETED") + ctx.logger.info("=" * 80) + + # Release lock with success=True + await sync_utils.release_sync_lock(knowledge_id, success=True) + + except Exception as e: + ctx.logger.error("=" * 80) + ctx.logger.error("❌ AI KNOWLEDGE SYNC FAILED") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Error: {e}", exc_info=True) + ctx.logger.error(f"Knowledge ID: {knowledge_id}") + ctx.logger.error("=" * 80) + + # Release lock with failure + await sync_utils.release_sync_lock( + knowledge_id, + success=False, + error_message=str(e) + ) + + # Re-raise to let Motia retry + raise diff --git a/steps/vmh/webhook/aiknowledge_update_api_step.py b/steps/vmh/webhook/aiknowledge_update_api_step.py new file mode 100644 index 0000000..76e258d --- /dev/null +++ b/steps/vmh/webhook/aiknowledge_update_api_step.py @@ -0,0 +1,73 @@ +"""VMH Webhook - AI Knowledge Update""" +from typing import Any +from motia import FlowContext, http, ApiRequest, ApiResponse + + +config = { + "name": "VMH Webhook AI Knowledge Update", + "description": "Receives update webhooks from EspoCRM for CAIKnowledge entities", + "flows": ["vmh-aiknowledge"], + "triggers": [ + http("POST", "/vmh/webhook/aiknowledge/update") + ], + "enqueues": ["aiknowledge.sync"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: + """ + Webhook handler for CAIKnowledge updates in EspoCRM. + + Triggered when: + - activationStatus changes + - syncStatus changes (e.g., set to 'unclean') + - Documents linked/unlinked + """ + try: + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ”” AI Knowledge Update Webhook") + ctx.logger.info("=" * 80) + + # Extract payload + payload = request.body + + # Validate required fields + knowledge_id = payload.get('entity_id') or payload.get('id') + entity_type = payload.get('entity_type', 'CAIKnowledge') + action = payload.get('action', 'update') + + if not knowledge_id: + ctx.logger.error("❌ Missing entity_id in payload") + return ApiResponse( + status_code=400, + body={'success': False, 'error': 'Missing entity_id'} + ) + + ctx.logger.info(f"πŸ“‹ Entity Type: {entity_type}") + ctx.logger.info(f"πŸ“‹ Entity ID: {knowledge_id}") + ctx.logger.info(f"πŸ“‹ Action: {action}") + + # Enqueue sync event + await ctx.enqueue({ + 'topic': 'aiknowledge.sync', + 'data': { + 'knowledge_id': knowledge_id, + 'source': 'webhook', + 'action': action + } + }) + + ctx.logger.info(f"βœ… Sync event enqueued for {knowledge_id}") + ctx.logger.info("=" * 80) + + return ApiResponse( + status_code=200, + body={'success': True, 'knowledge_id': knowledge_id} + ) + + except Exception as e: + ctx.logger.error(f"❌ Webhook error: {e}") + return ApiResponse( + status_code=500, + body={'success': False, 'error': str(e)} + )