From bb13d59ddb11fc2d2f113eddae1415fcd829f86d Mon Sep 17 00:00:00 2001 From: bsiggel Date: Fri, 13 Mar 2026 08:40:20 +0000 Subject: [PATCH] fix: Improve orphan detection and Blake3 hash verification in document synchronization --- services/aiknowledge_sync_utils.py | 74 ++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py index 351070d..c8b1044 100644 --- a/services/aiknowledge_sync_utils.py +++ b/services/aiknowledge_sync_utils.py @@ -281,8 +281,9 @@ class AIKnowledgeSync(BaseSyncUtils): # ═══════════════════════════════════════════════════════════════ successful = 0 failed = 0 - skipped = 0 - + skipped = 0 + # Track aiDocumentIds for orphan detection (collected during sync) + synced_file_ids: set = set() for doc in documents: doc_id = doc['documentId'] doc_name = doc.get('documentName', 'Unknown') @@ -303,27 +304,43 @@ class AIKnowledgeSync(BaseSyncUtils): if junction_status in ['new', 'unclean', 'failed']: needs_sync = True reason = f"status={junction_status}" - elif junction_status == 'synced' and blake3_hash and ai_document_id: - # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) - try: - xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) - if xai_doc_info: - xai_blake3 = xai_doc_info.get('blake3_hash') - - if xai_blake3 != blake3_hash: - needs_sync = True - reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)" - ctx.logger.info(f" 🔄 Blake3 mismatch detected!") + elif junction_status == 'synced': + # Synced status should have both blake3_hash and ai_document_id + if not blake3_hash: + needs_sync = True + reason = "inconsistency: synced but no blake3 hash" + ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!") + elif not ai_document_id: + needs_sync = True + reason = "inconsistency: synced but no aiDocumentId" + ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!") + else: + # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) + try: + xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) + if xai_doc_info: + xai_blake3 = xai_doc_info.get('blake3_hash') + + if xai_blake3 != blake3_hash: + needs_sync = True + reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)" + ctx.logger.info(f" 🔄 Blake3 mismatch detected!") + else: + ctx.logger.info(f" ✅ Blake3 hash matches") else: - ctx.logger.info(f" ✅ Blake3 hash matches") - else: + needs_sync = True + reason = "file not found in XAI collection" + ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!") + except Exception as e: needs_sync = True - reason = "file not found in XAI collection" - except Exception as e: - ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}") + reason = f"verification failed: {e}" + ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}") if not needs_sync: ctx.logger.info(f" ⏭️ Skipped (no sync needed)") + # Document is already synced, track its aiDocumentId + if ai_document_id: + synced_file_ids.add(ai_document_id) skipped += 1 continue @@ -338,24 +355,27 @@ class AIKnowledgeSync(BaseSyncUtils): failed += 1 continue - # Get attachment details for MIME type + # Get attachment details for MIME type and original filename try: attachment = await espocrm.get_entity('Attachment', attachment_id) mime_type = attachment.get('type', 'application/octet-stream') file_size = attachment.get('size', 0) + original_filename = attachment.get('name', doc_name) # Original filename with extension except Exception as e: ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults") mime_type = 'application/octet-stream' file_size = 0 + original_filename = doc_name ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)") + ctx.logger.info(f" 📄 Original filename: {original_filename}") # Download document file_content = await espocrm.download_attachment(attachment_id) ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes") - # Upload to XAI - filename = doc_name + # Upload to XAI with original filename (includes extension) + filename = original_filename xai_file_id = await xai.upload_file(file_content, filename, mime_type) ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}") @@ -376,6 +396,9 @@ class AIKnowledgeSync(BaseSyncUtils): ) ctx.logger.info(f" ✅ Junction updated") + # Track the new aiDocumentId for orphan detection + synced_file_ids.add(xai_file_id) + successful += 1 except Exception as e: @@ -403,11 +426,12 @@ class AIKnowledgeSync(BaseSyncUtils): xai_documents = await xai.list_collection_documents(collection_id) xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')} - # Get all ai_document_ids from junction - junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')} + # Use synced_file_ids (collected during this sync) for orphan detection + # This includes both pre-existing synced docs and newly uploaded ones + ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced") - # Find orphans (in XAI but not in junction) - orphans = xai_file_ids - junction_file_ids + # Find orphans (in XAI but not in our current sync) + orphans = xai_file_ids - synced_file_ids if orphans: ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")