feat: Enhance document synchronization by integrating CAIKnowledge handling and improving error logging

This commit is contained in:
bsiggel
2026-03-12 22:30:11 +00:00
parent 8ed7cca432
commit 6bf2343a12
6 changed files with 492 additions and 362 deletions

View File

@@ -172,20 +172,25 @@ class AIKnowledgeSync(BaseSyncUtils):
else:
ctx.logger.info("⏭️ No collection ID, nothing to delete")
# Update junction entries
junction_entries = await espocrm.get_junction_entries(
'CAIKnowledgeCDokumente',
'cAIKnowledgeId',
knowledge_id
)
# Reset junction entries
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
for doc in documents:
doc_id = doc['documentId']
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'syncstatus': 'new',
'aiDocumentId': None
},
update_last_sync=False
)
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}")
for junction in junction_entries:
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], {
'syncstatus': JunctionSyncStatus.NEW.value,
'aiDocumentId': None
})
ctx.logger.info(f"✅ Deactivation complete, {len(junction_entries)} junction entries reset")
ctx.logger.info(f"✅ Deactivation complete, {len(documents)} junction entries reset")
return
# ═══════════════════════════════════════════════════════════
@@ -235,15 +240,20 @@ class AIKnowledgeSync(BaseSyncUtils):
self,
knowledge_id: str,
collection_id: str,
ctx
ctx,
full_sync: bool = False
) -> None:
"""
Sync all documents of a knowledge base to XAI collection.
Uses efficient JunctionData endpoint to get all documents with junction data
and blake3 hashes in a single API call.
Args:
knowledge_id: CAIKnowledge entity ID
collection_id: XAI Collection ID
ctx: Motia context
full_sync: If True, force Blake3 hash comparison for all documents (nightly cron)
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
@@ -251,294 +261,159 @@ class AIKnowledgeSync(BaseSyncUtils):
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
# Load junction entries
junction_entries = await espocrm.get_junction_entries(
'CAIKnowledgeCDokumente',
'cAIKnowledgeId',
knowledge_id
)
# ═══════════════════════════════════════════════════════════════
# STEP 1: Load all documents with junction data (single API call)
# ═══════════════════════════════════════════════════════════════
ctx.logger.info(f"📥 Loading documents with junction data for knowledge {knowledge_id}")
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
ctx.logger.info(f"📊 Found {len(documents)} document(s)")
ctx.logger.info(f"📊 Found {len(junction_entries)} junction entries")
if not junction_entries:
if not documents:
ctx.logger.info("✅ No documents to sync")
return
# Load documents
documents = {}
for junction in junction_entries:
doc_id = junction['cDokumenteId']
try:
doc = await espocrm.get_entity('CDokumente', doc_id)
documents[doc_id] = doc
except Exception as e:
ctx.logger.error(f"❌ Failed to load document {doc_id}: {e}")
ctx.logger.info(f"📊 Loaded {len(documents)}/{len(junction_entries)} documents")
# Sync each document
# ═══════════════════════════════════════════════════════════════
# STEP 2: Sync each document based on status/hash
# ═══════════════════════════════════════════════════════════════
successful = 0
failed = 0
skipped = 0
for junction in junction_entries:
doc_id = junction['cDokumenteId']
document = documents.get(doc_id)
if not document:
failed += 1
continue
for doc in documents:
doc_id = doc['documentId']
doc_name = doc.get('documentName', 'Unknown')
junction_status = doc.get('syncstatus', 'new')
ai_document_id = doc.get('aiDocumentId')
blake3_hash = doc.get('blake3hash')
ctx.logger.info(f"\n📄 {doc_name} (ID: {doc_id})")
ctx.logger.info(f" Status: {junction_status}")
ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}")
ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...")
try:
synced = await self._sync_single_document(junction, document, collection_id, ctx)
if synced:
successful += 1
else:
# Decide if sync needed
needs_sync = False
reason = ""
if junction_status in ['new', 'unclean', 'failed']:
needs_sync = True
reason = f"status={junction_status}"
elif full_sync and blake3_hash and ai_document_id:
# Full sync mode: verify Blake3 hash with XAI
try:
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
if xai_doc_info:
xai_blake3 = xai_doc_info.get('blake3_hash')
if xai_blake3 != blake3_hash:
needs_sync = True
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)"
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
else:
ctx.logger.info(f" ✅ Blake3 hash matches")
else:
needs_sync = True
reason = "file not found in XAI collection"
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}")
if not needs_sync:
ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
skipped += 1
continue
ctx.logger.info(f" 🔄 Syncing: {reason}")
# Download document
attachment_id = doc.get('documentId') # TODO: Get correct attachment ID from CDokumente
file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
# Upload to XAI
filename = doc_name
mime_type = 'application/octet-stream' # TODO: Get from attachment
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
# Add to collection
await xai.add_to_collection(collection_id, xai_file_id)
ctx.logger.info(f" ✅ Added to collection {collection_id}")
# Update junction
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'aiDocumentId': xai_file_id,
'syncstatus': 'synced'
},
update_last_sync=True
)
ctx.logger.info(f" ✅ Junction updated")
successful += 1
except Exception as e:
failed += 1
ctx.logger.error(f"❌ Failed to sync document {doc_id}: {e}")
ctx.logger.error(f" ❌ Sync failed: {e}")
# Mark as failed in junction
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{'syncstatus': 'failed'},
update_last_sync=False
)
except Exception as update_err:
ctx.logger.error(f" ❌ Failed to update junction status: {update_err}")
# Mark as failed
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], {
'syncstatus': JunctionSyncStatus.FAILED.value
})
# Remove orphans
# ═══════════════════════════════════════════════════════════════
# STEP 3: Remove orphaned documents from XAI collection
# ═══════════════════════════════════════════════════════════════
try:
await self._remove_orphaned_documents(collection_id, junction_entries, ctx)
ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...")
# Get all files in XAI collection (normalized structure)
xai_documents = await xai.list_collection_documents(collection_id)
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
# Get all ai_document_ids from junction
junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')}
# Find orphans (in XAI but not in junction)
orphans = xai_file_ids - junction_file_ids
if orphans:
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
for orphan_id in orphans:
try:
await xai.remove_from_collection(collection_id, orphan_id)
ctx.logger.info(f" 🗑️ Removed {orphan_id}")
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}")
else:
ctx.logger.info(f" ✅ No orphans found")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to remove orphans: {e}")
ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}")
# Summary
# ═══════════════════════════════════════════════════════════════
# STEP 4: Summary
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info(f"📊 Sync Statistics:")
ctx.logger.info(f" ✅ Synced: {successful}")
ctx.logger.info(f" ⏭️ Skipped: {skipped}")
ctx.logger.info(f" ❌ Failed: {failed}")
ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}")
ctx.logger.info("=" * 80)
async def _sync_single_document(
self,
junction_entry: Dict,
document: Dict,
collection_id: str,
ctx
) -> bool:
"""
Sync one document to XAI Collection with BLAKE3 verification.
Args:
junction_entry: Junction table entry
document: CDokumente entity
collection_id: XAI Collection ID
ctx: Motia context
Returns:
True if synced, False if skipped
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
junction_id = junction_entry['id']
junction_status = junction_entry.get('syncstatus')
junction_ai_doc_id = junction_entry.get('aiDocumentId')
# 1. Check MIME type support
mime_type = document.get('mimeType') or 'application/octet-stream'
if not xai.is_mime_type_supported(mime_type):
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, {
'syncstatus': JunctionSyncStatus.UNSUPPORTED.value
})
ctx.logger.info(f"⏭️ Unsupported MIME: {document['name']}")
return False
# 2. Calculate hashes
current_file_hash = document.get('md5') or document.get('sha256')
if not current_file_hash:
ctx.logger.error(f"❌ No hash for document {document['id']}")
return False
current_metadata_hash = self._calculate_metadata_hash(document)
synced_file_hash = junction_entry.get('syncedHash')
synced_metadata_hash = junction_entry.get('syncedMetadataHash')
xai_blake3_hash = junction_entry.get('xaiBlake3Hash')
# 3. Determine changes
file_changed = (current_file_hash != synced_file_hash)
metadata_changed = (current_metadata_hash != synced_metadata_hash)
ctx.logger.info(f"📋 {document['name']}")
ctx.logger.info(f" File changed: {file_changed}, Metadata changed: {metadata_changed}")
# 4. Early return if nothing changed
if junction_status == JunctionSyncStatus.SYNCED.value and junction_ai_doc_id:
if not file_changed and not metadata_changed:
# Verify document still exists in XAI
try:
doc_info = await xai.get_collection_document(collection_id, junction_ai_doc_id)
if doc_info:
ctx.logger.info(f" ✅ Already synced (verified)")
return False
else:
ctx.logger.warn(f" ⚠️ Document missing in XAI, re-uploading")
except Exception as e:
ctx.logger.warn(f" ⚠️ Could not verify: {e}")
# 5. Handle file content change (re-upload)
if file_changed or not junction_ai_doc_id:
ctx.logger.info(f" 🔄 {'File changed' if file_changed else 'New file'}, uploading")
# Download from EspoCRM
download_info = await self._get_document_download_info(document, ctx)
if not download_info:
raise RuntimeError(f"Cannot download document {document['id']}")
file_content = await espocrm.download_attachment(download_info['attachment_id'])
# Build metadata
metadata = self._build_xai_metadata(document)
# Upload to XAI
xai_file_id = await xai.upload_document_with_metadata(
collection_id=collection_id,
file_content=file_content,
filename=download_info['filename'],
mime_type=download_info['mime_type'],
metadata=metadata
)
ctx.logger.info(f" ✅ Uploaded → {xai_file_id}")
# Verify upload
ctx.logger.info(f" 🔍 Verifying upload...")
success, blake3_hash = await xai.verify_upload_integrity(
collection_id=collection_id,
file_id=xai_file_id
)
if not success:
ctx.logger.error(f" ❌ Upload verification failed!")
raise RuntimeError("Upload verification failed")
ctx.logger.info(f" ✅ Verified: {blake3_hash[:32]}...")
# Update junction
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, {
'aiDocumentId': xai_file_id,
'syncstatus': JunctionSyncStatus.SYNCED.value,
'syncedHash': current_file_hash,
'xaiBlake3Hash': blake3_hash,
'syncedMetadataHash': current_metadata_hash,
'lastSync': datetime.now().isoformat()
})
return True
# 6. Handle metadata-only change
elif metadata_changed:
ctx.logger.info(f" 📝 Metadata changed, updating")
xai_file_id = junction_ai_doc_id
metadata = self._build_xai_metadata(document)
try:
# Try PATCH
await xai.update_document_metadata(collection_id, xai_file_id, metadata)
ctx.logger.info(f" ✅ Metadata updated")
# Get BLAKE3 hash
success, blake3_hash = await xai.verify_upload_integrity(
collection_id, xai_file_id
)
# Update junction
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, {
'syncstatus': JunctionSyncStatus.SYNCED.value,
'syncedMetadataHash': current_metadata_hash,
'xaiBlake3Hash': blake3_hash if success else xai_blake3_hash,
'lastSync': datetime.now().isoformat()
})
return True
except Exception as e:
ctx.logger.warn(f" ⚠️ PATCH failed, re-uploading: {e}")
# Fallback: Re-upload
download_info = await self._get_document_download_info(document, ctx)
file_content = await espocrm.download_attachment(download_info['attachment_id'])
await xai.remove_from_collection(collection_id, xai_file_id)
xai_file_id = await xai.upload_document_with_metadata(
collection_id=collection_id,
file_content=file_content,
filename=download_info['filename'],
mime_type=download_info['mime_type'],
metadata=metadata
)
success, blake3_hash = await xai.verify_upload_integrity(
collection_id, xai_file_id
)
await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, {
'aiDocumentId': xai_file_id,
'syncstatus': JunctionSyncStatus.SYNCED.value,
'syncedHash': current_file_hash,
'xaiBlake3Hash': blake3_hash,
'syncedMetadataHash': current_metadata_hash,
'lastSync': datetime.now().isoformat()
})
return True
return False
async def _remove_orphaned_documents(
self,
collection_id: str,
junction_entries: List[Dict],
ctx
) -> None:
"""
Remove documents from XAI that are no longer in junction table.
Args:
collection_id: XAI Collection ID
junction_entries: List of junction entries
ctx: Motia context
"""
from services.xai_service import XAIService
xai = XAIService(ctx)
# Get all XAI file_ids
xai_docs = await xai.list_collection_documents(collection_id)
xai_file_ids = {doc.get('file_id') or doc.get('id') for doc in xai_docs if doc.get('file_id') or doc.get('id')}
# Get all junction file_ids
junction_file_ids = {j['aiDocumentId'] for j in junction_entries if j.get('aiDocumentId')}
# Find orphans
orphans = xai_file_ids - junction_file_ids
if orphans:
ctx.logger.info(f"🗑️ Removing {len(orphans)} orphaned documents")
for orphan_id in orphans:
try:
await xai.remove_from_collection(collection_id, orphan_id)
ctx.logger.info(f" ✅ Removed orphan: {orphan_id}")
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}")
else:
ctx.logger.info("✅ No orphaned documents found")
def _calculate_metadata_hash(self, document: Dict) -> str:
"""
Calculate hash of sync-relevant metadata.