feat(sync): Implement orphan cleanup for xAI documents without EspoCRM equivalents
This commit is contained in:
@@ -85,9 +85,7 @@ class XAIService:
|
||||
filename=filename,
|
||||
content_type=mime_type
|
||||
)
|
||||
# CRITICAL: purpose="file_search" enables proper PDF processing
|
||||
# Without this, xAI throws "internal error" on complex PDFs
|
||||
form.add_field('purpose', 'file_search')
|
||||
form.add_field('purpose', 'assistants')
|
||||
|
||||
async with session.post(url, data=form, headers=headers) as response:
|
||||
try:
|
||||
@@ -134,6 +132,85 @@ class XAIService:
|
||||
|
||||
self._log(f"✅ File {file_id} added to collection {collection_id}")
|
||||
|
||||
async def upload_to_collection(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_content: bytes,
|
||||
filename: str,
|
||||
mime_type: str = 'application/octet-stream',
|
||||
fields: Optional[Dict[str, str]] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Lädt eine Datei direkt in eine xAI-Collection hoch (ein Request, inkl. Metadata).
|
||||
|
||||
POST https://management-api.x.ai/v1/collections/{collection_id}/documents
|
||||
Content-Type: multipart/form-data
|
||||
|
||||
Args:
|
||||
collection_id: Ziel-Collection
|
||||
file_content: Dateiinhalt als Bytes
|
||||
filename: Dateiname (inkl. Endung)
|
||||
mime_type: MIME-Type
|
||||
fields: Custom Metadaten-Felder (entsprechen den field_definitions)
|
||||
|
||||
Returns:
|
||||
xAI file_id (str)
|
||||
|
||||
Raises:
|
||||
RuntimeError: bei HTTP-Fehler oder fehlendem file_id in der Antwort
|
||||
"""
|
||||
import json as _json
|
||||
|
||||
if mime_type == 'application/octet-stream' and filename.lower().endswith('.pdf'):
|
||||
mime_type = 'application/pdf'
|
||||
|
||||
self._log(
|
||||
f"📤 Uploading {len(file_content)} bytes to collection {collection_id}: "
|
||||
f"{filename} ({mime_type})"
|
||||
)
|
||||
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents"
|
||||
headers = {"Authorization": f"Bearer {self.management_key}"}
|
||||
|
||||
form = aiohttp.FormData(quote_fields=False)
|
||||
form.add_field('name', filename)
|
||||
form.add_field(
|
||||
'data',
|
||||
file_content,
|
||||
filename=filename,
|
||||
content_type=mime_type,
|
||||
)
|
||||
form.add_field('content_type', mime_type)
|
||||
if fields:
|
||||
form.add_field('fields', _json.dumps(fields))
|
||||
|
||||
async with session.post(url, data=form, headers=headers) as response:
|
||||
try:
|
||||
data = await response.json()
|
||||
except Exception:
|
||||
raw = await response.text()
|
||||
data = {"_raw": raw}
|
||||
|
||||
if response.status not in (200, 201):
|
||||
raise RuntimeError(
|
||||
f"upload_to_collection failed ({response.status}): {data}"
|
||||
)
|
||||
|
||||
# Response may nest the file_id in different places
|
||||
file_id = (
|
||||
data.get('file_id')
|
||||
or (data.get('file_metadata') or {}).get('file_id')
|
||||
or data.get('id')
|
||||
)
|
||||
if not file_id:
|
||||
raise RuntimeError(
|
||||
f"No file_id in upload_to_collection response: {data}"
|
||||
)
|
||||
|
||||
self._log(f"✅ Uploaded to collection {collection_id}: {file_id}")
|
||||
return file_id
|
||||
|
||||
async def remove_from_collection(self, collection_id: str, file_id: str) -> None:
|
||||
"""
|
||||
Entfernt eine Datei aus einer xAI-Collection.
|
||||
@@ -194,7 +271,6 @@ class XAIService:
|
||||
async def create_collection(
|
||||
self,
|
||||
name: str,
|
||||
metadata: Optional[Dict[str, str]] = None,
|
||||
field_definitions: Optional[List[Dict]] = None
|
||||
) -> Dict:
|
||||
"""
|
||||
@@ -204,7 +280,6 @@ class XAIService:
|
||||
|
||||
Args:
|
||||
name: Collection name
|
||||
metadata: Optional metadata dict
|
||||
field_definitions: Optional field definitions for metadata fields
|
||||
|
||||
Returns:
|
||||
@@ -239,10 +314,6 @@ class XAIService:
|
||||
"field_definitions": field_definitions
|
||||
}
|
||||
|
||||
# Add metadata if provided
|
||||
if metadata:
|
||||
body["metadata"] = metadata
|
||||
|
||||
async with session.post(url, json=body, headers=headers) as response:
|
||||
if response.status not in (200, 201):
|
||||
raw = await response.text()
|
||||
@@ -435,44 +506,45 @@ class XAIService:
|
||||
self._log(f"✅ Document info retrieved: {normalized.get('filename', 'N/A')}")
|
||||
return normalized
|
||||
|
||||
async def update_document_metadata(
|
||||
async def rename_file(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_id: str,
|
||||
metadata: Dict[str, str]
|
||||
new_filename: str,
|
||||
) -> None:
|
||||
"""
|
||||
Aktualisiert nur Metadaten eines Documents (kein File-Upload).
|
||||
Benennt eine Datei auf Files-API-Ebene um (kein Re-Upload).
|
||||
|
||||
PATCH https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id}
|
||||
PUT https://api.x.ai/v1/files/{file_id}
|
||||
|
||||
Laut xAI-Dokumentation können über diesen Endpunkt Dateiname und
|
||||
content_type geändert werden – keine custom metadata-Felder.
|
||||
|
||||
Args:
|
||||
collection_id: XAI Collection ID
|
||||
file_id: XAI file_id
|
||||
metadata: Updated metadata fields
|
||||
file_id: xAI file_id
|
||||
new_filename: Neuer Dateiname
|
||||
|
||||
Raises:
|
||||
RuntimeError: bei HTTP-Fehler
|
||||
"""
|
||||
self._log(f"📝 Updating metadata for document {file_id}")
|
||||
self._log(f"✏️ Renaming file {file_id} → {new_filename}")
|
||||
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}"
|
||||
url = f"{XAI_FILES_URL}/v1/files/{file_id}"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.management_key}",
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
body = {"fields": metadata}
|
||||
body = {"filename": new_filename}
|
||||
|
||||
async with session.patch(url, json=body, headers=headers) as response:
|
||||
async with session.put(url, json=body, headers=headers) as response:
|
||||
if response.status not in (200, 204):
|
||||
raw = await response.text()
|
||||
raise RuntimeError(
|
||||
f"Failed to update document metadata ({response.status}): {raw}"
|
||||
f"Failed to rename file {file_id} ({response.status}): {raw}"
|
||||
)
|
||||
|
||||
self._log(f"✅ Metadata updated for {file_id}")
|
||||
self._log(f"✅ File renamed: {file_id} → {new_filename}")
|
||||
|
||||
def is_mime_type_supported(self, mime_type: str) -> bool:
|
||||
"""
|
||||
|
||||
@@ -60,12 +60,6 @@ class XAIUploadUtils:
|
||||
self._log.info(f"Creating xAI collection for '{akte_name}'...")
|
||||
col = await xai.create_collection(
|
||||
name=akte_name,
|
||||
metadata={
|
||||
'espocrm_entity_type': 'CAkten',
|
||||
'espocrm_entity_id': akte_id,
|
||||
'aktenzeichen': str(akte.get('aktennummer', '')),
|
||||
'rubrum': str(akte.get('rubrum', '') or ''),
|
||||
}
|
||||
)
|
||||
collection_id = col.get('collection_id') or col.get('id')
|
||||
self._log.info(f"✅ Collection created: {collection_id}")
|
||||
@@ -110,9 +104,20 @@ class XAIUploadUtils:
|
||||
self._log.info(f" 📄 {doc_name}")
|
||||
self._log.info(f" aiSyncStatus={ai_status}, aiSyncHash={ai_sync_hash[:12] if ai_sync_hash else 'N/A'}..., blake3={blake3_hash[:12] if blake3_hash else 'N/A'}...")
|
||||
|
||||
# Skip if already synced and hash matches
|
||||
# File content unchanged (hash match) → kein Re-Upload nötig
|
||||
if ai_status == 'synced' and ai_sync_hash and blake3_hash and ai_sync_hash == blake3_hash:
|
||||
self._log.info(f" ⏭️ Skipped (hash match, no change)")
|
||||
if ai_file_id:
|
||||
# Custom metadata (fields) können nach dem Upload nicht mehr geändert werden.
|
||||
# Nur Dateiname ist über PUT /v1/files/{id} änderbar.
|
||||
current_name = doc.get('dokumentName') or doc.get('name', '')
|
||||
if current_name and ai_file_id:
|
||||
try:
|
||||
await xai.rename_file(ai_file_id, current_name)
|
||||
except Exception as e:
|
||||
self._log.warn(f" ⚠️ Rename fehlgeschlagen (non-fatal): {e}")
|
||||
self._log.info(f" ✅ Unverändert – kein Re-Upload (hash match)")
|
||||
else:
|
||||
self._log.info(f" ⏭️ Skipped (hash match, kein aiFileId)")
|
||||
return True
|
||||
|
||||
# Get attachment info
|
||||
@@ -149,27 +154,24 @@ class XAIUploadUtils:
|
||||
except Exception:
|
||||
pass # Non-fatal - may already be gone
|
||||
|
||||
# Upload to xAI
|
||||
self._log.info(f" 📤 Uploading '{filename}' ({mime_type})...")
|
||||
new_xai_file_id = await xai.upload_file(file_content, filename, mime_type)
|
||||
self._log.info(f" Uploaded: xai_file_id={new_xai_file_id}")
|
||||
# Build metadata fields – werden einmalig beim Upload gesetzt;
|
||||
# Custom fields können nachträglich NICHT aktualisiert werden.
|
||||
fields = {
|
||||
'document_name': doc.get('name', filename),
|
||||
'description': str(doc.get('beschreibung', '') or ''),
|
||||
'advoware_art': str(doc.get('advowareArt', '') or ''),
|
||||
'advoware_bemerkung': str(doc.get('advowareBemerkung', '') or ''),
|
||||
'espocrm_id': doc['id'],
|
||||
'created_at': str(doc.get('createdAt', '') or ''),
|
||||
'modified_at': str(doc.get('modifiedAt', '') or ''),
|
||||
}
|
||||
|
||||
# Add to collection
|
||||
await xai.add_to_collection(collection_id, new_xai_file_id)
|
||||
self._log.info(f" ✅ Added to collection {collection_id}")
|
||||
|
||||
# Set document metadata (injected into chunks for better AI context)
|
||||
try:
|
||||
await xai.update_document_metadata(collection_id, new_xai_file_id, {
|
||||
'document_name': doc.get('name', filename),
|
||||
'description': str(doc.get('beschreibung', '') or ''),
|
||||
'advoware_art': str(doc.get('advowareArt', '') or ''),
|
||||
'advoware_bemerkung': str(doc.get('advowareBemerkung', '') or ''),
|
||||
'espocrm_id': doc['id'],
|
||||
})
|
||||
self._log.info(f" ✅ Dokument-Metadaten gesetzt")
|
||||
except Exception as meta_err:
|
||||
self._log.warn(f" ⚠️ Metadaten-Update fehlgeschlagen (non-fatal): {meta_err}")
|
||||
# Single-request upload directly to collection incl. metadata fields
|
||||
self._log.info(f" 📤 Uploading '{filename}' ({mime_type}) with metadata...")
|
||||
new_xai_file_id = await xai.upload_to_collection(
|
||||
collection_id, file_content, filename, mime_type, fields=fields
|
||||
)
|
||||
self._log.info(f" ✅ Uploaded + metadata set: {new_xai_file_id}")
|
||||
|
||||
# Update CDokumente with sync result
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
@@ -438,6 +438,22 @@ async def _run_xai_sync(
|
||||
|
||||
ctx.logger.info(f" Documents to check: {len(docs)}")
|
||||
|
||||
# ── Orphan-Cleanup: xAI-Docs löschen die kein EspoCRM-Äquivalent haben ──
|
||||
known_xai_file_ids = {doc.get('aiFileId') for doc in docs if doc.get('aiFileId')}
|
||||
try:
|
||||
xai_docs = await xai.list_collection_documents(collection_id)
|
||||
orphans = [d for d in xai_docs if d.get('file_id') not in known_xai_file_ids]
|
||||
if orphans:
|
||||
ctx.logger.info(f" 🗑️ Orphan-Cleanup: {len(orphans)} Doc(s) in xAI ohne EspoCRM-Eintrag")
|
||||
for orphan in orphans:
|
||||
try:
|
||||
await xai.remove_from_collection(collection_id, orphan['file_id'])
|
||||
ctx.logger.info(f" Gelöscht: {orphan.get('filename', orphan['file_id'])}")
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" Orphan-Delete fehlgeschlagen: {e}")
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Orphan-Cleanup fehlgeschlagen (non-fatal): {e}")
|
||||
|
||||
synced = 0
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
Reference in New Issue
Block a user