diff --git a/services/aiknowledge_sync_utils.py b/services/aiknowledge_sync_utils.py deleted file mode 100644 index 0d993c9..0000000 --- a/services/aiknowledge_sync_utils.py +++ /dev/null @@ -1,545 +0,0 @@ -""" -AI Knowledge Sync Utilities - -Utility functions for synchronizing CAIKnowledge entities with XAI Collections: -- Collection lifecycle management (create, delete) -- Document synchronization with BLAKE3 hash verification -- Metadata-only updates via PATCH -- Orphan detection and cleanup -""" - -import hashlib -import json -from typing import Dict, Any, Optional, List, Tuple -from datetime import datetime -from urllib.parse import unquote - -from services.sync_utils_base import BaseSyncUtils -from services.models import ( - AIKnowledgeActivationStatus, - AIKnowledgeSyncStatus, - JunctionSyncStatus -) - - -class AIKnowledgeSync(BaseSyncUtils): - """Utility class for AI Knowledge ↔ XAI Collections synchronization""" - - def _get_lock_key(self, entity_id: str) -> str: - """Redis lock key for AI Knowledge entities""" - return f"sync_lock:aiknowledge:{entity_id}" - - async def acquire_sync_lock(self, knowledge_id: str) -> bool: - """ - Acquire distributed lock via Redis + update EspoCRM syncStatus. - - Args: - knowledge_id: CAIKnowledge entity ID - - Returns: - True if lock acquired, False if already locked - """ - try: - # STEP 1: Atomic Redis lock - lock_key = self._get_lock_key(knowledge_id) - if not self._acquire_redis_lock(lock_key): - self._log(f"Redis lock already active for {knowledge_id}", level='warn') - return False - - # STEP 2: Update syncStatus to pending_sync - try: - await self.espocrm.update_entity('CAIKnowledge', knowledge_id, { - 'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value - }) - except Exception as e: - self._log(f"Could not set syncStatus: {e}", level='debug') - - self._log(f"Sync lock acquired for {knowledge_id}") - return True - - except Exception as e: - self._log(f"Error acquiring lock: {e}", level='error') - # Clean up Redis lock on error - lock_key = self._get_lock_key(knowledge_id) - self._release_redis_lock(lock_key) - return False - - async def release_sync_lock( - self, - knowledge_id: str, - success: bool = True, - error_message: Optional[str] = None - ) -> None: - """ - Release sync lock and set final status. - - Args: - knowledge_id: CAIKnowledge entity ID - success: Whether sync succeeded - error_message: Optional error message - """ - try: - update_data = { - 'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value - } - - if success: - update_data['lastSync'] = datetime.now().isoformat() - update_data['syncError'] = None - elif error_message: - update_data['syncError'] = error_message[:2000] - - await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data) - - self._log(f"Sync lock released: {knowledge_id} β†’ {'success' if success else 'failed'}") - - # Release Redis lock - lock_key = self._get_lock_key(knowledge_id) - self._release_redis_lock(lock_key) - - except Exception as e: - self._log(f"Error releasing lock: {e}", level='error') - # Ensure Redis lock is released - lock_key = self._get_lock_key(knowledge_id) - self._release_redis_lock(lock_key) - - async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None: - """ - Main sync orchestrator with activation status handling. - - Args: - knowledge_id: CAIKnowledge entity ID - ctx: Motia context for logging - """ - from services.espocrm import EspoCRMAPI - from services.xai_service import XAIService - - espocrm = EspoCRMAPI(ctx) - xai = XAIService(ctx) - - try: - # 1. Load knowledge entity - knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id) - - activation_status = knowledge.get('aktivierungsstatus') - collection_id = knowledge.get('datenbankId') - - ctx.logger.info("=" * 80) - ctx.logger.info(f"πŸ“‹ Processing: {knowledge['name']}") - ctx.logger.info(f" aktivierungsstatus: {activation_status}") - ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}") - ctx.logger.info("=" * 80) - - # ═══════════════════════════════════════════════════════════ - # CASE 1: NEW β†’ Create Collection - # ═══════════════════════════════════════════════════════════ - if activation_status == AIKnowledgeActivationStatus.NEW.value: - ctx.logger.info("πŸ†• Status 'new' β†’ Creating XAI Collection") - - collection = await xai.create_collection( - name=knowledge['name'], - metadata={ - 'espocrm_entity_type': 'CAIKnowledge', - 'espocrm_entity_id': knowledge_id, - 'created_at': datetime.now().isoformat() - } - ) - - # XAI API returns 'collection_id' not 'id' - collection_id = collection.get('collection_id') or collection.get('id') - - # Update EspoCRM: Set datenbankId + change status to 'active' - await espocrm.update_entity('CAIKnowledge', knowledge_id, { - 'datenbankId': collection_id, - 'aktivierungsstatus': AIKnowledgeActivationStatus.ACTIVE.value, - 'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value - }) - - ctx.logger.info(f"βœ… Collection created: {collection_id}") - ctx.logger.info(" Status changed to 'active', now syncing documents...") - - # Continue to document sync immediately (don't return) - # Fall through to sync logic below - - # ═══════════════════════════════════════════════════════════ - # CASE 2: DEACTIVATED β†’ Delete Collection from XAI - # ═══════════════════════════════════════════════════════════ - elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value: - ctx.logger.info("πŸ—‘οΈ Status 'deactivated' β†’ Deleting XAI Collection") - - if collection_id: - try: - await xai.delete_collection(collection_id) - ctx.logger.info(f"βœ… Collection deleted from XAI: {collection_id}") - except Exception as e: - ctx.logger.error(f"❌ Failed to delete collection: {e}") - else: - ctx.logger.info("⏭️ No collection ID, nothing to delete") - - # Reset junction entries - documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) - - for doc in documents: - doc_id = doc['documentId'] - try: - await espocrm.update_knowledge_document_junction( - knowledge_id, - doc_id, - { - 'syncstatus': 'new', - 'aiDocumentId': None - }, - update_last_sync=False - ) - except Exception as e: - ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}") - - ctx.logger.info(f"βœ… Deactivation complete, {len(documents)} junction entries reset") - return - - # ═══════════════════════════════════════════════════════════ - # CASE 3: PAUSED β†’ Skip Sync - # ═══════════════════════════════════════════════════════════ - elif activation_status == AIKnowledgeActivationStatus.PAUSED.value: - ctx.logger.info("⏸️ Status 'paused' β†’ No sync performed") - return - - # ═══════════════════════════════════════════════════════════ - # CASE 4: ACTIVE β†’ Normal Sync (or just created from NEW) - # ═══════════════════════════════════════════════════════════ - if activation_status in (AIKnowledgeActivationStatus.ACTIVE.value, AIKnowledgeActivationStatus.NEW.value): - if not collection_id: - ctx.logger.error("❌ Status 'active' but no datenbankId!") - raise RuntimeError("Active knowledge without collection ID") - - if activation_status == AIKnowledgeActivationStatus.ACTIVE.value: - ctx.logger.info(f"πŸ”„ Status 'active' β†’ Syncing documents to {collection_id}") - - # Verify collection exists - collection = await xai.get_collection(collection_id) - if not collection: - ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating") - collection = await xai.create_collection( - name=knowledge['name'], - metadata={ - 'espocrm_entity_type': 'CAIKnowledge', - 'espocrm_entity_id': knowledge_id - } - ) - collection_id = collection['id'] - await espocrm.update_entity('CAIKnowledge', knowledge_id, { - 'datenbankId': collection_id - }) - - # Sync documents (both for ACTIVE status and after NEW β†’ ACTIVE transition) - await self._sync_knowledge_documents(knowledge_id, collection_id, ctx) - - elif activation_status not in (AIKnowledgeActivationStatus.DEACTIVATED.value, AIKnowledgeActivationStatus.PAUSED.value): - ctx.logger.error(f"❌ Unknown aktivierungsstatus: {activation_status}") - raise ValueError(f"Invalid aktivierungsstatus: {activation_status}") - - finally: - await xai.close() - - async def _sync_knowledge_documents( - self, - knowledge_id: str, - collection_id: str, - ctx - ) -> None: - """ - Sync all documents of a knowledge base to XAI collection. - - Uses efficient JunctionData endpoint to get all documents with junction data - and blake3 hashes in a single API call. Hash comparison is always performed. - - Args: - knowledge_id: CAIKnowledge entity ID - collection_id: XAI Collection ID - ctx: Motia context - """ - from services.espocrm import EspoCRMAPI - from services.xai_service import XAIService - - espocrm = EspoCRMAPI(ctx) - xai = XAIService(ctx) - - # ═══════════════════════════════════════════════════════════════ - # STEP 1: Load all documents with junction data (single API call) - # ═══════════════════════════════════════════════════════════════ - ctx.logger.info(f"πŸ“₯ Loading documents with junction data for knowledge {knowledge_id}") - - documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id) - - ctx.logger.info(f"πŸ“Š Found {len(documents)} document(s)") - - if not documents: - ctx.logger.info("βœ… No documents to sync") - return - - # ═══════════════════════════════════════════════════════════════ - # STEP 2: Sync each document based on status/hash - # ═══════════════════════════════════════════════════════════════ - successful = 0 - failed = 0 - skipped = 0 - # Track aiDocumentIds for orphan detection (collected during sync) - synced_file_ids: set = set() - for doc in documents: - doc_id = doc['documentId'] - doc_name = doc.get('documentName', 'Unknown') - junction_status = doc.get('syncstatus', 'new') - ai_document_id = doc.get('aiDocumentId') - blake3_hash = doc.get('blake3hash') - - ctx.logger.info(f"\nπŸ“„ {doc_name} (ID: {doc_id})") - ctx.logger.info(f" Status: {junction_status}") - ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}") - ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...") - - try: - # Decide if sync needed - needs_sync = False - reason = "" - - if junction_status in ['new', 'unclean', 'failed']: - needs_sync = True - reason = f"status={junction_status}" - elif junction_status == 'synced': - # Synced status should have both blake3_hash and ai_document_id - if not blake3_hash: - needs_sync = True - reason = "inconsistency: synced but no blake3 hash" - ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!") - elif not ai_document_id: - needs_sync = True - reason = "inconsistency: synced but no aiDocumentId" - ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!") - else: - # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) - try: - xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) - if xai_doc_info: - xai_blake3 = xai_doc_info.get('blake3_hash') - - if xai_blake3 != blake3_hash: - needs_sync = True - reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)" - ctx.logger.info(f" πŸ”„ Blake3 mismatch detected!") - else: - ctx.logger.info(f" βœ… Blake3 hash matches") - else: - needs_sync = True - reason = "file not found in XAI collection" - ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!") - except Exception as e: - needs_sync = True - reason = f"verification failed: {e}" - ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}") - - if not needs_sync: - ctx.logger.info(f" ⏭️ Skipped (no sync needed)") - # Document is already synced, track its aiDocumentId - if ai_document_id: - synced_file_ids.add(ai_document_id) - skipped += 1 - continue - - ctx.logger.info(f" πŸ”„ Syncing: {reason}") - - # Get complete document entity with attachment info - doc_entity = await espocrm.get_entity('CDokumente', doc_id) - attachment_id = doc_entity.get('dokumentId') - - if not attachment_id: - ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}") - failed += 1 - continue - - # Get attachment details for MIME type and original filename - try: - attachment = await espocrm.get_entity('Attachment', attachment_id) - mime_type = attachment.get('type', 'application/octet-stream') - file_size = attachment.get('size', 0) - original_filename = attachment.get('name', doc_name) # Original filename with extension - # URL-decode filename (fixes special chars like Β§, Γ€, ΓΆ, ΓΌ, etc.) - original_filename = unquote(original_filename) - except Exception as e: - ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults") - mime_type = 'application/octet-stream' - file_size = 0 - original_filename = unquote(doc_name) # Also decode fallback name - - ctx.logger.info(f" πŸ“Ž Attachment: {attachment_id} ({mime_type}, {file_size} bytes)") - ctx.logger.info(f" πŸ“„ Original filename: {original_filename}") - - # Download document - file_content = await espocrm.download_attachment(attachment_id) - ctx.logger.info(f" πŸ“₯ Downloaded {len(file_content)} bytes") - - # Upload to XAI with original filename (includes extension) - filename = original_filename - - xai_file_id = await xai.upload_file(file_content, filename, mime_type) - ctx.logger.info(f" πŸ“€ Uploaded to XAI: {xai_file_id}") - - # Add to collection - await xai.add_to_collection(collection_id, xai_file_id) - ctx.logger.info(f" βœ… Added to collection {collection_id}") - - # Update junction - await espocrm.update_knowledge_document_junction( - knowledge_id, - doc_id, - { - 'aiDocumentId': xai_file_id, - 'syncstatus': 'synced' - }, - update_last_sync=True - ) - ctx.logger.info(f" βœ… Junction updated") - - # Track the new aiDocumentId for orphan detection - synced_file_ids.add(xai_file_id) - - successful += 1 - - except Exception as e: - failed += 1 - ctx.logger.error(f" ❌ Sync failed: {e}") - - # Mark as failed in junction - try: - await espocrm.update_knowledge_document_junction( - knowledge_id, - doc_id, - {'syncstatus': 'failed'}, - update_last_sync=False - ) - except Exception as update_err: - ctx.logger.error(f" ❌ Failed to update junction status: {update_err}") - - # ═══════════════════════════════════════════════════════════════ - # STEP 3: Remove orphaned documents from XAI collection - # ═══════════════════════════════════════════════════════════════ - try: - ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...") - - # Get all files in XAI collection (normalized structure) - xai_documents = await xai.list_collection_documents(collection_id) - xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')} - - # Use synced_file_ids (collected during this sync) for orphan detection - # This includes both pre-existing synced docs and newly uploaded ones - ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced") - - # Find orphans (in XAI but not in our current sync) - orphans = xai_file_ids - synced_file_ids - - if orphans: - ctx.logger.info(f" Found {len(orphans)} orphaned file(s)") - for orphan_id in orphans: - try: - await xai.remove_from_collection(collection_id, orphan_id) - ctx.logger.info(f" πŸ—‘οΈ Removed {orphan_id}") - except Exception as e: - ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}") - else: - ctx.logger.info(f" βœ… No orphans found") - - except Exception as e: - ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}") - - # ═══════════════════════════════════════════════════════════════ - # STEP 4: Summary - # ═══════════════════════════════════════════════════════════════ - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info(f"πŸ“Š Sync Statistics:") - ctx.logger.info(f" βœ… Synced: {successful}") - ctx.logger.info(f" ⏭️ Skipped: {skipped}") - ctx.logger.info(f" ❌ Failed: {failed}") - ctx.logger.info(f" Mode: Blake3 hash verification enabled") - ctx.logger.info("=" * 80) - - def _calculate_metadata_hash(self, document: Dict) -> str: - """ - Calculate hash of sync-relevant metadata. - - Args: - document: CDokumente entity - - Returns: - MD5 hash (32 chars) - """ - metadata = { - 'name': document.get('name', ''), - 'description': document.get('description', ''), - } - - metadata_str = json.dumps(metadata, sort_keys=True) - return hashlib.md5(metadata_str.encode()).hexdigest() - - def _build_xai_metadata(self, document: Dict) -> Dict[str, str]: - """ - Build XAI metadata from CDokumente entity. - - Args: - document: CDokumente entity - - Returns: - Metadata dict for XAI - """ - return { - 'document_name': document.get('name', ''), - 'description': document.get('description', ''), - 'created_at': document.get('createdAt', ''), - 'modified_at': document.get('modifiedAt', ''), - 'espocrm_id': document.get('id', '') - } - - async def _get_document_download_info( - self, - document: Dict, - ctx - ) -> Optional[Dict[str, Any]]: - """ - Get download info for CDokumente entity. - - Args: - document: CDokumente entity - ctx: Motia context - - Returns: - Dict with attachment_id, filename, mime_type - """ - from services.espocrm import EspoCRMAPI - - espocrm = EspoCRMAPI(ctx) - - # Check for dokumentId (CDokumente custom field) - attachment_id = None - filename = None - - if document.get('dokumentId'): - attachment_id = document.get('dokumentId') - filename = document.get('dokumentName') - elif document.get('fileId'): - attachment_id = document.get('fileId') - filename = document.get('fileName') - - if not attachment_id: - ctx.logger.error(f"❌ No attachment ID for document {document['id']}") - return None - - # Get attachment details - try: - attachment = await espocrm.get_entity('Attachment', attachment_id) - return { - 'attachment_id': attachment_id, - 'filename': filename or attachment.get('name', 'unknown'), - 'mime_type': attachment.get('type', 'application/octet-stream') - } - except Exception as e: - ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}") - return None diff --git a/services/xai_upload_utils.py b/services/xai_upload_utils.py new file mode 100644 index 0000000..b336d5e --- /dev/null +++ b/services/xai_upload_utils.py @@ -0,0 +1,201 @@ +""" +xAI Upload Utilities + +Shared logic for uploading documents from EspoCRM to xAI Collections. +Used by all sync flows (Advoware + direct xAI sync). + +Handles: +- Blake3 hash-based change detection +- Upload to xAI with correct filename/MIME +- Collection management (create/verify) +- EspoCRM metadata update after sync +""" + +from typing import Optional, Dict, Any +from datetime import datetime + + +class XAIUploadUtils: + """ + Stateless utility class for document upload operations to xAI. + + All methods take explicit service instances to remain reusable + across different sync contexts. + """ + + def __init__(self, ctx): + from services.logging_utils import get_service_logger + self._log = get_service_logger(__name__, ctx) + + async def ensure_collection( + self, + akte: Dict[str, Any], + xai, + espocrm, + ) -> Optional[str]: + """ + Ensure xAI collection exists for this Akte. + Creates one if missing, verifies it if present. + + Returns: + collection_id or None on failure + """ + akte_id = akte['id'] + akte_name = akte.get('name', f"Akte {akte.get('aktennummer', akte_id)}") + collection_id = akte.get('aiCollectionId') + + if collection_id: + # Verify it still exists in xAI + try: + col = await xai.get_collection(collection_id) + if col: + self._log.debug(f"Collection {collection_id} verified for '{akte_name}'") + return collection_id + self._log.warn(f"Collection {collection_id} not found in xAI, recreating...") + except Exception as e: + self._log.warn(f"Could not verify collection {collection_id}: {e}, recreating...") + + # Create new collection + try: + self._log.info(f"Creating xAI collection for '{akte_name}'...") + col = await xai.create_collection( + name=akte_name, + metadata={ + 'espocrm_entity_type': 'CAkten', + 'espocrm_entity_id': akte_id, + 'aktennummer': str(akte.get('aktennummer', '')), + } + ) + collection_id = col['id'] + self._log.info(f"βœ… Collection created: {collection_id}") + + # Save back to EspoCRM + await espocrm.update_entity('CAkten', akte_id, { + 'aiCollectionId': collection_id, + 'aiSyncStatus': 'unclean', # Trigger full doc sync + }) + return collection_id + + except Exception as e: + self._log.error(f"❌ Failed to create xAI collection: {e}") + return None + + async def sync_document_to_xai( + self, + doc: Dict[str, Any], + collection_id: str, + xai, + espocrm, + ) -> bool: + """ + Sync a single CDokumente entity to xAI collection. + + Decision logic (Blake3-based): + - aiSyncStatus in ['new', 'unclean', 'failed'] β†’ always sync + - aiSyncStatus == 'synced' AND aiSyncHash == blake3hash β†’ skip (no change) + - aiSyncStatus == 'synced' AND aiSyncHash != blake3hash β†’ re-upload (changed) + - No attachment β†’ mark unsupported + + Returns: + True if synced/skipped successfully, False on error + """ + doc_id = doc['id'] + doc_name = doc.get('name', doc_id) + ai_status = doc.get('aiSyncStatus', 'new') + ai_sync_hash = doc.get('aiSyncHash') + blake3_hash = doc.get('blake3hash') + ai_file_id = doc.get('aiFileId') + + self._log.info(f" πŸ“„ {doc_name}") + self._log.info(f" aiSyncStatus={ai_status}, aiSyncHash={ai_sync_hash[:12] if ai_sync_hash else 'N/A'}..., blake3={blake3_hash[:12] if blake3_hash else 'N/A'}...") + + # Skip if already synced and hash matches + if ai_status == 'synced' and ai_sync_hash and blake3_hash and ai_sync_hash == blake3_hash: + self._log.info(f" ⏭️ Skipped (hash match, no change)") + return True + + # Get attachment info + attachment_id = doc.get('dokumentId') + if not attachment_id: + self._log.warn(f" ⚠️ No attachment (dokumentId missing) - marking unsupported") + await espocrm.update_entity('CDokumente', doc_id, { + 'aiSyncStatus': 'unsupported', + 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + }) + return True # Not an error, just unsupported + + try: + # Download from EspoCRM + self._log.info(f" πŸ“₯ Downloading attachment {attachment_id}...") + file_content = await espocrm.download_attachment(attachment_id) + self._log.info(f" Downloaded {len(file_content)} bytes") + + # Determine filename + MIME type + filename = doc.get('dokumentName') or doc.get('name', 'document.bin') + from urllib.parse import unquote + filename = unquote(filename) + + import mimetypes + mime_type, _ = mimetypes.guess_type(filename) + if not mime_type: + mime_type = 'application/octet-stream' + + # Remove old file from collection if updating + if ai_file_id and ai_status != 'new': + try: + await xai.remove_from_collection(collection_id, ai_file_id) + self._log.info(f" πŸ—‘οΈ Removed old xAI file {ai_file_id}") + except Exception: + pass # Non-fatal - may already be gone + + # Upload to xAI + self._log.info(f" πŸ“€ Uploading '{filename}' ({mime_type})...") + new_xai_file_id = await xai.upload_file(file_content, filename, mime_type) + self._log.info(f" Uploaded: xai_file_id={new_xai_file_id}") + + # Add to collection + await xai.add_to_collection(collection_id, new_xai_file_id) + self._log.info(f" βœ… Added to collection {collection_id}") + + # Update CDokumente with sync result + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + await espocrm.update_entity('CDokumente', doc_id, { + 'aiFileId': new_xai_file_id, + 'aiCollectionId': collection_id, + 'aiSyncHash': blake3_hash or doc.get('syncedHash'), + 'aiSyncStatus': 'synced', + 'aiLastSync': now, + }) + self._log.info(f" βœ… EspoCRM updated") + return True + + except Exception as e: + self._log.error(f" ❌ Failed: {e}") + await espocrm.update_entity('CDokumente', doc_id, { + 'aiSyncStatus': 'failed', + 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + }) + return False + + async def remove_document_from_xai( + self, + doc: Dict[str, Any], + collection_id: str, + xai, + espocrm, + ) -> None: + """Remove a CDokumente from its xAI collection (called on DELETE).""" + doc_id = doc['id'] + ai_file_id = doc.get('aiFileId') + if not ai_file_id: + return + try: + await xai.remove_from_collection(collection_id, ai_file_id) + self._log.info(f" πŸ—‘οΈ Removed {doc.get('name')} from xAI collection") + await espocrm.update_entity('CDokumente', doc_id, { + 'aiFileId': None, + 'aiSyncStatus': 'new', + 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + }) + except Exception as e: + self._log.warn(f" ⚠️ Could not remove from xAI: {e}") diff --git a/src/steps/advoware_docs/advoware_document_sync_step.py b/src/steps/advoware_docs/advoware_document_sync_step.py deleted file mode 100644 index 261894a..0000000 --- a/src/steps/advoware_docs/advoware_document_sync_step.py +++ /dev/null @@ -1,507 +0,0 @@ -""" -Advoware Document Sync - Event Handler - -Executes 3-way merge sync for one Akte. -PER-AKTE LOCK: Allows parallel syncs of different Akten. -Triggers preview generation for new/changed documents. - -Flow: -1. Acquire per-Akte lock (key: advoware_document_sync:akte:{aktennr}) -2. Fetch data: EspoCRM docs + Windows files + Advoware history -3. Cleanup file list (filter by History) -4. 3-Way merge per file -5. Sync metadata (always) -6. Check Akte ablage status -7. Update sync status -8. Redis: SREM processing (success) or ZADD to pending Sorted Set (error) -9. Release per-Akte lock (always in finally) - -PARALLEL EXECUTION: Multiple Akten can sync simultaneously. -LOCK SCOPE: Only prevents the same Akte from syncing twice at once. - -Enqueues: -- document.generate_preview: Bei CREATE/UPDATE_ESPO -""" - -from typing import Dict, Any -from datetime import datetime -from motia import FlowContext, queue - - -config = { - "name": "Advoware Document Sync - Event Handler", - "description": "Execute 3-way merge sync for Akte", - "flows": ["advoware-document-sync"], - "triggers": [queue("advoware.document.sync")], - "enqueues": ["document.generate_preview"], -} - - -async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: - """ - Execute sync with GLOBAL lock. - - Flow: - 1. Acquire GLOBAL lock (key: advoware_document_sync_global) - 2. Fetch data: EspoCRM docs + Windows files + Advoware history - 3. Cleanup file list - 4. 3-Way merge per file - 5. Sync metadata (always) - 6. Check Akte ablage status - 7. Update sync status - 8. Redis: SREM processing (success) or SMOVE to pending (error) - 9. Release GLOBAL lock (always in finally) - """ - aktennummer = event_data.get('aktennummer') - akte_id = event_data.get('akte_id') - status = event_data.get('status', 'Unknown') - - ctx.logger.info("=" * 80) - ctx.logger.info(f"πŸ”„ DOCUMENT SYNC STARTED") - ctx.logger.info(f"=" * 80) - ctx.logger.info(f"πŸ“‹ Akte Details:") - ctx.logger.info(f" β”œβ”€ Aktennummer: {aktennummer}") - ctx.logger.info(f" β”œβ”€ EspoCRM ID: {akte_id}") - ctx.logger.info(f" β”œβ”€ Status: {status}") - ctx.logger.info(f" └─ Triggered: Via cron poller") - ctx.logger.info(f"") - ctx.logger.info(f"πŸš€ Parallelization: This Akte syncs independently") - ctx.logger.info(f" Other Akten can sync at the same time!") - ctx.logger.info("") - - from services.redis_client import get_redis_client - from services.espocrm import EspoCRMAPI - from services.advoware_watcher_service import AdvowareWatcherService - from services.advoware_history_service import AdvowareHistoryService - from services.advoware_service import AdvowareService - from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils - from services.blake3_utils import compute_blake3 - - redis_client = get_redis_client(strict=False) - lock_acquired = False - lock_key = f"advoware_document_sync:akte:{aktennummer}" # Per-Akte lock - - if not redis_client: - ctx.logger.error("❌ Redis unavailable, cannot acquire lock") - return - - try: - # 1. PER-AKTE LOCK (allows parallel syncs of different Akten) - ctx.logger.info(f"πŸ” Attempting to acquire lock for Akte {aktennummer}...") - - lock_acquired = redis_client.set(lock_key, f"sync_{datetime.now().isoformat()}", nx=True, ex=1800) - - if not lock_acquired: - current_holder = redis_client.get(lock_key) - ctx.logger.warn(f"") - ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer}") - ctx.logger.warn(f" Lock Key: {lock_key}") - ctx.logger.warn(f" Current Holder: {current_holder}") - ctx.logger.warn(f" Action: Requeueing (Motia will retry)") - raise RuntimeError(f"Lock busy for Akte {aktennummer}, retry later") - - ctx.logger.info(f"βœ… Lock acquired for Akte {aktennummer}") - ctx.logger.info(f" Lock Key: {lock_key}") - ctx.logger.info(f" TTL: 30 minutes") - ctx.logger.info(f" Scope: Only this Akte is locked (other Akten can sync in parallel)") - - # 2. Initialize services - espocrm = EspoCRMAPI(ctx) - watcher = AdvowareWatcherService(ctx) - history_service = AdvowareHistoryService(ctx) - advoware_service = AdvowareService(ctx) - sync_utils = AdvowareDocumentSyncUtils(ctx) - - # 3. Fetch data - ctx.logger.info("πŸ“₯ Fetching data...") - - # Get Akte from EspoCRM - akte = await espocrm.get_entity('CAdvowareAkten', akte_id) - - if not akte: - ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM") - redis_client.srem("advoware:processing_aktennummern", aktennummer) - return - - # Die Aktennummer IST die Advoware-ID - advoware_id = aktennummer - ctx.logger.info(f"πŸ“‹ Using Aktennummer as Advoware-ID: {advoware_id}") - - # Get linked documents from EspoCRM - espo_docs_result = await espocrm.list_related( - 'CAdvowareAkten', - akte_id, - 'dokumentes' - ) - espo_docs = espo_docs_result.get('list', []) - - # Get Windows file list - try: - windows_files = await watcher.get_akte_files(aktennummer) - except Exception as e: - ctx.logger.error(f"❌ Failed to fetch Windows files: {e}") - windows_files = [] - - # Get Advoware History - try: - advo_history = await history_service.get_akte_history(advoware_id) - except Exception as e: - ctx.logger.error(f"❌ Failed to fetch Advoware History: {e}") - advo_history = [] - - ctx.logger.info(f"πŸ“Š Data fetched:") - ctx.logger.info(f" - {len(espo_docs)} EspoCRM docs") - ctx.logger.info(f" - {len(windows_files)} Windows files") - ctx.logger.info(f" - {len(advo_history)} History entries") - - # 4. Cleanup file list (filter by History) - windows_files = sync_utils.cleanup_file_list(windows_files, advo_history) - ctx.logger.info(f"🧹 After cleanup: {len(windows_files)} Windows files with History") - - # 5. Build file mapping for 3-way merge based on HNR (stable identifier) - # hnr (History Number) is the stable identifier in Advoware - files can change name/path but hnr stays same - - # Index EspoCRM docs by hnr (stable identifier) - espo_docs_by_hnr = {} - espo_docs_by_path = {} # Fallback for docs without hnr - for doc in espo_docs: - hnr = doc.get('hnr') - if hnr: - espo_docs_by_hnr[hnr] = doc - dateipfad = doc.get('dateipfad', '') - if dateipfad: - espo_docs_by_path[dateipfad.lower()] = doc - - # Index History by hnr - history_by_hnr = {} - history_by_path = {} # For path-based lookup - for entry in advo_history: - hnr = entry.get('hNr') - datei = entry.get('datei', '') - if hnr: - history_by_hnr[hnr] = entry - if datei: - history_by_path[datei.lower()] = entry - - # Index Windows files by path (they don't have hnr directly) - windows_files_by_path = {f.get('path', '').lower(): f for f in windows_files} - - # Get all unique hnrs to process - all_hnrs = set(espo_docs_by_hnr.keys()) | set(history_by_hnr.keys()) - - ctx.logger.info(f"πŸ“‹ Total unique documents (by hnr): {len(all_hnrs)}") - ctx.logger.info(f" EspoCRM docs with hnr: {len(espo_docs_by_hnr)}") - ctx.logger.info(f" History entries: {len(history_by_hnr)}") - ctx.logger.info(f" Windows files: {len(windows_files_by_path)}") - - # 6. 3-Way merge per hnr (stable identifier) - sync_results = { - 'created': 0, - 'uploaded': 0, - 'updated': 0, - 'deleted': 0, - 'skipped': 0, - 'errors': 0 - } - - for hnr in all_hnrs: - # Get data for this hnr from all sources - espo_doc = espo_docs_by_hnr.get(hnr) - history_entry = history_by_hnr.get(hnr) - - # Get Windows file through history path - windows_file = None - file_path = None - if history_entry: - file_path = history_entry.get('datei', '').lower() - windows_file = windows_files_by_path.get(file_path) - - # Extract filename for display - if history_entry and history_entry.get('datei'): - filename = history_entry.get('datei').split('\\')[-1] - elif espo_doc: - filename = espo_doc.get('name', f'hnr_{hnr}') - else: - filename = f'hnr_{hnr}' - - ctx.logger.info(f"\n{'='*80}") - ctx.logger.info(f"Processing: {filename} (hnr: {hnr})") - ctx.logger.info(f"{'='*80}") - - try: - # Perform 3-way merge based on hnr - action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry) - - ctx.logger.info(f"πŸ“Š Merge decision:") - ctx.logger.info(f" Action: {action.action}") - ctx.logger.info(f" Reason: {action.reason}") - ctx.logger.info(f" Source: {action.source}") - - # Execute action - if action.action == 'SKIP': - ctx.logger.info(f"⏭️ Skipping {filename}") - sync_results['skipped'] += 1 - - elif action.action == 'CREATE': - # Download from Windows and create in EspoCRM - if not windows_file: - ctx.logger.error(f"❌ Cannot CREATE - no Windows file for hnr {hnr}") - sync_results['errors'] += 1 - continue - - ctx.logger.info(f"πŸ“₯ Downloading {filename} from Windows...") - content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) - - # Compute Blake3 hash - blake3_hash = compute_blake3(content) - - # Determine MIME type - import mimetypes - mime_type, _ = mimetypes.guess_type(filename) - if not mime_type: - mime_type = 'application/octet-stream' - - # Step 1: Upload attachment for File field - ctx.logger.info(f"πŸ“€ Uploading attachment (Step 1/2)...") - try: - attachment = await espocrm.upload_attachment_for_file_field( - file_content=content, - filename=filename, - related_type='CDokumente', - field='dokument', - mime_type=mime_type - ) - ctx.logger.info(f"βœ… Attachment uploaded: {attachment.get('id')}") - except Exception as e: - ctx.logger.error(f"❌ Failed to upload attachment: {e}") - raise - - # Step 2: Create document entity with attachment ID and Advoware fields - ctx.logger.info(f"πŸ’Ύ Creating document entity (Step 2/2)...") - - # Extract full Windows path from watcher data - full_path = windows_file.get('path', '') - - # Current timestamp for sync tracking (EspoCRM format) - now_iso = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - new_doc = await espocrm.create_entity('CDokumente', { - 'name': filename, - 'dokumentId': attachment.get('id'), # Link to attachment - # Advoware History fields - 'hnr': history_entry.get('hNr') if history_entry else None, - 'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben', - 'advowareBemerkung': history_entry.get('text', '') if history_entry else '', - # Windows file sync fields - 'dateipfad': full_path, - 'blake3hash': blake3_hash, - 'syncedHash': blake3_hash, - 'usn': windows_file.get('usn', 0), - 'syncStatus': 'synced', - 'lastSyncTimestamp': now_iso - }) - - doc_id = new_doc.get('id') - ctx.logger.info(f"βœ… Created document with attachment: {doc_id}") - - # Link to Akte - await espocrm.link_entities( - 'CAdvowareAkten', - akte_id, - 'dokumentes', - doc_id - ) - - sync_results['created'] += 1 - - # Trigger preview generation - try: - await ctx.emit('document.generate_preview', { - 'entity_id': doc_id, - 'entity_type': 'CDokumente' - }) - ctx.logger.info(f"βœ… Preview generation triggered for {doc_id}") - except Exception as e: - ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}") - - elif action.action == 'UPDATE_ESPO': - # Download from Windows and update EspoCRM - if not windows_file: - ctx.logger.error(f"❌ Cannot UPDATE_ESPO - no Windows file for hnr {hnr}") - sync_results['errors'] += 1 - continue - - ctx.logger.info(f"πŸ“₯ Downloading {filename} from Windows...") - content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) - - # Compute Blake3 hash - blake3_hash = compute_blake3(content) - - # Determine MIME type - import mimetypes - mime_type, _ = mimetypes.guess_type(filename) - if not mime_type: - mime_type = 'application/octet-stream' - - # Extract full Windows path - full_path = windows_file.get('path', '') - - # Update document in EspoCRM with correct field names - ctx.logger.info(f"πŸ’Ύ Updating document in EspoCRM...") - now_iso = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - update_data = { - 'name': filename, # Update name if changed - 'blake3hash': blake3_hash, - 'syncedHash': blake3_hash, - 'usn': windows_file.get('usn', 0), - 'dateipfad': full_path, # Update path if changed - 'syncStatus': 'synced', - 'lastSyncTimestamp': now_iso - } - - # Also update History fields if available - if history_entry: - update_data['hnr'] = history_entry.get('hNr') - update_data['advowareArt'] = history_entry.get('art', 'Schreiben') - update_data['advowareBemerkung'] = history_entry.get('text', '') - - await espocrm.update_entity('CDokumente', espo_doc.get('id'), update_data) - - ctx.logger.info(f"βœ… Updated document: {espo_doc.get('id')}") - sync_results['updated'] += 1 - - # Trigger preview generation - try: - await ctx.emit('document.generate_preview', { - 'entity_id': espo_doc.get('id'), - 'entity_type': 'CDokumente' - }) - ctx.logger.info(f"βœ… Preview generation triggered for {espo_doc.get('id')}") - except Exception as e: - ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}") - - elif action.action == 'UPLOAD_WINDOWS': - # Upload to Windows from EspoCRM - ctx.logger.info(f"πŸ“€ Uploading {filename} to Windows...") - - # Get file content from EspoCRM (would need attachment download) - # For now, log that this needs implementation - ctx.logger.warn(f"⚠️ Upload to Windows not yet implemented for {filename}") - sync_results['skipped'] += 1 - - elif action.action == 'DELETE': - # Delete from EspoCRM (file deleted in Windows/Advoware) - ctx.logger.info(f"πŸ—‘οΈ Deleting {filename} from EspoCRM...") - - if espo_doc: - doc_id = espo_doc.get('id') - await espocrm.delete_entity('CDokumente', doc_id) - ctx.logger.info(f"βœ… Deleted document: {doc_id}") - sync_results['deleted'] += 1 - else: - ctx.logger.warn(f"⚠️ No EspoCRM document found for deletion") - sync_results['skipped'] += 1 - - except Exception as e: - ctx.logger.error(f"❌ Error processing {filename}: {e}") - sync_results['errors'] += 1 - - # 7. Sync metadata (always update from History) - ctx.logger.info(f"\n{'='*80}") - ctx.logger.info("πŸ“‹ Syncing metadata from History...") - ctx.logger.info(f"{'='*80}") - - metadata_updates = 0 - for file_path in all_paths: - # Extract filename for EspoCRM lookup - filename = file_path.split('\\')[-1] - - espo_doc = espo_docs_by_name.get(filename.lower()) - history_entry = history_by_path.get(file_path) - - if espo_doc and history_entry: - needs_update, updates = sync_utils.should_sync_metadata(espo_doc, history_entry) - - if needs_update: - try: - await espocrm.update_entity('CDokumente', espo_doc.get('id'), updates) - ctx.logger.info(f"βœ… Updated metadata for {filename}: {list(updates.keys())}") - metadata_updates += 1 - except Exception as e: - ctx.logger.error(f"❌ Failed to update metadata for {filename}: {e}") - - ctx.logger.info(f"πŸ“Š Metadata sync: {metadata_updates} updates") - - # 8. Check Akte ablage status - ctx.logger.info(f"\n{'='*80}") - ctx.logger.info("πŸ—‚οΈ Checking Akte ablage status...") - ctx.logger.info(f"{'='*80}") - - akte_details = await advoware_service.get_akte(advoware_id) - - if akte_details and akte_details.get('ablage') == 1: - ctx.logger.info(f"πŸ“ Akte {aktennummer} marked as ablage, deactivating in EspoCRM") - - await espocrm.update_entity('CAdvowareAkten', akte_id, { - 'Aktivierungsstatus': 'Deaktiviert' - }) - - # 9. Update sync status - await espocrm.update_entity('CAdvowareAkten', akte_id, { - 'syncStatus': 'synced', - 'lastSync': datetime.now().isoformat() - }) - - # 10. SUCCESS: Remove from processing SET - redis_client.srem("advoware:processing_aktennummern", aktennummer) - - # Summary - ctx.logger.info(f"\n{'='*80}") - ctx.logger.info(f"βœ… Sync complete for Akte {aktennummer}") - ctx.logger.info(f"{'='*80}") - ctx.logger.info(f"πŸ“Š Results:") - ctx.logger.info(f" - Created: {sync_results['created']}") - ctx.logger.info(f" - Updated: {sync_results['updated']}") - ctx.logger.info(f" - Deleted: {sync_results['deleted']}") - ctx.logger.info(f" - Uploaded: {sync_results['uploaded']}") - ctx.logger.info(f" - Skipped: {sync_results['skipped']}") - ctx.logger.info(f" - Errors: {sync_results['errors']}") - ctx.logger.info(f" - Metadata updates: {metadata_updates}") - ctx.logger.info(f"{'='*80}") - - except Exception as e: - ctx.logger.error(f"❌ Sync failed for {aktennummer}: {e}") - - # Move back to pending Sorted Set for retry - if redis_client: - import time - retry_timestamp = time.time() - redis_client.zadd( - "advoware:pending_aktennummern", - {aktennummer: retry_timestamp} - ) - ctx.logger.info(f"βœ“ Moved {aktennummer} back to pending queue for retry") - - # Update status in EspoCRM - try: - await espocrm.update_entity('CAdvowareAkten', akte_id, { - 'syncStatus': 'failed', - 'lastSyncError': str(e)[:500] # Truncate long errors - }) - except: - pass - - # Re-raise for Motia retry - raise - - finally: - # ALWAYS release lock - if lock_acquired and redis_client: - redis_client.delete(lock_key) - ctx.logger.info(f"") - ctx.logger.info(f"πŸ”“ Lock released for Akte {aktennummer}") - ctx.logger.info(f" Lock Key: {lock_key}") - ctx.logger.info(f" Duration: Released after processing") - - ctx.logger.info("=" * 80) diff --git a/src/steps/advoware_docs/document_sync_cron_step.py b/src/steps/advoware_docs/document_sync_cron_step.py deleted file mode 100644 index b9de5d8..0000000 --- a/src/steps/advoware_docs/document_sync_cron_step.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -Advoware Document Sync - Cron Poller - -Polls Redis SET for pending Aktennummern every 10 seconds. -Filters by Akte status and emits sync events. - -Flow: -1. SPOP from advoware:pending_aktennummern -2. SADD to advoware:processing_aktennummern -3. Validate Akte status in EspoCRM -4. Emit event if status valid -5. Remove from processing if invalid -""" - -from typing import Dict, Any -from motia import FlowContext, cron - - -config = { - "name": "Advoware Document Sync - Cron Poller", - "description": "Poll Redis for pending Aktennummern and emit sync events", - "flows": ["advoware-document-sync"], - "triggers": [cron("*/10 * * * * *")], # Every 10 seconds - "enqueues": ["advoware.document.sync"], -} - - -async def handler(input_data: None, ctx: FlowContext) -> None: - """ - Poll Redis and emit sync events. - - Flow: - 1. SPOP from advoware:pending_aktennummern - 2. SADD to advoware:processing_aktennummern - 3. Validate Akte status in EspoCRM - 4. Emit event if status valid - 5. Remove from processing if invalid - """ - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ” Polling Redis for pending Aktennummern") - - from services.redis_client import get_redis_client - from services.espocrm import EspoCRMAPI - - redis_client = get_redis_client(strict=False) - if not redis_client: - ctx.logger.error("❌ Redis unavailable - cannot poll") - ctx.logger.info("=" * 80) - return - - espocrm = EspoCRMAPI(ctx) - - try: - import time - - # Debounce-Zeit: 10 Sekunden - debounce_seconds = 10 - cutoff_time = time.time() - debounce_seconds - - # Check queue sizes BEFORE poll (Sorted Set = ZCARD) - pending_count = redis_client.zcard("advoware:pending_aktennummern") - processing_count = redis_client.scard("advoware:processing_aktennummern") - - ctx.logger.info(f"πŸ“Š Queue Status:") - ctx.logger.info(f" β€’ Pending: {pending_count} Aktennummern (Sorted Set)") - ctx.logger.info(f" β€’ Processing: {processing_count} Aktennummern (Set)") - ctx.logger.info(f" β€’ Debounce: {debounce_seconds} seconds") - - # Poll Redis Sorted Set: Hole EintrΓ€ge Γ€lter als 10 Sekunden - # ZRANGEBYSCORE: Return members with score between min and max (timestamp) - old_entries = redis_client.zrangebyscore( - "advoware:pending_aktennummern", - min=0, # Γ„lteste mΓΆglich - max=cutoff_time, # Maximal cutoff_time (vor 10 Sekunden) - start=0, - num=1 # Nur 1 Eintrag pro Iteration - ) - - if not old_entries or len(old_entries) == 0: - # Entweder Queue leer ODER alle EintrΓ€ge sind zu neu (<10 Sekunden) - if pending_count > 0: - ctx.logger.info(f"⏸️ {pending_count} Aktennummern in queue, but all too recent (< {debounce_seconds}s)") - ctx.logger.info(f" Waiting for debounce window to pass...") - else: - ctx.logger.info("βœ“ No pending Aktennummern (queue is empty)") - ctx.logger.info("=" * 80) - return - - # Aktennummer gefunden (β‰₯10 Sekunden alt) - aktennr = old_entries[0] - - # Decode if bytes - if isinstance(aktennr, bytes): - aktennr = aktennr.decode('utf-8') - - # Hole den Timestamp des Eintrags - score = redis_client.zscore("advoware:pending_aktennummern", aktennr) - age_seconds = time.time() - score if score else 0 - - # Entferne aus Sorted Set - redis_client.zrem("advoware:pending_aktennummern", aktennr) - - ctx.logger.info(f"") - ctx.logger.info(f"πŸ“‹ Processing Aktennummer: {aktennr}") - ctx.logger.info(f" β”œβ”€ First Event: {age_seconds:.1f} seconds ago") - ctx.logger.info(f" β”œβ”€ Debounced: βœ… (waited {debounce_seconds}s)") - ctx.logger.info(f" └─ Removed from pending queue") - ctx.logger.info(f" β”œβ”€ Source: Redis SET 'advoware:pending_aktennummern'") - ctx.logger.info(f" β”œβ”€ Action: Moved to 'advoware:processing_aktennummern'") - ctx.logger.info(f" └─ Next: Validate Akte status in EspoCRM") - - # Move to processing SET - redis_client.sadd("advoware:processing_aktennummern", aktennr) - ctx.logger.info(f"βœ“ Moved to processing queue") - - # Validate Akte status in EspoCRM - ctx.logger.info(f"") - ctx.logger.info(f"πŸ” Looking up Akte in EspoCRM...") - - try: - # Search for Akte by aktennummer - result = await espocrm.list_entities( - 'CAdvowareAkten', - where=[{ - 'type': 'equals', - 'attribute': 'aktennummer', - 'value': aktennr - }], - max_size=1 - ) - - if not result or not result.get('list') or len(result['list']) == 0: - ctx.logger.warn(f"") - ctx.logger.warn(f"⚠️ REJECTED: Akte {aktennr} not found in EspoCRM") - ctx.logger.warn(f" Reason: No CAdvowareAkten entity with aktennummer={aktennr}") - ctx.logger.warn(f" Action: Removed from processing queue") - ctx.logger.warn(f" Impact: Will not be synced until re-added to Redis") - redis_client.srem("advoware:processing_aktennummern", aktennr) - return - - akte = result['list'][0] - akte_id = akte.get('id', '') - advoware_id = akte.get('advowareId', 'N/A') - aktivierungsstatus = akte.get('aktivierungsstatus', 'N/A') # Feldname kleingeschrieben! - - ctx.logger.info(f"βœ“ Akte found in EspoCRM:") - ctx.logger.info(f" β”œβ”€ EspoCRM ID: {akte_id}") - ctx.logger.info(f" β”œβ”€ Advoware ID: {advoware_id}") - ctx.logger.info(f" β”œβ”€ Aktivierungsstatus RAW: '{aktivierungsstatus}' (type: {type(aktivierungsstatus).__name__})") - ctx.logger.info(f" └─ All akte fields: {list(akte.keys())[:10]}...") # Debug: Zeige Feldnamen - - # Valid statuses: Both German and English variants accepted - # German: import, neu, aktiv - # English: import, new, active - valid_statuses = ['import', 'neu', 'aktiv', 'new', 'active'] - aktivierungsstatus_lower = str(aktivierungsstatus).lower().strip() - - ctx.logger.info(f"πŸ” Status validation:") - ctx.logger.info(f" β”œβ”€ Aktivierungsstatus: '{aktivierungsstatus}'") - ctx.logger.info(f" β”œβ”€ Aktivierungsstatus (lowercase): '{aktivierungsstatus_lower}'") - ctx.logger.info(f" β”œβ”€ Valid statuses: {valid_statuses}") - ctx.logger.info(f" └─ Is valid? {aktivierungsstatus_lower in valid_statuses}") - - if aktivierungsstatus_lower not in valid_statuses: - ctx.logger.warn(f"") - ctx.logger.warn(f"⚠️ REJECTED: Akte {aktennr} has invalid aktivierungsstatus") - ctx.logger.warn(f" Current Aktivierungsstatus: '{aktivierungsstatus}' (lowercased: '{aktivierungsstatus_lower}')") - ctx.logger.warn(f" Valid Statuses: {valid_statuses}") - ctx.logger.warn(f" Reason: Only active Akten are synced") - ctx.logger.warn(f" Action: Removed from processing queue") - redis_client.srem("advoware:processing_aktennummern", aktennr) - return - - ctx.logger.info(f"") - ctx.logger.info(f"βœ… ACCEPTED: Akte {aktennr} is valid for sync") - ctx.logger.info(f" Aktivierungsstatus: {aktivierungsstatus} (valid)") - ctx.logger.info(f" Action: Emitting sync event to queue") - - # Emit sync event - ctx.logger.info(f"πŸ“€ Emitting event to topic 'advoware.document.sync'...") - await ctx.enqueue({ - 'topic': 'advoware.document.sync', - 'data': { - 'aktennummer': aktennr, - 'akte_id': akte_id, - 'aktivierungsstatus': aktivierungsstatus # FIXED: war 'status' - } - }) - ctx.logger.info(f"βœ… Event emitted successfully") - - ctx.logger.info(f"") - ctx.logger.info(f"πŸš€ Sync event emitted successfully") - ctx.logger.info(f" Topic: advoware.document.sync") - ctx.logger.info(f" Payload: aktennummer={aktennr}, akte_id={akte_id}, aktivierungsstatus={aktivierungsstatus}") - ctx.logger.info(f" Next: Event handler will process sync") - - except Exception as e: - ctx.logger.error(f"") - ctx.logger.error(f"❌ ERROR: Failed to process {aktennr}") - ctx.logger.error(f" Error Type: {type(e).__name__}") - ctx.logger.error(f" Error Message: {str(e)}") - ctx.logger.error(f" Traceback: ", exc_info=True) # Full traceback - ctx.logger.error(f" Action: Moving back to pending queue for retry") - - # Move back to pending Sorted Set for retry - # Set timestamp to NOW so it gets retried immediately (no debounce on retry) - retry_timestamp = time.time() - redis_client.zadd( - "advoware:pending_aktennummern", - {aktennr: retry_timestamp} - ) - ctx.logger.info(f"βœ“ Moved {aktennr} back to pending queue (timestamp: now)") - - raise - - except Exception as e: - ctx.logger.error(f"") - ctx.logger.error(f"❌ CRON POLLER ERROR (non-fatal)") - ctx.logger.error(f" Error Type: {type(e).__name__}") - ctx.logger.error(f" Error Message: {str(e)}") - ctx.logger.error(f" Traceback: ", exc_info=True) # Full traceback - ctx.logger.error(f" Impact: This iteration failed, will retry in next cycle") - # Don't raise - let next cron iteration retry - - finally: - # Final queue status - try: - pending_final = redis_client.zcard("advoware:pending_aktennummern") - processing_final = redis_client.scard("advoware:processing_aktennummern") - - ctx.logger.info(f"") - ctx.logger.info(f"πŸ“Š Final Queue Status:") - ctx.logger.info(f" β€’ Pending: {pending_final} Aktennummern") - ctx.logger.info(f" β€’ Processing: {processing_final} Aktennummern") - except: - pass - - ctx.logger.info("=" * 80) diff --git a/src/steps/akte/__init__.py b/src/steps/akte/__init__.py new file mode 100644 index 0000000..9c1db70 --- /dev/null +++ b/src/steps/akte/__init__.py @@ -0,0 +1 @@ +# Akte sync steps – unified sync across Advoware, EspoCRM, and xAI diff --git a/src/steps/akte/akte_sync_cron_step.py b/src/steps/akte/akte_sync_cron_step.py new file mode 100644 index 0000000..b8d8cdd --- /dev/null +++ b/src/steps/akte/akte_sync_cron_step.py @@ -0,0 +1,135 @@ +""" +Akte Sync - Cron Poller + +Polls Redis Sorted Set for pending Aktennummern every 10 seconds. +Respects a 10-second debounce window so that rapid filesystem events +(e.g. many files being updated at once) are batched into a single sync. + +Redis keys (same as advoware-watcher writes to): + advoware:pending_aktennummern – Sorted Set { aktennummer β†’ timestamp } + advoware:processing_aktennummern – Set (tracks active syncs) + +Eligibility check (either flag triggers a sync): + syncSchalter == True AND aktivierungsstatus in valid list β†’ Advoware sync + aiAktivierungsstatus in valid list β†’ xAI sync +""" + +from motia import FlowContext, cron + + +config = { + "name": "Akte Sync - Cron Poller", + "description": "Poll Redis for pending Aktennummern and emit akte.sync events (10 s debounce)", + "flows": ["akte-sync"], + "triggers": [cron("*/10 * * * * *")], + "enqueues": ["akte.sync"], +} + +PENDING_KEY = "advoware:pending_aktennummern" +PROCESSING_KEY = "advoware:processing_aktennummern" +DEBOUNCE_SECS = 10 + +VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'} +VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'} + + +async def handler(input_data: None, ctx: FlowContext) -> None: + import time + from services.redis_client import get_redis_client + from services.espocrm import EspoCRMAPI + + ctx.logger.info("=" * 60) + ctx.logger.info("⏰ AKTE CRON POLLER") + + redis_client = get_redis_client(strict=False) + if not redis_client: + ctx.logger.error("❌ Redis unavailable") + ctx.logger.info("=" * 60) + return + + espocrm = EspoCRMAPI(ctx) + cutoff = time.time() - DEBOUNCE_SECS + + pending_count = redis_client.zcard(PENDING_KEY) + processing_count = redis_client.scard(PROCESSING_KEY) + ctx.logger.info(f" Pending : {pending_count}") + ctx.logger.info(f" Processing : {processing_count}") + + # Pull oldest entry that has passed the debounce window + old_entries = redis_client.zrangebyscore(PENDING_KEY, min=0, max=cutoff, start=0, num=1) + + if not old_entries: + if pending_count > 0: + ctx.logger.info(f"⏸️ {pending_count} pending – all too recent (< {DEBOUNCE_SECS}s)") + else: + ctx.logger.info("βœ“ Queue empty") + ctx.logger.info("=" * 60) + return + + aktennr = old_entries[0] + if isinstance(aktennr, bytes): + aktennr = aktennr.decode() + + score = redis_client.zscore(PENDING_KEY, aktennr) or 0 + age = time.time() - score + redis_client.zrem(PENDING_KEY, aktennr) + redis_client.sadd(PROCESSING_KEY, aktennr) + + ctx.logger.info(f"πŸ“‹ Aktennummer: {aktennr} (age={age:.1f}s)") + + try: + # ── Lookup in EspoCRM ────────────────────────────────────── + result = await espocrm.list_entities( + 'CAkten', + where=[{ + 'type': 'equals', + 'attribute': 'aktennummer', + 'value': aktennr, + }], + max_size=1, + ) + + if not result or not result.get('list'): + ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} – removing") + redis_client.srem(PROCESSING_KEY, aktennr) + ctx.logger.info("=" * 60) + return + + akte = result['list'][0] + akte_id = akte['id'] + sync_schalter = akte.get('syncSchalter', False) + aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() + ai_status = str(akte.get('aiAktivierungsstatus') or '').lower() + + advoware_eligible = sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES + xai_eligible = ai_status in VALID_AI_STATUSES + + ctx.logger.info(f" Akte ID : {akte_id}") + ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'βœ…' if advoware_eligible else '⏭️'})") + ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'βœ…' if xai_eligible else '⏭️'})") + + if not advoware_eligible and not xai_eligible: + ctx.logger.warn(f"⚠️ Akte {aktennr} not eligible for any sync – removing") + redis_client.srem(PROCESSING_KEY, aktennr) + ctx.logger.info("=" * 60) + return + + # ── Emit sync event ──────────────────────────────────────── + await ctx.enqueue({ + 'topic': 'akte.sync', + 'data': { + 'aktennummer': aktennr, + 'akte_id': akte_id, + }, + }) + ctx.logger.info(f"πŸ“€ akte.sync emitted (akte_id={akte_id})") + + except Exception as e: + ctx.logger.error(f"❌ Error processing {aktennr}: {e}") + # Requeue for retry + redis_client.zadd(PENDING_KEY, {aktennr: time.time()}) + redis_client.srem(PROCESSING_KEY, aktennr) + raise + + finally: + ctx.logger.info("=" * 60) diff --git a/src/steps/akte/akte_sync_event_step.py b/src/steps/akte/akte_sync_event_step.py new file mode 100644 index 0000000..93f8ef9 --- /dev/null +++ b/src/steps/akte/akte_sync_event_step.py @@ -0,0 +1,401 @@ +""" +Akte Sync - Event Handler + +Unified sync for one CAkten entity across all configured backends: + - Advoware (3-way merge: Windows ↔ EspoCRM ↔ History) + - xAI (Blake3 hash-based upload to Collection) + +Both run in the same event to keep CDokumente perfectly in sync. + +Trigger: akte.sync { akte_id, aktennummer } +Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte) +Parallel: Different Akten sync simultaneously. + +Enqueues: + - document.generate_preview (after CREATE / UPDATE_ESPO) +""" + +from typing import Dict, Any +from datetime import datetime +from motia import FlowContext, queue + + +config = { + "name": "Akte Sync - Event Handler", + "description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload", + "flows": ["akte-sync"], + "triggers": [queue("akte.sync")], + "enqueues": ["document.generate_preview"], +} + + +# ───────────────────────────────────────────────────────────────────────────── +# Entry point +# ───────────────────────────────────────────────────────────────────────────── + +async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: + akte_id = event_data.get('akte_id') + aktennummer = event_data.get('aktennummer') + + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ”„ AKTE SYNC STARTED") + ctx.logger.info(f" Aktennummer : {aktennummer}") + ctx.logger.info(f" EspoCRM ID : {akte_id}") + ctx.logger.info("=" * 80) + + from services.redis_client import get_redis_client + from services.espocrm import EspoCRMAPI + + redis_client = get_redis_client(strict=False) + if not redis_client: + ctx.logger.error("❌ Redis unavailable") + return + + lock_key = f"akte_sync:{akte_id}" + lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) + if not lock_acquired: + ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer} – requeueing") + raise RuntimeError(f"Lock busy for {aktennummer}") + + espocrm = EspoCRMAPI(ctx) + + try: + # ── Load Akte ────────────────────────────────────────────────────── + akte = await espocrm.get_entity('CAkten', akte_id) + if not akte: + ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM") + redis_client.srem("akte:processing", aktennummer) + return + + sync_schalter = akte.get('syncSchalter', False) + aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() + ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() + + ctx.logger.info(f"πŸ“‹ Akte '{akte.get('name')}'") + ctx.logger.info(f" syncSchalter : {sync_schalter}") + ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}") + ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}") + + advoware_enabled = sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active') + xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active') + + ctx.logger.info(f" Advoware sync : {'βœ… ON' if advoware_enabled else '⏭️ OFF'}") + ctx.logger.info(f" xAI sync : {'βœ… ON' if xai_enabled else '⏭️ OFF'}") + + if not advoware_enabled and not xai_enabled: + ctx.logger.info("⏭️ Both syncs disabled – nothing to do") + redis_client.srem("akte:processing", aktennummer) + return + + # ── ADVOWARE SYNC ────────────────────────────────────────────────── + advoware_results = None + if advoware_enabled: + advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx) + + # ── xAI SYNC ────────────────────────────────────────────────────── + if xai_enabled: + await _run_xai_sync(akte, akte_id, espocrm, ctx) + + # ── Final Status ─────────────────────────────────────────────────── + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'} + if advoware_enabled: + final_update['syncStatus'] = 'synced' + final_update['lastSync'] = now + if xai_enabled: + final_update['aiSyncStatus'] = 'synced' + final_update['aiLastSync'] = now + + await espocrm.update_entity('CAkten', akte_id, final_update) + redis_client.srem("akte:processing", aktennummer) + + ctx.logger.info("=" * 80) + ctx.logger.info("βœ… AKTE SYNC COMPLETE") + if advoware_results: + ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}") + ctx.logger.info("=" * 80) + + except Exception as e: + ctx.logger.error(f"❌ Sync failed: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + # Requeue for retry + import time + redis_client.zadd("akte:pending", {aktennummer: time.time()}) + + try: + await espocrm.update_entity('CAkten', akte_id, { + 'syncStatus': 'failed', + 'globalSyncStatus': 'failed', + }) + except Exception: + pass + raise + + finally: + if lock_acquired and redis_client: + redis_client.delete(lock_key) + ctx.logger.info(f"πŸ”“ Lock released for Akte {aktennummer}") + + +# ───────────────────────────────────────────────────────────────────────────── +# Advoware 3-way merge +# ───────────────────────────────────────────────────────────────────────────── + +async def _run_advoware_sync( + akte: Dict[str, Any], + aktennummer: str, + akte_id: str, + espocrm, + ctx: FlowContext, +) -> Dict[str, int]: + from services.advoware_watcher_service import AdvowareWatcherService + from services.advoware_history_service import AdvowareHistoryService + from services.advoware_service import AdvowareService + from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils + from services.blake3_utils import compute_blake3 + import mimetypes + + watcher = AdvowareWatcherService(ctx) + history_service = AdvowareHistoryService(ctx) + advoware_service = AdvowareService(ctx) + sync_utils = AdvowareDocumentSyncUtils(ctx) + + results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0} + + ctx.logger.info("") + ctx.logger.info("─" * 60) + ctx.logger.info("πŸ“‚ ADVOWARE SYNC") + ctx.logger.info("─" * 60) + + # ── Fetch from all 3 sources ─────────────────────────────────────── + espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') + espo_docs = espo_docs_result.get('list', []) + + try: + windows_files = await watcher.get_akte_files(aktennummer) + except Exception as e: + ctx.logger.error(f"❌ Windows watcher failed: {e}") + windows_files = [] + + try: + advo_history = await history_service.get_akte_history(aktennummer) + except Exception as e: + ctx.logger.error(f"❌ Advoware history failed: {e}") + advo_history = [] + + ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}") + ctx.logger.info(f" Windows files : {len(windows_files)}") + ctx.logger.info(f" History entries: {len(advo_history)}") + + # ── Cleanup Windows list (only files in History) ─────────────────── + windows_files = sync_utils.cleanup_file_list(windows_files, advo_history) + + # ── Build indexes by HNR (stable identifier from Advoware) ──────── + espo_by_hnr = {} + for doc in espo_docs: + if doc.get('hnr'): + espo_by_hnr[doc['hnr']] = doc + + history_by_hnr = {} + for entry in advo_history: + if entry.get('hNr'): + history_by_hnr[entry['hNr']] = entry + + windows_by_path = {f.get('path', '').lower(): f for f in windows_files} + + all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys()) + ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}") + + # ── 3-way merge per HNR ─────────────────────────────────────────── + for hnr in all_hnrs: + espo_doc = espo_by_hnr.get(hnr) + history_entry = history_by_hnr.get(hnr) + + windows_file = None + if history_entry and history_entry.get('datei'): + windows_file = windows_by_path.get(history_entry['datei'].lower()) + + if history_entry and history_entry.get('datei'): + filename = history_entry['datei'].split('\\')[-1] + elif espo_doc: + filename = espo_doc.get('name', f'hnr_{hnr}') + else: + filename = f'hnr_{hnr}' + + try: + action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry) + ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) – {action.reason}") + + if action.action == 'SKIP': + results['skipped'] += 1 + + elif action.action == 'CREATE': + if not windows_file: + ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}") + results['errors'] += 1 + continue + + content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) + blake3_hash = compute_blake3(content) + mime_type, _ = mimetypes.guess_type(filename) + mime_type = mime_type or 'application/octet-stream' + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + attachment = await espocrm.upload_attachment_for_file_field( + file_content=content, + filename=filename, + related_type='CDokumente', + field='dokument', + mime_type=mime_type, + ) + new_doc = await espocrm.create_entity('CDokumente', { + 'name': filename, + 'dokumentId': attachment.get('id'), + 'hnr': history_entry.get('hNr') if history_entry else None, + 'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben', + 'advowareBemerkung': history_entry.get('text', '') if history_entry else '', + 'dateipfad': windows_file.get('path', ''), + 'blake3hash': blake3_hash, + 'syncedHash': blake3_hash, + 'usn': windows_file.get('usn', 0), + 'syncStatus': 'synced', + 'lastSyncTimestamp': now, + 'cAktenId': akte_id, # Direct FK to CAkten + }) + doc_id = new_doc.get('id') + + # Link to Akte + await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id) + results['created'] += 1 + + # Trigger preview + try: + await ctx.emit('document.generate_preview', { + 'entity_id': doc_id, + 'entity_type': 'CDokumente', + }) + except Exception as e: + ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") + + elif action.action == 'UPDATE_ESPO': + if not windows_file: + ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}") + results['errors'] += 1 + continue + + content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) + blake3_hash = compute_blake3(content) + mime_type, _ = mimetypes.guess_type(filename) + mime_type = mime_type or 'application/octet-stream' + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + update_data: Dict[str, Any] = { + 'name': filename, + 'blake3hash': blake3_hash, + 'syncedHash': blake3_hash, + 'usn': windows_file.get('usn', 0), + 'dateipfad': windows_file.get('path', ''), + 'syncStatus': 'synced', + 'lastSyncTimestamp': now, + } + if history_entry: + update_data['hnr'] = history_entry.get('hNr') + update_data['advowareArt'] = history_entry.get('art', 'Schreiben') + update_data['advowareBemerkung'] = history_entry.get('text', '') + + await espocrm.update_entity('CDokumente', espo_doc['id'], update_data) + results['updated'] += 1 + + # Mark for re-sync to xAI (hash changed) + if espo_doc.get('aiSyncStatus') == 'synced': + await espocrm.update_entity('CDokumente', espo_doc['id'], { + 'aiSyncStatus': 'unclean', + }) + + try: + await ctx.emit('document.generate_preview', { + 'entity_id': espo_doc['id'], + 'entity_type': 'CDokumente', + }) + except Exception as e: + ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") + + elif action.action == 'DELETE': + if espo_doc: + await espocrm.delete_entity('CDokumente', espo_doc['id']) + results['deleted'] += 1 + + except Exception as e: + ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}") + results['errors'] += 1 + + # ── Ablage check ─────────────────────────────────────────────────── + try: + akte_details = await advoware_service.get_akte(aktennummer) + if akte_details and akte_details.get('ablage') == 1: + ctx.logger.info("πŸ“ Akte marked as ablage β†’ deactivating") + await espocrm.update_entity('CAkten', akte_id, { + 'aktivierungsstatus': 'deaktiviert', + }) + except Exception as e: + ctx.logger.warn(f"⚠️ Ablage check failed: {e}") + + return results + + +# ───────────────────────────────────────────────────────────────────────────── +# xAI sync +# ───────────────────────────────────────────────────────────────────────────── + +async def _run_xai_sync( + akte: Dict[str, Any], + akte_id: str, + espocrm, + ctx: FlowContext, +) -> None: + from services.xai_service import XAIService + from services.xai_upload_utils import XAIUploadUtils + + xai = XAIService(ctx) + upload_utils = XAIUploadUtils(ctx) + + ctx.logger.info("") + ctx.logger.info("─" * 60) + ctx.logger.info("πŸ€– xAI SYNC") + ctx.logger.info("─" * 60) + + try: + # ── Ensure collection exists ─────────────────────────────────── + collection_id = await upload_utils.ensure_collection(akte, xai, espocrm) + if not collection_id: + ctx.logger.error("❌ Could not obtain xAI collection – aborting xAI sync") + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + return + + # ── Load all linked documents ────────────────────────────────── + docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') + docs = docs_result.get('list', []) + ctx.logger.info(f" Documents to check: {len(docs)}") + + synced = 0 + skipped = 0 + failed = 0 + + for doc in docs: + ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm) + if ok: + if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'): + skipped += 1 + else: + synced += 1 + else: + failed += 1 + + ctx.logger.info(f" βœ… Synced : {synced}") + ctx.logger.info(f" ⏭️ Skipped : {skipped}") + ctx.logger.info(f" ❌ Failed : {failed}") + + finally: + await xai.close() diff --git a/src/steps/vmh/aiknowledge_full_sync_cron_step.py b/src/steps/vmh/aiknowledge_full_sync_cron_step.py deleted file mode 100644 index b5a1294..0000000 --- a/src/steps/vmh/aiknowledge_full_sync_cron_step.py +++ /dev/null @@ -1,90 +0,0 @@ -"""AI Knowledge Daily Sync - Cron Job""" -from typing import Any -from motia import FlowContext, cron - - -config = { - "name": "AI Knowledge Daily Sync", - "description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)", - "flows": ["aiknowledge-full-sync"], - "triggers": [ - cron("0 0 2 * * *"), # Daily at 2:00 AM - ], - "enqueues": ["aiknowledge.sync"], -} - - -async def handler(input_data: None, ctx: FlowContext[Any]) -> None: - """ - Daily sync handler - ensures all active knowledge bases are synchronized. - - Loads all CAIKnowledge entities that need sync and emits events. - Blake3 hash verification is always performed (hash available from JunctionData API). - Runs every day at 02:00:00. - """ - from services.espocrm import EspoCRMAPI - from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus - - ctx.logger.info("=" * 80) - ctx.logger.info("πŸŒ™ DAILY AI KNOWLEDGE SYNC STARTED") - ctx.logger.info("=" * 80) - - espocrm = EspoCRMAPI(ctx) - - try: - # Load all CAIKnowledge entities with status 'active' that need sync - result = await espocrm.list_entities( - 'CAIKnowledge', - where=[ - { - 'type': 'equals', - 'attribute': 'aktivierungsstatus', - 'value': AIKnowledgeActivationStatus.ACTIVE.value - }, - { - 'type': 'in', - 'attribute': 'syncStatus', - 'value': [ - AIKnowledgeSyncStatus.UNCLEAN.value, - AIKnowledgeSyncStatus.FAILED.value - ] - } - ], - select='id,name,syncStatus', - max_size=1000 # Adjust if you have more - ) - - entities = result.get('list', []) - total = len(entities) - - ctx.logger.info(f"πŸ“Š Found {total} knowledge bases needing sync") - - if total == 0: - ctx.logger.info("βœ… All knowledge bases are synced") - ctx.logger.info("=" * 80) - return - - # Enqueue sync events for all (Blake3 verification always enabled) - for i, entity in enumerate(entities, 1): - await ctx.enqueue({ - 'topic': 'aiknowledge.sync', - 'data': { - 'knowledge_id': entity['id'], - 'source': 'daily_cron' - } - }) - ctx.logger.info( - f"πŸ“€ [{i}/{total}] Enqueued: {entity['name']} " - f"(syncStatus={entity.get('syncStatus')})" - ) - - ctx.logger.info("=" * 80) - ctx.logger.info(f"βœ… Daily sync complete: {total} events enqueued") - ctx.logger.info("=" * 80) - - except Exception as e: - ctx.logger.error("=" * 80) - ctx.logger.error("❌ FULL SYNC FAILED") - ctx.logger.error("=" * 80) - ctx.logger.error(f"Error: {e}", exc_info=True) - raise diff --git a/src/steps/vmh/aiknowledge_sync_event_step.py b/src/steps/vmh/aiknowledge_sync_event_step.py deleted file mode 100644 index 2a51d33..0000000 --- a/src/steps/vmh/aiknowledge_sync_event_step.py +++ /dev/null @@ -1,89 +0,0 @@ -"""AI Knowledge Sync Event Handler""" -from typing import Dict, Any -from redis import Redis -from motia import FlowContext, queue - - -config = { - "name": "AI Knowledge Sync", - "description": "Synchronizes CAIKnowledge entities with XAI Collections", - "flows": ["vmh-aiknowledge"], - "triggers": [ - queue("aiknowledge.sync") - ], -} - - -async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: - """ - Event handler for AI Knowledge synchronization. - - Emitted by: - - Webhook on CAIKnowledge update - - Daily full sync cron job - - Args: - event_data: Event payload with knowledge_id - ctx: Motia context - """ - from services.redis_client import RedisClientFactory - from services.aiknowledge_sync_utils import AIKnowledgeSync - - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ”„ AI KNOWLEDGE SYNC STARTED") - ctx.logger.info("=" * 80) - - # Extract data - knowledge_id = event_data.get('knowledge_id') - source = event_data.get('source', 'unknown') - - if not knowledge_id: - ctx.logger.error("❌ Missing knowledge_id in event data") - return - - ctx.logger.info(f"πŸ“‹ Knowledge ID: {knowledge_id}") - ctx.logger.info(f"πŸ“‹ Source: {source}") - ctx.logger.info("=" * 80) - - # Get Redis for locking - redis_client = RedisClientFactory.get_client(strict=False) - - # Initialize sync utils - sync_utils = AIKnowledgeSync(ctx, redis_client) - - # Acquire lock - lock_acquired = await sync_utils.acquire_sync_lock(knowledge_id) - - if not lock_acquired: - ctx.logger.warn(f"⏸️ Lock already held for {knowledge_id}, skipping") - ctx.logger.info(" (Will be retried by Motia queue)") - raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry - - try: - # Perform sync (Blake3 hash verification always enabled) - await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx) - - ctx.logger.info("=" * 80) - ctx.logger.info("βœ… AI KNOWLEDGE SYNC COMPLETED") - ctx.logger.info("=" * 80) - - # Release lock with success=True - await sync_utils.release_sync_lock(knowledge_id, success=True) - - except Exception as e: - ctx.logger.error("=" * 80) - ctx.logger.error("❌ AI KNOWLEDGE SYNC FAILED") - ctx.logger.error("=" * 80) - ctx.logger.error(f"Error: {e}") - ctx.logger.error(f"Knowledge ID: {knowledge_id}") - ctx.logger.error("=" * 80) - - # Release lock with failure - await sync_utils.release_sync_lock( - knowledge_id, - success=False, - error_message=str(e) - ) - - # Re-raise to let Motia retry - raise diff --git a/src/steps/vmh/document_xai_sync_step.py b/src/steps/vmh/document_xai_sync_step.py deleted file mode 100644 index 55fb68a..0000000 --- a/src/steps/vmh/document_xai_sync_step.py +++ /dev/null @@ -1,336 +0,0 @@ -""" -VMH Document xAI Sync Handler - -Zentraler Sync-Handler fΓΌr Documents mit xAI Collections. -Triggers preview generation for new/changed files. - -Verarbeitet: -- vmh.document.create: Neu in EspoCRM β†’ PrΓΌfe ob xAI-Sync nΓΆtig -- vmh.document.update: GeΓ€ndert in EspoCRM β†’ PrΓΌfe ob xAI-Sync/Update nΓΆtig -- vmh.document.delete: GelΓΆscht in EspoCRM β†’ Remove from xAI Collections - -Enqueues: -- document.generate_preview: Bei new/changed Status -""" - -from typing import Dict, Any -from motia import FlowContext, queue -from services.espocrm import EspoCRMAPI -from services.document_sync_utils import DocumentSync -from services.xai_service import XAIService -from services.redis_client import get_redis_client -import hashlib -import json - -config = { - "name": "VMH Document xAI Sync Handler", - "description": "Zentraler Sync-Handler fΓΌr Documents mit xAI Collections", - "flows": ["vmh-documents"], - "triggers": [ - queue("vmh.document.create"), - queue("vmh.document.update"), - queue("vmh.document.delete") - ], - "enqueues": ["document.generate_preview"] -} - - -async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: - """Zentraler Sync-Handler fΓΌr Documents""" - entity_id = event_data.get('entity_id') - entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente - action = event_data.get('action') - source = event_data.get('source') - - if not entity_id: - ctx.logger.error("Keine entity_id im Event gefunden") - return - - ctx.logger.info("=" * 80) - ctx.logger.info(f"πŸ”„ DOCUMENT SYNC HANDLER GESTARTET") - ctx.logger.info("=" * 80) - ctx.logger.info(f"Entity Type: {entity_type}") - ctx.logger.info(f"Action: {action.upper()}") - ctx.logger.info(f"Document ID: {entity_id}") - ctx.logger.info(f"Source: {source}") - ctx.logger.info("=" * 80) - - # Shared Redis client for distributed locking (centralized factory) - redis_client = get_redis_client(strict=False) - - # APIs initialisieren (mit Context fΓΌr besseres Logging) - espocrm = EspoCRMAPI(ctx) - sync_utils = DocumentSync(espocrm, redis_client, ctx) - xai_service = XAIService(ctx) - - try: - # 1. ACQUIRE LOCK (verhindert parallele Syncs) - lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type) - - if not lock_acquired: - ctx.logger.warn(f"⏸️ Sync bereits aktiv fΓΌr {entity_type} {entity_id}, ΓΌberspringe") - return - - # Lock erfolgreich acquired - MUSS im finally block released werden! - try: - # 2. FETCH VOLLSTΓ„NDIGES DOCUMENT VON ESPOCRM - try: - document = await espocrm.get_entity(entity_type, entity_id) - except Exception as e: - ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}") - await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type) - return - - ctx.logger.info(f"πŸ“‹ {entity_type} geladen:") - ctx.logger.info(f" Name: {document.get('name', 'N/A')}") - ctx.logger.info(f" Type: {document.get('type', 'N/A')}") - ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}") - ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}") - ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}") - - # 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION - - if action == 'delete': - await handle_delete(entity_id, document, sync_utils, xai_service, ctx, entity_type) - - elif action in ['create', 'update']: - await handle_create_or_update(entity_id, document, sync_utils, xai_service, ctx, entity_type) - - else: - ctx.logger.warn(f"⚠️ Unbekannte Action: {action}") - await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type) - - except Exception as e: - # Unerwarteter Fehler wΓ€hrend Sync - GARANTIERE Lock-Release - ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") - import traceback - ctx.logger.error(traceback.format_exc()) - - try: - await sync_utils.release_sync_lock( - entity_id, - success=False, - error_message=str(e)[:2000], - entity_type=entity_type - ) - except Exception as release_error: - # Selbst Lock-Release failed - logge kritischen Fehler - ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed fΓΌr Document {entity_id}: {release_error}") - # Force Redis lock release - try: - lock_key = f"sync_lock:document:{entity_id}" - redis_client.delete(lock_key) - ctx.logger.info(f"βœ… Redis lock manuell released: {lock_key}") - except: - pass - - except Exception as e: - # Fehler VOR Lock-Acquire - kein Lock-Release nΓΆtig - ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}") - import traceback - ctx.logger.error(traceback.format_exc()) - - -async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None: - """ - Behandelt Create/Update von Documents - - Entscheidet ob xAI-Sync nΓΆtig ist und fΓΌhrt diesen durch - """ - try: - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ” ANALYSE: Braucht dieses Document xAI-Sync?") - ctx.logger.info("=" * 80) - - # Datei-Status fΓΌr Preview-Generierung (verschiedene Feld-Namen unterstΓΌtzen) - datei_status = document.get('fileStatus') or document.get('dateiStatus') - - # Entscheidungslogik: Soll dieses Document zu xAI? - needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document) - - ctx.logger.info(f"πŸ“Š Entscheidung: {'βœ… SYNC NΓ–TIG' if needs_sync else '⏭️ KEIN SYNC NΓ–TIG'}") - ctx.logger.info(f" Grund: {reason}") - ctx.logger.info(f" File-Status: {datei_status or 'N/A'}") - - if collection_ids: - ctx.logger.info(f" Collections: {collection_ids}") - - # ═══════════════════════════════════════════════════════════════ - # CHECK: Knowledge Bases mit Status "new" (noch keine Collection) - # ═══════════════════════════════════════════════════════════════ - new_knowledge_bases = [cid for cid in collection_ids if cid.startswith('NEW:')] - if new_knowledge_bases: - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ†• DOKUMENT IST MIT KNOWLEDGE BASE(S) VERKNÜPFT (Status: new)") - ctx.logger.info("=" * 80) - - for new_kb in new_knowledge_bases: - kb_id = new_kb[4:] # Remove "NEW:" prefix - ctx.logger.info(f"πŸ“‹ CAIKnowledge {kb_id}") - ctx.logger.info(f" Status: new β†’ Collection muss zuerst erstellt werden") - - # Trigger Knowledge Sync - ctx.logger.info(f"πŸ“€ Triggering aiknowledge.sync event...") - await ctx.emit('aiknowledge.sync', { - 'entity_id': kb_id, - 'entity_type': 'CAIKnowledge', - 'triggered_by': 'document_sync', - 'document_id': entity_id - }) - ctx.logger.info(f"βœ… Event emitted for {kb_id}") - - # Release lock and skip document sync - knowledge sync will handle documents - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("βœ… KNOWLEDGE SYNC GETRIGGERT") - ctx.logger.info(" Document Sync wird ΓΌbersprungen") - ctx.logger.info(" (Knowledge Sync erstellt Collection und synchronisiert dann Dokumente)") - ctx.logger.info("=" * 80) - - await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) - return - - # ═══════════════════════════════════════════════════════════════ - # PREVIEW-GENERIERUNG bei neuen/geΓ€nderten Dateien - # ═══════════════════════════════════════════════════════════════ - - # Case-insensitive check fΓΌr Datei-Status - datei_status_lower = (datei_status or '').lower() - if datei_status_lower in ['neu', 'geΓ€ndert', 'new', 'changed']: - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ–ΌοΈ TRIGGER PREVIEW-GENERIERUNG") - ctx.logger.info(f" Datei-Status: {datei_status}") - ctx.logger.info("=" * 80) - - try: - # Enqueue preview generation event - await ctx.emit('document.generate_preview', { - 'entity_id': entity_id, - 'entity_type': entity_type - }) - ctx.logger.info(f"βœ… Preview generation event emitted for {entity_id}") - except Exception as e: - ctx.logger.error(f"❌ Fehler beim Triggern der Preview-Generierung: {e}") - # Continue - Preview ist optional - - ctx.logger.info("=" * 80) - - # ═══════════════════════════════════════════════════════════════ - # xAI SYNC (falls erforderlich) - # ═══════════════════════════════════════════════════════════════ - - if not needs_sync: - ctx.logger.info("βœ… Kein xAI-Sync erforderlich, Lock wird released") - # Wenn Preview generiert wurde aber kein xAI sync nΓΆtig, - # wurde Status bereits in Preview-Schritt zurΓΌckgesetzt - await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) - return - - # ═══════════════════════════════════════════════════════════════ - # xAI SYNC DURCHFÜHREN - # ═══════════════════════════════════════════════════════════════ - - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ€– xAI SYNC STARTEN") - ctx.logger.info("=" * 80) - - # 1. Hole Download-Informationen (falls nicht schon aus Preview-Schritt vorhanden) - download_info = await sync_utils.get_document_download_info(entity_id, entity_type) - if not download_info: - raise Exception("Konnte Download-Info nicht ermitteln – Datei fehlt?") - - ctx.logger.info(f"πŸ“₯ Datei: {download_info['filename']} ({download_info['size']} bytes, {download_info['mime_type']})") - - # 2. Download Datei von EspoCRM - espocrm = sync_utils.espocrm - file_content = await espocrm.download_attachment(download_info['attachment_id']) - ctx.logger.info(f"βœ… Downloaded {len(file_content)} bytes") - - # 3. MD5-Hash berechnen fΓΌr Change-Detection - file_hash = hashlib.md5(file_content).hexdigest() - ctx.logger.info(f"πŸ”‘ MD5: {file_hash}") - - # 4. Upload zu xAI - # Immer neu hochladen wenn needs_sync=True (neues File oder Hash geΓ€ndert) - ctx.logger.info("πŸ“€ Uploading to xAI...") - xai_file_id = await xai_service.upload_file( - file_content, - download_info['filename'], - download_info['mime_type'] - ) - ctx.logger.info(f"βœ… xAI file_id: {xai_file_id}") - - # 5. Zu allen Ziel-Collections hinzufΓΌgen - ctx.logger.info(f"πŸ“š FΓΌge zu {len(collection_ids)} Collection(s) hinzu...") - added_collections = await xai_service.add_to_collections(collection_ids, xai_file_id) - ctx.logger.info(f"βœ… In {len(added_collections)}/{len(collection_ids)} Collections eingetragen") - - # 6. EspoCRM Metadaten aktualisieren und Lock freigeben - await sync_utils.update_sync_metadata( - entity_id, - xai_file_id=xai_file_id, - collection_ids=added_collections, - file_hash=file_hash, - entity_type=entity_type - ) - await sync_utils.release_sync_lock( - entity_id, - success=True, - entity_type=entity_type - ) - - ctx.logger.info("=" * 80) - ctx.logger.info("βœ… DOCUMENT SYNC ABGESCHLOSSEN") - ctx.logger.info("=" * 80) - - except Exception as e: - ctx.logger.error(f"❌ Fehler bei Create/Update: {e}") - import traceback - ctx.logger.error(traceback.format_exc()) - await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) - - -async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None: - """ - Behandelt Delete von Documents - - Entfernt Document aus xAI Collections (aber lΓΆscht File nicht - kann in anderen Collections sein) - """ - try: - ctx.logger.info("") - ctx.logger.info("=" * 80) - ctx.logger.info("πŸ—‘οΈ DOCUMENT DELETE - xAI CLEANUP") - ctx.logger.info("=" * 80) - - xai_file_id = document.get('xaiFileId') or document.get('xaiId') - xai_collections = document.get('xaiCollections') or [] - - if not xai_file_id or not xai_collections: - ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun") - await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) - return - - ctx.logger.info(f"πŸ“‹ Document Info:") - ctx.logger.info(f" xaiFileId: {xai_file_id}") - ctx.logger.info(f" Collections: {xai_collections}") - - ctx.logger.info(f"πŸ—‘οΈ Entferne aus {len(xai_collections)} Collection(s)...") - await xai_service.remove_from_collections(xai_collections, xai_file_id) - ctx.logger.info(f"βœ… File aus {len(xai_collections)} Collection(s) entfernt") - ctx.logger.info(" (File selbst bleibt in xAI – kann in anderen Collections sein)") - - await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) - - ctx.logger.info("=" * 80) - ctx.logger.info("βœ… DELETE ABGESCHLOSSEN") - ctx.logger.info("=" * 80) - - except Exception as e: - ctx.logger.error(f"❌ Fehler bei Delete: {e}") - import traceback - ctx.logger.error(traceback.format_exc()) - await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)