Refactor Akte and Document Sync Logic

- Removed the old VMH Document xAI Sync Handler implementation.
- Introduced new xAI Upload Utilities for shared upload logic across sync flows.
- Created a unified Akte sync structure with cron polling and event handling.
- Implemented Akte Sync Cron Poller to manage pending Aktennummern with a debounce mechanism.
- Developed Akte Sync Event Handler for synchronized processing across Advoware and xAI.
- Enhanced logging and error handling throughout the new sync processes.
- Ensured compatibility with existing Redis and EspoCRM services.
This commit is contained in:
bsiggel
2026-03-26 01:23:16 +00:00
parent 86ec4db9db
commit b4d35b1790
10 changed files with 738 additions and 1805 deletions

View File

@@ -1,545 +0,0 @@
"""
AI Knowledge Sync Utilities
Utility functions for synchronizing CAIKnowledge entities with XAI Collections:
- Collection lifecycle management (create, delete)
- Document synchronization with BLAKE3 hash verification
- Metadata-only updates via PATCH
- Orphan detection and cleanup
"""
import hashlib
import json
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
from urllib.parse import unquote
from services.sync_utils_base import BaseSyncUtils
from services.models import (
AIKnowledgeActivationStatus,
AIKnowledgeSyncStatus,
JunctionSyncStatus
)
class AIKnowledgeSync(BaseSyncUtils):
"""Utility class for AI Knowledge ↔ XAI Collections synchronization"""
def _get_lock_key(self, entity_id: str) -> str:
"""Redis lock key for AI Knowledge entities"""
return f"sync_lock:aiknowledge:{entity_id}"
async def acquire_sync_lock(self, knowledge_id: str) -> bool:
"""
Acquire distributed lock via Redis + update EspoCRM syncStatus.
Args:
knowledge_id: CAIKnowledge entity ID
Returns:
True if lock acquired, False if already locked
"""
try:
# STEP 1: Atomic Redis lock
lock_key = self._get_lock_key(knowledge_id)
if not self._acquire_redis_lock(lock_key):
self._log(f"Redis lock already active for {knowledge_id}", level='warn')
return False
# STEP 2: Update syncStatus to pending_sync
try:
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value
})
except Exception as e:
self._log(f"Could not set syncStatus: {e}", level='debug')
self._log(f"Sync lock acquired for {knowledge_id}")
return True
except Exception as e:
self._log(f"Error acquiring lock: {e}", level='error')
# Clean up Redis lock on error
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
return False
async def release_sync_lock(
self,
knowledge_id: str,
success: bool = True,
error_message: Optional[str] = None
) -> None:
"""
Release sync lock and set final status.
Args:
knowledge_id: CAIKnowledge entity ID
success: Whether sync succeeded
error_message: Optional error message
"""
try:
update_data = {
'syncStatus': AIKnowledgeSyncStatus.SYNCED.value if success else AIKnowledgeSyncStatus.FAILED.value
}
if success:
update_data['lastSync'] = datetime.now().isoformat()
update_data['syncError'] = None
elif error_message:
update_data['syncError'] = error_message[:2000]
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, update_data)
self._log(f"Sync lock released: {knowledge_id}{'success' if success else 'failed'}")
# Release Redis lock
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
except Exception as e:
self._log(f"Error releasing lock: {e}", level='error')
# Ensure Redis lock is released
lock_key = self._get_lock_key(knowledge_id)
self._release_redis_lock(lock_key)
async def sync_knowledge_to_xai(self, knowledge_id: str, ctx) -> None:
"""
Main sync orchestrator with activation status handling.
Args:
knowledge_id: CAIKnowledge entity ID
ctx: Motia context for logging
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
try:
# 1. Load knowledge entity
knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id)
activation_status = knowledge.get('aktivierungsstatus')
collection_id = knowledge.get('datenbankId')
ctx.logger.info("=" * 80)
ctx.logger.info(f"📋 Processing: {knowledge['name']}")
ctx.logger.info(f" aktivierungsstatus: {activation_status}")
ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}")
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════
# CASE 1: NEW → Create Collection
# ═══════════════════════════════════════════════════════════
if activation_status == AIKnowledgeActivationStatus.NEW.value:
ctx.logger.info("🆕 Status 'new' → Creating XAI Collection")
collection = await xai.create_collection(
name=knowledge['name'],
metadata={
'espocrm_entity_type': 'CAIKnowledge',
'espocrm_entity_id': knowledge_id,
'created_at': datetime.now().isoformat()
}
)
# XAI API returns 'collection_id' not 'id'
collection_id = collection.get('collection_id') or collection.get('id')
# Update EspoCRM: Set datenbankId + change status to 'active'
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': collection_id,
'aktivierungsstatus': AIKnowledgeActivationStatus.ACTIVE.value,
'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value
})
ctx.logger.info(f"✅ Collection created: {collection_id}")
ctx.logger.info(" Status changed to 'active', now syncing documents...")
# Continue to document sync immediately (don't return)
# Fall through to sync logic below
# ═══════════════════════════════════════════════════════════
# CASE 2: DEACTIVATED → Delete Collection from XAI
# ═══════════════════════════════════════════════════════════
elif activation_status == AIKnowledgeActivationStatus.DEACTIVATED.value:
ctx.logger.info("🗑️ Status 'deactivated' → Deleting XAI Collection")
if collection_id:
try:
await xai.delete_collection(collection_id)
ctx.logger.info(f"✅ Collection deleted from XAI: {collection_id}")
except Exception as e:
ctx.logger.error(f"❌ Failed to delete collection: {e}")
else:
ctx.logger.info("⏭️ No collection ID, nothing to delete")
# Reset junction entries
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
for doc in documents:
doc_id = doc['documentId']
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'syncstatus': 'new',
'aiDocumentId': None
},
update_last_sync=False
)
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to reset junction for {doc_id}: {e}")
ctx.logger.info(f"✅ Deactivation complete, {len(documents)} junction entries reset")
return
# ═══════════════════════════════════════════════════════════
# CASE 3: PAUSED → Skip Sync
# ═══════════════════════════════════════════════════════════
elif activation_status == AIKnowledgeActivationStatus.PAUSED.value:
ctx.logger.info("⏸️ Status 'paused' → No sync performed")
return
# ═══════════════════════════════════════════════════════════
# CASE 4: ACTIVE → Normal Sync (or just created from NEW)
# ═══════════════════════════════════════════════════════════
if activation_status in (AIKnowledgeActivationStatus.ACTIVE.value, AIKnowledgeActivationStatus.NEW.value):
if not collection_id:
ctx.logger.error("❌ Status 'active' but no datenbankId!")
raise RuntimeError("Active knowledge without collection ID")
if activation_status == AIKnowledgeActivationStatus.ACTIVE.value:
ctx.logger.info(f"🔄 Status 'active' → Syncing documents to {collection_id}")
# Verify collection exists
collection = await xai.get_collection(collection_id)
if not collection:
ctx.logger.warn(f"⚠️ Collection {collection_id} not found, recreating")
collection = await xai.create_collection(
name=knowledge['name'],
metadata={
'espocrm_entity_type': 'CAIKnowledge',
'espocrm_entity_id': knowledge_id
}
)
collection_id = collection['id']
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': collection_id
})
# Sync documents (both for ACTIVE status and after NEW → ACTIVE transition)
await self._sync_knowledge_documents(knowledge_id, collection_id, ctx)
elif activation_status not in (AIKnowledgeActivationStatus.DEACTIVATED.value, AIKnowledgeActivationStatus.PAUSED.value):
ctx.logger.error(f"❌ Unknown aktivierungsstatus: {activation_status}")
raise ValueError(f"Invalid aktivierungsstatus: {activation_status}")
finally:
await xai.close()
async def _sync_knowledge_documents(
self,
knowledge_id: str,
collection_id: str,
ctx
) -> None:
"""
Sync all documents of a knowledge base to XAI collection.
Uses efficient JunctionData endpoint to get all documents with junction data
and blake3 hashes in a single API call. Hash comparison is always performed.
Args:
knowledge_id: CAIKnowledge entity ID
collection_id: XAI Collection ID
ctx: Motia context
"""
from services.espocrm import EspoCRMAPI
from services.xai_service import XAIService
espocrm = EspoCRMAPI(ctx)
xai = XAIService(ctx)
# ═══════════════════════════════════════════════════════════════
# STEP 1: Load all documents with junction data (single API call)
# ═══════════════════════════════════════════════════════════════
ctx.logger.info(f"📥 Loading documents with junction data for knowledge {knowledge_id}")
documents = await espocrm.get_knowledge_documents_with_junction(knowledge_id)
ctx.logger.info(f"📊 Found {len(documents)} document(s)")
if not documents:
ctx.logger.info("✅ No documents to sync")
return
# ═══════════════════════════════════════════════════════════════
# STEP 2: Sync each document based on status/hash
# ═══════════════════════════════════════════════════════════════
successful = 0
failed = 0
skipped = 0
# Track aiDocumentIds for orphan detection (collected during sync)
synced_file_ids: set = set()
for doc in documents:
doc_id = doc['documentId']
doc_name = doc.get('documentName', 'Unknown')
junction_status = doc.get('syncstatus', 'new')
ai_document_id = doc.get('aiDocumentId')
blake3_hash = doc.get('blake3hash')
ctx.logger.info(f"\n📄 {doc_name} (ID: {doc_id})")
ctx.logger.info(f" Status: {junction_status}")
ctx.logger.info(f" aiDocumentId: {ai_document_id or 'N/A'}")
ctx.logger.info(f" blake3hash: {blake3_hash[:16] if blake3_hash else 'N/A'}...")
try:
# Decide if sync needed
needs_sync = False
reason = ""
if junction_status in ['new', 'unclean', 'failed']:
needs_sync = True
reason = f"status={junction_status}"
elif junction_status == 'synced':
# Synced status should have both blake3_hash and ai_document_id
if not blake3_hash:
needs_sync = True
reason = "inconsistency: synced but no blake3 hash"
ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!")
elif not ai_document_id:
needs_sync = True
reason = "inconsistency: synced but no aiDocumentId"
ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!")
else:
# Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
try:
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
if xai_doc_info:
xai_blake3 = xai_doc_info.get('blake3_hash')
if xai_blake3 != blake3_hash:
needs_sync = True
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)"
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
else:
ctx.logger.info(f" ✅ Blake3 hash matches")
else:
needs_sync = True
reason = "file not found in XAI collection"
ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!")
except Exception as e:
needs_sync = True
reason = f"verification failed: {e}"
ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}")
if not needs_sync:
ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
# Document is already synced, track its aiDocumentId
if ai_document_id:
synced_file_ids.add(ai_document_id)
skipped += 1
continue
ctx.logger.info(f" 🔄 Syncing: {reason}")
# Get complete document entity with attachment info
doc_entity = await espocrm.get_entity('CDokumente', doc_id)
attachment_id = doc_entity.get('dokumentId')
if not attachment_id:
ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}")
failed += 1
continue
# Get attachment details for MIME type and original filename
try:
attachment = await espocrm.get_entity('Attachment', attachment_id)
mime_type = attachment.get('type', 'application/octet-stream')
file_size = attachment.get('size', 0)
original_filename = attachment.get('name', doc_name) # Original filename with extension
# URL-decode filename (fixes special chars like §, ä, ö, ü, etc.)
original_filename = unquote(original_filename)
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
mime_type = 'application/octet-stream'
file_size = 0
original_filename = unquote(doc_name) # Also decode fallback name
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
ctx.logger.info(f" 📄 Original filename: {original_filename}")
# Download document
file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
# Upload to XAI with original filename (includes extension)
filename = original_filename
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
# Add to collection
await xai.add_to_collection(collection_id, xai_file_id)
ctx.logger.info(f" ✅ Added to collection {collection_id}")
# Update junction
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{
'aiDocumentId': xai_file_id,
'syncstatus': 'synced'
},
update_last_sync=True
)
ctx.logger.info(f" ✅ Junction updated")
# Track the new aiDocumentId for orphan detection
synced_file_ids.add(xai_file_id)
successful += 1
except Exception as e:
failed += 1
ctx.logger.error(f" ❌ Sync failed: {e}")
# Mark as failed in junction
try:
await espocrm.update_knowledge_document_junction(
knowledge_id,
doc_id,
{'syncstatus': 'failed'},
update_last_sync=False
)
except Exception as update_err:
ctx.logger.error(f" ❌ Failed to update junction status: {update_err}")
# ═══════════════════════════════════════════════════════════════
# STEP 3: Remove orphaned documents from XAI collection
# ═══════════════════════════════════════════════════════════════
try:
ctx.logger.info(f"\n🧹 Checking for orphaned documents in XAI collection...")
# Get all files in XAI collection (normalized structure)
xai_documents = await xai.list_collection_documents(collection_id)
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
# Use synced_file_ids (collected during this sync) for orphan detection
# This includes both pre-existing synced docs and newly uploaded ones
ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced")
# Find orphans (in XAI but not in our current sync)
orphans = xai_file_ids - synced_file_ids
if orphans:
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
for orphan_id in orphans:
try:
await xai.remove_from_collection(collection_id, orphan_id)
ctx.logger.info(f" 🗑️ Removed {orphan_id}")
except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to remove {orphan_id}: {e}")
else:
ctx.logger.info(f" ✅ No orphans found")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to clean up orphans: {e}")
# ═══════════════════════════════════════════════════════════════
# STEP 4: Summary
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info(f"📊 Sync Statistics:")
ctx.logger.info(f" ✅ Synced: {successful}")
ctx.logger.info(f" ⏭️ Skipped: {skipped}")
ctx.logger.info(f" ❌ Failed: {failed}")
ctx.logger.info(f" Mode: Blake3 hash verification enabled")
ctx.logger.info("=" * 80)
def _calculate_metadata_hash(self, document: Dict) -> str:
"""
Calculate hash of sync-relevant metadata.
Args:
document: CDokumente entity
Returns:
MD5 hash (32 chars)
"""
metadata = {
'name': document.get('name', ''),
'description': document.get('description', ''),
}
metadata_str = json.dumps(metadata, sort_keys=True)
return hashlib.md5(metadata_str.encode()).hexdigest()
def _build_xai_metadata(self, document: Dict) -> Dict[str, str]:
"""
Build XAI metadata from CDokumente entity.
Args:
document: CDokumente entity
Returns:
Metadata dict for XAI
"""
return {
'document_name': document.get('name', ''),
'description': document.get('description', ''),
'created_at': document.get('createdAt', ''),
'modified_at': document.get('modifiedAt', ''),
'espocrm_id': document.get('id', '')
}
async def _get_document_download_info(
self,
document: Dict,
ctx
) -> Optional[Dict[str, Any]]:
"""
Get download info for CDokumente entity.
Args:
document: CDokumente entity
ctx: Motia context
Returns:
Dict with attachment_id, filename, mime_type
"""
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
# Check for dokumentId (CDokumente custom field)
attachment_id = None
filename = None
if document.get('dokumentId'):
attachment_id = document.get('dokumentId')
filename = document.get('dokumentName')
elif document.get('fileId'):
attachment_id = document.get('fileId')
filename = document.get('fileName')
if not attachment_id:
ctx.logger.error(f"❌ No attachment ID for document {document['id']}")
return None
# Get attachment details
try:
attachment = await espocrm.get_entity('Attachment', attachment_id)
return {
'attachment_id': attachment_id,
'filename': filename or attachment.get('name', 'unknown'),
'mime_type': attachment.get('type', 'application/octet-stream')
}
except Exception as e:
ctx.logger.error(f"❌ Failed to get attachment {attachment_id}: {e}")
return None

View File

@@ -0,0 +1,201 @@
"""
xAI Upload Utilities
Shared logic for uploading documents from EspoCRM to xAI Collections.
Used by all sync flows (Advoware + direct xAI sync).
Handles:
- Blake3 hash-based change detection
- Upload to xAI with correct filename/MIME
- Collection management (create/verify)
- EspoCRM metadata update after sync
"""
from typing import Optional, Dict, Any
from datetime import datetime
class XAIUploadUtils:
"""
Stateless utility class for document upload operations to xAI.
All methods take explicit service instances to remain reusable
across different sync contexts.
"""
def __init__(self, ctx):
from services.logging_utils import get_service_logger
self._log = get_service_logger(__name__, ctx)
async def ensure_collection(
self,
akte: Dict[str, Any],
xai,
espocrm,
) -> Optional[str]:
"""
Ensure xAI collection exists for this Akte.
Creates one if missing, verifies it if present.
Returns:
collection_id or None on failure
"""
akte_id = akte['id']
akte_name = akte.get('name', f"Akte {akte.get('aktennummer', akte_id)}")
collection_id = akte.get('aiCollectionId')
if collection_id:
# Verify it still exists in xAI
try:
col = await xai.get_collection(collection_id)
if col:
self._log.debug(f"Collection {collection_id} verified for '{akte_name}'")
return collection_id
self._log.warn(f"Collection {collection_id} not found in xAI, recreating...")
except Exception as e:
self._log.warn(f"Could not verify collection {collection_id}: {e}, recreating...")
# Create new collection
try:
self._log.info(f"Creating xAI collection for '{akte_name}'...")
col = await xai.create_collection(
name=akte_name,
metadata={
'espocrm_entity_type': 'CAkten',
'espocrm_entity_id': akte_id,
'aktennummer': str(akte.get('aktennummer', '')),
}
)
collection_id = col['id']
self._log.info(f"✅ Collection created: {collection_id}")
# Save back to EspoCRM
await espocrm.update_entity('CAkten', akte_id, {
'aiCollectionId': collection_id,
'aiSyncStatus': 'unclean', # Trigger full doc sync
})
return collection_id
except Exception as e:
self._log.error(f"❌ Failed to create xAI collection: {e}")
return None
async def sync_document_to_xai(
self,
doc: Dict[str, Any],
collection_id: str,
xai,
espocrm,
) -> bool:
"""
Sync a single CDokumente entity to xAI collection.
Decision logic (Blake3-based):
- aiSyncStatus in ['new', 'unclean', 'failed'] → always sync
- aiSyncStatus == 'synced' AND aiSyncHash == blake3hash → skip (no change)
- aiSyncStatus == 'synced' AND aiSyncHash != blake3hash → re-upload (changed)
- No attachment → mark unsupported
Returns:
True if synced/skipped successfully, False on error
"""
doc_id = doc['id']
doc_name = doc.get('name', doc_id)
ai_status = doc.get('aiSyncStatus', 'new')
ai_sync_hash = doc.get('aiSyncHash')
blake3_hash = doc.get('blake3hash')
ai_file_id = doc.get('aiFileId')
self._log.info(f" 📄 {doc_name}")
self._log.info(f" aiSyncStatus={ai_status}, aiSyncHash={ai_sync_hash[:12] if ai_sync_hash else 'N/A'}..., blake3={blake3_hash[:12] if blake3_hash else 'N/A'}...")
# Skip if already synced and hash matches
if ai_status == 'synced' and ai_sync_hash and blake3_hash and ai_sync_hash == blake3_hash:
self._log.info(f" ⏭️ Skipped (hash match, no change)")
return True
# Get attachment info
attachment_id = doc.get('dokumentId')
if not attachment_id:
self._log.warn(f" ⚠️ No attachment (dokumentId missing) - marking unsupported")
await espocrm.update_entity('CDokumente', doc_id, {
'aiSyncStatus': 'unsupported',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
return True # Not an error, just unsupported
try:
# Download from EspoCRM
self._log.info(f" 📥 Downloading attachment {attachment_id}...")
file_content = await espocrm.download_attachment(attachment_id)
self._log.info(f" Downloaded {len(file_content)} bytes")
# Determine filename + MIME type
filename = doc.get('dokumentName') or doc.get('name', 'document.bin')
from urllib.parse import unquote
filename = unquote(filename)
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Remove old file from collection if updating
if ai_file_id and ai_status != 'new':
try:
await xai.remove_from_collection(collection_id, ai_file_id)
self._log.info(f" 🗑️ Removed old xAI file {ai_file_id}")
except Exception:
pass # Non-fatal - may already be gone
# Upload to xAI
self._log.info(f" 📤 Uploading '{filename}' ({mime_type})...")
new_xai_file_id = await xai.upload_file(file_content, filename, mime_type)
self._log.info(f" Uploaded: xai_file_id={new_xai_file_id}")
# Add to collection
await xai.add_to_collection(collection_id, new_xai_file_id)
self._log.info(f" ✅ Added to collection {collection_id}")
# Update CDokumente with sync result
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': new_xai_file_id,
'aiCollectionId': collection_id,
'aiSyncHash': blake3_hash or doc.get('syncedHash'),
'aiSyncStatus': 'synced',
'aiLastSync': now,
})
self._log.info(f" ✅ EspoCRM updated")
return True
except Exception as e:
self._log.error(f" ❌ Failed: {e}")
await espocrm.update_entity('CDokumente', doc_id, {
'aiSyncStatus': 'failed',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
return False
async def remove_document_from_xai(
self,
doc: Dict[str, Any],
collection_id: str,
xai,
espocrm,
) -> None:
"""Remove a CDokumente from its xAI collection (called on DELETE)."""
doc_id = doc['id']
ai_file_id = doc.get('aiFileId')
if not ai_file_id:
return
try:
await xai.remove_from_collection(collection_id, ai_file_id)
self._log.info(f" 🗑️ Removed {doc.get('name')} from xAI collection")
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': None,
'aiSyncStatus': 'new',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
except Exception as e:
self._log.warn(f" ⚠️ Could not remove from xAI: {e}")

View File

@@ -1,507 +0,0 @@
"""
Advoware Document Sync - Event Handler
Executes 3-way merge sync for one Akte.
PER-AKTE LOCK: Allows parallel syncs of different Akten.
Triggers preview generation for new/changed documents.
Flow:
1. Acquire per-Akte lock (key: advoware_document_sync:akte:{aktennr})
2. Fetch data: EspoCRM docs + Windows files + Advoware history
3. Cleanup file list (filter by History)
4. 3-Way merge per file
5. Sync metadata (always)
6. Check Akte ablage status
7. Update sync status
8. Redis: SREM processing (success) or ZADD to pending Sorted Set (error)
9. Release per-Akte lock (always in finally)
PARALLEL EXECUTION: Multiple Akten can sync simultaneously.
LOCK SCOPE: Only prevents the same Akte from syncing twice at once.
Enqueues:
- document.generate_preview: Bei CREATE/UPDATE_ESPO
"""
from typing import Dict, Any
from datetime import datetime
from motia import FlowContext, queue
config = {
"name": "Advoware Document Sync - Event Handler",
"description": "Execute 3-way merge sync for Akte",
"flows": ["advoware-document-sync"],
"triggers": [queue("advoware.document.sync")],
"enqueues": ["document.generate_preview"],
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
"""
Execute sync with GLOBAL lock.
Flow:
1. Acquire GLOBAL lock (key: advoware_document_sync_global)
2. Fetch data: EspoCRM docs + Windows files + Advoware history
3. Cleanup file list
4. 3-Way merge per file
5. Sync metadata (always)
6. Check Akte ablage status
7. Update sync status
8. Redis: SREM processing (success) or SMOVE to pending (error)
9. Release GLOBAL lock (always in finally)
"""
aktennummer = event_data.get('aktennummer')
akte_id = event_data.get('akte_id')
status = event_data.get('status', 'Unknown')
ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC STARTED")
ctx.logger.info(f"=" * 80)
ctx.logger.info(f"📋 Akte Details:")
ctx.logger.info(f" ├─ Aktennummer: {aktennummer}")
ctx.logger.info(f" ├─ EspoCRM ID: {akte_id}")
ctx.logger.info(f" ├─ Status: {status}")
ctx.logger.info(f" └─ Triggered: Via cron poller")
ctx.logger.info(f"")
ctx.logger.info(f"🚀 Parallelization: This Akte syncs independently")
ctx.logger.info(f" Other Akten can sync at the same time!")
ctx.logger.info("")
from services.redis_client import get_redis_client
from services.espocrm import EspoCRMAPI
from services.advoware_watcher_service import AdvowareWatcherService
from services.advoware_history_service import AdvowareHistoryService
from services.advoware_service import AdvowareService
from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils
from services.blake3_utils import compute_blake3
redis_client = get_redis_client(strict=False)
lock_acquired = False
lock_key = f"advoware_document_sync:akte:{aktennummer}" # Per-Akte lock
if not redis_client:
ctx.logger.error("❌ Redis unavailable, cannot acquire lock")
return
try:
# 1. PER-AKTE LOCK (allows parallel syncs of different Akten)
ctx.logger.info(f"🔐 Attempting to acquire lock for Akte {aktennummer}...")
lock_acquired = redis_client.set(lock_key, f"sync_{datetime.now().isoformat()}", nx=True, ex=1800)
if not lock_acquired:
current_holder = redis_client.get(lock_key)
ctx.logger.warn(f"")
ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer}")
ctx.logger.warn(f" Lock Key: {lock_key}")
ctx.logger.warn(f" Current Holder: {current_holder}")
ctx.logger.warn(f" Action: Requeueing (Motia will retry)")
raise RuntimeError(f"Lock busy for Akte {aktennummer}, retry later")
ctx.logger.info(f"✅ Lock acquired for Akte {aktennummer}")
ctx.logger.info(f" Lock Key: {lock_key}")
ctx.logger.info(f" TTL: 30 minutes")
ctx.logger.info(f" Scope: Only this Akte is locked (other Akten can sync in parallel)")
# 2. Initialize services
espocrm = EspoCRMAPI(ctx)
watcher = AdvowareWatcherService(ctx)
history_service = AdvowareHistoryService(ctx)
advoware_service = AdvowareService(ctx)
sync_utils = AdvowareDocumentSyncUtils(ctx)
# 3. Fetch data
ctx.logger.info("📥 Fetching data...")
# Get Akte from EspoCRM
akte = await espocrm.get_entity('CAdvowareAkten', akte_id)
if not akte:
ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM")
redis_client.srem("advoware:processing_aktennummern", aktennummer)
return
# Die Aktennummer IST die Advoware-ID
advoware_id = aktennummer
ctx.logger.info(f"📋 Using Aktennummer as Advoware-ID: {advoware_id}")
# Get linked documents from EspoCRM
espo_docs_result = await espocrm.list_related(
'CAdvowareAkten',
akte_id,
'dokumentes'
)
espo_docs = espo_docs_result.get('list', [])
# Get Windows file list
try:
windows_files = await watcher.get_akte_files(aktennummer)
except Exception as e:
ctx.logger.error(f"❌ Failed to fetch Windows files: {e}")
windows_files = []
# Get Advoware History
try:
advo_history = await history_service.get_akte_history(advoware_id)
except Exception as e:
ctx.logger.error(f"❌ Failed to fetch Advoware History: {e}")
advo_history = []
ctx.logger.info(f"📊 Data fetched:")
ctx.logger.info(f" - {len(espo_docs)} EspoCRM docs")
ctx.logger.info(f" - {len(windows_files)} Windows files")
ctx.logger.info(f" - {len(advo_history)} History entries")
# 4. Cleanup file list (filter by History)
windows_files = sync_utils.cleanup_file_list(windows_files, advo_history)
ctx.logger.info(f"🧹 After cleanup: {len(windows_files)} Windows files with History")
# 5. Build file mapping for 3-way merge based on HNR (stable identifier)
# hnr (History Number) is the stable identifier in Advoware - files can change name/path but hnr stays same
# Index EspoCRM docs by hnr (stable identifier)
espo_docs_by_hnr = {}
espo_docs_by_path = {} # Fallback for docs without hnr
for doc in espo_docs:
hnr = doc.get('hnr')
if hnr:
espo_docs_by_hnr[hnr] = doc
dateipfad = doc.get('dateipfad', '')
if dateipfad:
espo_docs_by_path[dateipfad.lower()] = doc
# Index History by hnr
history_by_hnr = {}
history_by_path = {} # For path-based lookup
for entry in advo_history:
hnr = entry.get('hNr')
datei = entry.get('datei', '')
if hnr:
history_by_hnr[hnr] = entry
if datei:
history_by_path[datei.lower()] = entry
# Index Windows files by path (they don't have hnr directly)
windows_files_by_path = {f.get('path', '').lower(): f for f in windows_files}
# Get all unique hnrs to process
all_hnrs = set(espo_docs_by_hnr.keys()) | set(history_by_hnr.keys())
ctx.logger.info(f"📋 Total unique documents (by hnr): {len(all_hnrs)}")
ctx.logger.info(f" EspoCRM docs with hnr: {len(espo_docs_by_hnr)}")
ctx.logger.info(f" History entries: {len(history_by_hnr)}")
ctx.logger.info(f" Windows files: {len(windows_files_by_path)}")
# 6. 3-Way merge per hnr (stable identifier)
sync_results = {
'created': 0,
'uploaded': 0,
'updated': 0,
'deleted': 0,
'skipped': 0,
'errors': 0
}
for hnr in all_hnrs:
# Get data for this hnr from all sources
espo_doc = espo_docs_by_hnr.get(hnr)
history_entry = history_by_hnr.get(hnr)
# Get Windows file through history path
windows_file = None
file_path = None
if history_entry:
file_path = history_entry.get('datei', '').lower()
windows_file = windows_files_by_path.get(file_path)
# Extract filename for display
if history_entry and history_entry.get('datei'):
filename = history_entry.get('datei').split('\\')[-1]
elif espo_doc:
filename = espo_doc.get('name', f'hnr_{hnr}')
else:
filename = f'hnr_{hnr}'
ctx.logger.info(f"\n{'='*80}")
ctx.logger.info(f"Processing: {filename} (hnr: {hnr})")
ctx.logger.info(f"{'='*80}")
try:
# Perform 3-way merge based on hnr
action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry)
ctx.logger.info(f"📊 Merge decision:")
ctx.logger.info(f" Action: {action.action}")
ctx.logger.info(f" Reason: {action.reason}")
ctx.logger.info(f" Source: {action.source}")
# Execute action
if action.action == 'SKIP':
ctx.logger.info(f"⏭️ Skipping {filename}")
sync_results['skipped'] += 1
elif action.action == 'CREATE':
# Download from Windows and create in EspoCRM
if not windows_file:
ctx.logger.error(f"❌ Cannot CREATE - no Windows file for hnr {hnr}")
sync_results['errors'] += 1
continue
ctx.logger.info(f"📥 Downloading {filename} from Windows...")
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
# Compute Blake3 hash
blake3_hash = compute_blake3(content)
# Determine MIME type
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Step 1: Upload attachment for File field
ctx.logger.info(f"📤 Uploading attachment (Step 1/2)...")
try:
attachment = await espocrm.upload_attachment_for_file_field(
file_content=content,
filename=filename,
related_type='CDokumente',
field='dokument',
mime_type=mime_type
)
ctx.logger.info(f"✅ Attachment uploaded: {attachment.get('id')}")
except Exception as e:
ctx.logger.error(f"❌ Failed to upload attachment: {e}")
raise
# Step 2: Create document entity with attachment ID and Advoware fields
ctx.logger.info(f"💾 Creating document entity (Step 2/2)...")
# Extract full Windows path from watcher data
full_path = windows_file.get('path', '')
# Current timestamp for sync tracking (EspoCRM format)
now_iso = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
new_doc = await espocrm.create_entity('CDokumente', {
'name': filename,
'dokumentId': attachment.get('id'), # Link to attachment
# Advoware History fields
'hnr': history_entry.get('hNr') if history_entry else None,
'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben',
'advowareBemerkung': history_entry.get('text', '') if history_entry else '',
# Windows file sync fields
'dateipfad': full_path,
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
'usn': windows_file.get('usn', 0),
'syncStatus': 'synced',
'lastSyncTimestamp': now_iso
})
doc_id = new_doc.get('id')
ctx.logger.info(f"✅ Created document with attachment: {doc_id}")
# Link to Akte
await espocrm.link_entities(
'CAdvowareAkten',
akte_id,
'dokumentes',
doc_id
)
sync_results['created'] += 1
# Trigger preview generation
try:
await ctx.emit('document.generate_preview', {
'entity_id': doc_id,
'entity_type': 'CDokumente'
})
ctx.logger.info(f"✅ Preview generation triggered for {doc_id}")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}")
elif action.action == 'UPDATE_ESPO':
# Download from Windows and update EspoCRM
if not windows_file:
ctx.logger.error(f"❌ Cannot UPDATE_ESPO - no Windows file for hnr {hnr}")
sync_results['errors'] += 1
continue
ctx.logger.info(f"📥 Downloading {filename} from Windows...")
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
# Compute Blake3 hash
blake3_hash = compute_blake3(content)
# Determine MIME type
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Extract full Windows path
full_path = windows_file.get('path', '')
# Update document in EspoCRM with correct field names
ctx.logger.info(f"💾 Updating document in EspoCRM...")
now_iso = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
update_data = {
'name': filename, # Update name if changed
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
'usn': windows_file.get('usn', 0),
'dateipfad': full_path, # Update path if changed
'syncStatus': 'synced',
'lastSyncTimestamp': now_iso
}
# Also update History fields if available
if history_entry:
update_data['hnr'] = history_entry.get('hNr')
update_data['advowareArt'] = history_entry.get('art', 'Schreiben')
update_data['advowareBemerkung'] = history_entry.get('text', '')
await espocrm.update_entity('CDokumente', espo_doc.get('id'), update_data)
ctx.logger.info(f"✅ Updated document: {espo_doc.get('id')}")
sync_results['updated'] += 1
# Trigger preview generation
try:
await ctx.emit('document.generate_preview', {
'entity_id': espo_doc.get('id'),
'entity_type': 'CDokumente'
})
ctx.logger.info(f"✅ Preview generation triggered for {espo_doc.get('id')}")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}")
elif action.action == 'UPLOAD_WINDOWS':
# Upload to Windows from EspoCRM
ctx.logger.info(f"📤 Uploading {filename} to Windows...")
# Get file content from EspoCRM (would need attachment download)
# For now, log that this needs implementation
ctx.logger.warn(f"⚠️ Upload to Windows not yet implemented for {filename}")
sync_results['skipped'] += 1
elif action.action == 'DELETE':
# Delete from EspoCRM (file deleted in Windows/Advoware)
ctx.logger.info(f"🗑️ Deleting {filename} from EspoCRM...")
if espo_doc:
doc_id = espo_doc.get('id')
await espocrm.delete_entity('CDokumente', doc_id)
ctx.logger.info(f"✅ Deleted document: {doc_id}")
sync_results['deleted'] += 1
else:
ctx.logger.warn(f"⚠️ No EspoCRM document found for deletion")
sync_results['skipped'] += 1
except Exception as e:
ctx.logger.error(f"❌ Error processing {filename}: {e}")
sync_results['errors'] += 1
# 7. Sync metadata (always update from History)
ctx.logger.info(f"\n{'='*80}")
ctx.logger.info("📋 Syncing metadata from History...")
ctx.logger.info(f"{'='*80}")
metadata_updates = 0
for file_path in all_paths:
# Extract filename for EspoCRM lookup
filename = file_path.split('\\')[-1]
espo_doc = espo_docs_by_name.get(filename.lower())
history_entry = history_by_path.get(file_path)
if espo_doc and history_entry:
needs_update, updates = sync_utils.should_sync_metadata(espo_doc, history_entry)
if needs_update:
try:
await espocrm.update_entity('CDokumente', espo_doc.get('id'), updates)
ctx.logger.info(f"✅ Updated metadata for {filename}: {list(updates.keys())}")
metadata_updates += 1
except Exception as e:
ctx.logger.error(f"❌ Failed to update metadata for {filename}: {e}")
ctx.logger.info(f"📊 Metadata sync: {metadata_updates} updates")
# 8. Check Akte ablage status
ctx.logger.info(f"\n{'='*80}")
ctx.logger.info("🗂️ Checking Akte ablage status...")
ctx.logger.info(f"{'='*80}")
akte_details = await advoware_service.get_akte(advoware_id)
if akte_details and akte_details.get('ablage') == 1:
ctx.logger.info(f"📁 Akte {aktennummer} marked as ablage, deactivating in EspoCRM")
await espocrm.update_entity('CAdvowareAkten', akte_id, {
'Aktivierungsstatus': 'Deaktiviert'
})
# 9. Update sync status
await espocrm.update_entity('CAdvowareAkten', akte_id, {
'syncStatus': 'synced',
'lastSync': datetime.now().isoformat()
})
# 10. SUCCESS: Remove from processing SET
redis_client.srem("advoware:processing_aktennummern", aktennummer)
# Summary
ctx.logger.info(f"\n{'='*80}")
ctx.logger.info(f"✅ Sync complete for Akte {aktennummer}")
ctx.logger.info(f"{'='*80}")
ctx.logger.info(f"📊 Results:")
ctx.logger.info(f" - Created: {sync_results['created']}")
ctx.logger.info(f" - Updated: {sync_results['updated']}")
ctx.logger.info(f" - Deleted: {sync_results['deleted']}")
ctx.logger.info(f" - Uploaded: {sync_results['uploaded']}")
ctx.logger.info(f" - Skipped: {sync_results['skipped']}")
ctx.logger.info(f" - Errors: {sync_results['errors']}")
ctx.logger.info(f" - Metadata updates: {metadata_updates}")
ctx.logger.info(f"{'='*80}")
except Exception as e:
ctx.logger.error(f"❌ Sync failed for {aktennummer}: {e}")
# Move back to pending Sorted Set for retry
if redis_client:
import time
retry_timestamp = time.time()
redis_client.zadd(
"advoware:pending_aktennummern",
{aktennummer: retry_timestamp}
)
ctx.logger.info(f"✓ Moved {aktennummer} back to pending queue for retry")
# Update status in EspoCRM
try:
await espocrm.update_entity('CAdvowareAkten', akte_id, {
'syncStatus': 'failed',
'lastSyncError': str(e)[:500] # Truncate long errors
})
except:
pass
# Re-raise for Motia retry
raise
finally:
# ALWAYS release lock
if lock_acquired and redis_client:
redis_client.delete(lock_key)
ctx.logger.info(f"")
ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}")
ctx.logger.info(f" Lock Key: {lock_key}")
ctx.logger.info(f" Duration: Released after processing")
ctx.logger.info("=" * 80)

View File

@@ -1,238 +0,0 @@
"""
Advoware Document Sync - Cron Poller
Polls Redis SET for pending Aktennummern every 10 seconds.
Filters by Akte status and emits sync events.
Flow:
1. SPOP from advoware:pending_aktennummern
2. SADD to advoware:processing_aktennummern
3. Validate Akte status in EspoCRM
4. Emit event if status valid
5. Remove from processing if invalid
"""
from typing import Dict, Any
from motia import FlowContext, cron
config = {
"name": "Advoware Document Sync - Cron Poller",
"description": "Poll Redis for pending Aktennummern and emit sync events",
"flows": ["advoware-document-sync"],
"triggers": [cron("*/10 * * * * *")], # Every 10 seconds
"enqueues": ["advoware.document.sync"],
}
async def handler(input_data: None, ctx: FlowContext) -> None:
"""
Poll Redis and emit sync events.
Flow:
1. SPOP from advoware:pending_aktennummern
2. SADD to advoware:processing_aktennummern
3. Validate Akte status in EspoCRM
4. Emit event if status valid
5. Remove from processing if invalid
"""
ctx.logger.info("=" * 80)
ctx.logger.info("🔍 Polling Redis for pending Aktennummern")
from services.redis_client import get_redis_client
from services.espocrm import EspoCRMAPI
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable - cannot poll")
ctx.logger.info("=" * 80)
return
espocrm = EspoCRMAPI(ctx)
try:
import time
# Debounce-Zeit: 10 Sekunden
debounce_seconds = 10
cutoff_time = time.time() - debounce_seconds
# Check queue sizes BEFORE poll (Sorted Set = ZCARD)
pending_count = redis_client.zcard("advoware:pending_aktennummern")
processing_count = redis_client.scard("advoware:processing_aktennummern")
ctx.logger.info(f"📊 Queue Status:")
ctx.logger.info(f" • Pending: {pending_count} Aktennummern (Sorted Set)")
ctx.logger.info(f" • Processing: {processing_count} Aktennummern (Set)")
ctx.logger.info(f" • Debounce: {debounce_seconds} seconds")
# Poll Redis Sorted Set: Hole Einträge älter als 10 Sekunden
# ZRANGEBYSCORE: Return members with score between min and max (timestamp)
old_entries = redis_client.zrangebyscore(
"advoware:pending_aktennummern",
min=0, # Älteste möglich
max=cutoff_time, # Maximal cutoff_time (vor 10 Sekunden)
start=0,
num=1 # Nur 1 Eintrag pro Iteration
)
if not old_entries or len(old_entries) == 0:
# Entweder Queue leer ODER alle Einträge sind zu neu (<10 Sekunden)
if pending_count > 0:
ctx.logger.info(f"⏸️ {pending_count} Aktennummern in queue, but all too recent (< {debounce_seconds}s)")
ctx.logger.info(f" Waiting for debounce window to pass...")
else:
ctx.logger.info("✓ No pending Aktennummern (queue is empty)")
ctx.logger.info("=" * 80)
return
# Aktennummer gefunden (≥10 Sekunden alt)
aktennr = old_entries[0]
# Decode if bytes
if isinstance(aktennr, bytes):
aktennr = aktennr.decode('utf-8')
# Hole den Timestamp des Eintrags
score = redis_client.zscore("advoware:pending_aktennummern", aktennr)
age_seconds = time.time() - score if score else 0
# Entferne aus Sorted Set
redis_client.zrem("advoware:pending_aktennummern", aktennr)
ctx.logger.info(f"")
ctx.logger.info(f"📋 Processing Aktennummer: {aktennr}")
ctx.logger.info(f" ├─ First Event: {age_seconds:.1f} seconds ago")
ctx.logger.info(f" ├─ Debounced: ✅ (waited {debounce_seconds}s)")
ctx.logger.info(f" └─ Removed from pending queue")
ctx.logger.info(f" ├─ Source: Redis SET 'advoware:pending_aktennummern'")
ctx.logger.info(f" ├─ Action: Moved to 'advoware:processing_aktennummern'")
ctx.logger.info(f" └─ Next: Validate Akte status in EspoCRM")
# Move to processing SET
redis_client.sadd("advoware:processing_aktennummern", aktennr)
ctx.logger.info(f"✓ Moved to processing queue")
# Validate Akte status in EspoCRM
ctx.logger.info(f"")
ctx.logger.info(f"🔍 Looking up Akte in EspoCRM...")
try:
# Search for Akte by aktennummer
result = await espocrm.list_entities(
'CAdvowareAkten',
where=[{
'type': 'equals',
'attribute': 'aktennummer',
'value': aktennr
}],
max_size=1
)
if not result or not result.get('list') or len(result['list']) == 0:
ctx.logger.warn(f"")
ctx.logger.warn(f"⚠️ REJECTED: Akte {aktennr} not found in EspoCRM")
ctx.logger.warn(f" Reason: No CAdvowareAkten entity with aktennummer={aktennr}")
ctx.logger.warn(f" Action: Removed from processing queue")
ctx.logger.warn(f" Impact: Will not be synced until re-added to Redis")
redis_client.srem("advoware:processing_aktennummern", aktennr)
return
akte = result['list'][0]
akte_id = akte.get('id', '')
advoware_id = akte.get('advowareId', 'N/A')
aktivierungsstatus = akte.get('aktivierungsstatus', 'N/A') # Feldname kleingeschrieben!
ctx.logger.info(f"✓ Akte found in EspoCRM:")
ctx.logger.info(f" ├─ EspoCRM ID: {akte_id}")
ctx.logger.info(f" ├─ Advoware ID: {advoware_id}")
ctx.logger.info(f" ├─ Aktivierungsstatus RAW: '{aktivierungsstatus}' (type: {type(aktivierungsstatus).__name__})")
ctx.logger.info(f" └─ All akte fields: {list(akte.keys())[:10]}...") # Debug: Zeige Feldnamen
# Valid statuses: Both German and English variants accepted
# German: import, neu, aktiv
# English: import, new, active
valid_statuses = ['import', 'neu', 'aktiv', 'new', 'active']
aktivierungsstatus_lower = str(aktivierungsstatus).lower().strip()
ctx.logger.info(f"🔍 Status validation:")
ctx.logger.info(f" ├─ Aktivierungsstatus: '{aktivierungsstatus}'")
ctx.logger.info(f" ├─ Aktivierungsstatus (lowercase): '{aktivierungsstatus_lower}'")
ctx.logger.info(f" ├─ Valid statuses: {valid_statuses}")
ctx.logger.info(f" └─ Is valid? {aktivierungsstatus_lower in valid_statuses}")
if aktivierungsstatus_lower not in valid_statuses:
ctx.logger.warn(f"")
ctx.logger.warn(f"⚠️ REJECTED: Akte {aktennr} has invalid aktivierungsstatus")
ctx.logger.warn(f" Current Aktivierungsstatus: '{aktivierungsstatus}' (lowercased: '{aktivierungsstatus_lower}')")
ctx.logger.warn(f" Valid Statuses: {valid_statuses}")
ctx.logger.warn(f" Reason: Only active Akten are synced")
ctx.logger.warn(f" Action: Removed from processing queue")
redis_client.srem("advoware:processing_aktennummern", aktennr)
return
ctx.logger.info(f"")
ctx.logger.info(f"✅ ACCEPTED: Akte {aktennr} is valid for sync")
ctx.logger.info(f" Aktivierungsstatus: {aktivierungsstatus} (valid)")
ctx.logger.info(f" Action: Emitting sync event to queue")
# Emit sync event
ctx.logger.info(f"📤 Emitting event to topic 'advoware.document.sync'...")
await ctx.enqueue({
'topic': 'advoware.document.sync',
'data': {
'aktennummer': aktennr,
'akte_id': akte_id,
'aktivierungsstatus': aktivierungsstatus # FIXED: war 'status'
}
})
ctx.logger.info(f"✅ Event emitted successfully")
ctx.logger.info(f"")
ctx.logger.info(f"🚀 Sync event emitted successfully")
ctx.logger.info(f" Topic: advoware.document.sync")
ctx.logger.info(f" Payload: aktennummer={aktennr}, akte_id={akte_id}, aktivierungsstatus={aktivierungsstatus}")
ctx.logger.info(f" Next: Event handler will process sync")
except Exception as e:
ctx.logger.error(f"")
ctx.logger.error(f"❌ ERROR: Failed to process {aktennr}")
ctx.logger.error(f" Error Type: {type(e).__name__}")
ctx.logger.error(f" Error Message: {str(e)}")
ctx.logger.error(f" Traceback: ", exc_info=True) # Full traceback
ctx.logger.error(f" Action: Moving back to pending queue for retry")
# Move back to pending Sorted Set for retry
# Set timestamp to NOW so it gets retried immediately (no debounce on retry)
retry_timestamp = time.time()
redis_client.zadd(
"advoware:pending_aktennummern",
{aktennr: retry_timestamp}
)
ctx.logger.info(f"✓ Moved {aktennr} back to pending queue (timestamp: now)")
raise
except Exception as e:
ctx.logger.error(f"")
ctx.logger.error(f"❌ CRON POLLER ERROR (non-fatal)")
ctx.logger.error(f" Error Type: {type(e).__name__}")
ctx.logger.error(f" Error Message: {str(e)}")
ctx.logger.error(f" Traceback: ", exc_info=True) # Full traceback
ctx.logger.error(f" Impact: This iteration failed, will retry in next cycle")
# Don't raise - let next cron iteration retry
finally:
# Final queue status
try:
pending_final = redis_client.zcard("advoware:pending_aktennummern")
processing_final = redis_client.scard("advoware:processing_aktennummern")
ctx.logger.info(f"")
ctx.logger.info(f"📊 Final Queue Status:")
ctx.logger.info(f" • Pending: {pending_final} Aktennummern")
ctx.logger.info(f" • Processing: {processing_final} Aktennummern")
except:
pass
ctx.logger.info("=" * 80)

View File

@@ -0,0 +1 @@
# Akte sync steps unified sync across Advoware, EspoCRM, and xAI

View File

@@ -0,0 +1,135 @@
"""
Akte Sync - Cron Poller
Polls Redis Sorted Set for pending Aktennummern every 10 seconds.
Respects a 10-second debounce window so that rapid filesystem events
(e.g. many files being updated at once) are batched into a single sync.
Redis keys (same as advoware-watcher writes to):
advoware:pending_aktennummern Sorted Set { aktennummer → timestamp }
advoware:processing_aktennummern Set (tracks active syncs)
Eligibility check (either flag triggers a sync):
syncSchalter == True AND aktivierungsstatus in valid list → Advoware sync
aiAktivierungsstatus in valid list → xAI sync
"""
from motia import FlowContext, cron
config = {
"name": "Akte Sync - Cron Poller",
"description": "Poll Redis for pending Aktennummern and emit akte.sync events (10 s debounce)",
"flows": ["akte-sync"],
"triggers": [cron("*/10 * * * * *")],
"enqueues": ["akte.sync"],
}
PENDING_KEY = "advoware:pending_aktennummern"
PROCESSING_KEY = "advoware:processing_aktennummern"
DEBOUNCE_SECS = 10
VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'}
VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'}
async def handler(input_data: None, ctx: FlowContext) -> None:
import time
from services.redis_client import get_redis_client
from services.espocrm import EspoCRMAPI
ctx.logger.info("=" * 60)
ctx.logger.info("⏰ AKTE CRON POLLER")
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable")
ctx.logger.info("=" * 60)
return
espocrm = EspoCRMAPI(ctx)
cutoff = time.time() - DEBOUNCE_SECS
pending_count = redis_client.zcard(PENDING_KEY)
processing_count = redis_client.scard(PROCESSING_KEY)
ctx.logger.info(f" Pending : {pending_count}")
ctx.logger.info(f" Processing : {processing_count}")
# Pull oldest entry that has passed the debounce window
old_entries = redis_client.zrangebyscore(PENDING_KEY, min=0, max=cutoff, start=0, num=1)
if not old_entries:
if pending_count > 0:
ctx.logger.info(f"⏸️ {pending_count} pending all too recent (< {DEBOUNCE_SECS}s)")
else:
ctx.logger.info("✓ Queue empty")
ctx.logger.info("=" * 60)
return
aktennr = old_entries[0]
if isinstance(aktennr, bytes):
aktennr = aktennr.decode()
score = redis_client.zscore(PENDING_KEY, aktennr) or 0
age = time.time() - score
redis_client.zrem(PENDING_KEY, aktennr)
redis_client.sadd(PROCESSING_KEY, aktennr)
ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)")
try:
# ── Lookup in EspoCRM ──────────────────────────────────────
result = await espocrm.list_entities(
'CAkten',
where=[{
'type': 'equals',
'attribute': 'aktennummer',
'value': aktennr,
}],
max_size=1,
)
if not result or not result.get('list'):
ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} removing")
redis_client.srem(PROCESSING_KEY, aktennr)
ctx.logger.info("=" * 60)
return
akte = result['list'][0]
akte_id = akte['id']
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_status = str(akte.get('aiAktivierungsstatus') or '').lower()
advoware_eligible = sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
xai_eligible = ai_status in VALID_AI_STATUSES
ctx.logger.info(f" Akte ID : {akte_id}")
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'' if advoware_eligible else '⏭️'})")
ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'' if xai_eligible else '⏭️'})")
if not advoware_eligible and not xai_eligible:
ctx.logger.warn(f"⚠️ Akte {aktennr} not eligible for any sync removing")
redis_client.srem(PROCESSING_KEY, aktennr)
ctx.logger.info("=" * 60)
return
# ── Emit sync event ────────────────────────────────────────
await ctx.enqueue({
'topic': 'akte.sync',
'data': {
'aktennummer': aktennr,
'akte_id': akte_id,
},
})
ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id})")
except Exception as e:
ctx.logger.error(f"❌ Error processing {aktennr}: {e}")
# Requeue for retry
redis_client.zadd(PENDING_KEY, {aktennr: time.time()})
redis_client.srem(PROCESSING_KEY, aktennr)
raise
finally:
ctx.logger.info("=" * 60)

View File

@@ -0,0 +1,401 @@
"""
Akte Sync - Event Handler
Unified sync for one CAkten entity across all configured backends:
- Advoware (3-way merge: Windows ↔ EspoCRM ↔ History)
- xAI (Blake3 hash-based upload to Collection)
Both run in the same event to keep CDokumente perfectly in sync.
Trigger: akte.sync { akte_id, aktennummer }
Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte)
Parallel: Different Akten sync simultaneously.
Enqueues:
- document.generate_preview (after CREATE / UPDATE_ESPO)
"""
from typing import Dict, Any
from datetime import datetime
from motia import FlowContext, queue
config = {
"name": "Akte Sync - Event Handler",
"description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload",
"flows": ["akte-sync"],
"triggers": [queue("akte.sync")],
"enqueues": ["document.generate_preview"],
}
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
akte_id = event_data.get('akte_id')
aktennummer = event_data.get('aktennummer')
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 AKTE SYNC STARTED")
ctx.logger.info(f" Aktennummer : {aktennummer}")
ctx.logger.info(f" EspoCRM ID : {akte_id}")
ctx.logger.info("=" * 80)
from services.redis_client import get_redis_client
from services.espocrm import EspoCRMAPI
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable")
return
lock_key = f"akte_sync:{akte_id}"
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer} requeueing")
raise RuntimeError(f"Lock busy for {aktennummer}")
espocrm = EspoCRMAPI(ctx)
try:
# ── Load Akte ──────────────────────────────────────────────────────
akte = await espocrm.get_entity('CAkten', akte_id)
if not akte:
ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM")
redis_client.srem("akte:processing", aktennummer)
return
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
ctx.logger.info(f"📋 Akte '{akte.get('name')}'")
ctx.logger.info(f" syncSchalter : {sync_schalter}")
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}")
ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}")
advoware_enabled = sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active')
xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active')
ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}")
ctx.logger.info(f" xAI sync : {'✅ ON' if xai_enabled else '⏭️ OFF'}")
if not advoware_enabled and not xai_enabled:
ctx.logger.info("⏭️ Both syncs disabled nothing to do")
redis_client.srem("akte:processing", aktennummer)
return
# ── ADVOWARE SYNC ──────────────────────────────────────────────────
advoware_results = None
if advoware_enabled:
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx)
# ── xAI SYNC ──────────────────────────────────────────────────────
if xai_enabled:
await _run_xai_sync(akte, akte_id, espocrm, ctx)
# ── Final Status ───────────────────────────────────────────────────
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'}
if advoware_enabled:
final_update['syncStatus'] = 'synced'
final_update['lastSync'] = now
if xai_enabled:
final_update['aiSyncStatus'] = 'synced'
final_update['aiLastSync'] = now
await espocrm.update_entity('CAkten', akte_id, final_update)
redis_client.srem("akte:processing", aktennummer)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ AKTE SYNC COMPLETE")
if advoware_results:
ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Sync failed: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
# Requeue for retry
import time
redis_client.zadd("akte:pending", {aktennummer: time.time()})
try:
await espocrm.update_entity('CAkten', akte_id, {
'syncStatus': 'failed',
'globalSyncStatus': 'failed',
})
except Exception:
pass
raise
finally:
if lock_acquired and redis_client:
redis_client.delete(lock_key)
ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}")
# ─────────────────────────────────────────────────────────────────────────────
# Advoware 3-way merge
# ─────────────────────────────────────────────────────────────────────────────
async def _run_advoware_sync(
akte: Dict[str, Any],
aktennummer: str,
akte_id: str,
espocrm,
ctx: FlowContext,
) -> Dict[str, int]:
from services.advoware_watcher_service import AdvowareWatcherService
from services.advoware_history_service import AdvowareHistoryService
from services.advoware_service import AdvowareService
from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils
from services.blake3_utils import compute_blake3
import mimetypes
watcher = AdvowareWatcherService(ctx)
history_service = AdvowareHistoryService(ctx)
advoware_service = AdvowareService(ctx)
sync_utils = AdvowareDocumentSyncUtils(ctx)
results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
ctx.logger.info("")
ctx.logger.info("" * 60)
ctx.logger.info("📂 ADVOWARE SYNC")
ctx.logger.info("" * 60)
# ── Fetch from all 3 sources ───────────────────────────────────────
espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
espo_docs = espo_docs_result.get('list', [])
try:
windows_files = await watcher.get_akte_files(aktennummer)
except Exception as e:
ctx.logger.error(f"❌ Windows watcher failed: {e}")
windows_files = []
try:
advo_history = await history_service.get_akte_history(aktennummer)
except Exception as e:
ctx.logger.error(f"❌ Advoware history failed: {e}")
advo_history = []
ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}")
ctx.logger.info(f" Windows files : {len(windows_files)}")
ctx.logger.info(f" History entries: {len(advo_history)}")
# ── Cleanup Windows list (only files in History) ───────────────────
windows_files = sync_utils.cleanup_file_list(windows_files, advo_history)
# ── Build indexes by HNR (stable identifier from Advoware) ────────
espo_by_hnr = {}
for doc in espo_docs:
if doc.get('hnr'):
espo_by_hnr[doc['hnr']] = doc
history_by_hnr = {}
for entry in advo_history:
if entry.get('hNr'):
history_by_hnr[entry['hNr']] = entry
windows_by_path = {f.get('path', '').lower(): f for f in windows_files}
all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys())
ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}")
# ── 3-way merge per HNR ───────────────────────────────────────────
for hnr in all_hnrs:
espo_doc = espo_by_hnr.get(hnr)
history_entry = history_by_hnr.get(hnr)
windows_file = None
if history_entry and history_entry.get('datei'):
windows_file = windows_by_path.get(history_entry['datei'].lower())
if history_entry and history_entry.get('datei'):
filename = history_entry['datei'].split('\\')[-1]
elif espo_doc:
filename = espo_doc.get('name', f'hnr_{hnr}')
else:
filename = f'hnr_{hnr}'
try:
action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry)
ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) {action.reason}")
if action.action == 'SKIP':
results['skipped'] += 1
elif action.action == 'CREATE':
if not windows_file:
ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}")
results['errors'] += 1
continue
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
blake3_hash = compute_blake3(content)
mime_type, _ = mimetypes.guess_type(filename)
mime_type = mime_type or 'application/octet-stream'
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
attachment = await espocrm.upload_attachment_for_file_field(
file_content=content,
filename=filename,
related_type='CDokumente',
field='dokument',
mime_type=mime_type,
)
new_doc = await espocrm.create_entity('CDokumente', {
'name': filename,
'dokumentId': attachment.get('id'),
'hnr': history_entry.get('hNr') if history_entry else None,
'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben',
'advowareBemerkung': history_entry.get('text', '') if history_entry else '',
'dateipfad': windows_file.get('path', ''),
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
'usn': windows_file.get('usn', 0),
'syncStatus': 'synced',
'lastSyncTimestamp': now,
'cAktenId': akte_id, # Direct FK to CAkten
})
doc_id = new_doc.get('id')
# Link to Akte
await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id)
results['created'] += 1
# Trigger preview
try:
await ctx.emit('document.generate_preview', {
'entity_id': doc_id,
'entity_type': 'CDokumente',
})
except Exception as e:
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
elif action.action == 'UPDATE_ESPO':
if not windows_file:
ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}")
results['errors'] += 1
continue
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
blake3_hash = compute_blake3(content)
mime_type, _ = mimetypes.guess_type(filename)
mime_type = mime_type or 'application/octet-stream'
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
update_data: Dict[str, Any] = {
'name': filename,
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
'usn': windows_file.get('usn', 0),
'dateipfad': windows_file.get('path', ''),
'syncStatus': 'synced',
'lastSyncTimestamp': now,
}
if history_entry:
update_data['hnr'] = history_entry.get('hNr')
update_data['advowareArt'] = history_entry.get('art', 'Schreiben')
update_data['advowareBemerkung'] = history_entry.get('text', '')
await espocrm.update_entity('CDokumente', espo_doc['id'], update_data)
results['updated'] += 1
# Mark for re-sync to xAI (hash changed)
if espo_doc.get('aiSyncStatus') == 'synced':
await espocrm.update_entity('CDokumente', espo_doc['id'], {
'aiSyncStatus': 'unclean',
})
try:
await ctx.emit('document.generate_preview', {
'entity_id': espo_doc['id'],
'entity_type': 'CDokumente',
})
except Exception as e:
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
elif action.action == 'DELETE':
if espo_doc:
await espocrm.delete_entity('CDokumente', espo_doc['id'])
results['deleted'] += 1
except Exception as e:
ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}")
results['errors'] += 1
# ── Ablage check ───────────────────────────────────────────────────
try:
akte_details = await advoware_service.get_akte(aktennummer)
if akte_details and akte_details.get('ablage') == 1:
ctx.logger.info("📁 Akte marked as ablage → deactivating")
await espocrm.update_entity('CAkten', akte_id, {
'aktivierungsstatus': 'deaktiviert',
})
except Exception as e:
ctx.logger.warn(f"⚠️ Ablage check failed: {e}")
return results
# ─────────────────────────────────────────────────────────────────────────────
# xAI sync
# ─────────────────────────────────────────────────────────────────────────────
async def _run_xai_sync(
akte: Dict[str, Any],
akte_id: str,
espocrm,
ctx: FlowContext,
) -> None:
from services.xai_service import XAIService
from services.xai_upload_utils import XAIUploadUtils
xai = XAIService(ctx)
upload_utils = XAIUploadUtils(ctx)
ctx.logger.info("")
ctx.logger.info("" * 60)
ctx.logger.info("🤖 xAI SYNC")
ctx.logger.info("" * 60)
try:
# ── Ensure collection exists ───────────────────────────────────
collection_id = await upload_utils.ensure_collection(akte, xai, espocrm)
if not collection_id:
ctx.logger.error("❌ Could not obtain xAI collection aborting xAI sync")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
# ── Load all linked documents ──────────────────────────────────
docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
docs = docs_result.get('list', [])
ctx.logger.info(f" Documents to check: {len(docs)}")
synced = 0
skipped = 0
failed = 0
for doc in docs:
ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm)
if ok:
if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'):
skipped += 1
else:
synced += 1
else:
failed += 1
ctx.logger.info(f" ✅ Synced : {synced}")
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
ctx.logger.info(f" ❌ Failed : {failed}")
finally:
await xai.close()

View File

@@ -1,90 +0,0 @@
"""AI Knowledge Daily Sync - Cron Job"""
from typing import Any
from motia import FlowContext, cron
config = {
"name": "AI Knowledge Daily Sync",
"description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
"flows": ["aiknowledge-full-sync"],
"triggers": [
cron("0 0 2 * * *"), # Daily at 2:00 AM
],
"enqueues": ["aiknowledge.sync"],
}
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
"""
Daily sync handler - ensures all active knowledge bases are synchronized.
Loads all CAIKnowledge entities that need sync and emits events.
Blake3 hash verification is always performed (hash available from JunctionData API).
Runs every day at 02:00:00.
"""
from services.espocrm import EspoCRMAPI
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
ctx.logger.info("=" * 80)
ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
ctx.logger.info("=" * 80)
espocrm = EspoCRMAPI(ctx)
try:
# Load all CAIKnowledge entities with status 'active' that need sync
result = await espocrm.list_entities(
'CAIKnowledge',
where=[
{
'type': 'equals',
'attribute': 'aktivierungsstatus',
'value': AIKnowledgeActivationStatus.ACTIVE.value
},
{
'type': 'in',
'attribute': 'syncStatus',
'value': [
AIKnowledgeSyncStatus.UNCLEAN.value,
AIKnowledgeSyncStatus.FAILED.value
]
}
],
select='id,name,syncStatus',
max_size=1000 # Adjust if you have more
)
entities = result.get('list', [])
total = len(entities)
ctx.logger.info(f"📊 Found {total} knowledge bases needing sync")
if total == 0:
ctx.logger.info("✅ All knowledge bases are synced")
ctx.logger.info("=" * 80)
return
# Enqueue sync events for all (Blake3 verification always enabled)
for i, entity in enumerate(entities, 1):
await ctx.enqueue({
'topic': 'aiknowledge.sync',
'data': {
'knowledge_id': entity['id'],
'source': 'daily_cron'
}
})
ctx.logger.info(
f"📤 [{i}/{total}] Enqueued: {entity['name']} "
f"(syncStatus={entity.get('syncStatus')})"
)
ctx.logger.info("=" * 80)
ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ FULL SYNC FAILED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}", exc_info=True)
raise

View File

@@ -1,89 +0,0 @@
"""AI Knowledge Sync Event Handler"""
from typing import Dict, Any
from redis import Redis
from motia import FlowContext, queue
config = {
"name": "AI Knowledge Sync",
"description": "Synchronizes CAIKnowledge entities with XAI Collections",
"flows": ["vmh-aiknowledge"],
"triggers": [
queue("aiknowledge.sync")
],
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""
Event handler for AI Knowledge synchronization.
Emitted by:
- Webhook on CAIKnowledge update
- Daily full sync cron job
Args:
event_data: Event payload with knowledge_id
ctx: Motia context
"""
from services.redis_client import RedisClientFactory
from services.aiknowledge_sync_utils import AIKnowledgeSync
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 AI KNOWLEDGE SYNC STARTED")
ctx.logger.info("=" * 80)
# Extract data
knowledge_id = event_data.get('knowledge_id')
source = event_data.get('source', 'unknown')
if not knowledge_id:
ctx.logger.error("❌ Missing knowledge_id in event data")
return
ctx.logger.info(f"📋 Knowledge ID: {knowledge_id}")
ctx.logger.info(f"📋 Source: {source}")
ctx.logger.info("=" * 80)
# Get Redis for locking
redis_client = RedisClientFactory.get_client(strict=False)
# Initialize sync utils
sync_utils = AIKnowledgeSync(ctx, redis_client)
# Acquire lock
lock_acquired = await sync_utils.acquire_sync_lock(knowledge_id)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Lock already held for {knowledge_id}, skipping")
ctx.logger.info(" (Will be retried by Motia queue)")
raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry
try:
# Perform sync (Blake3 hash verification always enabled)
await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ AI KNOWLEDGE SYNC COMPLETED")
ctx.logger.info("=" * 80)
# Release lock with success=True
await sync_utils.release_sync_lock(knowledge_id, success=True)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ AI KNOWLEDGE SYNC FAILED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Knowledge ID: {knowledge_id}")
ctx.logger.error("=" * 80)
# Release lock with failure
await sync_utils.release_sync_lock(
knowledge_id,
success=False,
error_message=str(e)
)
# Re-raise to let Motia retry
raise

