diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py index 3e2fd44..7a7283e 100644 --- a/services/aiknowledge_sync_utils.py +++ b/services/aiknowledge_sync_utils.py @@ -172,20 +172,25 @@ class AIKnowledgeSync(BaseSyncUtils): else: ctx.logger.info("⏭️ No collection ID, nothing to delete") - # Update junction entries - junction_entries = await espocrm.get_junction_entries( - 'CAIKnowledgeCDokumente', - 'cAIKnowledgeId', - knowledge_id - ) + # Reset junction entries + documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) + + for doc in documents: + doc_id = doc['documentId'] + try: + await espocrm.update_knowledge_document_junction( + knowledge_id, + doc_id, + { + 'syncstatus': 'new', + 'aiDocumentId': None + }, + update_last_sync=False + ) + except Exception as e: + ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}") - for junction in junction_entries: - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { - 'syncstatus': JunctionSyncStatus.NEW.value, - 'aiDocumentId': None - }) - - ctx.logger.info(f"✅ Deactivation complete, {len(junction_entries)} junction entries reset") + ctx.logger.info(f"✅ Deactivation complete, {len(documents)} junction entries reset") return # ═══════════════════════════════════════════════════════════ @@ -235,15 +240,20 @@ class AIKnowledgeSync(BaseSyncUtils): self, knowledge_id: str, collection_id: str, - ctx + ctx, + full_sync: bool = False ) -> None: """ Sync all documents of a knowledge base to XAI collection. + + Uses efficient JunctionData endpoint to get all documents with junction data + and blake3 hashes in a single API call. Args: knowledge_id: CAIKnowledge entity ID collection_id: XAI Collection ID ctx: Motia context + full_sync: If True, force Blake3 hash comparison for all documents (nightly cron) """ from services.espocrm import EspoCRMAPI from services.xai_service import XAIService @@ -251,294 +261,159 @@ class AIKnowledgeSync(BaseSyncUtils): espocrm = EspoCRMAPI(ctx) xai = XAIService(ctx) - # Load junction entries - junction_entries = await espocrm.get_junction_entries( - 'CAIKnowledgeCDokumente', - 'cAIKnowledgeId', - knowledge_id - ) + # ═══════════════════════════════════════════════════════════════ + # STEP 1: Load all documents with junction data (single API call) + # ═══════════════════════════════════════════════════════════════ + ctx.logger.info(f"📥 Loading documents with junction data for knowledge {knowledge_id}") + + documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) + + ctx.logger.info(f"📊 Found {len(documents)} document(s)") - ctx.logger.info(f"📊 Found {len(junction_entries)} junction entries") - - if not junction_entries: + if not documents: ctx.logger.info("✅ No documents to sync") return - # Load documents - documents = {} - for junction in junction_entries: - doc_id = junction['cDokumenteId'] - try: - doc = await espocrm.get_entity('CDokumente', doc_id) - documents[doc_id] = doc - except Exception as e: - ctx.logger.error(f"❌ Failed to load document {doc_id}: {e}") - - ctx.logger.info(f"📊 Loaded {len(documents)}/{len(junction_entries)} documents") - - # Sync each document + # ═══════════════════════════════════════════════════════════════ + # STEP 2: Sync each document based on status/hash + # ═══════════════════════════════════════════════════════════════ successful = 0 failed = 0 skipped = 0 - for junction in junction_entries: - doc_id = junction['cDokumenteId'] - document = documents.get(doc_id) - - if not document: - failed += 1 - continue - + for doc in documents: + doc_id = doc['documentId'] + doc_name = doc.get('documentName', 'Unknown') + junction_status = doc.get('syncstatus', 'new') + ai_document_id = doc.get('aiDocumentId') + blake3_hash = doc.get('blake3hash') + + ctx.logger.info(f"\n📄 {doc_name} (ID: {doc_id})") + ctx.logger.info(f" Status: {junction_status}") + ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}") + ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...") + try: - synced = await self._sync_single_document(junction, document, collection_id, ctx) - if synced: - successful += 1 - else: + # Decide if sync needed + needs_sync = False + reason = "" + + if junction_status in ['new', 'unclean', 'failed']: + needs_sync = True + reason = f"status={junction_status}" + elif full_sync and blake3_hash and ai_document_id: + # Full sync mode: verify Blake3 hash with XAI + try: + xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) + if xai_doc_info: + xai_blake3 = xai_doc_info.get('blake3_hash') + + if xai_blake3 != blake3_hash: + needs_sync = True + reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)" + ctx.logger.info(f" 🔄 Blake3 mismatch detected!") + else: + ctx.logger.info(f" ✅ Blake3 hash matches") + else: + needs_sync = True + reason = "file not found in XAI collection" + except Exception as e: + ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}") + + if not needs_sync: + ctx.logger.info(f" ⏭️ Skipped (no sync needed)") skipped += 1 + continue + + ctx.logger.info(f" 🔄 Syncing: {reason}") + + # Download document + attachment_id = doc.get('documentId') # TODO: Get correct attachment ID from CDokumente + file_content = await espocrm.download_attachment(attachment_id) + ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes") + + # Upload to XAI + filename = doc_name + mime_type = 'application/octet-stream' # TODO: Get from attachment + + xai_file_id = await xai.upload_file(file_content, filename, mime_type) + ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}") + + # Add to collection + await xai.add_to_collection(collection_id, xai_file_id) + ctx.logger.info(f" ✅ Added to collection {collection_id}") + + # Update junction + await espocrm.update_knowledge_document_junction( + knowledge_id, + doc_id, + { + 'aiDocumentId': xai_file_id, + 'syncstatus': 'synced' + }, + update_last_sync=True + ) + ctx.logger.info(f" ✅ Junction updated") + + successful += 1 + except Exception as e: failed += 1 - ctx.logger.error(f"❌ Failed to sync document {doc_id}: {e}") + ctx.logger.error(f" ❌ Sync failed: {e}") + + # Mark as failed in junction + try: + await espocrm.update_knowledge_document_junction( + knowledge_id, + doc_id, + {'syncstatus': 'failed'}, + update_last_sync=False + ) + except Exception as update_err: + ctx.logger.error(f" ❌ Failed to update junction status: {update_err}") - # Mark as failed - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction['id'], { - 'syncstatus': JunctionSyncStatus.FAILED.value - }) - - # Remove orphans + # ═══════════════════════════════════════════════════════════════ + # STEP 3: Remove orphaned documents from XAI collection + # ═══════════════════════════════════════════════════════════════ try: - await self._remove_orphaned_documents(collection_id, junction_entries, ctx) + ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...") + + # Get all files in XAI collection (normalized structure) + xai_documents = await xai.list_collection_documents(collection_id) + xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')} + + # Get all ai_document_ids from junction + junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')} + + # Find orphans (in XAI but not in junction) + orphans = xai_file_ids - junction_file_ids + + if orphans: + ctx.logger.info(f" Found {len(orphans)} orphaned file(s)") + for orphan_id in orphans: + try: + await xai.remove_from_collection(collection_id, orphan_id) + ctx.logger.info(f" 🗑️ Removed {orphan_id}") + except Exception as e: + ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") + else: + ctx.logger.info(f" ✅ No orphans found") + except Exception as e: - ctx.logger.warn(f"⚠️ Failed to remove orphans: {e}") + ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}") - # Summary + # ═══════════════════════════════════════════════════════════════ + # STEP 4: Summary + # ═══════════════════════════════════════════════════════════════ + ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info(f"📊 Sync Statistics:") ctx.logger.info(f" ✅ Synced: {successful}") ctx.logger.info(f" ⏭️ Skipped: {skipped}") ctx.logger.info(f" ❌ Failed: {failed}") + ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}") ctx.logger.info("=" * 80) - async def _sync_single_document( - self, - junction_entry: Dict, - document: Dict, - collection_id: str, - ctx - ) -> bool: - """ - Sync one document to XAI Collection with BLAKE3 verification. - - Args: - junction_entry: Junction table entry - document: CDokumente entity - collection_id: XAI Collection ID - ctx: Motia context - - Returns: - True if synced, False if skipped - """ - from services.espocrm import EspoCRMAPI - from services.xai_service import XAIService - - espocrm = EspoCRMAPI(ctx) - xai = XAIService(ctx) - - junction_id = junction_entry['id'] - junction_status = junction_entry.get('syncstatus') - junction_ai_doc_id = junction_entry.get('aiDocumentId') - - # 1. Check MIME type support - mime_type = document.get('mimeType') or 'application/octet-stream' - if not xai.is_mime_type_supported(mime_type): - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { - 'syncstatus': JunctionSyncStatus.UNSUPPORTED.value - }) - ctx.logger.info(f"⏭️ Unsupported MIME: {document['name']}") - return False - - # 2. Calculate hashes - current_file_hash = document.get('md5') or document.get('sha256') - if not current_file_hash: - ctx.logger.error(f"❌ No hash for document {document['id']}") - return False - - current_metadata_hash = self._calculate_metadata_hash(document) - - synced_file_hash = junction_entry.get('syncedHash') - synced_metadata_hash = junction_entry.get('syncedMetadataHash') - xai_blake3_hash = junction_entry.get('xaiBlake3Hash') - - # 3. Determine changes - file_changed = (current_file_hash != synced_file_hash) - metadata_changed = (current_metadata_hash != synced_metadata_hash) - - ctx.logger.info(f"📋 {document['name']}") - ctx.logger.info(f" File changed: {file_changed}, Metadata changed: {metadata_changed}") - - # 4. Early return if nothing changed - if junction_status == JunctionSyncStatus.SYNCED.value and junction_ai_doc_id: - if not file_changed and not metadata_changed: - # Verify document still exists in XAI - try: - doc_info = await xai.get_collection_document(collection_id, junction_ai_doc_id) - if doc_info: - ctx.logger.info(f" ✅ Already synced (verified)") - return False - else: - ctx.logger.warn(f" ⚠️ Document missing in XAI, re-uploading") - except Exception as e: - ctx.logger.warn(f" ⚠️ Could not verify: {e}") - - # 5. Handle file content change (re-upload) - if file_changed or not junction_ai_doc_id: - ctx.logger.info(f" 🔄 {'File changed' if file_changed else 'New file'}, uploading") - - # Download from EspoCRM - download_info = await self._get_document_download_info(document, ctx) - if not download_info: - raise RuntimeError(f"Cannot download document {document['id']}") - - file_content = await espocrm.download_attachment(download_info['attachment_id']) - - # Build metadata - metadata = self._build_xai_metadata(document) - - # Upload to XAI - xai_file_id = await xai.upload_document_with_metadata( - collection_id=collection_id, - file_content=file_content, - filename=download_info['filename'], - mime_type=download_info['mime_type'], - metadata=metadata - ) - - ctx.logger.info(f" ✅ Uploaded → {xai_file_id}") - - # Verify upload - ctx.logger.info(f" 🔍 Verifying upload...") - success, blake3_hash = await xai.verify_upload_integrity( - collection_id=collection_id, - file_id=xai_file_id - ) - - if not success: - ctx.logger.error(f" ❌ Upload verification failed!") - raise RuntimeError("Upload verification failed") - - ctx.logger.info(f" ✅ Verified: {blake3_hash[:32]}...") - - # Update junction - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { - 'aiDocumentId': xai_file_id, - 'syncstatus': JunctionSyncStatus.SYNCED.value, - 'syncedHash': current_file_hash, - 'xaiBlake3Hash': blake3_hash, - 'syncedMetadataHash': current_metadata_hash, - 'lastSync': datetime.now().isoformat() - }) - - return True - - # 6. Handle metadata-only change - elif metadata_changed: - ctx.logger.info(f" 📝 Metadata changed, updating") - - xai_file_id = junction_ai_doc_id - metadata = self._build_xai_metadata(document) - - try: - # Try PATCH - await xai.update_document_metadata(collection_id, xai_file_id, metadata) - ctx.logger.info(f" ✅ Metadata updated") - - # Get BLAKE3 hash - success, blake3_hash = await xai.verify_upload_integrity( - collection_id, xai_file_id - ) - - # Update junction - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { - 'syncstatus': JunctionSyncStatus.SYNCED.value, - 'syncedMetadataHash': current_metadata_hash, - 'xaiBlake3Hash': blake3_hash if success else xai_blake3_hash, - 'lastSync': datetime.now().isoformat() - }) - - return True - - except Exception as e: - ctx.logger.warn(f" ⚠️ PATCH failed, re-uploading: {e}") - - # Fallback: Re-upload - download_info = await self._get_document_download_info(document, ctx) - file_content = await espocrm.download_attachment(download_info['attachment_id']) - - await xai.remove_from_collection(collection_id, xai_file_id) - - xai_file_id = await xai.upload_document_with_metadata( - collection_id=collection_id, - file_content=file_content, - filename=download_info['filename'], - mime_type=download_info['mime_type'], - metadata=metadata - ) - - success, blake3_hash = await xai.verify_upload_integrity( - collection_id, xai_file_id - ) - - await espocrm.update_junction_entry('CAIKnowledgeCDokumente', junction_id, { - 'aiDocumentId': xai_file_id, - 'syncstatus': JunctionSyncStatus.SYNCED.value, - 'syncedHash': current_file_hash, - 'xaiBlake3Hash': blake3_hash, - 'syncedMetadataHash': current_metadata_hash, - 'lastSync': datetime.now().isoformat() - }) - - return True - - return False - - async def _remove_orphaned_documents( - self, - collection_id: str, - junction_entries: List[Dict], - ctx - ) -> None: - """ - Remove documents from XAI that are no longer in junction table. - - Args: - collection_id: XAI Collection ID - junction_entries: List of junction entries - ctx: Motia context - """ - from services.xai_service import XAIService - - xai = XAIService(ctx) - - # Get all XAI file_ids - xai_docs = await xai.list_collection_documents(collection_id) - xai_file_ids = {doc.get('file_id') or doc.get('id') for doc in xai_docs if doc.get('file_id') or doc.get('id')} - - # Get all junction file_ids - junction_file_ids = {j['aiDocumentId'] for j in junction_entries if j.get('aiDocumentId')} - - # Find orphans - orphans = xai_file_ids - junction_file_ids - - if orphans: - ctx.logger.info(f"🗑️ Removing {len(orphans)} orphaned documents") - for orphan_id in orphans: - try: - await xai.remove_from_collection(collection_id, orphan_id) - ctx.logger.info(f" ✅ Removed orphan: {orphan_id}") - except Exception as e: - ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") - else: - ctx.logger.info("✅ No orphaned documents found") - def _calculate_metadata_hash(self, document: Dict) -> str: """ Calculate hash of sync-relevant metadata. diff --git a/services/document_sync_utils.py b/services/document_sync_utils.py index 05e3472..7ea383c 100644 --- a/services/document_sync_utils.py +++ b/services/document_sync_utils.py @@ -242,78 +242,67 @@ class DocumentSync(BaseSyncUtils): entity_type: str = 'Document' ) -> List[str]: """ - Determine all xAI collection IDs of entities linked to this document. + Determine all xAI collection IDs of CAIKnowledge entities linked to this document. - EspoCRM Many-to-Many: Document can be linked to arbitrary entities - (CBeteiligte, Account, CVmhErstgespraech, etc.) + Checks CAIKnowledgeCDokumente junction table: + - Status 'active' + datenbankId: Returns collection ID + - Status 'new': Returns "NEW:{knowledge_id}" marker (collection must be created first) + - Other statuses (paused, deactivated): Skips Args: document_id: Document ID + entity_type: Entity type (e.g., 'CDokumente') Returns: - List of xAI collection IDs (deduplicated) + List of collection IDs or markers: + - Normal IDs: "abc123..." (existing collections) + - New markers: "NEW:kb-id..." (collection needs to be created via knowledge sync) """ collections = set() self._log(f"🔍 Checking relations of {entity_type} {document_id}...") + # ═══════════════════════════════════════════════════════════════ + # SPECIAL HANDLING: CAIKnowledge via Junction Table + # ═══════════════════════════════════════════════════════════════ try: - entity_def = await self.espocrm.get_entity_def(entity_type) - links = entity_def.get('links', {}) if isinstance(entity_def, dict) else {} - except Exception as e: - self._log(f"⚠️ Konnte Metadata fuer {entity_type} nicht laden: {e}", level='warn') - links = {} - - link_types = {'hasMany', 'hasChildren', 'manyMany', 'hasManyThrough'} - - for link_name, link_def in links.items(): - try: - if not isinstance(link_def, dict): - continue - if link_def.get('type') not in link_types: - continue - - related_entity = link_def.get('entity') - if not related_entity: - continue - - related_def = await self.espocrm.get_entity_def(related_entity) - related_fields = related_def.get('fields', {}) if isinstance(related_def, dict) else {} - - select_fields = ['id'] - if 'xaiCollectionId' in related_fields: - select_fields.append('xaiCollectionId') - - offset = 0 - page_size = 100 - - while True: - result = await self.espocrm.list_related( - entity_type, - document_id, - link_name, - select=','.join(select_fields), - offset=offset, - max_size=page_size - ) - - entities = result.get('list', []) - if not entities: - break - - for entity in entities: - collection_id = entity.get('xaiCollectionId') - if collection_id: + junction_entries = await self.espocrm.get_junction_entries( + 'CAIKnowledgeCDokumente', + 'cDokumenteId', + document_id + ) + + if junction_entries: + self._log(f" 📋 Found {len(junction_entries)} CAIKnowledge link(s)") + + for junction in junction_entries: + knowledge_id = junction.get('cAIKnowledgeId') + if not knowledge_id: + continue + + try: + knowledge = await self.espocrm.get_entity('CAIKnowledge', knowledge_id) + activation_status = knowledge.get('activationStatus') + collection_id = knowledge.get('datenbankId') + + if activation_status == 'active' and collection_id: + # Existing collection - use it collections.add(collection_id) - - if len(entities) < page_size: - break - offset += page_size - - except Exception as e: - self._log(f" ⚠️ Fehler beim Prüfen von Link {link_name}: {e}", level='warn') - continue + self._log(f" ✅ CAIKnowledge {knowledge_id}: {collection_id} (active)") + elif activation_status == 'new': + # Collection doesn't exist yet - return special marker + # Format: "NEW:{knowledge_id}" signals to caller: trigger knowledge sync first + collections.add(f"NEW:{knowledge_id}") + self._log(f" 🆕 CAIKnowledge {knowledge_id}: status='new' → collection must be created first") + else: + self._log(f" ⏭️ CAIKnowledge {knowledge_id}: status={activation_status}, datenbankId={collection_id or 'N/A'}") + + except Exception as e: + self._log(f" ⚠️ Failed to load CAIKnowledge {knowledge_id}: {e}", level='warn') + except Exception as e: + self._log(f" ⚠️ Failed to check CAIKnowledge junction: {e}", level='warn') + result = list(collections) self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden") diff --git a/services/espocrm.py b/services/espocrm.py index a9e80de..1c6ae48 100644 --- a/services/espocrm.py +++ b/services/espocrm.py @@ -58,6 +58,10 @@ class EspoCRMAPI: self._entity_defs_cache: Dict[str, Dict[str, Any]] = {} self._entity_defs_cache_ttl_seconds = int(os.getenv('ESPOCRM_METADATA_TTL_SECONDS', '300')) + # Metadata cache (complete metadata loaded once) + self._metadata_cache: Optional[Dict[str, Any]] = None + self._metadata_cache_ts: float = 0 + # Optional Redis for caching/rate limiting (centralized) self.redis_client = get_redis_client(strict=False) if self.redis_client: @@ -87,20 +91,76 @@ class EspoCRMAPI: if self._session and not self._session.closed: await self._session.close() - async def get_entity_def(self, entity_type: str) -> Dict[str, Any]: + async def get_metadata(self) -> Dict[str, Any]: + """ + Get complete EspoCRM metadata (cached). + + Loads once and caches for TTL duration. + Much faster than individual entity def calls. + + Returns: + Complete metadata dict with entityDefs, clientDefs, etc. + """ now = time.monotonic() - cached = self._entity_defs_cache.get(entity_type) - if cached and (now - cached['ts']) < self._entity_defs_cache_ttl_seconds: - return cached['data'] - + + # Return cached if still valid + if (self._metadata_cache is not None and + (now - self._metadata_cache_ts) < self._entity_defs_cache_ttl_seconds): + return self._metadata_cache + + # Load fresh metadata try: - data = await self.api_call(f"/Metadata/EntityDefs/{entity_type}", method='GET') - except EspoCRMAPIError: - all_defs = await self.api_call("/Metadata/EntityDefs", method='GET') - data = all_defs.get(entity_type, {}) if isinstance(all_defs, dict) else {} + self._log("📥 Loading complete EspoCRM metadata...", level='debug') + metadata = await self.api_call("/Metadata", method='GET') + + if not isinstance(metadata, dict): + self._log("⚠️ Metadata response is not a dict, using empty", level='warn') + metadata = {} + + # Cache it + self._metadata_cache = metadata + self._metadata_cache_ts = now + + entity_count = len(metadata.get('entityDefs', {})) + self._log(f"✅ Metadata cached: {entity_count} entity definitions", level='debug') + + return metadata + + except Exception as e: + self._log(f"❌ Failed to load metadata: {e}", level='error') + # Return empty dict as fallback + return {} - self._entity_defs_cache[entity_type] = {'ts': now, 'data': data} - return data + async def get_entity_def(self, entity_type: str) -> Dict[str, Any]: + """ + Get entity definition for a specific entity type (cached via metadata). + + Uses complete metadata cache - much faster and correct API usage. + + Args: + entity_type: Entity type (e.g., 'Document', 'CDokumente', 'Account') + + Returns: + Entity definition dict with fields, links, etc. + """ + try: + metadata = await self.get_metadata() + entity_defs = metadata.get('entityDefs', {}) + + if not isinstance(entity_defs, dict): + self._log(f"⚠️ entityDefs is not a dict for {entity_type}", level='warn') + return {} + + entity_def = entity_defs.get(entity_type, {}) + + if not entity_def: + self._log(f"⚠️ No entity definition found for '{entity_type}'", level='debug') + + return entity_def + + except Exception as e: + self._log(f"⚠️ Could not load entity def for {entity_type}: {e}", level='warn') + return {} async def api_call( self, @@ -540,3 +600,131 @@ class EspoCRMAPI: ) """ await self.update_entity(junction_entity, junction_id, fields) + + async def get_knowledge_documents_with_junction( + self, + knowledge_id: str + ) -> List[Dict[str, Any]]: + """ + Get all documents linked to a CAIKnowledge entry with junction data. + + Uses custom EspoCRM endpoint: GET /JunctionData/CAIKnowledge/{knowledge_id}/dokumentes + + Returns enriched list with: + - junctionId: Junction table ID + - cAIKnowledgeId, cDokumenteId: Junction keys + - aiDocumentId: XAI document ID from junction + - syncstatus: Sync status from junction (new, synced, failed, unclean) + - lastSync: Last sync timestamp from junction + - documentId, documentName: Document info + - blake3hash: Blake3 hash from document entity + - documentCreatedAt, documentModifiedAt: Document timestamps + + This consolidates multiple API calls into one efficient query. + + Args: + knowledge_id: CAIKnowledge entity ID + + Returns: + List of document dicts with junction data + + Example: + docs = await espocrm.get_knowledge_documents_with_junction('69b1b03582bb6e2da') + for doc in docs: + print(f"{doc['documentName']}: {doc['syncstatus']}") + """ + # JunctionData endpoint is at root level, not under /api/v1 + base_url = self.api_base_url.rstrip('/').replace('/api/v1', '') + url = f"{base_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes" + + self._log(f"GET {url}") + + try: + session = await self._get_session() + timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds) + + async with session.get(url, headers=self._get_headers(), timeout=timeout) as response: + self._log(f"Response status: {response.status}") + + if response.status == 404: + # Knowledge base not found or no documents linked + return [] + + if response.status >= 400: + error_text = await response.text() + raise EspoCRMAPIError(f"JunctionData GET failed: {response.status} - {error_text}") + + result = await response.json() + documents = result.get('list', []) + + self._log(f"✅ Loaded {len(documents)} document(s) with junction data") + return documents + + except asyncio.TimeoutError: + raise EspoCRMTimeoutError(f"Timeout getting junction data for knowledge {knowledge_id}") + except aiohttp.ClientError as e: + raise EspoCRMAPIError(f"Network error getting junction data: {e}") + + async def update_knowledge_document_junction( + self, + knowledge_id: str, + document_id: str, + fields: Dict[str, Any], + update_last_sync: bool = True + ) -> Dict[str, Any]: + """ + Update junction columns for a specific document link. + + Uses custom EspoCRM endpoint: + PUT /JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id} + + Args: + knowledge_id: CAIKnowledge entity ID + document_id: CDokumente entity ID + fields: Junction fields to update (aiDocumentId, syncstatus, etc.) + update_last_sync: Whether to update lastSync timestamp (default: True) + + Returns: + Updated junction data + + Example: + await espocrm.update_knowledge_document_junction( + '69b1b03582bb6e2da', + '69a68b556a39771bf', + { + 'aiDocumentId': 'xai-file-abc123', + 'syncstatus': 'synced' + }, + update_last_sync=True + ) + """ + # JunctionData endpoint is at root level, not under /api/v1 + base_url = self.api_base_url.rstrip('/').replace('/api/v1', '') + url = f"{base_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id}" + + payload = {**fields} + if update_last_sync: + payload['updateLastSync'] = True + + self._log(f"PUT {url}") + self._log(f" Payload: {payload}") + + try: + session = await self._get_session() + timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds) + + async with session.put(url, headers=self._get_headers(), json=payload, timeout=timeout) as response: + self._log(f"Response status: {response.status}") + + if response.status >= 400: + error_text = await response.text() + raise EspoCRMAPIError(f"JunctionData PUT failed: {response.status} - {error_text}") + + result = await response.json() + self._log(f"✅ Junction updated: junctionId={result.get('junctionId')}") + return result + + except asyncio.TimeoutError: + raise EspoCRMTimeoutError(f"Timeout updating junction data") + except aiohttp.ClientError as e: + raise EspoCRMAPIError(f"Network error updating junction data: {e}") diff --git a/services/xai_service.py b/services/xai_service.py index 274cd59..a130e92 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -236,7 +236,8 @@ class XAIService: data = await response.json() - collection_id = data.get('id') + # API returns 'collection_id' not 'id' + collection_id = data.get('collection_id') or data.get('id') self._log(f"✅ Collection created: {collection_id}") return data @@ -308,7 +309,18 @@ class XAIService: GET https://management-api.x.ai/v1/collections/{collection_id}/documents Returns: - List von document objects mit file_id, filename, hash, fields + List von normalized document objects: + [ + { + 'file_id': 'file_...', + 'filename': 'doc.pdf', + 'blake3_hash': 'hex_string', # Plain hex, kein prefix + 'size_bytes': 12345, + 'content_type': 'application/pdf', + 'fields': {}, # Custom metadata + 'status': 'DOCUMENT_STATUS_...' + } + ] Raises: RuntimeError: bei HTTP-Fehler @@ -328,16 +340,31 @@ class XAIService: data = await response.json() - # API sollte eine Liste zurückgeben oder ein dict mit 'documents' key + # API gibt Liste zurück oder dict mit 'documents' key if isinstance(data, list): - documents = data + raw_documents = data elif isinstance(data, dict) and 'documents' in data: - documents = data['documents'] + raw_documents = data['documents'] else: - documents = [] + raw_documents = [] - self._log(f"✅ Listed {len(documents)} documents") - return documents + # Normalize nested structure: file_metadata -> top-level + normalized = [] + for doc in raw_documents: + file_meta = doc.get('file_metadata', {}) + normalized.append({ + 'file_id': file_meta.get('file_id'), + 'filename': file_meta.get('name'), + 'blake3_hash': file_meta.get('hash'), # Plain hex string + 'size_bytes': int(file_meta.get('size_bytes', 0)) if file_meta.get('size_bytes') else 0, + 'content_type': file_meta.get('content_type'), + 'created_at': file_meta.get('created_at'), + 'fields': doc.get('fields', {}), + 'status': doc.get('status') + }) + + self._log(f"✅ Listed {len(normalized)} documents") + return normalized async def get_collection_document(self, collection_id: str, file_id: str) -> Optional[Dict]: """ @@ -346,12 +373,14 @@ class XAIService: GET https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} Returns: - Dict mit document info including BLAKE3 hash: + Normalized dict mit document info: { 'file_id': 'file_xyz', 'filename': 'document.pdf', - 'hash': 'blake3:abcd1234...', # BLAKE3 Hash! - 'fields': {...} # Metadata + 'blake3_hash': 'hex_string', # Plain hex, kein prefix + 'size_bytes': 12345, + 'content_type': 'application/pdf', + 'fields': {...} # Custom metadata } Returns None if not found. @@ -374,8 +403,21 @@ class XAIService: data = await response.json() - self._log(f"✅ Document info retrieved: {data.get('filename', 'N/A')}") - return data + # Normalize nested structure + file_meta = data.get('file_metadata', {}) + normalized = { + 'file_id': file_meta.get('file_id'), + 'filename': file_meta.get('name'), + 'blake3_hash': file_meta.get('hash'), # Plain hex + 'size_bytes': int(file_meta.get('size_bytes', 0)) if file_meta.get('size_bytes') else 0, + 'content_type': file_meta.get('content_type'), + 'created_at': file_meta.get('created_at'), + 'fields': data.get('fields', {}), + 'status': data.get('status') + } + + self._log(f"✅ Document info retrieved: {normalized.get('filename', 'N/A')}") + return normalized async def update_document_metadata( self, diff --git a/steps/advoware_cal_sync/calendar_sync_cron_step.py b/steps/advoware_cal_sync/calendar_sync_cron_step.py index 6f909a9..91fc474 100644 --- a/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -18,7 +18,7 @@ config = { 'description': 'Runs calendar sync automatically every 15 minutes', 'flows': ['advoware-calendar-sync'], 'triggers': [ - cron("0 */15 * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday) + cron("0 15 1 * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday) ], 'enqueues': ['calendar_sync_all'] } diff --git a/steps/vmh/document_sync_event_step.py b/steps/vmh/document_sync_event_step.py index 8a58be8..7e6651e 100644 --- a/steps/vmh/document_sync_event_step.py +++ b/steps/vmh/document_sync_event_step.py @@ -152,6 +152,42 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync if collection_ids: ctx.logger.info(f" Collections: {collection_ids}") + # ═══════════════════════════════════════════════════════════════ + # CHECK: Knowledge Bases mit Status "new" (noch keine Collection) + # ═══════════════════════════════════════════════════════════════ + new_knowledge_bases = [cid for cid in collection_ids if cid.startswith('NEW:')] + if new_knowledge_bases: + ctx.logger.info("") + ctx.logger.info("=" * 80) + ctx.logger.info("🆕 DOKUMENT IST MIT KNOWLEDGE BASE(S) VERKNÜPFT (Status: new)") + ctx.logger.info("=" * 80) + + for new_kb in new_knowledge_bases: + kb_id = new_kb[4:] # Remove "NEW:" prefix + ctx.logger.info(f"📋 CAIKnowledge {kb_id}") + ctx.logger.info(f" Status: new → Collection muss zuerst erstellt werden") + + # Trigger Knowledge Sync + ctx.logger.info(f"📤 Triggering aiknowledge.sync event...") + await ctx.emit('aiknowledge.sync', { + 'entity_id': kb_id, + 'entity_type': 'CAIKnowledge', + 'triggered_by': 'document_sync', + 'document_id': entity_id + }) + ctx.logger.info(f"✅ Event emitted for {kb_id}") + + # Release lock and skip document sync - knowledge sync will handle documents + ctx.logger.info("") + ctx.logger.info("=" * 80) + ctx.logger.info("✅ KNOWLEDGE SYNC GETRIGGERT") + ctx.logger.info(" Document Sync wird übersprungen") + ctx.logger.info(" (Knowledge Sync erstellt Collection und synchronisiert dann Dokumente)") + ctx.logger.info("=" * 80) + + await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) + return + # ═══════════════════════════════════════════════════════════════ # PREVIEW-GENERIERUNG bei neuen/geänderten Dateien # ═══════════════════════════════════════════════════════════════