#!/usr/bin/env python3 """ xAI Collections API Test Script Tests all critical operations for our document sync requirements: 1. File upload and ID behavior (collection-specific vs global?) 2. Same file in multiple collections (shared file_id?) 3. CRUD operations on collections 4. CRUD operations on documents 5. Response structures and metadata 6. Update/versioning behavior Usage: export XAI_API_KEY="xai-..." python test_xai_collections_api.py """ import os import sys import json import asyncio import aiohttp from typing import Optional, Dict, Any, List from datetime import datetime import tempfile # Configuration XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai") XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai") XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload if not XAI_MANAGEMENT_KEY: print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!") print(" export XAI_MANAGEMENT_KEY='xai-token-...'") sys.exit(1) if not XAI_API_KEY: print("❌ ERROR: XAI_API_KEY environment variable not set!") print(" export XAI_API_KEY='xai-...'") sys.exit(1) class Colors: """ANSI color codes for terminal output""" HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def print_header(text: str): print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n") def print_success(text: str): print(f"{Colors.GREEN}✅ {text}{Colors.END}") def print_error(text: str): print(f"{Colors.RED}❌ {text}{Colors.END}") def print_info(text: str): print(f"{Colors.BLUE}ℹ️ {text}{Colors.END}") def print_warning(text: str): print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}") def print_json(data: Any, title: Optional[str] = None): if title: print(f"{Colors.BOLD}{title}:{Colors.END}") print(json.dumps(data, indent=2, ensure_ascii=False)) class XAICollectionsTestClient: """Test client for xAI Collections API""" def __init__(self): self.management_url = XAI_MANAGEMENT_URL self.files_url = XAI_FILES_URL self.management_key = XAI_MANAGEMENT_KEY self.api_key = XAI_API_KEY self.session: Optional[aiohttp.ClientSession] = None # Test state self.created_collections: List[str] = [] self.uploaded_files: List[str] = [] self.test_results: Dict[str, bool] = {} async def __aenter__(self): # Session without default Content-Type (set per-request) self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=30) ) return self async def __aexit__(self, *args): if self.session: await self.session.close() async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]: """Make HTTP request and return (status, response_data)""" base_url = self.files_url if use_files_api else self.management_url url = f"{base_url}{path}" # Set headers per-request if 'headers' not in kwargs: kwargs['headers'] = {} # Set authorization if use_files_api: kwargs['headers']['Authorization'] = f"Bearer {self.api_key}" else: kwargs['headers']['Authorization'] = f"Bearer {self.management_key}" # Set Content-Type for JSON requests if 'json' in kwargs: kwargs['headers']['Content-Type'] = 'application/json' print_info(f"{method} {url}") print_info(f"Headers: {kwargs.get('headers', {})}") try: async with self.session.request(method, url, **kwargs) as response: status = response.status try: data = await response.json() except: text = await response.text() data = {"_raw_text": text} if text else {} if status < 400: print_success(f"Response: {status}") else: print_error(f"Response: {status}") return status, data except Exception as e: print_error(f"Request failed: {e}") return 0, {"error": str(e)} # ======================================================================== # COLLECTION OPERATIONS # ======================================================================== async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]: """POST /v1/collections""" payload = { "collection_name": name, # xAI uses "collection_name" not "name" "metadata": metadata or {} } status, data = await self._request("POST", "/v1/collections", json=payload) if status == 200 or status == 201: # Try different possible field names for collection ID collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId") if collection_id: self.created_collections.append(collection_id) print_success(f"Created collection: {collection_id}") return status, data async def get_collection(self, collection_id: str) -> tuple[int, Any]: """GET /v1/collections/{collection_id}""" return await self._request("GET", f"/v1/collections/{collection_id}") async def list_collections(self) -> tuple[int, Any]: """GET /v1/collections""" return await self._request("GET", "/v1/collections") async def update_collection(self, collection_id: str, name: Optional[str] = None, metadata: Optional[Dict] = None) -> tuple[int, Any]: """PUT /v1/collections/{collection_id}""" payload = {} if name: payload["collection_name"] = name # xAI uses "collection_name" if metadata: payload["metadata"] = metadata return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload) async def delete_collection(self, collection_id: str) -> tuple[int, Any]: """DELETE /v1/collections/{collection_id}""" status, data = await self._request("DELETE", f"/v1/collections/{collection_id}") if status == 200 or status == 204: if collection_id in self.created_collections: self.created_collections.remove(collection_id) return status, data # ======================================================================== # FILE OPERATIONS (multiple upload methods) # ======================================================================== async def upload_file_multipart(self, content: bytes, filename: str, mime_type: str = "text/plain") -> tuple[int, Any]: """ Method 0: Multipart form-data upload (what the server actually expects!) POST /v1/files with multipart/form-data """ print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)") # Create multipart form data form = aiohttp.FormData() form.add_field('file', content, filename=filename, content_type=mime_type) print_info(f"Uploading {len(content)} bytes as multipart/form-data") # Use _request but with form data instead of json base_url = self.files_url url = f"{base_url}/v1/files" headers = { "Authorization": f"Bearer {self.api_key}" # Do NOT set Content-Type - aiohttp will set it with boundary } print_info(f"POST {url}") print_info(f"Headers: {headers}") try: async with self.session.request("POST", url, data=form, headers=headers) as response: status = response.status try: data = await response.json() except: text = await response.text() data = {"_raw_text": text} if text else {} if status < 400: print_success(f"Response: {status}") else: print_error(f"Response: {status}") return status, data except Exception as e: print_error(f"Request failed: {e}") return 0, {"error": str(e)} async def upload_file_direct(self, content: bytes, filename: str, mime_type: str = "text/plain") -> tuple[int, Any]: """ Method 1: Direct upload to xAI Files API POST /v1/files with JSON body containing base64-encoded data """ import base64 print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)") # Encode file content as base64 data_b64 = base64.b64encode(content).decode('ascii') payload = { "name": filename, "content_type": mime_type, "data": data_b64 } print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)") status, data = await self._request( "POST", "/v1/files", use_files_api=True, json=payload ) return status, data async def upload_file_chunked(self, content: bytes, filename: str, mime_type: str = "text/plain") -> tuple[int, Any]: """ Method 2: Initialize + Chunk streaming upload POST /v1/files:initialize → POST /v1/files:uploadChunks """ import base64 print_info("METHOD 2: Initialize + Chunk Streaming") # Step 1: Initialize upload print_info("Step 1: Initialize upload") init_payload = { "name": filename, "content_type": mime_type } status, data = await self._request( "POST", "/v1/files:initialize", use_files_api=True, json=init_payload ) print_json(data, "Initialize Response") if status not in [200, 201]: print_error("Failed to initialize upload") return status, data file_id = data.get("file_id") if not file_id: print_error("No file_id in initialize response") return status, data print_success(f"Initialized upload with file_id: {file_id}") # Step 2: Upload chunks print_info(f"Step 2: Upload {len(content)} bytes in chunks") # Encode content as base64 for chunk upload chunk_b64 = base64.b64encode(content).decode('ascii') chunk_payload = { "file_id": file_id, "chunk": chunk_b64 } status, data = await self._request( "POST", "/v1/files:uploadChunks", use_files_api=True, json=chunk_payload ) print_json(data, "Upload Chunks Response") if status in [200, 201]: print_success(f"Uploaded file chunks: {file_id}") self.uploaded_files.append(file_id) return status, data async def upload_file(self, content: bytes, filename: str, mime_type: str = "text/plain") -> tuple[int, Any]: """ Try multiple upload methods until one succeeds """ print_info("Trying upload methods...") # Try Method 0: Multipart form-data (what the server really wants!) status0, data0 = await self.upload_file_multipart(content, filename, mime_type) if status0 in [200, 201]: file_id = data0.get("id") or data0.get("file_id") # Try both field names if file_id: self.uploaded_files.append(file_id) print_success(f"✅ Multipart upload succeeded: {file_id}") return status0, data0 else: print_error("No 'id' or 'file_id' in response") print_json(data0, "Response data") print_warning(f"Multipart upload failed ({status0}), trying JSON upload...") # Try Method 1: Direct upload with JSON status1, data1 = await self.upload_file_direct(content, filename, mime_type) if status1 in [200, 201]: file_id = data1.get("file_id") if file_id: self.uploaded_files.append(file_id) print_success(f"✅ Direct upload succeeded: {file_id}") return status1, data1 print_warning(f"Direct upload failed ({status1}), trying chunked upload...") # Try Method 2: Initialize + Chunks status2, data2 = await self.upload_file_chunked(content, filename, mime_type) if status2 in [200, 201]: print_success("✅ Chunked upload succeeded") return status2, data2 print_error("❌ All upload methods failed") return status0, data0 # Return multipart method's error # ======================================================================== # COLLECTION DOCUMENT OPERATIONS # ======================================================================== async def add_document_to_collection(self, collection_id: str, file_id: str) -> tuple[int, Any]: """POST /v1/collections/{collection_id}/documents/{file_id}""" return await self._request("POST", f"/v1/collections/{collection_id}/documents/{file_id}") async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]: """GET /v1/collections/{collection_id}/documents""" return await self._request("GET", f"/v1/collections/{collection_id}/documents") async def get_collection_document(self, collection_id: str, file_id: str) -> tuple[int, Any]: """GET /v1/collections/{collection_id}/documents/{file_id}""" return await self._request("GET", f"/v1/collections/{collection_id}/documents/{file_id}") async def update_collection_document(self, collection_id: str, file_id: str, metadata: Dict) -> tuple[int, Any]: """PATCH /v1/collections/{collection_id}/documents/{file_id}""" return await self._request("PATCH", f"/v1/collections/{collection_id}/documents/{file_id}", json={"metadata": metadata}) async def remove_document_from_collection(self, collection_id: str, file_id: str) -> tuple[int, Any]: """DELETE /v1/collections/{collection_id}/documents/{file_id}""" return await self._request("DELETE", f"/v1/collections/{collection_id}/documents/{file_id}") async def batch_get_documents(self, collection_id: str, file_ids: List[str]) -> tuple[int, Any]: """GET /v1/collections/{collection_id}/documents:batchGet""" params = {"fileIds": ",".join(file_ids)} return await self._request("GET", f"/v1/collections/{collection_id}/documents:batchGet", params=params) # ======================================================================== # TEST SCENARIOS # ======================================================================== async def test_basic_collection_crud(self): """Test 1: Basic Collection CRUD operations""" print_header("TEST 1: Basic Collection CRUD") # Create print_info("Creating collection...") status, data = await self.create_collection( name="Test Collection 1", metadata={"test": True, "purpose": "API testing"} ) print_json(data, "Response") if status not in [200, 201]: print_error("Failed to create collection") self.test_results["collection_crud"] = False return None # Try different possible field names for collection ID collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId") if not collection_id: print_error("No collection ID field in response") print_json(data, "Response Data") self.test_results["collection_crud"] = False return None print_success(f"Collection created: {collection_id}") # Read print_info("Reading collection...") status, data = await self.get_collection(collection_id) print_json(data, "Response") # Update print_info("Updating collection...") status, data = await self.update_collection( collection_id, name="Test Collection 1 (Updated)", metadata={"test": True, "updated": True} ) print_json(data, "Response") self.test_results["collection_crud"] = True return collection_id async def test_file_upload_and_structure(self, collection_id: str): """Test 2: File upload (two-step process)""" print_header("TEST 2: File Upload (Two-Step) & Response Structure") # Create test file content test_content = b""" This is a test document for xAI Collections API testing. Topic: German Contract Law Key Points: - Contracts require offer and acceptance - Consideration is necessary - Written form may be required for certain contracts This document contains sufficient content for testing. """ # STEP 1: Upload file to Files API print_info("STEP 1: Uploading file to Files API (api.x.ai)...") status, data = await self.upload_file( content=test_content, filename="test_document.txt", mime_type="text/plain" ) print_json(data, "Files API Upload Response") if status not in [200, 201]: print_error("File upload to Files API failed") self.test_results["file_upload"] = False return None # Try both field names: 'id' (Files API) or 'file_id' (Collections API) file_id = data.get("id") or data.get("file_id") if not file_id: print_error("No 'id' or 'file_id' field in response") print_json(data, "Response for debugging") self.test_results["file_upload"] = False return None print_success(f"File uploaded to Files API: {file_id}") # STEP 2: Add file to collection using Management API print_info("STEP 2: Adding file to collection (management-api.x.ai)...") status2, data2 = await self.add_document_to_collection(collection_id, file_id) print_json(data2, "Add to Collection Response") if status2 not in [200, 201]: print_error("Failed to add file to collection") self.test_results["file_upload"] = False return None print_success(f"File added to collection: {file_id}") self.test_results["file_upload"] = True return file_id async def test_document_in_collection(self, collection_id: str, file_id: str): """Test 3: Verify document is in collection and get details""" print_header("TEST 3: Verify Document in Collection") # Verify by listing documents print_info("Listing collection documents...") status, data = await self.get_collection_documents(collection_id) print_json(data, "Collection Documents") if status not in [200, 201]: print_error("Failed to list documents") self.test_results["add_to_collection"] = False return False # Get specific document print_info("Getting specific document...") status, data = await self.get_collection_document(collection_id, file_id) print_json(data, "Document Details") if status not in [200, 201]: print_error("Failed to get document details") self.test_results["add_to_collection"] = False return False print_success("Document verified in collection") self.test_results["add_to_collection"] = True return True async def test_shared_file_across_collections(self, file_id: str): """Test 4: CRITICAL - Can same file_id be used in multiple collections?""" print_header("TEST 4: Shared File Across Collections (CRITICAL)") # Create second collection print_info("Creating second collection...") status, data = await self.create_collection( name="Test Collection 2", metadata={"test": True, "purpose": "Multi-collection test"} ) if status not in [200, 201]: print_error("Failed to create second collection") self.test_results["shared_file"] = False return collection2_id = data.get("collection_id") or data.get("id") print_success(f"Collection 2 created: {collection2_id}") # Try to add SAME file_id to second collection print_info(f"Adding SAME file_id {file_id} to collection 2...") status, data = await self.add_document_to_collection(collection2_id, file_id) print_json(data, "Response from adding existing file_id to second collection") if status not in [200, 201]: print_error("Failed to add same file to second collection") print_warning("⚠️ Files might be collection-specific (BAD for our use case)") self.test_results["shared_file"] = False return print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!") print_success("✅ This is PERFECT for our architecture!") # Verify both collections have the file print_info("Verifying file in both collections...") status1, data1 = await self.get_collection_documents(self.created_collections[0]) status2, data2 = await self.get_collection_documents(collection2_id) print_json(data1, "Collection 1 Documents") print_json(data2, "Collection 2 Documents") # Extract file_ids from both collections to verify they match docs1 = data1.get("documents", []) docs2 = data2.get("documents", []) file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1] file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2] if file_id in file_ids_1 and file_id in file_ids_2: print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!") print_info(" → We can store ONE xaiFileId per document!") print_info(" → Simply track which collections contain it!") self.test_results["shared_file"] = True async def test_document_update(self, collection_id: str, file_id: str): """Test 5: Update document metadata""" print_header("TEST 5: Update Document Metadata") print_info("Updating document metadata...") status, data = await self.update_collection_document( collection_id, file_id, metadata={"updated_at": datetime.now().isoformat(), "version": 2} ) print_json(data, "Update Response") if status not in [200, 201]: print_error("Failed to update document") self.test_results["document_update"] = False return print_success("Document metadata updated") self.test_results["document_update"] = True async def test_document_removal(self): """Test 6: Remove document from collection""" print_header("TEST 6: Remove Document from Collection") if len(self.created_collections) < 2 or not self.uploaded_files: print_warning("Skipping - need at least 2 collections and 1 file") return collection_id = self.created_collections[0] file_id = self.uploaded_files[0] print_info(f"Removing file {file_id} from collection {collection_id}...") status, data = await self.remove_document_from_collection(collection_id, file_id) print_json(data, "Response") if status not in [200, 204]: print_error("Failed to remove document") self.test_results["document_removal"] = False return print_success("Document removed from collection") # Verify removal print_info("Verifying removal...") status, data = await self.get_collection_documents(collection_id) print_json(data, "Remaining Documents") self.test_results["document_removal"] = True async def test_batch_get(self): """Test 7: Batch get documents""" print_header("TEST 7: Batch Get Documents") if not self.created_collections or not self.uploaded_files: print_warning("Skipping - need collections and files") return collection_id = self.created_collections[-1] # Use last collection file_ids = self.uploaded_files if not file_ids: print_warning("No file IDs to batch get") return print_info(f"Batch getting {len(file_ids)} documents...") status, data = await self.batch_get_documents(collection_id, file_ids) print_json(data, "Batch Response") self.test_results["batch_get"] = status in [200, 201] async def cleanup(self): """Clean up all created test resources""" print_header("CLEANUP: Deleting Test Resources") # Delete collections (should cascade delete documents?) for collection_id in list(self.created_collections): print_info(f"Deleting collection {collection_id}...") await self.delete_collection(collection_id) print_success("Cleanup complete") def print_summary(self): """Print test results summary""" print_header("TEST RESULTS SUMMARY") total = len(self.test_results) passed = sum(1 for v in self.test_results.values() if v) for test_name, result in self.test_results.items(): status = "✅ PASS" if result else "❌ FAIL" print(f"{status} - {test_name}") print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n") # Critical findings print_header("CRITICAL FINDINGS") if "shared_file" in self.test_results: if self.test_results["shared_file"]: print_success("✅ Same file CAN be used in multiple collections") print_info(" → We can use a SINGLE xaiFileId per document!") print_info(" → Much simpler architecture!") else: print_error("❌ Files seem to be collection-specific") print_warning(" → More complex mapping required") print_warning(" → Each collection might need separate file upload") async def main(): """Run all tests""" print_header("xAI Collections API Test Suite") print_info(f"Management URL: {XAI_MANAGEMENT_URL}") print_info(f"Files URL: {XAI_FILES_URL}") print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}") print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}") async with XAICollectionsTestClient() as client: try: # Test 1: Basic Collection CRUD collection_id = await client.test_basic_collection_crud() if not collection_id: print_error("Cannot continue without collection. Stopping.") return # Test 2: File Upload (now two-step process) file_id = await client.test_file_upload_and_structure(collection_id) if not file_id: print_error("File upload failed. Continuing with remaining tests...") else: # Test 3: Verify document in collection await client.test_document_in_collection(collection_id, file_id) # Test 4: CRITICAL - Shared file test await client.test_shared_file_across_collections(file_id) # Test 5: Update document await client.test_document_update(collection_id, file_id) # Test 6: Remove document await client.test_document_removal() # Test 7: Batch get await client.test_batch_get() # Cleanup await client.cleanup() # Print summary client.print_summary() except Exception as e: print_error(f"Test suite failed: {e}") import traceback traceback.print_exc() # Try cleanup anyway try: await client.cleanup() except: pass if __name__ == "__main__": asyncio.run(main())