feat: Refactor AI Knowledge sync processes to remove full sync parameter and ensure Blake3 verification is always performed

This commit is contained in:
bsiggel
2026-03-12 22:41:19 +00:00
parent 8f1533337c
commit 46c0bbf381
4 changed files with 20 additions and 122 deletions

View File

@@ -227,7 +227,7 @@ class AIKnowledgeSync(BaseSyncUtils):
}) })
# Sync documents # Sync documents
await self._sync_knowledge_documents(knowledge_id, collection_id, ctx, full_sync=full_sync) await self._sync_knowledge_documents(knowledge_id, collection_id, ctx)
else: else:
ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}") ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}")
@@ -240,20 +240,18 @@ class AIKnowledgeSync(BaseSyncUtils):
self, self,
knowledge_id: str, knowledge_id: str,
collection_id: str, collection_id: str,
ctx, ctx
full_sync: bool = False
) -> None: ) -> None:
""" """
Sync all documents of a knowledge base to XAI collection. Sync all documents of a knowledge base to XAI collection.
Uses efficient JunctionData endpoint to get all documents with junction data Uses efficient JunctionData endpoint to get all documents with junction data
and blake3 hashes in a single API call. and blake3 hashes in a single API call. Hash comparison is always performed.
Args: Args:
knowledge_id: CAIKnowledge entity ID knowledge_id: CAIKnowledge entity ID
collection_id: XAI Collection ID collection_id: XAI Collection ID
ctx: Motia context ctx: Motia context
full_sync: If True, force Blake3 hash comparison for all documents (nightly cron)
""" """
from services.espocrm import EspoCRMAPI from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService from services.xai_service import XAIService
@@ -301,8 +299,8 @@ class AIKnowledgeSync(BaseSyncUtils):
if junction_status in ['new', 'unclean', 'failed']: if junction_status in ['new', 'unclean', 'failed']:
needs_sync = True needs_sync = True
reason = f"status={junction_status}" reason = f"status={junction_status}"
elif full_sync and blake3_hash and ai_document_id: elif junction_status == 'synced' and blake3_hash and ai_document_id:
# Full sync mode: verify Blake3 hash with XAI # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
try: try:
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
if xai_doc_info: if xai_doc_info:
@@ -310,7 +308,7 @@ class AIKnowledgeSync(BaseSyncUtils):
if xai_blake3 != blake3_hash: if xai_blake3 != blake3_hash:
needs_sync = True needs_sync = True
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)" reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)"
ctx.logger.info(f" 🔄 Blake3 mismatch detected!") ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
else: else:
ctx.logger.info(f" ✅ Blake3 hash matches") ctx.logger.info(f" ✅ Blake3 hash matches")

View File

@@ -458,107 +458,6 @@ class XAIService:
self._log(f"✅ Metadata updated for {file_id}") self._log(f"✅ Metadata updated for {file_id}")
# ========== High-Level Operations ==========
async def upload_document_with_metadata(
self,
collection_id: str,
file_content: bytes,
filename: str,
mime_type: str,
metadata: Dict[str, str]
) -> str:
"""
Upload file + add to collection with metadata in one operation.
Args:
collection_id: XAI Collection ID
file_content: File bytes
filename: Filename
mime_type: MIME type
metadata: Metadata fields
Returns:
XAI file_id
Raises:
RuntimeError: bei Upload/Add-Fehler
"""
# Step 1: Upload file
file_id = await self.upload_file(file_content, filename, mime_type)
try:
# Step 2: Add to collection (XAI API automatically handles metadata)
# Note: Metadata muss beim POST mit angegeben werden
session = await self._get_session()
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}"
headers = {
"Authorization": f"Bearer {self.management_key}",
"Content-Type": "application/json"
}
body = {"fields": metadata}
async with session.post(url, json=body, headers=headers) as response:
if response.status not in (200, 201):
raw = await response.text()
raise RuntimeError(
f"Failed to add file to collection with metadata ({response.status}): {raw}"
)
self._log(f"✅ File {file_id} added to collection {collection_id} with metadata")
return file_id
except Exception as e:
# Cleanup: File wurde hochgeladen aber nicht zur Collection hinzugefügt
self._log(f"⚠️ Failed to add to collection, file {file_id} may be orphaned", level='warn')
raise
async def verify_upload_integrity(
self,
collection_id: str,
file_id: str,
retry_attempts: int = 3
) -> Tuple[bool, Optional[str]]:
"""
Verifiziert Upload-Integrität via BLAKE3 Hash von XAI.
Args:
collection_id: XAI Collection ID
file_id: XAI file_id
retry_attempts: Retry bei temporären Fehlern
Returns:
(success: bool, blake3_hash: Optional[str])
"""
for attempt in range(1, retry_attempts + 1):
try:
doc_info = await self.get_collection_document(collection_id, file_id)
if not doc_info:
self._log(f"⚠️ Document {file_id} not found in collection", level='warn')
return (False, None)
blake3_hash = doc_info.get('hash')
if not blake3_hash:
self._log(f"⚠️ No hash returned by XAI API", level='warn')
return (False, None)
self._log(f"✅ Upload verified, BLAKE3: {blake3_hash[:32]}...")
return (True, blake3_hash)
except Exception as e:
if attempt < retry_attempts:
delay = 2 ** attempt # Exponential backoff
self._log(f"⚠️ Verification failed (attempt {attempt}), retry in {delay}s", level='warn')
await asyncio.sleep(delay)
else:
self._log(f"❌ Verification failed after {retry_attempts} attempts: {e}", level='error')
return (False, None)
return (False, None)
def is_mime_type_supported(self, mime_type: str) -> bool: def is_mime_type_supported(self, mime_type: str) -> bool:
""" """
Prüft, ob XAI diesen MIME-Type unterstützt. Prüft, ob XAI diesen MIME-Type unterstützt.

