""" AI Knowledge Sync Utilities Utility functions for synchronizing CAIKnowledge entities with XAI Collections: - Collection lifecycle management (create, delete) - Document synchronization with BLAKE3 hash verification - Metadata-only updates via PATCH - Orphan detection and cleanup """ import hashlib import json from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from services.sync_utils_base import BaseSyncUtils from services.models import ( AIKnowledgeActivationStatus, AIKnowledgeSyncStatus, JunctionSyncStatus ) class AIKnowledgeSync(BaseSyncUtils): """Utility class for AI Knowledge ↔ XAI Collections synchronization""" def _get_lock_key(self, entity_id: str) -> str: """Redis lock key for AI Knowledge entities""" return f"sync_lock:aiknowledge:{entity_id}" async def acquire_sync_lock(self, knowledge_id: str) -> bool: """ Acquire distributed lock via Redis + update EspoCRM syncStatus. Args: knowledge_id: CAIKnowledge entity ID Returns: True if lock acquired, False if already locked """ try: # STEP 1: Atomic Redis lock lock_key = self._get_lock_key(knowledge_id) if not self._acquire_redis_lock(lock_key): self._log(f"Redis lock already active for {knowledge_id}", level='warn') return False # STEP 2: Update syncStatus to pending_sync try: await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { 'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value }) except Exception as e: self._log(f"Could not set syncStatus: {e}", level='debug') self._log(f"Sync lock acquired for {knowledge_id}") return True except Exception as e: self._log(f"Error acquiring lock: {e}", level='error') # Clean up Redis lock on error lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) return False async def release_sync_lock( self, knowledge_id: str, success: bool = True, error_message: Optional[str] = None ) -> None: """ Release sync lock and set final status. Args: knowledge_id: CAIKnowledge entity ID success: Whether sync succeeded error_message: Optional error message """ try: update_data = { 'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value } if success: update_data['lastSync'] = datetime.now().isoformat() update_data['syncError'] = None elif error_message: update_data['syncError'] = error_message[:2000] await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data) self._log(f"Sync lock released: {knowledge_id} β†’ {'success' if success else 'failed'}") # Release Redis lock lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) except Exception as e: self._log(f"Error releasing lock: {e}", level='error') # Ensure Redis lock is released lock_key = self._get_lock_key(knowledge_id) self._release_redis_lock(lock_key) async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None: """ Main sync orchestrator with activation status handling. Args: knowledge_id: CAIKnowledge entity ID ctx: Motia context for logging """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) try: # 1. Load knowledge entity knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id) activation_status = knowledge.get('aktivierungsstatus') collection_id = knowledge.get('datenbankId') ctx.logger.info("=" * 80) ctx.logger.info(f"πŸ“‹ Processing: {knowledge['name']}") ctx.logger.info(f" aktivierungsstatus: {activation_status}") ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}") ctx.logger.info("=" * 80) # ═══════════════════════════════════════════════════════════ # CASE 1: NEW β†’ Create Collection # ═══════════════════════════════════════════════════════════ if activation_status == AIKnowledgeActivationStatus.NEW.value: ctx.logger.info("πŸ†• Status 'new' β†’ Creating XAI Collection") collection = await xai.create_collection( name=knowledge['name'], metadata={ 'espocrm_entity_type': 'CAIKnowledge', 'espocrm_entity_id': knowledge_id, 'created_at': datetime.now().isoformat() } ) # XAI API returns 'collection_id' not 'id' collection_id = collection.get('collection_id') or collection.get('id') # Update EspoCRM: Set datenbankId + change status to 'active' await espocrm.update_entity('CAIKnowledge', knowledge_id, { 'datenbankId': collection_id, 'aktivierungsstatus': AIKnowledgeActivationStatus.ACTIVE.value, 'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value }) ctx.logger.info(f"βœ… Collection created: {collection_id}") ctx.logger.info(" Status changed to 'active', now syncing documents...") # Continue to document sync immediately (don't return) # Fall through to sync logic below # ═══════════════════════════════════════════════════════════ # CASE 2: DEACTIVATED β†’ Delete Collection from XAI # ═══════════════════════════════════════════════════════════ elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value: ctx.logger.info("πŸ—‘οΈ Status 'deactivated' β†’ Deleting XAI Collection") if collection_id: try: await xai.delete_collection(collection_id) ctx.logger.info(f"βœ… Collection deleted from XAI: {collection_id}") except Exception as e: ctx.logger.error(f"❌ Failed to delete collection: {e}") else: ctx.logger.info("⏭️ No collection ID, nothing to delete") # Reset junction entries documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) for doc in documents: doc_id = doc['documentId'] try: await espocrm.update_knowledge_document_junction( knowledge_id, doc_id, { 'syncstatus': 'new', 'aiDocumentId': None }, update_last_sync=False ) except Exception as e: ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}") ctx.logger.info(f"βœ… Deactivation complete, {len(documents)} junction entries reset") return # ═══════════════════════════════════════════════════════════ # CASE 3: PAUSED β†’ Skip Sync # ═══════════════════════════════════════════════════════════ elif activation_status == AIKnowledgeActivationStatus.PAUSED.value: ctx.logger.info("⏸️ Status 'paused' β†’ No sync performed") return # ═══════════════════════════════════════════════════════════ # CASE 4: ACTIVE β†’ Normal Sync (or just created from NEW) # ═══════════════════════════════════════════════════════════ if activation_status in (AIKnowledgeActivationStatus.ACTIVE.value, AIKnowledgeActivationStatus.NEW.value): if not collection_id: ctx.logger.error("❌ Status 'active' but no datenbankId!") raise RuntimeError("Active knowledge without collection ID") if activation_status == AIKnowledgeActivationStatus.ACTIVE.value: ctx.logger.info(f"πŸ”„ Status 'active' β†’ Syncing documents to {collection_id}") # Verify collection exists collection = await xai.get_collection(collection_id) if not collection: ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating") collection = await xai.create_collection( name=knowledge['name'], metadata={ 'espocrm_entity_type': 'CAIKnowledge', 'espocrm_entity_id': knowledge_id } ) collection_id = collection['id'] await espocrm.update_entity('CAIKnowledge', knowledge_id, { 'datenbankId': collection_id }) # Sync documents (both for ACTIVE status and after NEW β†’ ACTIVE transition) await self._sync_knowledge_documents(knowledge_id, collection_id, ctx) elif activation_status not in (AIKnowledgeActivationStatus.DEACTIVATED.value, AIKnowledgeActivationStatus.PAUSED.value): ctx.logger.error(f"❌ Unknown aktivierungsstatus: {activation_status}") raise ValueError(f"Invalid aktivierungsstatus: {activation_status}") finally: await xai.close() async def _sync_knowledge_documents( self, knowledge_id: str, collection_id: str, ctx ) -> None: """ Sync all documents of a knowledge base to XAI collection. Uses efficient JunctionData endpoint to get all documents with junction data and blake3 hashes in a single API call. Hash comparison is always performed. Args: knowledge_id: CAIKnowledge entity ID collection_id: XAI Collection ID ctx: Motia context """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) # ═══════════════════════════════════════════════════════════════ # STEP 1: Load all documents with junction data (single API call) # ═══════════════════════════════════════════════════════════════ ctx.logger.info(f"πŸ“₯ Loading documents with junction data for knowledge {knowledge_id}") documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) ctx.logger.info(f"πŸ“Š Found {len(documents)} document(s)") if not documents: ctx.logger.info("βœ… No documents to sync") return # ═══════════════════════════════════════════════════════════════ # STEP 2: Sync each document based on status/hash # ═══════════════════════════════════════════════════════════════ successful = 0 failed = 0 skipped = 0 # Track aiDocumentIds for orphan detection (collected during sync) synced_file_ids: set = set() for doc in documents: doc_id = doc['documentId'] doc_name = doc.get('documentName', 'Unknown') junction_status = doc.get('syncstatus', 'new') ai_document_id = doc.get('aiDocumentId') blake3_hash = doc.get('blake3hash') ctx.logger.info(f"\nπŸ“„ {doc_name} (ID: {doc_id})") ctx.logger.info(f" Status: {junction_status}") ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}") ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...") try: # Decide if sync needed needs_sync = False reason = "" if junction_status in ['new', 'unclean', 'failed']: needs_sync = True reason = f"status={junction_status}" elif junction_status == 'synced': # Synced status should have both blake3_hash and ai_document_id if not blake3_hash: needs_sync = True reason = "inconsistency: synced but no blake3 hash" ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!") elif not ai_document_id: needs_sync = True reason = "inconsistency: synced but no aiDocumentId" ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!") else: # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) try: xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) if xai_doc_info: xai_blake3 = xai_doc_info.get('blake3_hash') if xai_blake3 != blake3_hash: needs_sync = True reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)" ctx.logger.info(f" πŸ”„ Blake3 mismatch detected!") else: ctx.logger.info(f" βœ… Blake3 hash matches") else: needs_sync = True reason = "file not found in XAI collection" ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!") except Exception as e: needs_sync = True reason = f"verification failed: {e}" ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}") if not needs_sync: ctx.logger.info(f" ⏭️ Skipped (no sync needed)") # Document is already synced, track its aiDocumentId if ai_document_id: synced_file_ids.add(ai_document_id) skipped += 1 continue ctx.logger.info(f" πŸ”„ Syncing: {reason}") # Get complete document entity with attachment info doc_entity = await espocrm.get_entity('CDokumente', doc_id) attachment_id = doc_entity.get('dokumentId') if not attachment_id: ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}") failed += 1 continue # Get attachment details for MIME type and original filename try: attachment = await espocrm.get_entity('Attachment', attachment_id) mime_type = attachment.get('type', 'application/octet-stream') file_size = attachment.get('size', 0) original_filename = attachment.get('name', doc_name) # Original filename with extension # URL-decode filename (fixes special chars like Β§, Γ€, ΓΆ, ΓΌ, etc.) original_filename = unquote(original_filename) except Exception as e: ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults") mime_type = 'application/octet-stream' file_size = 0 original_filename = unquote(doc_name) # Also decode fallback name ctx.logger.info(f" πŸ“Ž Attachment: {attachment_id} ({mime_type}, {file_size} bytes)") ctx.logger.info(f" πŸ“„ Original filename: {original_filename}") # Download document file_content = await espocrm.download_attachment(attachment_id) ctx.logger.info(f" πŸ“₯ Downloaded {len(file_content)} bytes") # Upload to XAI with original filename (includes extension) filename = original_filename xai_file_id = await xai.upload_file(file_content, filename, mime_type) ctx.logger.info(f" πŸ“€ Uploaded to XAI: {xai_file_id}") # Add to collection await xai.add_to_collection(collection_id, xai_file_id) ctx.logger.info(f" βœ… Added to collection {collection_id}") # Update junction await espocrm.update_knowledge_document_junction( knowledge_id, doc_id, { 'aiDocumentId': xai_file_id, 'syncstatus': 'synced' }, update_last_sync=True ) ctx.logger.info(f" βœ… Junction updated") # Track the new aiDocumentId for orphan detection synced_file_ids.add(xai_file_id) successful += 1 except Exception as e: failed += 1 ctx.logger.error(f" ❌ Sync failed: {e}") # Mark as failed in junction try: await espocrm.update_knowledge_document_junction( knowledge_id, doc_id, {'syncstatus': 'failed'}, update_last_sync=False ) except Exception as update_err: ctx.logger.error(f" ❌ Failed to update junction status: {update_err}") # ═══════════════════════════════════════════════════════════════ # STEP 3: Remove orphaned documents from XAI collection # ═══════════════════════════════════════════════════════════════ try: ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...") # Get all files in XAI collection (normalized structure) xai_documents = await xai.list_collection_documents(collection_id) xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')} # Use synced_file_ids (collected during this sync) for orphan detection # This includes both pre-existing synced docs and newly uploaded ones ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced") # Find orphans (in XAI but not in our current sync) orphans = xai_file_ids - synced_file_ids if orphans: ctx.logger.info(f" Found {len(orphans)} orphaned file(s)") for orphan_id in orphans: try: await xai.remove_from_collection(collection_id, orphan_id) ctx.logger.info(f" πŸ—‘οΈ Removed {orphan_id}") except Exception as e: ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") else: ctx.logger.info(f" βœ… No orphans found") except Exception as e: ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}") # ═══════════════════════════════════════════════════════════════ # STEP 4: Summary # ═══════════════════════════════════════════════════════════════ ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info(f"πŸ“Š Sync Statistics:") ctx.logger.info(f" βœ… Synced: {successful}") ctx.logger.info(f" ⏭️ Skipped: {skipped}") ctx.logger.info(f" ❌ Failed: {failed}") ctx.logger.info(f" Mode: Blake3 hash verification enabled") ctx.logger.info("=" * 80) def _calculate_metadata_hash(self, document: Dict) -> str: """ Calculate hash of sync-relevant metadata. Args: document: CDokumente entity Returns: MD5 hash (32 chars) """ metadata = { 'name': document.get('name', ''), 'description': document.get('description', ''), } metadata_str = json.dumps(metadata, sort_keys=True) return hashlib.md5(metadata_str.encode()).hexdigest() def _build_xai_metadata(self, document: Dict) -> Dict[str, str]: """ Build XAI metadata from CDokumente entity. Args: document: CDokumente entity Returns: Metadata dict for XAI """ return { 'document_name': document.get('name', ''), 'description': document.get('description', ''), 'created_at': document.get('createdAt', ''), 'modified_at': document.get('modifiedAt', ''), 'espocrm_id': document.get('id', '') } async def _get_document_download_info( self, document: Dict, ctx ) -> Optional[Dict[str, Any]]: """ Get download info for CDokumente entity. Args: document: CDokumente entity ctx: Motia context Returns: Dict with attachment_id, filename, mime_type """ from services.espocrm import EspoCRMAPI espocrm = EspoCRMAPI(ctx) # Check for dokumentId (CDokumente custom field) attachment_id = None filename = None if document.get('dokumentId'): attachment_id = document.get('dokumentId') filename = document.get('dokumentName') elif document.get('fileId'): attachment_id = document.get('fileId') filename = document.get('fileName') if not attachment_id: ctx.logger.error(f"❌ No attachment ID for document {document['id']}") return None # Get attachment details try: attachment = await espocrm.get_entity('Attachment', attachment_id) return { 'attachment_id': attachment_id, 'filename': filename or attachment.get('name', 'unknown'), 'mime_type': attachment.get('type', 'application/octet-stream') } except Exception as e: ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}") return None