From bc917bd885cf09c94f84dbb78ebded9669b45086 Mon Sep 17 00:00:00 2001 From: bsiggel Date: Mon, 2 Mar 2026 17:16:07 +0000 Subject: [PATCH] feat(webhooks): add EspoCRM Document Delete and Update webhook handlers with detailed logging - Implemented document delete webhook handler to process delete notifications from EspoCRM. - Added detailed logging for incoming payloads, including headers and entity information. - Extracted and emitted document IDs for further processing. - Implemented document update webhook handler to handle update notifications from EspoCRM. - Enhanced logging for update payloads, including changed fields and metadata. - Both handlers emit events for queue processing to facilitate further actions. test: add comprehensive xAI Collections API test suite - Created a test suite for xAI Collections API covering critical operations. - Included tests for file uploads, CRUD operations on collections and documents, and response structures. - Verified shared file behavior across multiple collections. - Implemented cleanup procedures to remove test resources after execution. --- steps/espocrm_webhooks/__init__.py | 6 + .../document_create_webhook_api_step.py | 198 +++++ .../document_delete_webhook_api_step.py | 174 ++++ .../document_update_webhook_api_step.py | 196 +++++ test_xai_collections_api.py | 788 ++++++++++++++++++ 5 files changed, 1362 insertions(+) create mode 100644 steps/espocrm_webhooks/__init__.py create mode 100644 steps/espocrm_webhooks/document_create_webhook_api_step.py create mode 100644 steps/espocrm_webhooks/document_delete_webhook_api_step.py create mode 100644 steps/espocrm_webhooks/document_update_webhook_api_step.py create mode 100755 test_xai_collections_api.py diff --git a/steps/espocrm_webhooks/__init__.py b/steps/espocrm_webhooks/__init__.py new file mode 100644 index 0000000..39b6101 --- /dev/null +++ b/steps/espocrm_webhooks/__init__.py @@ -0,0 +1,6 @@ +""" +EspoCRM Generic Webhooks + +Empfängt Webhooks von EspoCRM für verschiedene Entities. +Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext. +""" diff --git a/steps/espocrm_webhooks/document_create_webhook_api_step.py b/steps/espocrm_webhooks/document_create_webhook_api_step.py new file mode 100644 index 0000000..705ea2e --- /dev/null +++ b/steps/espocrm_webhooks/document_create_webhook_api_step.py @@ -0,0 +1,198 @@ +"""EspoCRM Webhook - Document Create + +Empfängt Create-Webhooks von EspoCRM für Documents. +Loggt detailliert alle Payload-Informationen für Analyse. +""" +import json +import datetime +from typing import Any +from motia import FlowContext, http, ApiRequest, ApiResponse + + +config = { + "name": "EspoCRM Document Create Webhook", + "description": "Empfängt Create-Webhooks von EspoCRM für Document Entities", + "flows": ["espocrm-documents"], + "triggers": [ + http("POST", "/api/espocrm/document/create") + ], + "enqueues": ["espocrm.document.create"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: + """ + Webhook handler for Document creation in EspoCRM. + + Receives notifications when documents are created and emits queue events + for processing (xAI sync, etc.). + + Payload Analysis Mode: Logs comprehensive details about webhook structure. + """ + try: + payload = request.body or [] + + # ═══════════════════════════════════════════════════════════════ + # DETAILLIERTES LOGGING FÜR ANALYSE + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("=" * 80) + ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN") + ctx.logger.info("=" * 80) + + # Log Request Headers + ctx.logger.info("\n🔍 REQUEST HEADERS:") + if hasattr(request, 'headers'): + for key, value in request.headers.items(): + ctx.logger.info(f" {key}: {value}") + else: + ctx.logger.info(" (keine Headers verfügbar)") + + # Log Payload Type & Structure + ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}") + ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}") + + # Log Full Payload (pretty-printed) + ctx.logger.info("\n📄 FULL PAYLOAD:") + ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False)) + + # ═══════════════════════════════════════════════════════════════ + # PAYLOAD ANALYSE & ID EXTRAKTION + # ═══════════════════════════════════════════════════════════════ + + entity_ids = set() + payload_details = [] + + if isinstance(payload, list): + ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen") + for idx, entity in enumerate(payload): + if isinstance(entity, dict): + entity_id = entity.get('id') + if entity_id: + entity_ids.add(entity_id) + + # Sammle Details für Logging + detail = { + 'index': idx, + 'id': entity_id, + 'name': entity.get('name', 'N/A'), + 'type': entity.get('type', 'N/A'), + 'size': entity.get('size', 'N/A'), + 'all_fields': list(entity.keys()) + } + payload_details.append(detail) + + ctx.logger.info(f"\n 📄 Document #{idx + 1}:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Name: {entity.get('name', 'N/A')}") + ctx.logger.info(f" Type: {entity.get('type', 'N/A')}") + ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes") + ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}") + + # xAI-relevante Felder (falls vorhanden) + xai_fields = {k: v for k, v in entity.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + + # Parent/Relationship Felder + rel_fields = {k: v for k, v in entity.items() + if 'parent' in k.lower() or 'related' in k.lower() or + 'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')} + if rel_fields: + ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}") + + elif isinstance(payload, dict): + ctx.logger.info("\n✅ Payload ist SINGLE DICT") + entity_id = payload.get('id') + if entity_id: + entity_ids.add(entity_id) + + ctx.logger.info(f"\n 📄 Document:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Name: {payload.get('name', 'N/A')}") + ctx.logger.info(f" Type: {payload.get('type', 'N/A')}") + ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes") + ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}") + + # xAI-relevante Felder + xai_fields = {k: v for k, v in payload.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + + # Relationship Felder + rel_fields = {k: v for k, v in payload.items() + if 'parent' in k.lower() or 'related' in k.lower() or + 'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')} + if rel_fields: + ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}") + else: + ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}") + + # ═══════════════════════════════════════════════════════════════ + # QUEUE EVENTS EMITTIEREN + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden") + ctx.logger.info("=" * 80) + + if not entity_ids: + ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!") + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'create', + 'ids_count': 0, + 'warning': 'No document IDs found in payload' + } + ) + + # Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock) + for entity_id in entity_ids: + await ctx.enqueue({ + 'topic': 'espocrm.document.create', + 'data': { + 'entity_id': entity_id, + 'action': 'create', + 'source': 'webhook', + 'timestamp': datetime.datetime.now().isoformat() + } + }) + ctx.logger.info(f"✅ Event emittiert: espocrm.document.create für ID {entity_id}") + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN") + ctx.logger.info("=" * 80) + + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'create', + 'ids_count': len(entity_ids), + 'document_ids': list(entity_ids) + } + ) + + except Exception as e: + ctx.logger.error("=" * 80) + ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Error Type: {type(e).__name__}") + ctx.logger.error(f"Error Message: {str(e)}") + + # Log Stack Trace + import traceback + ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}") + + return ApiResponse( + status=500, + body={ + 'error': 'Internal server error', + 'error_type': type(e).__name__, + 'details': str(e) + } + ) diff --git a/steps/espocrm_webhooks/document_delete_webhook_api_step.py b/steps/espocrm_webhooks/document_delete_webhook_api_step.py new file mode 100644 index 0000000..f9a33f9 --- /dev/null +++ b/steps/espocrm_webhooks/document_delete_webhook_api_step.py @@ -0,0 +1,174 @@ +"""EspoCRM Webhook - Document Delete + +Empfängt Delete-Webhooks von EspoCRM für Documents. +Loggt detailliert alle Payload-Informationen für Analyse. +""" +import json +import datetime +from typing import Any +from motia import FlowContext, http, ApiRequest, ApiResponse + + +config = { + "name": "EspoCRM Document Delete Webhook", + "description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities", + "flows": ["espocrm-documents"], + "triggers": [ + http("POST", "/api/espocrm/document/delete") + ], + "enqueues": ["espocrm.document.delete"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: + """ + Webhook handler for Document deletion in EspoCRM. + + Receives notifications when documents are deleted. + Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten. + """ + try: + payload = request.body or [] + + # ═══════════════════════════════════════════════════════════════ + # DETAILLIERTES LOGGING FÜR ANALYSE + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("=" * 80) + ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN") + ctx.logger.info("=" * 80) + + # Log Request Headers + ctx.logger.info("\n🔍 REQUEST HEADERS:") + if hasattr(request, 'headers'): + for key, value in request.headers.items(): + ctx.logger.info(f" {key}: {value}") + else: + ctx.logger.info(" (keine Headers verfügbar)") + + # Log Payload Type & Structure + ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}") + ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}") + + # Log Full Payload (pretty-printed) + ctx.logger.info("\n📄 FULL PAYLOAD:") + ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False)) + + # ═══════════════════════════════════════════════════════════════ + # PAYLOAD ANALYSE & ID EXTRAKTION + # ═══════════════════════════════════════════════════════════════ + + entity_ids = set() + + if isinstance(payload, list): + ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen") + for idx, entity in enumerate(payload): + if isinstance(entity, dict): + entity_id = entity.get('id') + if entity_id: + entity_ids.add(entity_id) + + ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}") + + # Bei Delete haben wir oft nur minimale Daten + if 'name' in entity: + ctx.logger.info(f" Name: {entity.get('name')}") + if 'deletedAt' in entity or 'deleted' in entity: + ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}") + + # xAI-relevante Felder (falls vorhanden) + xai_fields = {k: v for k, v in entity.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + + elif isinstance(payload, dict): + ctx.logger.info("\n✅ Payload ist SINGLE DICT") + entity_id = payload.get('id') + if entity_id: + entity_ids.add(entity_id) + + ctx.logger.info(f"\n 🗑️ Document:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}") + + if 'name' in payload: + ctx.logger.info(f" Name: {payload.get('name')}") + if 'deletedAt' in payload or 'deleted' in payload: + ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}") + + # xAI-relevante Felder + xai_fields = {k: v for k, v in payload.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + else: + ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}") + + # ═══════════════════════════════════════════════════════════════ + # QUEUE EVENTS EMITTIEREN + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden") + ctx.logger.info("=" * 80) + + if not entity_ids: + ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!") + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'delete', + 'ids_count': 0, + 'warning': 'No document IDs found in payload' + } + ) + + # Emit events für Queue-Processing + for entity_id in entity_ids: + await ctx.enqueue({ + 'topic': 'espocrm.document.delete', + 'data': { + 'entity_id': entity_id, + 'action': 'delete', + 'source': 'webhook', + 'timestamp': datetime.datetime.now().isoformat() + } + }) + ctx.logger.info(f"✅ Event emittiert: espocrm.document.delete für ID {entity_id}") + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN") + ctx.logger.info("=" * 80) + + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'delete', + 'ids_count': len(entity_ids), + 'document_ids': list(entity_ids) + } + ) + + except Exception as e: + ctx.logger.error("=" * 80) + ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Error Type: {type(e).__name__}") + ctx.logger.error(f"Error Message: {str(e)}") + + import traceback + ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}") + + return ApiResponse( + status=500, + body={ + 'error': 'Internal server error', + 'error_type': type(e).__name__, + 'details': str(e) + } + ) diff --git a/steps/espocrm_webhooks/document_update_webhook_api_step.py b/steps/espocrm_webhooks/document_update_webhook_api_step.py new file mode 100644 index 0000000..c4dc91e --- /dev/null +++ b/steps/espocrm_webhooks/document_update_webhook_api_step.py @@ -0,0 +1,196 @@ +"""EspoCRM Webhook - Document Update + +Empfängt Update-Webhooks von EspoCRM für Documents. +Loggt detailliert alle Payload-Informationen für Analyse. +""" +import json +import datetime +from typing import Any +from motia import FlowContext, http, ApiRequest, ApiResponse + + +config = { + "name": "EspoCRM Document Update Webhook", + "description": "Empfängt Update-Webhooks von EspoCRM für Document Entities", + "flows": ["espocrm-documents"], + "triggers": [ + http("POST", "/api/espocrm/document/update") + ], + "enqueues": ["espocrm.document.update"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: + """ + Webhook handler for Document updates in EspoCRM. + + Receives notifications when documents are updated and emits queue events + for processing (xAI sync, etc.). + + Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden. + xAI-Feld-Updates sollten keine neuen Webhooks triggern. + """ + try: + payload = request.body or [] + + # ═══════════════════════════════════════════════════════════════ + # DETAILLIERTES LOGGING FÜR ANALYSE + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("=" * 80) + ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN") + ctx.logger.info("=" * 80) + + # Log Request Headers + ctx.logger.info("\n🔍 REQUEST HEADERS:") + if hasattr(request, 'headers'): + for key, value in request.headers.items(): + ctx.logger.info(f" {key}: {value}") + else: + ctx.logger.info(" (keine Headers verfügbar)") + + # Log Payload Type & Structure + ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}") + ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}") + + # Log Full Payload (pretty-printed) + ctx.logger.info("\n📄 FULL PAYLOAD:") + ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False)) + + # ═══════════════════════════════════════════════════════════════ + # PAYLOAD ANALYSE & ID EXTRAKTION + # ═══════════════════════════════════════════════════════════════ + + entity_ids = set() + + if isinstance(payload, list): + ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen") + for idx, entity in enumerate(payload): + if isinstance(entity, dict): + entity_id = entity.get('id') + if entity_id: + entity_ids.add(entity_id) + + ctx.logger.info(f"\n 📄 Document #{idx + 1}:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Name: {entity.get('name', 'N/A')}") + ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}") + ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}") + ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}") + + # Prüfe ob CHANGED fields mitgeliefert werden + changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields') + if changed_fields: + ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}") + + # xAI-relevante Felder + xai_fields = {k: v for k, v in entity.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + + # Relationship Felder + rel_fields = {k: v for k, v in entity.items() + if 'parent' in k.lower() or 'related' in k.lower() or + 'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')} + if rel_fields: + ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}") + + elif isinstance(payload, dict): + ctx.logger.info("\n✅ Payload ist SINGLE DICT") + entity_id = payload.get('id') + if entity_id: + entity_ids.add(entity_id) + + ctx.logger.info(f"\n 📄 Document:") + ctx.logger.info(f" ID: {entity_id}") + ctx.logger.info(f" Name: {payload.get('name', 'N/A')}") + ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}") + ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}") + ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}") + + # Geänderte Felder + changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields') + if changed_fields: + ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}") + + # xAI-relevante Felder + xai_fields = {k: v for k, v in payload.items() + if 'xai' in k.lower() or 'collection' in k.lower()} + if xai_fields: + ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}") + + # Relationship Felder + rel_fields = {k: v for k, v in payload.items() + if 'parent' in k.lower() or 'related' in k.lower() or + 'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')} + if rel_fields: + ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}") + else: + ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}") + + # ═══════════════════════════════════════════════════════════════ + # QUEUE EVENTS EMITTIEREN + # ═══════════════════════════════════════════════════════════════ + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden") + ctx.logger.info("=" * 80) + + if not entity_ids: + ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!") + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'update', + 'ids_count': 0, + 'warning': 'No document IDs found in payload' + } + ) + + # Emit events für Queue-Processing + for entity_id in entity_ids: + await ctx.enqueue({ + 'topic': 'espocrm.document.update', + 'data': { + 'entity_id': entity_id, + 'action': 'update', + 'source': 'webhook', + 'timestamp': datetime.datetime.now().isoformat() + } + }) + ctx.logger.info(f"✅ Event emittiert: espocrm.document.update für ID {entity_id}") + + ctx.logger.info("\n" + "=" * 80) + ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN") + ctx.logger.info("=" * 80) + + return ApiResponse( + status=200, + body={ + 'status': 'received', + 'action': 'update', + 'ids_count': len(entity_ids), + 'document_ids': list(entity_ids) + } + ) + + except Exception as e: + ctx.logger.error("=" * 80) + ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Error Type: {type(e).__name__}") + ctx.logger.error(f"Error Message: {str(e)}") + + import traceback + ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}") + + return ApiResponse( + status=500, + body={ + 'error': 'Internal server error', + 'error_type': type(e).__name__, + 'details': str(e) + } + ) diff --git a/test_xai_collections_api.py b/test_xai_collections_api.py new file mode 100755 index 0000000..98a6d23 --- /dev/null +++ b/test_xai_collections_api.py @@ -0,0 +1,788 @@ +#!/usr/bin/env python3 +""" +xAI Collections API Test Script + +Tests all critical operations for our document sync requirements: +1. File upload and ID behavior (collection-specific vs global?) +2. Same file in multiple collections (shared file_id?) +3. CRUD operations on collections +4. CRUD operations on documents +5. Response structures and metadata +6. Update/versioning behavior + +Usage: + export XAI_API_KEY="xai-..." + python test_xai_collections_api.py +""" + +import os +import sys +import json +import asyncio +import aiohttp +from typing import Optional, Dict, Any, List +from datetime import datetime +import tempfile + +# Configuration +XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai") +XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai") +XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key +XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload + +if not XAI_MANAGEMENT_KEY: + print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!") + print(" export XAI_MANAGEMENT_KEY='xai-token-...'") + sys.exit(1) + +if not XAI_API_KEY: + print("❌ ERROR: XAI_API_KEY environment variable not set!") + print(" export XAI_API_KEY='xai-...'") + sys.exit(1) + + +class Colors: + """ANSI color codes for terminal output""" + HEADER = '\033[95m' + BLUE = '\033[94m' + CYAN = '\033[96m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + END = '\033[0m' + + +def print_header(text: str): + print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}") + print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}") + print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n") + + +def print_success(text: str): + print(f"{Colors.GREEN}✅ {text}{Colors.END}") + + +def print_error(text: str): + print(f"{Colors.RED}❌ {text}{Colors.END}") + + +def print_info(text: str): + print(f"{Colors.BLUE}ℹ️ {text}{Colors.END}") + + +def print_warning(text: str): + print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}") + + +def print_json(data: Any, title: Optional[str] = None): + if title: + print(f"{Colors.BOLD}{title}:{Colors.END}") + print(json.dumps(data, indent=2, ensure_ascii=False)) + + +class XAICollectionsTestClient: + """Test client for xAI Collections API""" + + def __init__(self): + self.management_url = XAI_MANAGEMENT_URL + self.files_url = XAI_FILES_URL + self.management_key = XAI_MANAGEMENT_KEY + self.api_key = XAI_API_KEY + self.session: Optional[aiohttp.ClientSession] = None + + # Test state + self.created_collections: List[str] = [] + self.uploaded_files: List[str] = [] + self.test_results: Dict[str, bool] = {} + + async def __aenter__(self): + # Session without default Content-Type (set per-request) + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=30) + ) + return self + + async def __aexit__(self, *args): + if self.session: + await self.session.close() + + async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]: + """Make HTTP request and return (status, response_data)""" + base_url = self.files_url if use_files_api else self.management_url + url = f"{base_url}{path}" + + # Set headers per-request + if 'headers' not in kwargs: + kwargs['headers'] = {} + + # Set authorization + if use_files_api: + kwargs['headers']['Authorization'] = f"Bearer {self.api_key}" + else: + kwargs['headers']['Authorization'] = f"Bearer {self.management_key}" + + # Set Content-Type for JSON requests + if 'json' in kwargs: + kwargs['headers']['Content-Type'] = 'application/json' + + print_info(f"{method} {url}") + print_info(f"Headers: {kwargs.get('headers', {})}") + + try: + async with self.session.request(method, url, **kwargs) as response: + status = response.status + + try: + data = await response.json() + except: + text = await response.text() + data = {"_raw_text": text} if text else {} + + if status < 400: + print_success(f"Response: {status}") + else: + print_error(f"Response: {status}") + + return status, data + + except Exception as e: + print_error(f"Request failed: {e}") + return 0, {"error": str(e)} + + # ======================================================================== + # COLLECTION OPERATIONS + # ======================================================================== + + async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]: + """POST /v1/collections""" + payload = { + "collection_name": name, # xAI uses "collection_name" not "name" + "metadata": metadata or {} + } + status, data = await self._request("POST", "/v1/collections", json=payload) + + if status == 200 or status == 201: + # Try different possible field names for collection ID + collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId") + if collection_id: + self.created_collections.append(collection_id) + print_success(f"Created collection: {collection_id}") + + return status, data + + async def get_collection(self, collection_id: str) -> tuple[int, Any]: + """GET /v1/collections/{collection_id}""" + return await self._request("GET", f"/v1/collections/{collection_id}") + + async def list_collections(self) -> tuple[int, Any]: + """GET /v1/collections""" + return await self._request("GET", "/v1/collections") + + async def update_collection(self, collection_id: str, name: Optional[str] = None, + metadata: Optional[Dict] = None) -> tuple[int, Any]: + """PUT /v1/collections/{collection_id}""" + payload = {} + if name: + payload["collection_name"] = name # xAI uses "collection_name" + if metadata: + payload["metadata"] = metadata + + return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload) + + async def delete_collection(self, collection_id: str) -> tuple[int, Any]: + """DELETE /v1/collections/{collection_id}""" + status, data = await self._request("DELETE", f"/v1/collections/{collection_id}") + + if status == 200 or status == 204: + if collection_id in self.created_collections: + self.created_collections.remove(collection_id) + + return status, data + + # ======================================================================== + # FILE OPERATIONS (multiple upload methods) + # ======================================================================== + + async def upload_file_multipart(self, content: bytes, filename: str, + mime_type: str = "text/plain") -> tuple[int, Any]: + """ + Method 0: Multipart form-data upload (what the server actually expects!) + POST /v1/files with multipart/form-data + """ + print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)") + + # Create multipart form data + form = aiohttp.FormData() + form.add_field('file', content, filename=filename, content_type=mime_type) + + print_info(f"Uploading {len(content)} bytes as multipart/form-data") + + # Use _request but with form data instead of json + base_url = self.files_url + url = f"{base_url}/v1/files" + + headers = { + "Authorization": f"Bearer {self.api_key}" + # Do NOT set Content-Type - aiohttp will set it with boundary + } + + print_info(f"POST {url}") + print_info(f"Headers: {headers}") + + try: + async with self.session.request("POST", url, data=form, headers=headers) as response: + status = response.status + + try: + data = await response.json() + except: + text = await response.text() + data = {"_raw_text": text} if text else {} + + if status < 400: + print_success(f"Response: {status}") + else: + print_error(f"Response: {status}") + + return status, data + + except Exception as e: + print_error(f"Request failed: {e}") + return 0, {"error": str(e)} + + async def upload_file_direct(self, content: bytes, filename: str, + mime_type: str = "text/plain") -> tuple[int, Any]: + """ + Method 1: Direct upload to xAI Files API + POST /v1/files with JSON body containing base64-encoded data + """ + import base64 + + print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)") + + # Encode file content as base64 + data_b64 = base64.b64encode(content).decode('ascii') + + payload = { + "name": filename, + "content_type": mime_type, + "data": data_b64 + } + + print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)") + + status, data = await self._request( + "POST", + "/v1/files", + use_files_api=True, + json=payload + ) + + return status, data + + async def upload_file_chunked(self, content: bytes, filename: str, + mime_type: str = "text/plain") -> tuple[int, Any]: + """ + Method 2: Initialize + Chunk streaming upload + POST /v1/files:initialize → POST /v1/files:uploadChunks + """ + import base64 + + print_info("METHOD 2: Initialize + Chunk Streaming") + + # Step 1: Initialize upload + print_info("Step 1: Initialize upload") + init_payload = { + "name": filename, + "content_type": mime_type + } + + status, data = await self._request( + "POST", + "/v1/files:initialize", + use_files_api=True, + json=init_payload + ) + + print_json(data, "Initialize Response") + + if status not in [200, 201]: + print_error("Failed to initialize upload") + return status, data + + file_id = data.get("file_id") + if not file_id: + print_error("No file_id in initialize response") + return status, data + + print_success(f"Initialized upload with file_id: {file_id}") + + # Step 2: Upload chunks + print_info(f"Step 2: Upload {len(content)} bytes in chunks") + + # Encode content as base64 for chunk upload + chunk_b64 = base64.b64encode(content).decode('ascii') + + chunk_payload = { + "file_id": file_id, + "chunk": chunk_b64 + } + + status, data = await self._request( + "POST", + "/v1/files:uploadChunks", + use_files_api=True, + json=chunk_payload + ) + + print_json(data, "Upload Chunks Response") + + if status in [200, 201]: + print_success(f"Uploaded file chunks: {file_id}") + self.uploaded_files.append(file_id) + + return status, data + + async def upload_file(self, content: bytes, filename: str, + mime_type: str = "text/plain") -> tuple[int, Any]: + """ + Try multiple upload methods until one succeeds + """ + print_info("Trying upload methods...") + + # Try Method 0: Multipart form-data (what the server really wants!) + status0, data0 = await self.upload_file_multipart(content, filename, mime_type) + + if status0 in [200, 201]: + file_id = data0.get("id") or data0.get("file_id") # Try both field names + if file_id: + self.uploaded_files.append(file_id) + print_success(f"✅ Multipart upload succeeded: {file_id}") + return status0, data0 + else: + print_error("No 'id' or 'file_id' in response") + print_json(data0, "Response data") + + print_warning(f"Multipart upload failed ({status0}), trying JSON upload...") + + # Try Method 1: Direct upload with JSON + status1, data1 = await self.upload_file_direct(content, filename, mime_type) + + if status1 in [200, 201]: + file_id = data1.get("file_id") + if file_id: + self.uploaded_files.append(file_id) + print_success(f"✅ Direct upload succeeded: {file_id}") + return status1, data1 + + print_warning(f"Direct upload failed ({status1}), trying chunked upload...") + + # Try Method 2: Initialize + Chunks + status2, data2 = await self.upload_file_chunked(content, filename, mime_type) + + if status2 in [200, 201]: + print_success("✅ Chunked upload succeeded") + return status2, data2 + + print_error("❌ All upload methods failed") + return status0, data0 # Return multipart method's error + + # ======================================================================== + # COLLECTION DOCUMENT OPERATIONS + # ======================================================================== + + async def add_document_to_collection(self, collection_id: str, + file_id: str) -> tuple[int, Any]: + """POST /v1/collections/{collection_id}/documents/{file_id}""" + return await self._request("POST", + f"/v1/collections/{collection_id}/documents/{file_id}") + + async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]: + """GET /v1/collections/{collection_id}/documents""" + return await self._request("GET", + f"/v1/collections/{collection_id}/documents") + + async def get_collection_document(self, collection_id: str, + file_id: str) -> tuple[int, Any]: + """GET /v1/collections/{collection_id}/documents/{file_id}""" + return await self._request("GET", + f"/v1/collections/{collection_id}/documents/{file_id}") + + async def update_collection_document(self, collection_id: str, file_id: str, + metadata: Dict) -> tuple[int, Any]: + """PATCH /v1/collections/{collection_id}/documents/{file_id}""" + return await self._request("PATCH", + f"/v1/collections/{collection_id}/documents/{file_id}", + json={"metadata": metadata}) + + async def remove_document_from_collection(self, collection_id: str, + file_id: str) -> tuple[int, Any]: + """DELETE /v1/collections/{collection_id}/documents/{file_id}""" + return await self._request("DELETE", + f"/v1/collections/{collection_id}/documents/{file_id}") + + async def batch_get_documents(self, collection_id: str, + file_ids: List[str]) -> tuple[int, Any]: + """GET /v1/collections/{collection_id}/documents:batchGet""" + params = {"fileIds": ",".join(file_ids)} + return await self._request("GET", + f"/v1/collections/{collection_id}/documents:batchGet", + params=params) + + # ======================================================================== + # TEST SCENARIOS + # ======================================================================== + + async def test_basic_collection_crud(self): + """Test 1: Basic Collection CRUD operations""" + print_header("TEST 1: Basic Collection CRUD") + + # Create + print_info("Creating collection...") + status, data = await self.create_collection( + name="Test Collection 1", + metadata={"test": True, "purpose": "API testing"} + ) + print_json(data, "Response") + + if status not in [200, 201]: + print_error("Failed to create collection") + self.test_results["collection_crud"] = False + return None + + # Try different possible field names for collection ID + collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId") + if not collection_id: + print_error("No collection ID field in response") + print_json(data, "Response Data") + self.test_results["collection_crud"] = False + return None + + print_success(f"Collection created: {collection_id}") + + # Read + print_info("Reading collection...") + status, data = await self.get_collection(collection_id) + print_json(data, "Response") + + # Update + print_info("Updating collection...") + status, data = await self.update_collection( + collection_id, + name="Test Collection 1 (Updated)", + metadata={"test": True, "updated": True} + ) + print_json(data, "Response") + + self.test_results["collection_crud"] = True + return collection_id + + async def test_file_upload_and_structure(self, collection_id: str): + """Test 2: File upload (two-step process)""" + print_header("TEST 2: File Upload (Two-Step) & Response Structure") + + # Create test file content + test_content = b""" + This is a test document for xAI Collections API testing. + + Topic: German Contract Law + + Key Points: + - Contracts require offer and acceptance + - Consideration is necessary + - Written form may be required for certain contracts + + This document contains sufficient content for testing. + """ + + # STEP 1: Upload file to Files API + print_info("STEP 1: Uploading file to Files API (api.x.ai)...") + status, data = await self.upload_file( + content=test_content, + filename="test_document.txt", + mime_type="text/plain" + ) + print_json(data, "Files API Upload Response") + + if status not in [200, 201]: + print_error("File upload to Files API failed") + self.test_results["file_upload"] = False + return None + + # Try both field names: 'id' (Files API) or 'file_id' (Collections API) + file_id = data.get("id") or data.get("file_id") + if not file_id: + print_error("No 'id' or 'file_id' field in response") + print_json(data, "Response for debugging") + self.test_results["file_upload"] = False + return None + + print_success(f"File uploaded to Files API: {file_id}") + + # STEP 2: Add file to collection using Management API + print_info("STEP 2: Adding file to collection (management-api.x.ai)...") + status2, data2 = await self.add_document_to_collection(collection_id, file_id) + print_json(data2, "Add to Collection Response") + + if status2 not in [200, 201]: + print_error("Failed to add file to collection") + self.test_results["file_upload"] = False + return None + + print_success(f"File added to collection: {file_id}") + self.test_results["file_upload"] = True + return file_id + + async def test_document_in_collection(self, collection_id: str, file_id: str): + """Test 3: Verify document is in collection and get details""" + print_header("TEST 3: Verify Document in Collection") + + # Verify by listing documents + print_info("Listing collection documents...") + status, data = await self.get_collection_documents(collection_id) + print_json(data, "Collection Documents") + + if status not in [200, 201]: + print_error("Failed to list documents") + self.test_results["add_to_collection"] = False + return False + + # Get specific document + print_info("Getting specific document...") + status, data = await self.get_collection_document(collection_id, file_id) + print_json(data, "Document Details") + + if status not in [200, 201]: + print_error("Failed to get document details") + self.test_results["add_to_collection"] = False + return False + + print_success("Document verified in collection") + self.test_results["add_to_collection"] = True + return True + + async def test_shared_file_across_collections(self, file_id: str): + """Test 4: CRITICAL - Can same file_id be used in multiple collections?""" + print_header("TEST 4: Shared File Across Collections (CRITICAL)") + + # Create second collection + print_info("Creating second collection...") + status, data = await self.create_collection( + name="Test Collection 2", + metadata={"test": True, "purpose": "Multi-collection test"} + ) + + if status not in [200, 201]: + print_error("Failed to create second collection") + self.test_results["shared_file"] = False + return + + collection2_id = data.get("collection_id") or data.get("id") + print_success(f"Collection 2 created: {collection2_id}") + + # Try to add SAME file_id to second collection + print_info(f"Adding SAME file_id {file_id} to collection 2...") + + status, data = await self.add_document_to_collection(collection2_id, file_id) + print_json(data, "Response from adding existing file_id to second collection") + + if status not in [200, 201]: + print_error("Failed to add same file to second collection") + print_warning("⚠️ Files might be collection-specific (BAD for our use case)") + self.test_results["shared_file"] = False + return + + print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!") + print_success("✅ This is PERFECT for our architecture!") + + # Verify both collections have the file + print_info("Verifying file in both collections...") + + status1, data1 = await self.get_collection_documents(self.created_collections[0]) + status2, data2 = await self.get_collection_documents(collection2_id) + + print_json(data1, "Collection 1 Documents") + print_json(data2, "Collection 2 Documents") + + # Extract file_ids from both collections to verify they match + docs1 = data1.get("documents", []) + docs2 = data2.get("documents", []) + + file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1] + file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2] + + if file_id in file_ids_1 and file_id in file_ids_2: + print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!") + print_info(" → We can store ONE xaiFileId per document!") + print_info(" → Simply track which collections contain it!") + + self.test_results["shared_file"] = True + + async def test_document_update(self, collection_id: str, file_id: str): + """Test 5: Update document metadata""" + print_header("TEST 5: Update Document Metadata") + + print_info("Updating document metadata...") + status, data = await self.update_collection_document( + collection_id, + file_id, + metadata={"updated_at": datetime.now().isoformat(), "version": 2} + ) + print_json(data, "Update Response") + + if status not in [200, 201]: + print_error("Failed to update document") + self.test_results["document_update"] = False + return + + print_success("Document metadata updated") + self.test_results["document_update"] = True + + async def test_document_removal(self): + """Test 6: Remove document from collection""" + print_header("TEST 6: Remove Document from Collection") + + if len(self.created_collections) < 2 or not self.uploaded_files: + print_warning("Skipping - need at least 2 collections and 1 file") + return + + collection_id = self.created_collections[0] + file_id = self.uploaded_files[0] + + print_info(f"Removing file {file_id} from collection {collection_id}...") + status, data = await self.remove_document_from_collection(collection_id, file_id) + print_json(data, "Response") + + if status not in [200, 204]: + print_error("Failed to remove document") + self.test_results["document_removal"] = False + return + + print_success("Document removed from collection") + + # Verify removal + print_info("Verifying removal...") + status, data = await self.get_collection_documents(collection_id) + print_json(data, "Remaining Documents") + + self.test_results["document_removal"] = True + + async def test_batch_get(self): + """Test 7: Batch get documents""" + print_header("TEST 7: Batch Get Documents") + + if not self.created_collections or not self.uploaded_files: + print_warning("Skipping - need collections and files") + return + + collection_id = self.created_collections[-1] # Use last collection + file_ids = self.uploaded_files + + if not file_ids: + print_warning("No file IDs to batch get") + return + + print_info(f"Batch getting {len(file_ids)} documents...") + status, data = await self.batch_get_documents(collection_id, file_ids) + print_json(data, "Batch Response") + + self.test_results["batch_get"] = status in [200, 201] + + async def cleanup(self): + """Clean up all created test resources""" + print_header("CLEANUP: Deleting Test Resources") + + # Delete collections (should cascade delete documents?) + for collection_id in list(self.created_collections): + print_info(f"Deleting collection {collection_id}...") + await self.delete_collection(collection_id) + + print_success("Cleanup complete") + + def print_summary(self): + """Print test results summary""" + print_header("TEST RESULTS SUMMARY") + + total = len(self.test_results) + passed = sum(1 for v in self.test_results.values() if v) + + for test_name, result in self.test_results.items(): + status = "✅ PASS" if result else "❌ FAIL" + print(f"{status} - {test_name}") + + print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n") + + # Critical findings + print_header("CRITICAL FINDINGS") + + if "shared_file" in self.test_results: + if self.test_results["shared_file"]: + print_success("✅ Same file CAN be used in multiple collections") + print_info(" → We can use a SINGLE xaiFileId per document!") + print_info(" → Much simpler architecture!") + else: + print_error("❌ Files seem to be collection-specific") + print_warning(" → More complex mapping required") + print_warning(" → Each collection might need separate file upload") + + +async def main(): + """Run all tests""" + print_header("xAI Collections API Test Suite") + print_info(f"Management URL: {XAI_MANAGEMENT_URL}") + print_info(f"Files URL: {XAI_FILES_URL}") + print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}") + print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}") + + async with XAICollectionsTestClient() as client: + try: + # Test 1: Basic Collection CRUD + collection_id = await client.test_basic_collection_crud() + + if not collection_id: + print_error("Cannot continue without collection. Stopping.") + return + + # Test 2: File Upload (now two-step process) + file_id = await client.test_file_upload_and_structure(collection_id) + + if not file_id: + print_error("File upload failed. Continuing with remaining tests...") + else: + # Test 3: Verify document in collection + await client.test_document_in_collection(collection_id, file_id) + + # Test 4: CRITICAL - Shared file test + await client.test_shared_file_across_collections(file_id) + + # Test 5: Update document + await client.test_document_update(collection_id, file_id) + + # Test 6: Remove document + await client.test_document_removal() + + # Test 7: Batch get + await client.test_batch_get() + + # Cleanup + await client.cleanup() + + # Print summary + client.print_summary() + + except Exception as e: + print_error(f"Test suite failed: {e}") + import traceback + traceback.print_exc() + + # Try cleanup anyway + try: + await client.cleanup() + except: + pass + + +if __name__ == "__main__": + asyncio.run(main())