View File

@@ -1,11 +1,11 @@
"""AI Knowledge Full Sync - Daily Cron Job""" """AI Knowledge Daily Sync - Cron Job"""
from typing import Any from typing import Any
from motia import FlowContext, cron from motia import FlowContext, cron
config = { config = {
"name": "AI Knowledge Full Sync", "name": "AI Knowledge Daily Sync",
"description": "Daily full sync of all CAIKnowledge entities (catches missed webhooks)", "description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
"flows": ["aiknowledge-full-sync"], "flows": ["aiknowledge-full-sync"],
"triggers": [ "triggers": [
cron("0 0 2 * * *"), # Daily at 2:00 AM cron("0 0 2 * * *"), # Daily at 2:00 AM
@@ -16,16 +16,17 @@ config = {
async def handler(input_data: None, ctx: FlowContext[Any]) -> None: async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
""" """
Daily full sync handler. Daily sync handler - ensures all active knowledge bases are synchronized.
Loads all CAIKnowledge entities that need sync and emits events. Loads all CAIKnowledge entities that need sync and emits events.
Blake3 hash verification is always performed (hash available from JunctionData API).
Runs every day at 02:00:00. Runs every day at 02:00:00.
""" """
from services.espocrm import EspoCRMAPI from services.espocrm import EspoCRMAPI
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info("🌙 DAILY FULL SYNC STARTED") ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
espocrm = EspoCRMAPI(ctx) espocrm = EspoCRMAPI(ctx)
@@ -63,20 +64,22 @@ async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
return return
# Enqueue sync events for all # Enqueue sync events for all (Blake3 verification always enabled)
for i, entity in enumerate(entities, 1): for i, entity in enumerate(entities, 1):
await ctx.enqueue({ await ctx.enqueue({
'topic': 'aiknowledge.sync', 'topic': 'aiknowledge.sync',
'data': { 'data': {
'knowledge_id': entity['id'], 'knowledge_id': entity['id'],
'source': 'daily_full_sync', 'source': 'daily_cron'
'full_sync': True # Enable Blake3 verification }
})
ctx.logger.info(
f"📤 [{i}/{total}] Enqueued: {entity['name']} " f"📤 [{i}/{total}] Enqueued: {entity['name']} "
f"(syncStatus={entity.get('syncStatus')})" f"(syncStatus={entity.get('syncStatus')})"
) )
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info(f"Full sync complete: {total} events enqueued") ctx.logger.info(f"Daily sync complete: {total} events enqueued")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
except Exception as e: except Exception as e:

View File

@@ -36,7 +36,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
# Extract data # Extract data
knowledge_id = event_data.get('knowledge_id') knowledge_id = event_data.get('knowledge_id')
source = event_data.get('source', 'unknown') source = event_data.get('source', 'unknown')
full_sync = event_data.get('full_sync', False) # Blake3 verification mode
if not knowledge_id: if not knowledge_id:
ctx.logger.error("❌ Missing knowledge_id in event data") ctx.logger.error("❌ Missing knowledge_id in event data")
@@ -44,7 +43,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
ctx.logger.info(f"📋 Knowledge ID: {knowledge_id}") ctx.logger.info(f"📋 Knowledge ID: {knowledge_id}")
ctx.logger.info(f"📋 Source: {source}") ctx.logger.info(f"📋 Source: {source}")
ctx.logger.info(f"📋 Full Sync Mode: {full_sync}")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
# Get Redis for locking # Get Redis for locking
@@ -62,8 +60,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry
try: try:
# Perform sync # Perform sync (Blake3 hash verification always enabled)
await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx, full_sync=full_sync) await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx)
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info("✅ AI KNOWLEDGE SYNC COMPLETED") ctx.logger.info("✅ AI KNOWLEDGE SYNC COMPLETED")