View File

@@ -1,336 +0,0 @@
"""
VMH Document xAI Sync Handler
Zentraler Sync-Handler für Documents mit xAI Collections.
Triggers preview generation for new/changed files.
Verarbeitet:
- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig
- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig
- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections
Enqueues:
- document.generate_preview: Bei new/changed Status
"""
from typing import Dict, Any
from motia import FlowContext, queue
from services.espocrm import EspoCRMAPI
from services.document_sync_utils import DocumentSync
from services.xai_service import XAIService
from services.redis_client import get_redis_client
import hashlib
import json
config = {
"name": "VMH Document xAI Sync Handler",
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
"flows": ["vmh-documents"],
"triggers": [
queue("vmh.document.create"),
queue("vmh.document.update"),
queue("vmh.document.delete")
],
"enqueues": ["document.generate_preview"]
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""Zentraler Sync-Handler für Documents"""
entity_id = event_data.get('entity_id')
entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente
action = event_data.get('action')
source = event_data.get('source')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")
return
ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info(f"Source: {source}")
ctx.logger.info("=" * 80)
# Shared Redis client for distributed locking (centralized factory)
redis_client = get_redis_client(strict=False)
# APIs initialisieren (mit Context für besseres Logging)
espocrm = EspoCRMAPI(ctx)
sync_utils = DocumentSync(espocrm, redis_client, ctx)
xai_service = XAIService(ctx)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_type} {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
try:
document = await espocrm.get_entity(entity_type, entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
return
ctx.logger.info(f"📋 {entity_type} geladen:")
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}")
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
if action == 'delete':
await handle_delete(entity_id, document, sync_utils, xai_service, ctx, entity_type)
elif action in ['create', 'update']:
await handle_create_or_update(entity_id, document, sync_utils, xai_service, ctx, entity_type)
else:
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type)
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
success=False,
error_message=str(e)[:2000],
entity_type=entity_type
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:document:{entity_id}"
redis_client.delete(lock_key)
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None:
"""
Behandelt Create/Update von Documents
Entscheidet ob xAI-Sync nötig ist und führt diesen durch
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
ctx.logger.info("=" * 80)
# Datei-Status für Preview-Generierung (verschiedene Feld-Namen unterstützen)
datei_status = document.get('fileStatus') or document.get('dateiStatus')
# Entscheidungslogik: Soll dieses Document zu xAI?
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
ctx.logger.info(f" Grund: {reason}")
ctx.logger.info(f" File-Status: {datei_status or 'N/A'}")
if collection_ids:
ctx.logger.info(f" Collections: {collection_ids}")
# ═══════════════════════════════════════════════════════════════
# CHECK: Knowledge Bases mit Status "new" (noch keine Collection)
# ═══════════════════════════════════════════════════════════════
new_knowledge_bases = [cid for cid in collection_ids if cid.startswith('NEW:')]
if new_knowledge_bases:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🆕 DOKUMENT IST MIT KNOWLEDGE BASE(S) VERKNÜPFT (Status: new)")
ctx.logger.info("=" * 80)
for new_kb in new_knowledge_bases:
kb_id = new_kb[4:] # Remove "NEW:" prefix
ctx.logger.info(f"📋 CAIKnowledge {kb_id}")
ctx.logger.info(f" Status: new → Collection muss zuerst erstellt werden")
# Trigger Knowledge Sync
ctx.logger.info(f"📤 Triggering aiknowledge.sync event...")
await ctx.emit('aiknowledge.sync', {
'entity_id': kb_id,
'entity_type': 'CAIKnowledge',
'triggered_by': 'document_sync',
'document_id': entity_id
})
ctx.logger.info(f"✅ Event emitted for {kb_id}")
# Release lock and skip document sync - knowledge sync will handle documents
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ KNOWLEDGE SYNC GETRIGGERT")
ctx.logger.info(" Document Sync wird übersprungen")
ctx.logger.info(" (Knowledge Sync erstellt Collection und synchronisiert dann Dokumente)")
ctx.logger.info("=" * 80)
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
# ═══════════════════════════════════════════════════════════════
# PREVIEW-GENERIERUNG bei neuen/geänderten Dateien
# ═══════════════════════════════════════════════════════════════
# Case-insensitive check für Datei-Status
datei_status_lower = (datei_status or '').lower()
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🖼️ TRIGGER PREVIEW-GENERIERUNG")
ctx.logger.info(f" Datei-Status: {datei_status}")
ctx.logger.info("=" * 80)
try:
# Enqueue preview generation event
await ctx.emit('document.generate_preview', {
'entity_id': entity_id,
'entity_type': entity_type
})
ctx.logger.info(f"✅ Preview generation event emitted for {entity_id}")
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Triggern der Preview-Generierung: {e}")
# Continue - Preview ist optional
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════════
# xAI SYNC (falls erforderlich)
# ═══════════════════════════════════════════════════════════════
if not needs_sync:
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
# Wenn Preview generiert wurde aber kein xAI sync nötig,
# wurde Status bereits in Preview-Schritt zurückgesetzt
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
# ═══════════════════════════════════════════════════════════════
# xAI SYNC DURCHFÜHREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🤖 xAI SYNC STARTEN")
ctx.logger.info("=" * 80)
# 1. Hole Download-Informationen (falls nicht schon aus Preview-Schritt vorhanden)
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
raise Exception("Konnte Download-Info nicht ermitteln Datei fehlt?")
ctx.logger.info(f"📥 Datei: {download_info['filename']} ({download_info['size']} bytes, {download_info['mime_type']})")
# 2. Download Datei von EspoCRM
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. MD5-Hash berechnen für Change-Detection
file_hash = hashlib.md5(file_content).hexdigest()
ctx.logger.info(f"🔑 MD5: {file_hash}")
# 4. Upload zu xAI
# Immer neu hochladen wenn needs_sync=True (neues File oder Hash geändert)
ctx.logger.info("📤 Uploading to xAI...")
xai_file_id = await xai_service.upload_file(
file_content,
download_info['filename'],
download_info['mime_type']
)
ctx.logger.info(f"✅ xAI file_id: {xai_file_id}")
# 5. Zu allen Ziel-Collections hinzufügen
ctx.logger.info(f"📚 Füge zu {len(collection_ids)} Collection(s) hinzu...")
added_collections = await xai_service.add_to_collections(collection_ids, xai_file_id)
ctx.logger.info(f"✅ In {len(added_collections)}/{len(collection_ids)} Collections eingetragen")
# 6. EspoCRM Metadaten aktualisieren und Lock freigeben
await sync_utils.update_sync_metadata(
entity_id,
xai_file_id=xai_file_id,
collection_ids=added_collections,
file_hash=file_hash,
entity_type=entity_type
)
await sync_utils.release_sync_lock(
entity_id,
success=True,
entity_type=entity_type
)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Create/Update: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None:
"""
Behandelt Delete von Documents
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
ctx.logger.info("=" * 80)
xai_file_id = document.get('xaiFileId') or document.get('xaiId')
xai_collections = document.get('xaiCollections') or []
if not xai_file_id or not xai_collections:
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
ctx.logger.info(f"📋 Document Info:")
ctx.logger.info(f" xaiFileId: {xai_file_id}")
ctx.logger.info(f" Collections: {xai_collections}")
ctx.logger.info(f"🗑️ Entferne aus {len(xai_collections)} Collection(s)...")
await xai_service.remove_from_collections(xai_collections, xai_file_id)
ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
ctx.logger.info(" (File selbst bleibt in xAI kann in anderen Collections sein)")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DELETE ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)