diff --git a/services/xai_service.py b/services/xai_service.py new file mode 100644 index 0000000..479f9df --- /dev/null +++ b/services/xai_service.py @@ -0,0 +1,177 @@ +"""xAI Files & Collections Service""" +import os +import aiohttp +import logging +from typing import Optional, List + +logger = logging.getLogger(__name__) + +XAI_FILES_URL = "https://api.x.ai" +XAI_MANAGEMENT_URL = "https://management-api.x.ai" + + +class XAIService: + """ + Client für xAI Files API und Collections Management API. + + Benötigte Umgebungsvariablen: + - XAI_API_KEY – regulärer API-Key für File-Uploads (api.x.ai) + - XAI_MANAGEMENT_KEY – Management-API-Key für Collection-Operationen (management-api.x.ai) + """ + + def __init__(self, ctx=None): + self.api_key = os.getenv('XAI_API_KEY', '') + self.management_key = os.getenv('XAI_MANAGEMENT_KEY', '') + self.ctx = ctx + self._session: Optional[aiohttp.ClientSession] = None + + if not self.api_key: + raise ValueError("XAI_API_KEY not configured in environment") + if not self.management_key: + raise ValueError("XAI_MANAGEMENT_KEY not configured in environment") + + def _log(self, msg: str, level: str = 'info') -> None: + if self.ctx: + getattr(self.ctx.logger, level, self.ctx.logger.info)(msg) + else: + getattr(logger, level, logger.info)(msg) + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=120) + ) + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + + async def upload_file( + self, + file_content: bytes, + filename: str, + mime_type: str = 'application/octet-stream' + ) -> str: + """ + Lädt eine Datei zur xAI Files API hoch (multipart/form-data). + + POST https://api.x.ai/v1/files + + Returns: + xAI file_id (str) + + Raises: + RuntimeError: bei HTTP-Fehler oder fehlendem file_id in der Antwort + """ + self._log(f"📤 Uploading {len(file_content)} bytes to xAI: {filename}") + + session = await self._get_session() + url = f"{XAI_FILES_URL}/v1/files" + headers = {"Authorization": f"Bearer {self.api_key}"} + + form = aiohttp.FormData() + form.add_field('file', file_content, filename=filename, content_type=mime_type) + + async with session.post(url, data=form, headers=headers) as response: + try: + data = await response.json() + except Exception: + raw = await response.text() + data = {"_raw": raw} + + if response.status not in (200, 201): + raise RuntimeError( + f"xAI file upload failed ({response.status}): {data}" + ) + + file_id = data.get('id') or data.get('file_id') + if not file_id: + raise RuntimeError( + f"No file_id in xAI upload response: {data}" + ) + + self._log(f"✅ xAI file uploaded: {file_id}") + return file_id + + async def add_to_collection(self, collection_id: str, file_id: str) -> None: + """ + Fügt eine Datei einer xAI-Collection hinzu. + + POST https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"📚 Adding file {file_id} to collection {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = { + "Authorization": f"Bearer {self.management_key}", + "Content-Type": "application/json", + } + + async with session.post(url, headers=headers) as response: + if response.status not in (200, 201): + raw = await response.text() + raise RuntimeError( + f"Failed to add file to collection {collection_id} ({response.status}): {raw}" + ) + + self._log(f"✅ File {file_id} added to collection {collection_id}") + + async def remove_from_collection(self, collection_id: str, file_id: str) -> None: + """ + Entfernt eine Datei aus einer xAI-Collection. + Die Datei selbst wird NICHT gelöscht – sie kann in anderen Collections sein. + + DELETE https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} + + Raises: + RuntimeError: bei HTTP-Fehler + """ + self._log(f"🗑️ Removing file {file_id} from collection {collection_id}") + + session = await self._get_session() + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = {"Authorization": f"Bearer {self.management_key}"} + + async with session.delete(url, headers=headers) as response: + if response.status not in (200, 204): + raw = await response.text() + raise RuntimeError( + f"Failed to remove file from collection {collection_id} ({response.status}): {raw}" + ) + + self._log(f"✅ File {file_id} removed from collection {collection_id}") + + async def add_to_collections(self, collection_ids: List[str], file_id: str) -> List[str]: + """ + Fügt eine Datei zu mehreren Collections hinzu. + + Returns: + Liste der erfolgreich hinzugefügten Collection-IDs + """ + added = [] + for collection_id in collection_ids: + try: + await self.add_to_collection(collection_id, file_id) + added.append(collection_id) + except Exception as e: + self._log( + f"⚠️ Fehler beim Hinzufügen zu Collection {collection_id}: {e}", + level='warn' + ) + return added + + async def remove_from_collections(self, collection_ids: List[str], file_id: str) -> None: + """Entfernt eine Datei aus mehreren Collections (ignoriert Fehler pro Collection).""" + for collection_id in collection_ids: + try: + await self.remove_from_collection(collection_id, file_id) + except Exception as e: + self._log( + f"⚠️ Fehler beim Entfernen aus Collection {collection_id}: {e}", + level='warn' + ) diff --git a/steps/vmh/document_sync_event_step.py b/steps/vmh/document_sync_event_step.py index 80d88ba..6d7a2f3 100644 --- a/steps/vmh/document_sync_event_step.py +++ b/steps/vmh/document_sync_event_step.py @@ -13,6 +13,8 @@ from typing import Dict, Any from motia import FlowContext from services.espocrm import EspoCRMAPI from services.document_sync_utils import DocumentSync +from services.xai_service import XAIService +import hashlib import json import redis import os @@ -65,10 +67,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): # APIs initialisieren espocrm = EspoCRMAPI() sync_utils = DocumentSync(espocrm, redis_client, ctx) - - # TODO: xAI Service wird in nächstem Schritt hinzugefügt - # from services.xai_service import XAIService - # xai_service = XAIService(ctx) + xai_service = XAIService(ctx) try: # 1. ACQUIRE LOCK (verhindert parallele Syncs) @@ -98,10 +97,10 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): # 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION if action == 'delete': - await handle_delete(entity_id, document, sync_utils, ctx, entity_type) + await handle_delete(entity_id, document, sync_utils, xai_service, ctx, entity_type) elif action in ['create', 'update']: - await handle_create_or_update(entity_id, document, sync_utils, ctx, entity_type) + await handle_create_or_update(entity_id, document, sync_utils, xai_service, ctx, entity_type) else: ctx.logger.warn(f"⚠️ Unbekannte Action: {action}") @@ -138,7 +137,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): ctx.logger.error(traceback.format_exc()) -async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): +async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): """ Behandelt Create/Update von Documents @@ -257,62 +256,59 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync # ═══════════════════════════════════════════════════════════════ # xAI SYNC DURCHFÜHREN # ═══════════════════════════════════════════════════════════════ - + ctx.logger.info("") ctx.logger.info("=" * 80) ctx.logger.info("🤖 xAI SYNC STARTEN") ctx.logger.info("=" * 80) - - # TODO: Implementierung mit xai_service.py - ctx.logger.warn("⚠️ xAI Service noch nicht implementiert!") - ctx.logger.info("") - ctx.logger.info("TODO: Folgende Schritte werden implementiert:") - ctx.logger.info("1. 📥 Download File von EspoCRM") - ctx.logger.info("2. 📤 Upload zu xAI (falls noch kein xaiFileId)") - ctx.logger.info("3. 📚 Add zu Collections") - ctx.logger.info("4. 💾 Update EspoCRM: xaiFileId + xaiCollections") - ctx.logger.info("") - - # PLACEHOLDER Implementation: - # - # # 1. Download File von EspoCRM - # download_info = await sync_utils.get_document_download_info(entity_id) - # if not download_info: - # raise Exception("Konnte Download-Info nicht ermitteln") - # - # # 2. Upload zu xAI (falls noch nicht vorhanden) - # xai_file_id = document.get('xaiFileId') - # if not xai_file_id: - # file_content = await download_file_from_espocrm(download_info) - # xai_file_id = await xai_service.upload_file(file_content, download_info['filename']) - # - # # 3. Add zu Collections - # for collection_id in collection_ids: - # await xai_service.add_file_to_collection(collection_id, xai_file_id) - # - # # 4. Update EspoCRM - # await sync_utils.release_sync_lock( - # entity_id, - # success=True, - # extra_fields={ - # 'xaiFileId': xai_file_id, - # 'xaiCollections': collection_ids - # } - # ) - - # Für jetzt: Success ohne Sync + + # 1. Hole Download-Informationen (falls nicht schon aus Preview-Schritt vorhanden) + download_info = await sync_utils.get_document_download_info(entity_id, entity_type) + if not download_info: + raise Exception("Konnte Download-Info nicht ermitteln – Datei fehlt?") + + ctx.logger.info(f"📥 Datei: {download_info['filename']} ({download_info['size']} bytes, {download_info['mime_type']})") + + # 2. Download Datei von EspoCRM + espocrm = sync_utils.espocrm + file_content = await espocrm.download_attachment(download_info['attachment_id']) + ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes") + + # 3. MD5-Hash berechnen für Change-Detection + file_hash = hashlib.md5(file_content).hexdigest() + ctx.logger.info(f"🔑 MD5: {file_hash}") + + # 4. Upload zu xAI + # Immer neu hochladen wenn needs_sync=True (neues File oder Hash geändert) + ctx.logger.info("📤 Uploading to xAI...") + xai_file_id = await xai_service.upload_file( + file_content, + download_info['filename'], + download_info['mime_type'] + ) + ctx.logger.info(f"✅ xAI file_id: {xai_file_id}") + + # 5. Zu allen Ziel-Collections hinzufügen + ctx.logger.info(f"📚 Füge zu {len(collection_ids)} Collection(s) hinzu...") + added_collections = await xai_service.add_to_collections(collection_ids, xai_file_id) + ctx.logger.info(f"✅ In {len(added_collections)}/{len(collection_ids)} Collections eingetragen") + + # 6. EspoCRM Metadaten aktualisieren und Lock freigeben + await sync_utils.update_sync_metadata( + entity_id, + xai_file_id=xai_file_id, + collection_ids=added_collections, + file_hash=file_hash, + entity_type=entity_type + ) await sync_utils.release_sync_lock( entity_id, success=True, - extra_fields={ - # TODO: Echte xAI-Daten hier einsetzen - # 'xaiFileId': xai_file_id, - # 'xaiCollections': collection_ids - } + entity_type=entity_type ) - + ctx.logger.info("=" * 80) - ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN (PLACEHOLDER)") + ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN") ctx.logger.info("=" * 80) except Exception as e: @@ -322,7 +318,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) -async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): +async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): """ Behandelt Delete von Documents @@ -346,25 +342,15 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do ctx.logger.info(f" xaiFileId: {xai_file_id}") ctx.logger.info(f" Collections: {xai_collections}") - # TODO: Implementierung mit xai_service - ctx.logger.warn("⚠️ xAI Delete-Operation noch nicht implementiert!") - ctx.logger.info("") - ctx.logger.info("TODO: Folgende Schritte werden implementiert:") - ctx.logger.info("1. 🗑️ Remove File aus allen Collections") - ctx.logger.info("2. ⚠️ File NICHT von xAI löschen (kann in anderen Collections sein)") - ctx.logger.info("") - - # PLACEHOLDER Implementation: - # - # for collection_id in xai_collections: - # await xai_service.remove_file_from_collection(collection_id, xai_file_id) - # - # ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt") - + ctx.logger.info(f"🗑️ Entferne aus {len(xai_collections)} Collection(s)...") + await xai_service.remove_from_collections(xai_collections, xai_file_id) + ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt") + ctx.logger.info(" (File selbst bleibt in xAI – kann in anderen Collections sein)") + await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type) - + ctx.logger.info("=" * 80) - ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)") + ctx.logger.info("✅ DELETE ABGESCHLOSSEN") ctx.logger.info("=" * 80) except Exception as e: