""" AI Knowledge Sync Utilities Utility functions for synchronizing CAIKnowledge entities with XAI Collections: - Collection lifecycle management (create, delete) - Document synchronization with BLAKE3 hash verification - Metadata-only updates via PATCH - Orphan detection and cleanup """ import hashlib import json from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from services.sync_utils_base import BaseSyncUtils from services.models import ( AIKnowledgeActivationStatus, AIKnowledgeSyncStatus, JunctionSyncStatus ) class AIKnowledgeSync(BaseSyncUtils): """Utility class for AI Knowledge ↔ XAI Collections synchronization""" def _get_lock_key(self, entity_id: str) -> str: """Redis lock key for AI Knowledge entities""" return f"sync_lock:aiknowledge:{entity_id}" async def acquire_sync_lock(self, knowledge_id: str) -> bool: """ Acquire distributed lock via Redis + update EspoCRM syncStatus. Args: knowledge_id: CAIKnowledge entity ID Returns: True if lock acquired, False if already locked """ try: # STEP 1: Atomic Redis lock lock_key = self._get_lock_key(knowledge_id) if not self._acquire_redis_lock(lock_key): self._log(f"Redis lock already active for {knowledge_id}", level='warn') return False # STEP 2: Update syncStatus to pending_sync try: await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { 'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value }) except Exception as e: self._log(f"Could not set syncStatus: {e}", level='debug') self._log(f"Sync lock acquired for {knowledge_id}") return True except Exception as e: self._log(f"Error acquiring lock: {e}", level='error') # Clean up Redis lock on error lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) return False async def release_sync_lock( self, knowledge_id: str, success: bool = True, error_message: Optional[str] = None ) -> None: """ Release sync lock and set final status. Args: knowledge_id: CAIKnowledge entity ID success: Whether sync succeeded error_message: Optional error message """ try: update_data = { 'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value } if success: update_data['lastSync'] = datetime.now().isoformat() update_data['syncError'] = None elif error_message: update_data['syncError'] = error_message[:2000] await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data) self._log(f"Sync lock released: {knowledge_id} β†’ {'success' if success else 'failed'}") # Release Redis lock lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) except Exception as e: self._log(f"Error releasing lock: {e}", level='error') # Ensure Redis lock is released lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None: """ Main sync orchestrator with activation status handling. Args: knowledge_id: CAIKnowledge entity ID ctx: Motia context for logging """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) try: # 1. Load knowledge entity knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id) activation_status = knowledge.get('activationStatus') collection_id = knowledge.get('datenbankId') ctx.logger.info("=" * 80) ctx.logger.info(f"πŸ“‹ Processing: {knowledge['name']}") ctx.logger.info(f" activationStatus: {activation_status}") ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}") ctx.logger.info("=" * 80) # ═══════════════════════════════════════════════════════════ # CASE 1: NEW β†’ Create Collection # ═══════════════════════════════════════════════════════════ if activation_status == AIKnowledgeActivationStatus.NEW.value: ctx.logger.info("πŸ†• Status 'new' β†’ Creating XAI Collection") collection = await xai.create_collection( name=knowledge['name'], metadata={ 'espocrm_entity_type': 'CAIKnowledge', 'espocrm_entity_id': knowledge_id, 'created_at': datetime.now().isoformat() } ) collection_id = collection['id'] # Update EspoCRM: Set datenbankId + change status to 'active' await espocrm.update_entity('CAIKnowledge', knowledge_id, { 'datenbankId': collection_id, 'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value, 'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value }) ctx.logger.info(f"βœ… Collection created: {collection_id}") ctx.logger.info(" Status changed to 'active', next webhook will sync documents") return # ═══════════════════════════════════════════════════════════ # CASE 2: DEACTIVATED β†’ Delete Collection from XAI # ═══════════════════════════════════════════════════════════ elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value: ctx.logger.info("πŸ—‘οΈ Status 'deactivated' β†’ Deleting XAI Collection") if collection_id: try: await xai.delete_collection(collection_id) ctx.logger.info(f"βœ… Collection deleted from XAI: {collection_id}") except Exception as e: ctx.logger.error(f"❌ Failed to delete collection: {e}") else: ctx.logger.info("⏭️ No collection ID, nothing to delete") # Update junction entries junction_entries = await espocrm.get_junction_entries( 'CAIKnowledgeCDokumente', 'cAIKnowledgeId', knowledge_id ) for junction in junction_entries: await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { 'syncstatus': JunctionSyncStatus.NEW.value, 'aiDocumentId': None }) ctx.logger.info(f"βœ… Deactivation complete, {len(junction_entries)} junction entries reset") return # ═══════════════════════════════════════════════════════════ # CASE 3: PAUSED β†’ Skip Sync # ═══════════════════════════════════════════════════════════ elif activation_status == AIKnowledgeActivationStatus.PAUSED.value: ctx.logger.info("⏸️ Status 'paused' β†’ No sync performed") return # ═══════════════════════════════════════════════════════════ # CASE 4: ACTIVE β†’ Normal Sync # ═══════════════════════════════════════════════════════════ elif activation_status == AIKnowledgeActivationStatus.ACTIVE.value: if not collection_id: ctx.logger.error("❌ Status 'active' but no datenbankId!") raise RuntimeError("Active knowledge without collection ID") ctx.logger.info(f"πŸ”„ Status 'active' β†’ Syncing documents to {collection_id}") # Verify collection exists collection = await xai.get_collection(collection_id) if not collection: ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating") collection = await xai.create_collection( name=knowledge['name'], metadata={ 'espocrm_entity_type': 'CAIKnowledge', 'espocrm_entity_id': knowledge_id } ) collection_id = collection['id'] await espocrm.update_entity('CAIKnowledge', knowledge_id, { 'datenbankId': collection_id }) # Sync documents await self._sync_knowledge_documents(knowledge_id, collection_id, ctx) else: ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}") raise ValueError(f"Invalid activationStatus: {activation_status}") finally: await xai.close() async def _sync_knowledge_documents( self, knowledge_id: str, collection_id: str, ctx ) -> None: """ Sync all documents of a knowledge base to XAI collection. Args: knowledge_id: CAIKnowledge entity ID collection_id: XAI Collection ID ctx: Motia context """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) # Load junction entries junction_entries = await espocrm.get_junction_entries( 'CAIKnowledgeCDokumente', 'cAIKnowledgeId', knowledge_id ) ctx.logger.info(f"πŸ“Š Found {len(junction_entries)} junction entries") if not junction_entries: ctx.logger.info("βœ… No documents to sync") return # Load documents documents = {} for junction in junction_entries: doc_id = junction['cDokumenteId'] try: doc = await espocrm.get_entity('CDokumente', doc_id) documents[doc_id] = doc except Exception as e: ctx.logger.error(f"❌ Failed to load document {doc_id}: {e}") ctx.logger.info(f"πŸ“Š Loaded {len(documents)}/{len(junction_entries)} documents") # Sync each document successful = 0 failed = 0 skipped = 0 for junction in junction_entries: doc_id = junction['cDokumenteId'] document = documents.get(doc_id) if not document: failed += 1 continue try: synced = await self._sync_single_document(junction, document, collection_id, ctx) if synced: successful += 1 else: skipped += 1 except Exception as e: failed += 1 ctx.logger.error(f"❌ Failed to sync document {doc_id}: {e}") # Mark as failed await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { 'syncstatus': JunctionSyncStatus.FAILED.value }) # Remove orphans try: await self._remove_orphaned_documents(collection_id, junction_entries, ctx) except Exception as e: ctx.logger.warn(f"⚠️ Failed to remove orphans: {e}") # Summary ctx.logger.info("=" * 80) ctx.logger.info(f"πŸ“Š Sync Statistics:") ctx.logger.info(f" βœ… Synced: {successful}") ctx.logger.info(f" ⏭️ Skipped: {skipped}") ctx.logger.info(f" ❌ Failed: {failed}") ctx.logger.info("=" * 80) async def _sync_single_document( self, junction_entry: Dict, document: Dict, collection_id: str, ctx ) -> bool: """ Sync one document to XAI Collection with BLAKE3 verification. Args: junction_entry: Junction table entry document: CDokumente entity collection_id: XAI Collection ID ctx: Motia context Returns: True if synced, False if skipped """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) junction_id = junction_entry['id'] junction_status = junction_entry.get('syncstatus') junction_ai_doc_id = junction_entry.get('aiDocumentId') # 1. Check MIME type support mime_type = document.get('mimeType') or 'application/octet-stream' if not xai.is_mime_type_supported(mime_type): await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { 'syncstatus': JunctionSyncStatus.UNSUPPORTED.value }) ctx.logger.info(f"⏭️ Unsupported MIME: {document['name']}") return False # 2. Calculate hashes current_file_hash = document.get('md5') or document.get('sha256') if not current_file_hash: ctx.logger.error(f"❌ No hash for document {document['id']}") return False current_metadata_hash = self._calculate_metadata_hash(document) synced_file_hash = junction_entry.get('syncedHash') synced_metadata_hash = junction_entry.get('syncedMetadataHash') xai_blake3_hash = junction_entry.get('xaiBlake3Hash') # 3. Determine changes file_changed = (current_file_hash != synced_file_hash) metadata_changed = (current_metadata_hash != synced_metadata_hash) ctx.logger.info(f"πŸ“‹ {document['name']}") ctx.logger.info(f" File changed: {file_changed}, Metadata changed: {metadata_changed}") # 4. Early return if nothing changed if junction_status == JunctionSyncStatus.SYNCED.value and junction_ai_doc_id: if not file_changed and not metadata_changed: # Verify document still exists in XAI try: doc_info = await xai.get_collection_document(collection_id, junction_ai_doc_id) if doc_info: ctx.logger.info(f" βœ… Already synced (verified)") return False else: ctx.logger.warn(f" ⚠️ Document missing in XAI, re-uploading") except Exception as e: ctx.logger.warn(f" ⚠️ Could not verify: {e}") # 5. Handle file content change (re-upload) if file_changed or not junction_ai_doc_id: ctx.logger.info(f" πŸ”„ {'File changed' if file_changed else 'New file'}, uploading") # Download from EspoCRM download_info = await self._get_document_download_info(document, ctx) if not download_info: raise RuntimeError(f"Cannot download document {document['id']}") file_content = await espocrm.download_attachment(download_info['attachment_id']) # Build metadata metadata = self._build_xai_metadata(document) # Upload to XAI xai_file_id = await xai.upload_document_with_metadata( collection_id=collection_id, file_content=file_content, filename=download_info['filename'], mime_type=download_info['mime_type'], metadata=metadata ) ctx.logger.info(f" βœ… Uploaded β†’ {xai_file_id}") # Verify upload ctx.logger.info(f" πŸ” Verifying upload...") success, blake3_hash = await xai.verify_upload_integrity( collection_id=collection_id, file_id=xai_file_id ) if not success: ctx.logger.error(f" ❌ Upload verification failed!") raise RuntimeError("Upload verification failed") ctx.logger.info(f" βœ… Verified: {blake3_hash[:32]}...") # Update junction await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { 'aiDocumentId': xai_file_id, 'syncstatus': JunctionSyncStatus.SYNCED.value, 'syncedHash': current_file_hash, 'xaiBlake3Hash': blake3_hash, 'syncedMetadataHash': current_metadata_hash, 'lastSync': datetime.now().isoformat() }) return True # 6. Handle metadata-only change elif metadata_changed: ctx.logger.info(f" πŸ“ Metadata changed, updating") xai_file_id = junction_ai_doc_id metadata = self._build_xai_metadata(document) try: # Try PATCH await xai.update_document_metadata(collection_id, xai_file_id, metadata) ctx.logger.info(f" βœ… Metadata updated") # Get BLAKE3 hash success, blake3_hash = await xai.verify_upload_integrity( collection_id, xai_file_id ) # Update junction await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { 'syncstatus': JunctionSyncStatus.SYNCED.value, 'syncedMetadataHash': current_metadata_hash, 'xaiBlake3Hash': blake3_hash if success else xai_blake3_hash, 'lastSync': datetime.now().isoformat() }) return True except Exception as e: ctx.logger.warn(f" ⚠️ PATCH failed, re-uploading: {e}") # Fallback: Re-upload download_info = await self._get_document_download_info(document, ctx) file_content = await espocrm.download_attachment(download_info['attachment_id']) await xai.remove_from_collection(collection_id, xai_file_id) xai_file_id = await xai.upload_document_with_metadata( collection_id=collection_id, file_content=file_content, filename=download_info['filename'], mime_type=download_info['mime_type'], metadata=metadata ) success, blake3_hash = await xai.verify_upload_integrity( collection_id, xai_file_id ) await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { 'aiDocumentId': xai_file_id, 'syncstatus': JunctionSyncStatus.SYNCED.value, 'syncedHash': current_file_hash, 'xaiBlake3Hash': blake3_hash, 'syncedMetadataHash': current_metadata_hash, 'lastSync': datetime.now().isoformat() }) return True return False async def _remove_orphaned_documents( self, collection_id: str, junction_entries: List[Dict], ctx ) -> None: """ Remove documents from XAI that are no longer in junction table. Args: collection_id: XAI Collection ID junction_entries: List of junction entries ctx: Motia context """ from services.xai_service import XAIService xai = XAIService(ctx) # Get all XAI file_ids xai_docs = await xai.list_collection_documents(collection_id) xai_file_ids = {doc.get('file_id') or doc.get('id') for doc in xai_docs if doc.get('file_id') or doc.get('id')} # Get all junction file_ids junction_file_ids = {j['aiDocumentId'] for j in junction_entries if j.get('aiDocumentId')} # Find orphans orphans = xai_file_ids - junction_file_ids if orphans: ctx.logger.info(f"πŸ—‘οΈ Removing {len(orphans)} orphaned documents") for orphan_id in orphans: try: await xai.remove_from_collection(collection_id, orphan_id) ctx.logger.info(f" βœ… Removed orphan: {orphan_id}") except Exception as e: ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") else: ctx.logger.info("βœ… No orphaned documents found") def _calculate_metadata_hash(self, document: Dict) -> str: """ Calculate hash of sync-relevant metadata. Args: document: CDokumente entity Returns: MD5 hash (32 chars) """ metadata = { 'name': document.get('name', ''), 'description': document.get('description', ''), } metadata_str = json.dumps(metadata, sort_keys=True) return hashlib.md5(metadata_str.encode()).hexdigest() def _build_xai_metadata(self, document: Dict) -> Dict[str, str]: """ Build XAI metadata from CDokumente entity. Args: document: CDokumente entity Returns: Metadata dict for XAI """ return { 'document_name': document.get('name', ''), 'description': document.get('description', ''), 'created_at': document.get('createdAt', ''), 'modified_at': document.get('modifiedAt', ''), 'espocrm_id': document.get('id', '') } async def _get_document_download_info( self, document: Dict, ctx ) -> Optional[Dict[str, Any]]: """ Get download info for CDokumente entity. Args: document: CDokumente entity ctx: Motia context Returns: Dict with attachment_id, filename, mime_type """ from services.espocrm import EspoCRMAPI espocrm = EspoCRMAPI(ctx) # Check for dokumentId (CDokumente custom field) attachment_id = None filename = None if document.get('dokumentId'): attachment_id = document.get('dokumentId') filename = document.get('dokumentName') elif document.get('fileId'): attachment_id = document.get('fileId') filename = document.get('fileName') if not attachment_id: ctx.logger.error(f"❌ No attachment ID for document {document['id']}") return None # Get attachment details try: attachment = await espocrm.get_entity('Attachment', attachment_id) return { 'attachment_id': attachment_id, 'filename': filename or attachment.get('name', 'unknown'), 'mime_type': attachment.get('type', 'application/octet-stream') } except Exception as e: ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}") return None