517 lines
23 KiB
Python
517 lines
23 KiB
Python
"""
|
|
AI Knowledge Sync Utilities
|
|
|
|
Utility functions for synchronizing CAIKnowledge entities with XAI Collections:
|
|
- Collection lifecycle management (create, delete)
|
|
- Document synchronization with BLAKE3 hash verification
|
|
- Metadata-only updates via PATCH
|
|
- Orphan detection and cleanup
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
from typing import Dict, Any, Optional, List, Tuple
|
|
from datetime import datetime
|
|
|
|
from services.sync_utils_base import BaseSyncUtils
|
|
from services.models import (
|
|
AIKnowledgeActivationStatus,
|
|
AIKnowledgeSyncStatus,
|
|
JunctionSyncStatus
|
|
)
|
|
|
|
|
|
class AIKnowledgeSync(BaseSyncUtils):
|
|
"""Utility class for AI Knowledge ↔ XAI Collections synchronization"""
|
|
|
|
def _get_lock_key(self, entity_id: str) -> str:
|
|
"""Redis lock key for AI Knowledge entities"""
|
|
return f"sync_lock:aiknowledge:{entity_id}"
|
|
|
|
async def acquire_sync_lock(self, knowledge_id: str) -> bool:
|
|
"""
|
|
Acquire distributed lock via Redis + update EspoCRM syncStatus.
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
|
|
Returns:
|
|
True if lock acquired, False if already locked
|
|
"""
|
|
try:
|
|
# STEP 1: Atomic Redis lock
|
|
lock_key = self._get_lock_key(knowledge_id)
|
|
if not self._acquire_redis_lock(lock_key):
|
|
self._log(f"Redis lock already active for {knowledge_id}", level='warn')
|
|
return False
|
|
|
|
# STEP 2: Update syncStatus to pending_sync
|
|
try:
|
|
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
|
|
'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value
|
|
})
|
|
except Exception as e:
|
|
self._log(f"Could not set syncStatus: {e}", level='debug')
|
|
|
|
self._log(f"Sync lock acquired for {knowledge_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._log(f"Error acquiring lock: {e}", level='error')
|
|
# Clean up Redis lock on error
|
|
lock_key = self._get_lock_key(knowledge_id)
|
|
self._release_redis_lock(lock_key)
|
|
return False
|
|
|
|
async def release_sync_lock(
|
|
self,
|
|
knowledge_id: str,
|
|
success: bool = True,
|
|
error_message: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Release sync lock and set final status.
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
success: Whether sync succeeded
|
|
error_message: Optional error message
|
|
"""
|
|
try:
|
|
update_data = {
|
|
'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value
|
|
}
|
|
|
|
if success:
|
|
update_data['lastSync'] = datetime.now().isoformat()
|
|
update_data['syncError'] = None
|
|
elif error_message:
|
|
update_data['syncError'] = error_message[:2000]
|
|
|
|
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data)
|
|
|
|
self._log(f"Sync lock released: {knowledge_id} → {'success' if success else 'failed'}")
|
|
|
|
# Release Redis lock
|
|
lock_key = self._get_lock_key(knowledge_id)
|
|
self._release_redis_lock(lock_key)
|
|
|
|
except Exception as e:
|
|
self._log(f"Error releasing lock: {e}", level='error')
|
|
# Ensure Redis lock is released
|
|
lock_key = self._get_lock_key(knowledge_id)
|
|
self._release_redis_lock(lock_key)
|
|
|
|
async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None:
|
|
"""
|
|
Main sync orchestrator with activation status handling.
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
ctx: Motia context for logging
|
|
"""
|
|
from services.espocrm import EspoCRMAPI
|
|
from services.xai_service import XAIService
|
|
|
|
espocrm = EspoCRMAPI(ctx)
|
|
xai = XAIService(ctx)
|
|
|
|
try:
|
|
# 1. Load knowledge entity
|
|
knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id)
|
|
|
|
activation_status = knowledge.get('activationStatus')
|
|
collection_id = knowledge.get('datenbankId')
|
|
|
|
ctx.logger.info("=" * 80)
|
|
ctx.logger.info(f"📋 Processing: {knowledge['name']}")
|
|
ctx.logger.info(f" activationStatus: {activation_status}")
|
|
ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}")
|
|
ctx.logger.info("=" * 80)
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# CASE 1: NEW → Create Collection
|
|
# ═══════════════════════════════════════════════════════════
|
|
if activation_status == AIKnowledgeActivationStatus.NEW.value:
|
|
ctx.logger.info("🆕 Status 'new' → Creating XAI Collection")
|
|
|
|
collection = await xai.create_collection(
|
|
name=knowledge['name'],
|
|
metadata={
|
|
'espocrm_entity_type': 'CAIKnowledge',
|
|
'espocrm_entity_id': knowledge_id,
|
|
'created_at': datetime.now().isoformat()
|
|
}
|
|
)
|
|
|
|
collection_id = collection['id']
|
|
|
|
# Update EspoCRM: Set datenbankId + change status to 'active'
|
|
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
|
|
'datenbankId': collection_id,
|
|
'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value,
|
|
'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value
|
|
})
|
|
|
|
ctx.logger.info(f"✅ Collection created: {collection_id}")
|
|
ctx.logger.info(" Status changed to 'active', next webhook will sync documents")
|
|
return
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# CASE 2: DEACTIVATED → Delete Collection from XAI
|
|
# ═══════════════════════════════════════════════════════════
|
|
elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value:
|
|
ctx.logger.info("🗑️ Status 'deactivated' → Deleting XAI Collection")
|
|
|
|
if collection_id:
|
|
try:
|
|
await xai.delete_collection(collection_id)
|
|
ctx.logger.info(f"✅ Collection deleted from XAI: {collection_id}")
|
|
except Exception as e:
|
|
ctx.logger.error(f"❌ Failed to delete collection: {e}")
|
|
else:
|
|
ctx.logger.info("⏭️ No collection ID, nothing to delete")
|
|
|
|
# Reset junction entries
|
|
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
|
|
|
|
for doc in documents:
|
|
doc_id = doc['documentId']
|
|
try:
|
|
await espocrm.update_knowledge_document_junction(
|
|
knowledge_id,
|
|
doc_id,
|
|
{
|
|
'syncstatus': 'new',
|
|
'aiDocumentId': None
|
|
},
|
|
update_last_sync=False
|
|
)
|
|
except Exception as e:
|
|
ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}")
|
|
|
|
ctx.logger.info(f"✅ Deactivation complete, {len(documents)} junction entries reset")
|
|
return
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# CASE 3: PAUSED → Skip Sync
|
|
# ═══════════════════════════════════════════════════════════
|
|
elif activation_status == AIKnowledgeActivationStatus.PAUSED.value:
|
|
ctx.logger.info("⏸️ Status 'paused' → No sync performed")
|
|
return
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# CASE 4: ACTIVE → Normal Sync
|
|
# ═══════════════════════════════════════════════════════════
|
|
elif activation_status == AIKnowledgeActivationStatus.ACTIVE.value:
|
|
if not collection_id:
|
|
ctx.logger.error("❌ Status 'active' but no datenbankId!")
|
|
raise RuntimeError("Active knowledge without collection ID")
|
|
|
|
ctx.logger.info(f"🔄 Status 'active' → Syncing documents to {collection_id}")
|
|
|
|
# Verify collection exists
|
|
collection = await xai.get_collection(collection_id)
|
|
if not collection:
|
|
ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating")
|
|
collection = await xai.create_collection(
|
|
name=knowledge['name'],
|
|
metadata={
|
|
'espocrm_entity_type': 'CAIKnowledge',
|
|
'espocrm_entity_id': knowledge_id
|
|
}
|
|
)
|
|
collection_id = collection['id']
|
|
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
|
|
'datenbankId': collection_id
|
|
})
|
|
|
|
# Sync documents
|
|
await self._sync_knowledge_documents(knowledge_id, collection_id, ctx, full_sync=full_sync)
|
|
|
|
else:
|
|
ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}")
|
|
raise ValueError(f"Invalid activationStatus: {activation_status}")
|
|
|
|
finally:
|
|
await xai.close()
|
|
|
|
async def _sync_knowledge_documents(
|
|
self,
|
|
knowledge_id: str,
|
|
collection_id: str,
|
|
ctx,
|
|
full_sync: bool = False
|
|
) -> None:
|
|
"""
|
|
Sync all documents of a knowledge base to XAI collection.
|
|
|
|
Uses efficient JunctionData endpoint to get all documents with junction data
|
|
and blake3 hashes in a single API call.
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
collection_id: XAI Collection ID
|
|
ctx: Motia context
|
|
full_sync: If True, force Blake3 hash comparison for all documents (nightly cron)
|
|
"""
|
|
from services.espocrm import EspoCRMAPI
|
|
from services.xai_service import XAIService
|
|
|
|
espocrm = EspoCRMAPI(ctx)
|
|
xai = XAIService(ctx)
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STEP 1: Load all documents with junction data (single API call)
|
|
# ═══════════════════════════════════════════════════════════════
|
|
ctx.logger.info(f"📥 Loading documents with junction data for knowledge {knowledge_id}")
|
|
|
|
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
|
|
|
|
ctx.logger.info(f"📊 Found {len(documents)} document(s)")
|
|
|
|
if not documents:
|
|
ctx.logger.info("✅ No documents to sync")
|
|
return
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STEP 2: Sync each document based on status/hash
|
|
# ═══════════════════════════════════════════════════════════════
|
|
successful = 0
|
|
failed = 0
|
|
skipped = 0
|
|
|
|
for doc in documents:
|
|
doc_id = doc['documentId']
|
|
doc_name = doc.get('documentName', 'Unknown')
|
|
junction_status = doc.get('syncstatus', 'new')
|
|
ai_document_id = doc.get('aiDocumentId')
|
|
blake3_hash = doc.get('blake3hash')
|
|
|
|
ctx.logger.info(f"\n📄 {doc_name} (ID: {doc_id})")
|
|
ctx.logger.info(f" Status: {junction_status}")
|
|
ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}")
|
|
ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...")
|
|
|
|
try:
|
|
# Decide if sync needed
|
|
needs_sync = False
|
|
reason = ""
|
|
|
|
if junction_status in ['new', 'unclean', 'failed']:
|
|
needs_sync = True
|
|
reason = f"status={junction_status}"
|
|
elif full_sync and blake3_hash and ai_document_id:
|
|
# Full sync mode: verify Blake3 hash with XAI
|
|
try:
|
|
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
|
|
if xai_doc_info:
|
|
xai_blake3 = xai_doc_info.get('blake3_hash')
|
|
|
|
if xai_blake3 != blake3_hash:
|
|
needs_sync = True
|
|
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)"
|
|
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
|
|
else:
|
|
ctx.logger.info(f" ✅ Blake3 hash matches")
|
|
else:
|
|
needs_sync = True
|
|
reason = "file not found in XAI collection"
|
|
except Exception as e:
|
|
ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}")
|
|
|
|
if not needs_sync:
|
|
ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
|
|
skipped += 1
|
|
continue
|
|
|
|
ctx.logger.info(f" 🔄 Syncing: {reason}")
|
|
|
|
# Get complete document entity with attachment info
|
|
doc_entity = await espocrm.get_entity('CDokumente', doc_id)
|
|
attachment_id = doc_entity.get('dokumentId')
|
|
|
|
if not attachment_id:
|
|
ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}")
|
|
failed += 1
|
|
continue
|
|
|
|
# Get attachment details for MIME type
|
|
try:
|
|
attachment = await espocrm.get_entity('Attachment', attachment_id)
|
|
mime_type = attachment.get('type', 'application/octet-stream')
|
|
file_size = attachment.get('size', 0)
|
|
except Exception as e:
|
|
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
|
|
mime_type = 'application/octet-stream'
|
|
file_size = 0
|
|
|
|
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
|
|
|
|
# Download document
|
|
file_content = await espocrm.download_attachment(attachment_id)
|
|
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
|
|
|
|
# Upload to XAI
|
|
filename = doc_name
|
|
|
|
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
|
|
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
|
|
|
|
# Add to collection
|
|
await xai.add_to_collection(collection_id, xai_file_id)
|
|
ctx.logger.info(f" ✅ Added to collection {collection_id}")
|
|
|
|
# Update junction
|
|
await espocrm.update_knowledge_document_junction(
|
|
knowledge_id,
|
|
doc_id,
|
|
{
|
|
'aiDocumentId': xai_file_id,
|
|
'syncstatus': 'synced'
|
|
},
|
|
update_last_sync=True
|
|
)
|
|
ctx.logger.info(f" ✅ Junction updated")
|
|
|
|
successful += 1
|
|
|
|
except Exception as e:
|
|
failed += 1
|
|
ctx.logger.error(f" ❌ Sync failed: {e}")
|
|
|
|
# Mark as failed in junction
|
|
try:
|
|
await espocrm.update_knowledge_document_junction(
|
|
knowledge_id,
|
|
doc_id,
|
|
{'syncstatus': 'failed'},
|
|
update_last_sync=False
|
|
)
|
|
except Exception as update_err:
|
|
ctx.logger.error(f" ❌ Failed to update junction status: {update_err}")
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STEP 3: Remove orphaned documents from XAI collection
|
|
# ═══════════════════════════════════════════════════════════════
|
|
try:
|
|
ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...")
|
|
|
|
# Get all files in XAI collection (normalized structure)
|
|
xai_documents = await xai.list_collection_documents(collection_id)
|
|
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
|
|
|
|
# Get all ai_document_ids from junction
|
|
junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')}
|
|
|
|
# Find orphans (in XAI but not in junction)
|
|
orphans = xai_file_ids - junction_file_ids
|
|
|
|
if orphans:
|
|
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
|
|
for orphan_id in orphans:
|
|
try:
|
|
await xai.remove_from_collection(collection_id, orphan_id)
|
|
ctx.logger.info(f" 🗑️ Removed {orphan_id}")
|
|
except Exception as e:
|
|
ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}")
|
|
else:
|
|
ctx.logger.info(f" ✅ No orphans found")
|
|
|
|
except Exception as e:
|
|
ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}")
|
|
|
|
# ═══════════════════════════════════════════════════════════════
|
|
# STEP 4: Summary
|
|
# ═══════════════════════════════════════════════════════════════
|
|
ctx.logger.info("")
|
|
ctx.logger.info("=" * 80)
|
|
ctx.logger.info(f"📊 Sync Statistics:")
|
|
ctx.logger.info(f" ✅ Synced: {successful}")
|
|
ctx.logger.info(f" ⏭️ Skipped: {skipped}")
|
|
ctx.logger.info(f" ❌ Failed: {failed}")
|
|
ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}")
|
|
ctx.logger.info("=" * 80)
|
|
|
|
def _calculate_metadata_hash(self, document: Dict) -> str:
|
|
"""
|
|
Calculate hash of sync-relevant metadata.
|
|
|
|
Args:
|
|
document: CDokumente entity
|
|
|
|
Returns:
|
|
MD5 hash (32 chars)
|
|
"""
|
|
metadata = {
|
|
'name': document.get('name', ''),
|
|
'description': document.get('description', ''),
|
|
}
|
|
|
|
metadata_str = json.dumps(metadata, sort_keys=True)
|
|
return hashlib.md5(metadata_str.encode()).hexdigest()
|
|
|
|
def _build_xai_metadata(self, document: Dict) -> Dict[str, str]:
|
|
"""
|
|
Build XAI metadata from CDokumente entity.
|
|
|
|
Args:
|
|
document: CDokumente entity
|
|
|
|
Returns:
|
|
Metadata dict for XAI
|
|
"""
|
|
return {
|
|
'document_name': document.get('name', ''),
|
|
'description': document.get('description', ''),
|
|
'created_at': document.get('createdAt', ''),
|
|
'modified_at': document.get('modifiedAt', ''),
|
|
'espocrm_id': document.get('id', '')
|
|
}
|
|
|
|
async def _get_document_download_info(
|
|
self,
|
|
document: Dict,
|
|
ctx
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get download info for CDokumente entity.
|
|
|
|
Args:
|
|
document: CDokumente entity
|
|
ctx: Motia context
|
|
|
|
Returns:
|
|
Dict with attachment_id, filename, mime_type
|
|
"""
|
|
from services.espocrm import EspoCRMAPI
|
|
|
|
espocrm = EspoCRMAPI(ctx)
|
|
|
|
# Check for dokumentId (CDokumente custom field)
|
|
attachment_id = None
|
|
filename = None
|
|
|
|
if document.get('dokumentId'):
|
|
attachment_id = document.get('dokumentId')
|
|
filename = document.get('dokumentName')
|
|
elif document.get('fileId'):
|
|
attachment_id = document.get('fileId')
|
|
filename = document.get('fileName')
|
|
|
|
if not attachment_id:
|
|
ctx.logger.error(f"❌ No attachment ID for document {document['id']}")
|
|
return None
|
|
|
|
# Get attachment details
|
|
try:
|
|
attachment = await espocrm.get_entity('Attachment', attachment_id)
|
|
return {
|
|
'attachment_id': attachment_id,
|
|
'filename': filename or attachment.get('name', 'unknown'),
|
|
'mime_type': attachment.get('type', 'application/octet-stream')
|
|
}
|
|
except Exception as e:
|
|
ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}")
|
|
return None
|