212 lines
8.4 KiB
Python
212 lines
8.4 KiB
Python
"""
|
||
xAI Upload Utilities
|
||
|
||
Shared logic for uploading documents from EspoCRM to xAI Collections.
|
||
Used by all sync flows (Advoware + direct xAI sync).
|
||
|
||
Handles:
|
||
- Blake3 hash-based change detection
|
||
- Upload to xAI with correct filename/MIME
|
||
- Collection management (create/verify)
|
||
- EspoCRM metadata update after sync
|
||
"""
|
||
|
||
from typing import Optional, Dict, Any
|
||
from datetime import datetime
|
||
|
||
|
||
class XAIUploadUtils:
|
||
"""
|
||
Stateless utility class for document upload operations to xAI.
|
||
|
||
All methods take explicit service instances to remain reusable
|
||
across different sync contexts.
|
||
"""
|
||
|
||
def __init__(self, ctx):
|
||
from services.logging_utils import get_service_logger
|
||
self._log = get_service_logger(__name__, ctx)
|
||
|
||
async def ensure_collection(
|
||
self,
|
||
akte: Dict[str, Any],
|
||
xai,
|
||
espocrm,
|
||
) -> Optional[str]:
|
||
"""
|
||
Ensure xAI collection exists for this Akte.
|
||
Creates one if missing, verifies it if present.
|
||
|
||
Returns:
|
||
collection_id or None on failure
|
||
"""
|
||
akte_id = akte['id']
|
||
akte_name = akte.get('name', f"Akte {akte.get('aktennummer', akte_id)}")
|
||
collection_id = akte.get('aiCollectionId')
|
||
|
||
if collection_id:
|
||
# Verify it still exists in xAI
|
||
try:
|
||
col = await xai.get_collection(collection_id)
|
||
if col:
|
||
self._log.debug(f"Collection {collection_id} verified for '{akte_name}'")
|
||
return collection_id
|
||
self._log.warn(f"Collection {collection_id} not found in xAI, recreating...")
|
||
except Exception as e:
|
||
self._log.warn(f"Could not verify collection {collection_id}: {e}, recreating...")
|
||
|
||
# Create new collection
|
||
try:
|
||
self._log.info(f"Creating xAI collection for '{akte_name}'...")
|
||
col = await xai.create_collection(
|
||
name=akte_name,
|
||
)
|
||
collection_id = col.get('collection_id') or col.get('id')
|
||
self._log.info(f"✅ Collection created: {collection_id}")
|
||
|
||
# Save back to EspoCRM
|
||
await espocrm.update_entity('CAkten', akte_id, {
|
||
'aiCollectionId': collection_id,
|
||
'aiSyncStatus': 'unclean', # Trigger full doc sync
|
||
})
|
||
return collection_id
|
||
|
||
except Exception as e:
|
||
self._log.error(f"❌ Failed to create xAI collection: {e}")
|
||
return None
|
||
|
||
async def sync_document_to_xai(
|
||
self,
|
||
doc: Dict[str, Any],
|
||
collection_id: str,
|
||
xai,
|
||
espocrm,
|
||
) -> bool:
|
||
"""
|
||
Sync a single CDokumente entity to xAI collection.
|
||
|
||
Decision logic (Blake3-based):
|
||
- aiSyncStatus in ['new', 'unclean', 'failed'] → always sync
|
||
- aiSyncStatus == 'synced' AND aiSyncHash == blake3hash → skip (no change)
|
||
- aiSyncStatus == 'synced' AND aiSyncHash != blake3hash → re-upload (changed)
|
||
- No attachment → mark unsupported
|
||
|
||
Returns:
|
||
True if synced/skipped successfully, False on error
|
||
"""
|
||
doc_id = doc['id']
|
||
doc_name = doc.get('name', doc_id)
|
||
ai_status = doc.get('aiSyncStatus', 'new')
|
||
ai_sync_hash = doc.get('aiSyncHash')
|
||
blake3_hash = doc.get('blake3hash')
|
||
ai_file_id = doc.get('aiFileId')
|
||
|
||
self._log.info(f" 📄 {doc_name}")
|
||
self._log.info(f" aiSyncStatus={ai_status}, aiSyncHash={ai_sync_hash[:12] if ai_sync_hash else 'N/A'}..., blake3={blake3_hash[:12] if blake3_hash else 'N/A'}...")
|
||
|
||
# File content unchanged (hash match) → kein Re-Upload nötig
|
||
if ai_status == 'synced' and ai_sync_hash and blake3_hash and ai_sync_hash == blake3_hash:
|
||
if ai_file_id:
|
||
self._log.info(f" ✅ Unverändert – kein Re-Upload (hash match)")
|
||
else:
|
||
self._log.info(f" ⏭️ Skipped (hash match, kein aiFileId)")
|
||
return True
|
||
|
||
# Get attachment info
|
||
attachment_id = doc.get('dokumentId')
|
||
if not attachment_id:
|
||
self._log.warn(f" ⚠️ No attachment (dokumentId missing) - marking unsupported")
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiSyncStatus': 'unsupported',
|
||
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
})
|
||
return True # Not an error, just unsupported
|
||
|
||
try:
|
||
# Download from EspoCRM
|
||
self._log.info(f" 📥 Downloading attachment {attachment_id}...")
|
||
file_content = await espocrm.download_attachment(attachment_id)
|
||
self._log.info(f" Downloaded {len(file_content)} bytes")
|
||
|
||
# Determine filename + MIME type
|
||
filename = doc.get('dokumentName') or doc.get('name', 'document.bin')
|
||
from urllib.parse import unquote
|
||
filename = unquote(filename)
|
||
|
||
import mimetypes
|
||
mime_type, _ = mimetypes.guess_type(filename)
|
||
if not mime_type:
|
||
mime_type = 'application/octet-stream'
|
||
|
||
# Remove old file from collection if updating
|
||
if ai_file_id and ai_status != 'new':
|
||
try:
|
||
await xai.remove_from_collection(collection_id, ai_file_id)
|
||
self._log.info(f" 🗑️ Removed old xAI file {ai_file_id}")
|
||
except Exception:
|
||
pass # Non-fatal - may already be gone
|
||
|
||
# Build metadata fields – werden einmalig beim Upload gesetzt;
|
||
# Custom fields können nachträglich NICHT aktualisiert werden.
|
||
# xAI erlaubt KEINE leeren Strings als Feldwerte → nur befüllte Felder senden.
|
||
fields_raw = {
|
||
'document_name': doc.get('name', filename),
|
||
'description': str(doc.get('beschreibung', '') or ''),
|
||
'advoware_art': str(doc.get('advowareArt', '') or ''),
|
||
'advoware_bemerkung': str(doc.get('advowareBemerkung', '') or ''),
|
||
'espocrm_id': doc['id'],
|
||
'created_at': str(doc.get('createdAt', '') or ''),
|
||
'modified_at': str(doc.get('modifiedAt', '') or ''),
|
||
}
|
||
fields = {k: v for k, v in fields_raw.items() if v}
|
||
|
||
# Single-request upload directly to collection incl. metadata fields
|
||
self._log.info(f" 📤 Uploading '{filename}' ({mime_type}) with metadata...")
|
||
new_xai_file_id = await xai.upload_to_collection(
|
||
collection_id, file_content, filename, mime_type, fields=fields
|
||
)
|
||
self._log.info(f" ✅ Uploaded + metadata set: {new_xai_file_id}")
|
||
|
||
# Update CDokumente with sync result
|
||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiFileId': new_xai_file_id,
|
||
'aiCollectionId': collection_id,
|
||
'aiSyncHash': blake3_hash or doc.get('syncedHash'),
|
||
'aiSyncStatus': 'synced',
|
||
'aiLastSync': now,
|
||
})
|
||
self._log.info(f" ✅ EspoCRM updated")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._log.error(f" ❌ Failed: {e}")
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiSyncStatus': 'failed',
|
||
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
})
|
||
return False
|
||
|
||
async def remove_document_from_xai(
|
||
self,
|
||
doc: Dict[str, Any],
|
||
collection_id: str,
|
||
xai,
|
||
espocrm,
|
||
) -> None:
|
||
"""Remove a CDokumente from its xAI collection (called on DELETE)."""
|
||
doc_id = doc['id']
|
||
ai_file_id = doc.get('aiFileId')
|
||
if not ai_file_id:
|
||
return
|
||
try:
|
||
await xai.remove_from_collection(collection_id, ai_file_id)
|
||
self._log.info(f" 🗑️ Removed {doc.get('name')} from xAI collection")
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiFileId': None,
|
||
'aiSyncStatus': 'new',
|
||
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
})
|
||
except Exception as e:
|
||
self._log.warn(f" ⚠️ Could not remove from xAI: {e}")
|