diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py index c069a9c..0c8f74e 100644 --- a/services/aiknowledge_sync_utils.py +++ b/services/aiknowledge_sync_utils.py @@ -227,7 +227,7 @@ class AIKnowledgeSync(BaseSyncUtils): }) # Sync documents - await self._sync_knowledge_documents(knowledge_id, collection_id, ctx, full_sync=full_sync) + await self._sync_knowledge_documents(knowledge_id, collection_id, ctx) else: ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}") @@ -240,20 +240,18 @@ class AIKnowledgeSync(BaseSyncUtils): self, knowledge_id: str, collection_id: str, - ctx, - full_sync: bool = False + ctx ) -> None: """ Sync all documents of a knowledge base to XAI collection. Uses efficient JunctionData endpoint to get all documents with junction data - and blake3 hashes in a single API call. + and blake3 hashes in a single API call. Hash comparison is always performed. Args: knowledge_id: CAIKnowledge entity ID collection_id: XAI Collection ID ctx: Motia context - full_sync: If True, force Blake3 hash comparison for all documents (nightly cron) """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService @@ -301,8 +299,8 @@ class AIKnowledgeSync(BaseSyncUtils): if junction_status in ['new', 'unclean', 'failed']: needs_sync = True reason = f"status={junction_status}" - elif full_sync and blake3_hash and ai_document_id: - # Full sync mode: verify Blake3 hash with XAI + elif junction_status == 'synced' and blake3_hash and ai_document_id: + # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) try: xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) if xai_doc_info: @@ -310,7 +308,7 @@ class AIKnowledgeSync(BaseSyncUtils): if xai_blake3 != blake3_hash: needs_sync = True - reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)" + reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)" ctx.logger.info(f" 🔄 Blake3 mismatch detected!") else: ctx.logger.info(f" ✅ Blake3 hash matches") diff --git a/services/xai_service.py b/services/xai_service.py index a130e92..eca4758 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -458,107 +458,6 @@ class XAIService: self._log(f"✅ Metadata updated for {file_id}") - # ========== High-Level Operations ========== - - async def upload_document_with_metadata( - self, - collection_id: str, - file_content: bytes, - filename: str, - mime_type: str, - metadata: Dict[str, str] - ) -> str: - """ - Upload file + add to collection with metadata in one operation. - - Args: - collection_id: XAI Collection ID - file_content: File bytes - filename: Filename - mime_type: MIME type - metadata: Metadata fields - - Returns: - XAI file_id - - Raises: - RuntimeError: bei Upload/Add-Fehler - """ - # Step 1: Upload file - file_id = await self.upload_file(file_content, filename, mime_type) - - try: - # Step 2: Add to collection (XAI API automatically handles metadata) - # Note: Metadata muss beim POST mit angegeben werden - session = await self._get_session() - url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" - headers = { - "Authorization": f"Bearer {self.management_key}", - "Content-Type": "application/json" - } - - body = {"fields": metadata} - - async with session.post(url, json=body, headers=headers) as response: - if response.status not in (200, 201): - raw = await response.text() - raise RuntimeError( - f"Failed to add file to collection with metadata ({response.status}): {raw}" - ) - - self._log(f"✅ File {file_id} added to collection {collection_id} with metadata") - return file_id - - except Exception as e: - # Cleanup: File wurde hochgeladen aber nicht zur Collection hinzugefügt - self._log(f"⚠️ Failed to add to collection, file {file_id} may be orphaned", level='warn') - raise - - async def verify_upload_integrity( - self, - collection_id: str, - file_id: str, - retry_attempts: int = 3 - ) -> Tuple[bool, Optional[str]]: - """ - Verifiziert Upload-Integrität via BLAKE3 Hash von XAI. - - Args: - collection_id: XAI Collection ID - file_id: XAI file_id - retry_attempts: Retry bei temporären Fehlern - - Returns: - (success: bool, blake3_hash: Optional[str]) - """ - for attempt in range(1, retry_attempts + 1): - try: - doc_info = await self.get_collection_document(collection_id, file_id) - - if not doc_info: - self._log(f"⚠️ Document {file_id} not found in collection", level='warn') - return (False, None) - - blake3_hash = doc_info.get('hash') - - if not blake3_hash: - self._log(f"⚠️ No hash returned by XAI API", level='warn') - return (False, None) - - self._log(f"✅ Upload verified, BLAKE3: {blake3_hash[:32]}...") - return (True, blake3_hash) - - except Exception as e: - if attempt < retry_attempts: - delay = 2 ** attempt # Exponential backoff - self._log(f"⚠️ Verification failed (attempt {attempt}), retry in {delay}s", level='warn') - await asyncio.sleep(delay) - else: - self._log(f"❌ Verification failed after {retry_attempts} attempts: {e}", level='error') - return (False, None) - - return (False, None) - def is_mime_type_supported(self, mime_type: str) -> bool: """ Prüft, ob XAI diesen MIME-Type unterstützt. diff --git a/steps/vmh/aiknowledge_full_sync_cron_step.py b/steps/vmh/aiknowledge_full_sync_cron_step.py index cbb1fe8..ebe2ce4 100644 --- a/steps/vmh/aiknowledge_full_sync_cron_step.py +++ b/steps/vmh/aiknowledge_full_sync_cron_step.py @@ -1,11 +1,11 @@ -"""AI Knowledge Full Sync - Daily Cron Job""" +"""AI Knowledge Daily Sync - Cron Job""" from typing import Any from motia import FlowContext, cron config = { - "name": "AI Knowledge Full Sync", - "description": "Daily full sync of all CAIKnowledge entities (catches missed webhooks)", + "name": "AI Knowledge Daily Sync", + "description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)", "flows": ["aiknowledge-full-sync"], "triggers": [ cron("0 0 2 * * *"), # Daily at 2:00 AM @@ -16,16 +16,17 @@ config = { async def handler(input_data: None, ctx: FlowContext[Any]) -> None: """ - Daily full sync handler. + Daily sync handler - ensures all active knowledge bases are synchronized. Loads all CAIKnowledge entities that need sync and emits events. + Blake3 hash verification is always performed (hash available from JunctionData API). Runs every day at 02:00:00. """ from services.espocrm import EspoCRMAPI from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus ctx.logger.info("=" * 80) - ctx.logger.info("🌙 DAILY FULL SYNC STARTED") + ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED") ctx.logger.info("=" * 80) espocrm = EspoCRMAPI(ctx) @@ -63,20 +64,22 @@ async def handler(input_data: None, ctx: FlowContext[Any]) -> None: ctx.logger.info("=" * 80) return - # Enqueue sync events for all + # Enqueue sync events for all (Blake3 verification always enabled) for i, entity in enumerate(entities, 1): await ctx.enqueue({ 'topic': 'aiknowledge.sync', 'data': { 'knowledge_id': entity['id'], - 'source': 'daily_full_sync', - 'full_sync': True # Enable Blake3 verification + 'source': 'daily_cron' + } + }) + ctx.logger.info( f"📤 [{i}/{total}] Enqueued: {entity['name']} " f"(syncStatus={entity.get('syncStatus')})" ) ctx.logger.info("=" * 80) - ctx.logger.info(f"✅ Full sync complete: {total} events enqueued") + ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued") ctx.logger.info("=" * 80) except Exception as e: diff --git a/steps/vmh/aiknowledge_sync_event_step.py b/steps/vmh/aiknowledge_sync_event_step.py index d1df249..29daca4 100644 --- a/steps/vmh/aiknowledge_sync_event_step.py +++ b/steps/vmh/aiknowledge_sync_event_step.py @@ -36,7 +36,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: # Extract data knowledge_id = event_data.get('knowledge_id') source = event_data.get('source', 'unknown') - full_sync = event_data.get('full_sync', False) # Blake3 verification mode if not knowledge_id: ctx.logger.error("❌ Missing knowledge_id in event data") @@ -44,7 +43,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: ctx.logger.info(f"📋 Knowledge ID: {knowledge_id}") ctx.logger.info(f"📋 Source: {source}") - ctx.logger.info(f"📋 Full Sync Mode: {full_sync}") ctx.logger.info("=" * 80) # Get Redis for locking @@ -62,8 +60,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry try: - # Perform sync - await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx, full_sync=full_sync) + # Perform sync (Blake3 hash verification always enabled) + await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx) ctx.logger.info("=" * 80) ctx.logger.info("✅ AI KNOWLEDGE SYNC COMPLETED")