- Removed the old VMH Document xAI Sync Handler implementation. - Introduced new xAI Upload Utilities for shared upload logic across sync flows. - Created a unified Akte sync structure with cron polling and event handling. - Implemented Akte Sync Cron Poller to manage pending Aktennummern with a debounce mechanism. - Developed Akte Sync Event Handler for synchronized processing across Advoware and xAI. - Enhanced logging and error handling throughout the new sync processes. - Ensured compatibility with existing Redis and EspoCRM services.
202 lines
7.6 KiB
Python
202 lines
7.6 KiB
Python
"""
|
|
xAI Upload Utilities
|
|
|
|
Shared logic for uploading documents from EspoCRM to xAI Collections.
|
|
Used by all sync flows (Advoware + direct xAI sync).
|
|
|
|
Handles:
|
|
- Blake3 hash-based change detection
|
|
- Upload to xAI with correct filename/MIME
|
|
- Collection management (create/verify)
|
|
- EspoCRM metadata update after sync
|
|
"""
|
|
|
|
from typing import Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
|
|
class XAIUploadUtils:
|
|
"""
|
|
Stateless utility class for document upload operations to xAI.
|
|
|
|
All methods take explicit service instances to remain reusable
|
|
across different sync contexts.
|
|
"""
|
|
|
|
def __init__(self, ctx):
|
|
from services.logging_utils import get_service_logger
|
|
self._log = get_service_logger(__name__, ctx)
|
|
|
|
async def ensure_collection(
|
|
self,
|
|
akte: Dict[str, Any],
|
|
xai,
|
|
espocrm,
|
|
) -> Optional[str]:
|
|
"""
|
|
Ensure xAI collection exists for this Akte.
|
|
Creates one if missing, verifies it if present.
|
|
|
|
Returns:
|
|
collection_id or None on failure
|
|
"""
|
|
akte_id = akte['id']
|
|
akte_name = akte.get('name', f"Akte {akte.get('aktennummer', akte_id)}")
|
|
collection_id = akte.get('aiCollectionId')
|
|
|
|
if collection_id:
|
|
# Verify it still exists in xAI
|
|
try:
|
|
col = await xai.get_collection(collection_id)
|
|
if col:
|
|
self._log.debug(f"Collection {collection_id} verified for '{akte_name}'")
|
|
return collection_id
|
|
self._log.warn(f"Collection {collection_id} not found in xAI, recreating...")
|
|
except Exception as e:
|
|
self._log.warn(f"Could not verify collection {collection_id}: {e}, recreating...")
|
|
|
|
# Create new collection
|
|
try:
|
|
self._log.info(f"Creating xAI collection for '{akte_name}'...")
|
|
col = await xai.create_collection(
|
|
name=akte_name,
|
|
metadata={
|
|
'espocrm_entity_type': 'CAkten',
|
|
'espocrm_entity_id': akte_id,
|
|
'aktennummer': str(akte.get('aktennummer', '')),
|
|
}
|
|
)
|
|
collection_id = col['id']
|
|
self._log.info(f"✅ Collection created: {collection_id}")
|
|
|
|
# Save back to EspoCRM
|
|
await espocrm.update_entity('CAkten', akte_id, {
|
|
'aiCollectionId': collection_id,
|
|
'aiSyncStatus': 'unclean', # Trigger full doc sync
|
|
})
|
|
return collection_id
|
|
|
|
except Exception as e:
|
|
self._log.error(f"❌ Failed to create xAI collection: {e}")
|
|
return None
|
|
|
|
async def sync_document_to_xai(
|
|
self,
|
|
doc: Dict[str, Any],
|
|
collection_id: str,
|
|
xai,
|
|
espocrm,
|
|
) -> bool:
|
|
"""
|
|
Sync a single CDokumente entity to xAI collection.
|
|
|
|
Decision logic (Blake3-based):
|
|
- aiSyncStatus in ['new', 'unclean', 'failed'] → always sync
|
|
- aiSyncStatus == 'synced' AND aiSyncHash == blake3hash → skip (no change)
|
|
- aiSyncStatus == 'synced' AND aiSyncHash != blake3hash → re-upload (changed)
|
|
- No attachment → mark unsupported
|
|
|
|
Returns:
|
|
True if synced/skipped successfully, False on error
|
|
"""
|
|
doc_id = doc['id']
|
|
doc_name = doc.get('name', doc_id)
|
|
ai_status = doc.get('aiSyncStatus', 'new')
|
|
ai_sync_hash = doc.get('aiSyncHash')
|
|
blake3_hash = doc.get('blake3hash')
|
|
ai_file_id = doc.get('aiFileId')
|
|
|
|
self._log.info(f" 📄 {doc_name}")
|
|
self._log.info(f" aiSyncStatus={ai_status}, aiSyncHash={ai_sync_hash[:12] if ai_sync_hash else 'N/A'}..., blake3={blake3_hash[:12] if blake3_hash else 'N/A'}...")
|
|
|
|
# Skip if already synced and hash matches
|
|
if ai_status == 'synced' and ai_sync_hash and blake3_hash and ai_sync_hash == blake3_hash:
|
|
self._log.info(f" ⏭️ Skipped (hash match, no change)")
|
|
return True
|
|
|
|
# Get attachment info
|
|
attachment_id = doc.get('dokumentId')
|
|
if not attachment_id:
|
|
self._log.warn(f" ⚠️ No attachment (dokumentId missing) - marking unsupported")
|
|
await espocrm.update_entity('CDokumente', doc_id, {
|
|
'aiSyncStatus': 'unsupported',
|
|
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
})
|
|
return True # Not an error, just unsupported
|
|
|
|
try:
|
|
# Download from EspoCRM
|
|
self._log.info(f" 📥 Downloading attachment {attachment_id}...")
|
|
file_content = await espocrm.download_attachment(attachment_id)
|
|
self._log.info(f" Downloaded {len(file_content)} bytes")
|
|
|
|
# Determine filename + MIME type
|
|
filename = doc.get('dokumentName') or doc.get('name', 'document.bin')
|
|
from urllib.parse import unquote
|
|
filename = unquote(filename)
|
|
|
|
import mimetypes
|
|
mime_type, _ = mimetypes.guess_type(filename)
|
|
if not mime_type:
|
|
mime_type = 'application/octet-stream'
|
|
|
|
# Remove old file from collection if updating
|
|
if ai_file_id and ai_status != 'new':
|
|
try:
|
|
await xai.remove_from_collection(collection_id, ai_file_id)
|
|
self._log.info(f" 🗑️ Removed old xAI file {ai_file_id}")
|
|
except Exception:
|
|
pass # Non-fatal - may already be gone
|
|
|
|
# Upload to xAI
|
|
self._log.info(f" 📤 Uploading '{filename}' ({mime_type})...")
|
|
new_xai_file_id = await xai.upload_file(file_content, filename, mime_type)
|
|
self._log.info(f" Uploaded: xai_file_id={new_xai_file_id}")
|
|
|
|
# Add to collection
|
|
await xai.add_to_collection(collection_id, new_xai_file_id)
|
|
self._log.info(f" ✅ Added to collection {collection_id}")
|
|
|
|
# Update CDokumente with sync result
|
|
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
await espocrm.update_entity('CDokumente', doc_id, {
|
|
'aiFileId': new_xai_file_id,
|
|
'aiCollectionId': collection_id,
|
|
'aiSyncHash': blake3_hash or doc.get('syncedHash'),
|
|
'aiSyncStatus': 'synced',
|
|
'aiLastSync': now,
|
|
})
|
|
self._log.info(f" ✅ EspoCRM updated")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._log.error(f" ❌ Failed: {e}")
|
|
await espocrm.update_entity('CDokumente', doc_id, {
|
|
'aiSyncStatus': 'failed',
|
|
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
})
|
|
return False
|
|
|
|
async def remove_document_from_xai(
|
|
self,
|
|
doc: Dict[str, Any],
|
|
collection_id: str,
|
|
xai,
|
|
espocrm,
|
|
) -> None:
|
|
"""Remove a CDokumente from its xAI collection (called on DELETE)."""
|
|
doc_id = doc['id']
|
|
ai_file_id = doc.get('aiFileId')
|
|
if not ai_file_id:
|
|
return
|
|
try:
|
|
await xai.remove_from_collection(collection_id, ai_file_id)
|
|
self._log.info(f" 🗑️ Removed {doc.get('name')} from xAI collection")
|
|
await espocrm.update_entity('CDokumente', doc_id, {
|
|
'aiFileId': None,
|
|
'aiSyncStatus': 'new',
|
|
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
})
|
|
except Exception as e:
|
|
self._log.warn(f" ⚠️ Could not remove from xAI: {e}")
|