Files
motia-iii/services/aiknowledge_sync_utils.py

515 lines
23 KiB
Python

"""
AI Knowledge Sync Utilities
Utility functions for synchronizing CAIKnowledge entities with XAI Collections:
- Collection lifecycle management (create, delete)
- Document synchronization with BLAKE3 hash verification
- Metadata-only updates via PATCH
- Orphan detection and cleanup
"""
import hashlib
import json
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from services.sync_utils_base import BaseSyncUtils
from services.models import (
AIKnowledgeActivationStatus,
AIKnowledgeSyncStatus,
JunctionSyncStatus
)
class AIKnowledgeSync(BaseSyncUtils):
"""Utility class for AI Knowledge ↔ XAI Collections synchronization"""
def _get_lock_key(self, entity_id: str) -> str:
"""Redis lock key for AI Knowledge entities"""
return f"sync_lock:aiknowledge:{entity_id}"
async def acquire_sync_lock(self, knowledge_id: str) -> bool:
"""
Acquire distributed lock via Redis + update EspoCRM syncStatus.
Args:
knowledge_id: CAIKnowledge entity ID
Returns:
True if lock acquired, False if already locked
"""
try:
# STEP 1: Atomic Redis lock
lock_key = self._get_lock_key(knowledge_id)
if not self._acquire_redis_lock(lock_key):
self._log(f"Redis lock already active for {knowledge_id}", level='warn')
return False
# STEP 2: Update syncStatus to pending_sync
try:
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value
})
except Exception as e:
self._log(f"Could not set syncStatus: {e}", level='debug')
self._log(f"Sync lock acquired for {knowledge_id}")
return True
except Exception as e:
self._log(f"Error acquiring lock: {e}", level='error')
# Clean up Redis lock on error
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
return False
async def release_sync_lock(
self,
knowledge_id: str,
success: bool = True,
error_message: Optional[str] = None
) -> None:
"""
Release sync lock and set final status.
Args:
knowledge_id: CAIKnowledge entity ID
success: Whether sync succeeded
error_message: Optional error message
"""
try:
update_data = {
'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value
}
if success:
update_data['lastSync'] = datetime.now().isoformat()
update_data['syncError'] = None
elif error_message:
update_data['syncError'] = error_message[:2000]
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data)
self._log(f"Sync lock released: {knowledge_id}{'success' if success else 'failed'}")
# Release Redis lock
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
except Exception as e:
self._log(f"Error releasing lock: {e}", level='error')
# Ensure Redis lock is released
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None:
"""
Main sync orchestrator with activation status handling.
Args:
knowledge_id: CAIKnowledge entity ID
ctx: Motia context for logging
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
try:
# 1. Load knowledge entity
knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id)
activation_status = knowledge.get('aktivierungsstatus')
collection_id = knowledge.get('datenbankId')
ctx.logger.info("=" * 80)
ctx.logger.info(f"📋 Processing: {knowledge['name']}")
ctx.logger.info(f" aktivierungsstatus: {activation_status}")
ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}")
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════
# CASE 1: NEW → Create Collection
# ═══════════════════════════════════════════════════════════
if activation_status == AIKnowledgeActivationStatus.NEW.value:
ctx.logger.info("🆕 Status 'new' → Creating XAI Collection")
collection = await xai.create_collection(
name=knowledge['name'],
metadata={
'espocrm_entity_type': 'CAIKnowledge',
'espocrm_entity_id': knowledge_id,
'created_at': datetime.now().isoformat()
}
)
collection_id = collection['id']
# Update EspoCRM: Set datenbankId + change status to 'active'
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': collection_id,
'aktivierungsstatus': AIKnowledgeActivationStatus.ACTIVE.value,
'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value
})
ctx.logger.info(f"✅ Collection created: {collection_id}")
ctx.logger.info(" Status changed to 'active', next webhook will sync documents")
return
# ═══════════════════════════════════════════════════════════
# CASE 2: DEACTIVATED → Delete Collection from XAI
# ═══════════════════════════════════════════════════════════
elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value:
ctx.logger.info("🗑️ Status 'deactivated' → Deleting XAI Collection")
if collection_id:
try:
await xai.delete_collection(collection_id)
ctx.logger.info(f"✅ Collection deleted from XAI: {collection_id}")
except Exception as e:
ctx.logger.error(f"❌ Failed to delete collection: {e}")
else:
ctx.logger.info("⏭️ No collection ID, nothing to delete")
# Reset junction entries
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
for doc in documents:
doc_id = doc['documentId']
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'syncstatus': 'new',
'aiDocumentId': None
},
update_last_sync=False
)
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}")
ctx.logger.info(f"✅ Deactivation complete, {len(documents)} junction entries reset")
return
# ═══════════════════════════════════════════════════════════
# CASE 3: PAUSED → Skip Sync
# ═══════════════════════════════════════════════════════════
elif activation_status == AIKnowledgeActivationStatus.PAUSED.value:
ctx.logger.info("⏸️ Status 'paused' → No sync performed")
return
# ═══════════════════════════════════════════════════════════
# CASE 4: ACTIVE → Normal Sync
# ═══════════════════════════════════════════════════════════
elif activation_status == AIKnowledgeActivationStatus.ACTIVE.value:
if not collection_id:
ctx.logger.error("❌ Status 'active' but no datenbankId!")
raise RuntimeError("Active knowledge without collection ID")
ctx.logger.info(f"🔄 Status 'active' → Syncing documents to {collection_id}")
# Verify collection exists
collection = await xai.get_collection(collection_id)
if not collection:
ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating")
collection = await xai.create_collection(
name=knowledge['name'],
metadata={
'espocrm_entity_type': 'CAIKnowledge',
'espocrm_entity_id': knowledge_id
}
)
collection_id = collection['id']
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': collection_id
})
# Sync documents
await self._sync_knowledge_documents(knowledge_id, collection_id, ctx)
else:
ctx.logger.error(f"❌ Unknown aktivierungsstatus: {activation_status}")
raise ValueError(f"Invalid aktivierungsstatus: {activation_status}")
finally:
await xai.close()
async def _sync_knowledge_documents(
self,
knowledge_id: str,
collection_id: str,
ctx
) -> None:
"""
Sync all documents of a knowledge base to XAI collection.
Uses efficient JunctionData endpoint to get all documents with junction data
and blake3 hashes in a single API call. Hash comparison is always performed.
Args:
knowledge_id: CAIKnowledge entity ID
collection_id: XAI Collection ID
ctx: Motia context
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
# ═══════════════════════════════════════════════════════════════
# STEP 1: Load all documents with junction data (single API call)
# ═══════════════════════════════════════════════════════════════
ctx.logger.info(f"📥 Loading documents with junction data for knowledge {knowledge_id}")
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
ctx.logger.info(f"📊 Found {len(documents)} document(s)")
if not documents:
ctx.logger.info("✅ No documents to sync")
return
# ═══════════════════════════════════════════════════════════════
# STEP 2: Sync each document based on status/hash
# ═══════════════════════════════════════════════════════════════
successful = 0
failed = 0
skipped = 0
for doc in documents:
doc_id = doc['documentId']
doc_name = doc.get('documentName', 'Unknown')
junction_status = doc.get('syncstatus', 'new')
ai_document_id = doc.get('aiDocumentId')
blake3_hash = doc.get('blake3hash')
ctx.logger.info(f"\n📄 {doc_name} (ID: {doc_id})")
ctx.logger.info(f" Status: {junction_status}")
ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}")
ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...")
try:
# Decide if sync needed
needs_sync = False
reason = ""
if junction_status in ['new', 'unclean', 'failed']:
needs_sync = True
reason = f"status={junction_status}"
elif junction_status == 'synced' and blake3_hash and ai_document_id:
# Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
try:
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
if xai_doc_info:
xai_blake3 = xai_doc_info.get('blake3_hash')
if xai_blake3 != blake3_hash:
needs_sync = True
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)"
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
else:
ctx.logger.info(f" ✅ Blake3 hash matches")
else:
needs_sync = True
reason = "file not found in XAI collection"
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}")
if not needs_sync:
ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
skipped += 1
continue
ctx.logger.info(f" 🔄 Syncing: {reason}")
# Get complete document entity with attachment info
doc_entity = await espocrm.get_entity('CDokumente', doc_id)
attachment_id = doc_entity.get('dokumentId')
if not attachment_id:
ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}")
failed += 1
continue
# Get attachment details for MIME type
try:
attachment = await espocrm.get_entity('Attachment', attachment_id)
mime_type = attachment.get('type', 'application/octet-stream')
file_size = attachment.get('size', 0)
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
mime_type = 'application/octet-stream'
file_size = 0
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
# Download document
file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
# Upload to XAI
filename = doc_name
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
# Add to collection
await xai.add_to_collection(collection_id, xai_file_id)
ctx.logger.info(f" ✅ Added to collection {collection_id}")
# Update junction
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'aiDocumentId': xai_file_id,
'syncstatus': 'synced'
},
update_last_sync=True
)
ctx.logger.info(f" ✅ Junction updated")
successful += 1
except Exception as e:
failed += 1
ctx.logger.error(f" ❌ Sync failed: {e}")
# Mark as failed in junction
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{'syncstatus': 'failed'},
update_last_sync=False
)
except Exception as update_err:
ctx.logger.error(f" ❌ Failed to update junction status: {update_err}")
# ═══════════════════════════════════════════════════════════════
# STEP 3: Remove orphaned documents from XAI collection
# ═══════════════════════════════════════════════════════════════
try:
ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...")
# Get all files in XAI collection (normalized structure)
xai_documents = await xai.list_collection_documents(collection_id)
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
# Get all ai_document_ids from junction
junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')}
# Find orphans (in XAI but not in junction)
orphans = xai_file_ids - junction_file_ids
if orphans:
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
for orphan_id in orphans:
try:
await xai.remove_from_collection(collection_id, orphan_id)
ctx.logger.info(f" 🗑️ Removed {orphan_id}")
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}")
else:
ctx.logger.info(f" ✅ No orphans found")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}")
# ═══════════════════════════════════════════════════════════════
# STEP 4: Summary
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info(f"📊 Sync Statistics:")
ctx.logger.info(f" ✅ Synced: {successful}")
ctx.logger.info(f" ⏭️ Skipped: {skipped}")
ctx.logger.info(f" ❌ Failed: {failed}")
ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}")
ctx.logger.info("=" * 80)
def _calculate_metadata_hash(self, document: Dict) -> str:
"""
Calculate hash of sync-relevant metadata.
Args:
document: CDokumente entity
Returns:
MD5 hash (32 chars)
"""
metadata = {
'name': document.get('name', ''),
'description': document.get('description', ''),
}
metadata_str = json.dumps(metadata, sort_keys=True)
return hashlib.md5(metadata_str.encode()).hexdigest()
def _build_xai_metadata(self, document: Dict) -> Dict[str, str]:
"""
Build XAI metadata from CDokumente entity.
Args:
document: CDokumente entity
Returns:
Metadata dict for XAI
"""
return {
'document_name': document.get('name', ''),
'description': document.get('description', ''),
'created_at': document.get('createdAt', ''),
'modified_at': document.get('modifiedAt', ''),
'espocrm_id': document.get('id', '')
}
async def _get_document_download_info(
self,
document: Dict,
ctx
) -> Optional[Dict[str, Any]]:
"""
Get download info for CDokumente entity.
Args:
document: CDokumente entity
ctx: Motia context
Returns:
Dict with attachment_id, filename, mime_type
"""
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
# Check for dokumentId (CDokumente custom field)
attachment_id = None
filename = None
if document.get('dokumentId'):
attachment_id = document.get('dokumentId')
filename = document.get('dokumentName')
elif document.get('fileId'):
attachment_id = document.get('fileId')
filename = document.get('fileName')
if not attachment_id:
ctx.logger.error(f"❌ No attachment ID for document {document['id']}")
return None
# Get attachment details
try:
attachment = await espocrm.get_entity('Attachment', attachment_id)
return {
'attachment_id': attachment_id,
'filename': filename or attachment.get('name', 'unknown'),
'mime_type': attachment.get('type', 'application/octet-stream')
}
except Exception as e:
ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}")
return None