- Implemented `test_thumbnail_generation.py` to validate the complete flow of document thumbnail generation in EspoCRM, including document creation, file upload, webhook triggering, and preview verification. - Created `test_xai_collections_api.py` to test critical operations of the xAI Collections API, covering file uploads, collection CRUD operations, document management, and response validation. - Both scripts include detailed logging for success and error states, ensuring robust testing and easier debugging.
789 lines
30 KiB
Python
Executable File
789 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
xAI Collections API Test Script
|
||
|
||
Tests all critical operations for our document sync requirements:
|
||
1. File upload and ID behavior (collection-specific vs global?)
|
||
2. Same file in multiple collections (shared file_id?)
|
||
3. CRUD operations on collections
|
||
4. CRUD operations on documents
|
||
5. Response structures and metadata
|
||
6. Update/versioning behavior
|
||
|
||
Usage:
|
||
export XAI_API_KEY="xai-..."
|
||
python test_xai_collections_api.py
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import asyncio
|
||
import aiohttp
|
||
from typing import Optional, Dict, Any, List
|
||
from datetime import datetime
|
||
import tempfile
|
||
|
||
# Configuration
|
||
XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai")
|
||
XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai")
|
||
XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key
|
||
XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload
|
||
|
||
if not XAI_MANAGEMENT_KEY:
|
||
print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!")
|
||
print(" export XAI_MANAGEMENT_KEY='xai-token-...'")
|
||
sys.exit(1)
|
||
|
||
if not XAI_API_KEY:
|
||
print("❌ ERROR: XAI_API_KEY environment variable not set!")
|
||
print(" export XAI_API_KEY='xai-...'")
|
||
sys.exit(1)
|
||
|
||
|
||
class Colors:
|
||
"""ANSI color codes for terminal output"""
|
||
HEADER = '\033[95m'
|
||
BLUE = '\033[94m'
|
||
CYAN = '\033[96m'
|
||
GREEN = '\033[92m'
|
||
YELLOW = '\033[93m'
|
||
RED = '\033[91m'
|
||
BOLD = '\033[1m'
|
||
UNDERLINE = '\033[4m'
|
||
END = '\033[0m'
|
||
|
||
|
||
def print_header(text: str):
|
||
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n")
|
||
|
||
|
||
def print_success(text: str):
|
||
print(f"{Colors.GREEN}✅ {text}{Colors.END}")
|
||
|
||
|
||
def print_error(text: str):
|
||
print(f"{Colors.RED}❌ {text}{Colors.END}")
|
||
|
||
|
||
def print_info(text: str):
|
||
print(f"{Colors.BLUE}ℹ️ {text}{Colors.END}")
|
||
|
||
|
||
def print_warning(text: str):
|
||
print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}")
|
||
|
||
|
||
def print_json(data: Any, title: Optional[str] = None):
|
||
if title:
|
||
print(f"{Colors.BOLD}{title}:{Colors.END}")
|
||
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||
|
||
|
||
class XAICollectionsTestClient:
|
||
"""Test client for xAI Collections API"""
|
||
|
||
def __init__(self):
|
||
self.management_url = XAI_MANAGEMENT_URL
|
||
self.files_url = XAI_FILES_URL
|
||
self.management_key = XAI_MANAGEMENT_KEY
|
||
self.api_key = XAI_API_KEY
|
||
self.session: Optional[aiohttp.ClientSession] = None
|
||
|
||
# Test state
|
||
self.created_collections: List[str] = []
|
||
self.uploaded_files: List[str] = []
|
||
self.test_results: Dict[str, bool] = {}
|
||
|
||
async def __aenter__(self):
|
||
# Session without default Content-Type (set per-request)
|
||
self.session = aiohttp.ClientSession(
|
||
timeout=aiohttp.ClientTimeout(total=30)
|
||
)
|
||
return self
|
||
|
||
async def __aexit__(self, *args):
|
||
if self.session:
|
||
await self.session.close()
|
||
|
||
async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]:
|
||
"""Make HTTP request and return (status, response_data)"""
|
||
base_url = self.files_url if use_files_api else self.management_url
|
||
url = f"{base_url}{path}"
|
||
|
||
# Set headers per-request
|
||
if 'headers' not in kwargs:
|
||
kwargs['headers'] = {}
|
||
|
||
# Set authorization
|
||
if use_files_api:
|
||
kwargs['headers']['Authorization'] = f"Bearer {self.api_key}"
|
||
else:
|
||
kwargs['headers']['Authorization'] = f"Bearer {self.management_key}"
|
||
|
||
# Set Content-Type for JSON requests
|
||
if 'json' in kwargs:
|
||
kwargs['headers']['Content-Type'] = 'application/json'
|
||
|
||
print_info(f"{method} {url}")
|
||
print_info(f"Headers: {kwargs.get('headers', {})}")
|
||
|
||
try:
|
||
async with self.session.request(method, url, **kwargs) as response:
|
||
status = response.status
|
||
|
||
try:
|
||
data = await response.json()
|
||
except:
|
||
text = await response.text()
|
||
data = {"_raw_text": text} if text else {}
|
||
|
||
if status < 400:
|
||
print_success(f"Response: {status}")
|
||
else:
|
||
print_error(f"Response: {status}")
|
||
|
||
return status, data
|
||
|
||
except Exception as e:
|
||
print_error(f"Request failed: {e}")
|
||
return 0, {"error": str(e)}
|
||
|
||
# ========================================================================
|
||
# COLLECTION OPERATIONS
|
||
# ========================================================================
|
||
|
||
async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]:
|
||
"""POST /v1/collections"""
|
||
payload = {
|
||
"collection_name": name, # xAI uses "collection_name" not "name"
|
||
"metadata": metadata or {}
|
||
}
|
||
status, data = await self._request("POST", "/v1/collections", json=payload)
|
||
|
||
if status == 200 or status == 201:
|
||
# Try different possible field names for collection ID
|
||
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
|
||
if collection_id:
|
||
self.created_collections.append(collection_id)
|
||
print_success(f"Created collection: {collection_id}")
|
||
|
||
return status, data
|
||
|
||
async def get_collection(self, collection_id: str) -> tuple[int, Any]:
|
||
"""GET /v1/collections/{collection_id}"""
|
||
return await self._request("GET", f"/v1/collections/{collection_id}")
|
||
|
||
async def list_collections(self) -> tuple[int, Any]:
|
||
"""GET /v1/collections"""
|
||
return await self._request("GET", "/v1/collections")
|
||
|
||
async def update_collection(self, collection_id: str, name: Optional[str] = None,
|
||
metadata: Optional[Dict] = None) -> tuple[int, Any]:
|
||
"""PUT /v1/collections/{collection_id}"""
|
||
payload = {}
|
||
if name:
|
||
payload["collection_name"] = name # xAI uses "collection_name"
|
||
if metadata:
|
||
payload["metadata"] = metadata
|
||
|
||
return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload)
|
||
|
||
async def delete_collection(self, collection_id: str) -> tuple[int, Any]:
|
||
"""DELETE /v1/collections/{collection_id}"""
|
||
status, data = await self._request("DELETE", f"/v1/collections/{collection_id}")
|
||
|
||
if status == 200 or status == 204:
|
||
if collection_id in self.created_collections:
|
||
self.created_collections.remove(collection_id)
|
||
|
||
return status, data
|
||
|
||
# ========================================================================
|
||
# FILE OPERATIONS (multiple upload methods)
|
||
# ========================================================================
|
||
|
||
async def upload_file_multipart(self, content: bytes, filename: str,
|
||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||
"""
|
||
Method 0: Multipart form-data upload (what the server actually expects!)
|
||
POST /v1/files with multipart/form-data
|
||
"""
|
||
print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)")
|
||
|
||
# Create multipart form data
|
||
form = aiohttp.FormData()
|
||
form.add_field('file', content, filename=filename, content_type=mime_type)
|
||
|
||
print_info(f"Uploading {len(content)} bytes as multipart/form-data")
|
||
|
||
# Use _request but with form data instead of json
|
||
base_url = self.files_url
|
||
url = f"{base_url}/v1/files"
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {self.api_key}"
|
||
# Do NOT set Content-Type - aiohttp will set it with boundary
|
||
}
|
||
|
||
print_info(f"POST {url}")
|
||
print_info(f"Headers: {headers}")
|
||
|
||
try:
|
||
async with self.session.request("POST", url, data=form, headers=headers) as response:
|
||
status = response.status
|
||
|
||
try:
|
||
data = await response.json()
|
||
except:
|
||
text = await response.text()
|
||
data = {"_raw_text": text} if text else {}
|
||
|
||
if status < 400:
|
||
print_success(f"Response: {status}")
|
||
else:
|
||
print_error(f"Response: {status}")
|
||
|
||
return status, data
|
||
|
||
except Exception as e:
|
||
print_error(f"Request failed: {e}")
|
||
return 0, {"error": str(e)}
|
||
|
||
async def upload_file_direct(self, content: bytes, filename: str,
|
||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||
"""
|
||
Method 1: Direct upload to xAI Files API
|
||
POST /v1/files with JSON body containing base64-encoded data
|
||
"""
|
||
import base64
|
||
|
||
print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)")
|
||
|
||
# Encode file content as base64
|
||
data_b64 = base64.b64encode(content).decode('ascii')
|
||
|
||
payload = {
|
||
"name": filename,
|
||
"content_type": mime_type,
|
||
"data": data_b64
|
||
}
|
||
|
||
print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)")
|
||
|
||
status, data = await self._request(
|
||
"POST",
|
||
"/v1/files",
|
||
use_files_api=True,
|
||
json=payload
|
||
)
|
||
|
||
return status, data
|
||
|
||
async def upload_file_chunked(self, content: bytes, filename: str,
|
||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||
"""
|
||
Method 2: Initialize + Chunk streaming upload
|
||
POST /v1/files:initialize → POST /v1/files:uploadChunks
|
||
"""
|
||
import base64
|
||
|
||
print_info("METHOD 2: Initialize + Chunk Streaming")
|
||
|
||
# Step 1: Initialize upload
|
||
print_info("Step 1: Initialize upload")
|
||
init_payload = {
|
||
"name": filename,
|
||
"content_type": mime_type
|
||
}
|
||
|
||
status, data = await self._request(
|
||
"POST",
|
||
"/v1/files:initialize",
|
||
use_files_api=True,
|
||
json=init_payload
|
||
)
|
||
|
||
print_json(data, "Initialize Response")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to initialize upload")
|
||
return status, data
|
||
|
||
file_id = data.get("file_id")
|
||
if not file_id:
|
||
print_error("No file_id in initialize response")
|
||
return status, data
|
||
|
||
print_success(f"Initialized upload with file_id: {file_id}")
|
||
|
||
# Step 2: Upload chunks
|
||
print_info(f"Step 2: Upload {len(content)} bytes in chunks")
|
||
|
||
# Encode content as base64 for chunk upload
|
||
chunk_b64 = base64.b64encode(content).decode('ascii')
|
||
|
||
chunk_payload = {
|
||
"file_id": file_id,
|
||
"chunk": chunk_b64
|
||
}
|
||
|
||
status, data = await self._request(
|
||
"POST",
|
||
"/v1/files:uploadChunks",
|
||
use_files_api=True,
|
||
json=chunk_payload
|
||
)
|
||
|
||
print_json(data, "Upload Chunks Response")
|
||
|
||
if status in [200, 201]:
|
||
print_success(f"Uploaded file chunks: {file_id}")
|
||
self.uploaded_files.append(file_id)
|
||
|
||
return status, data
|
||
|
||
async def upload_file(self, content: bytes, filename: str,
|
||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||
"""
|
||
Try multiple upload methods until one succeeds
|
||
"""
|
||
print_info("Trying upload methods...")
|
||
|
||
# Try Method 0: Multipart form-data (what the server really wants!)
|
||
status0, data0 = await self.upload_file_multipart(content, filename, mime_type)
|
||
|
||
if status0 in [200, 201]:
|
||
file_id = data0.get("id") or data0.get("file_id") # Try both field names
|
||
if file_id:
|
||
self.uploaded_files.append(file_id)
|
||
print_success(f"✅ Multipart upload succeeded: {file_id}")
|
||
return status0, data0
|
||
else:
|
||
print_error("No 'id' or 'file_id' in response")
|
||
print_json(data0, "Response data")
|
||
|
||
print_warning(f"Multipart upload failed ({status0}), trying JSON upload...")
|
||
|
||
# Try Method 1: Direct upload with JSON
|
||
status1, data1 = await self.upload_file_direct(content, filename, mime_type)
|
||
|
||
if status1 in [200, 201]:
|
||
file_id = data1.get("file_id")
|
||
if file_id:
|
||
self.uploaded_files.append(file_id)
|
||
print_success(f"✅ Direct upload succeeded: {file_id}")
|
||
return status1, data1
|
||
|
||
print_warning(f"Direct upload failed ({status1}), trying chunked upload...")
|
||
|
||
# Try Method 2: Initialize + Chunks
|
||
status2, data2 = await self.upload_file_chunked(content, filename, mime_type)
|
||
|
||
if status2 in [200, 201]:
|
||
print_success("✅ Chunked upload succeeded")
|
||
return status2, data2
|
||
|
||
print_error("❌ All upload methods failed")
|
||
return status0, data0 # Return multipart method's error
|
||
|
||
# ========================================================================
|
||
# COLLECTION DOCUMENT OPERATIONS
|
||
# ========================================================================
|
||
|
||
async def add_document_to_collection(self, collection_id: str,
|
||
file_id: str) -> tuple[int, Any]:
|
||
"""POST /v1/collections/{collection_id}/documents/{file_id}"""
|
||
return await self._request("POST",
|
||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||
|
||
async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]:
|
||
"""GET /v1/collections/{collection_id}/documents"""
|
||
return await self._request("GET",
|
||
f"/v1/collections/{collection_id}/documents")
|
||
|
||
async def get_collection_document(self, collection_id: str,
|
||
file_id: str) -> tuple[int, Any]:
|
||
"""GET /v1/collections/{collection_id}/documents/{file_id}"""
|
||
return await self._request("GET",
|
||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||
|
||
async def update_collection_document(self, collection_id: str, file_id: str,
|
||
metadata: Dict) -> tuple[int, Any]:
|
||
"""PATCH /v1/collections/{collection_id}/documents/{file_id}"""
|
||
return await self._request("PATCH",
|
||
f"/v1/collections/{collection_id}/documents/{file_id}",
|
||
json={"metadata": metadata})
|
||
|
||
async def remove_document_from_collection(self, collection_id: str,
|
||
file_id: str) -> tuple[int, Any]:
|
||
"""DELETE /v1/collections/{collection_id}/documents/{file_id}"""
|
||
return await self._request("DELETE",
|
||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||
|
||
async def batch_get_documents(self, collection_id: str,
|
||
file_ids: List[str]) -> tuple[int, Any]:
|
||
"""GET /v1/collections/{collection_id}/documents:batchGet"""
|
||
params = {"fileIds": ",".join(file_ids)}
|
||
return await self._request("GET",
|
||
f"/v1/collections/{collection_id}/documents:batchGet",
|
||
params=params)
|
||
|
||
# ========================================================================
|
||
# TEST SCENARIOS
|
||
# ========================================================================
|
||
|
||
async def test_basic_collection_crud(self):
|
||
"""Test 1: Basic Collection CRUD operations"""
|
||
print_header("TEST 1: Basic Collection CRUD")
|
||
|
||
# Create
|
||
print_info("Creating collection...")
|
||
status, data = await self.create_collection(
|
||
name="Test Collection 1",
|
||
metadata={"test": True, "purpose": "API testing"}
|
||
)
|
||
print_json(data, "Response")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to create collection")
|
||
self.test_results["collection_crud"] = False
|
||
return None
|
||
|
||
# Try different possible field names for collection ID
|
||
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
|
||
if not collection_id:
|
||
print_error("No collection ID field in response")
|
||
print_json(data, "Response Data")
|
||
self.test_results["collection_crud"] = False
|
||
return None
|
||
|
||
print_success(f"Collection created: {collection_id}")
|
||
|
||
# Read
|
||
print_info("Reading collection...")
|
||
status, data = await self.get_collection(collection_id)
|
||
print_json(data, "Response")
|
||
|
||
# Update
|
||
print_info("Updating collection...")
|
||
status, data = await self.update_collection(
|
||
collection_id,
|
||
name="Test Collection 1 (Updated)",
|
||
metadata={"test": True, "updated": True}
|
||
)
|
||
print_json(data, "Response")
|
||
|
||
self.test_results["collection_crud"] = True
|
||
return collection_id
|
||
|
||
async def test_file_upload_and_structure(self, collection_id: str):
|
||
"""Test 2: File upload (two-step process)"""
|
||
print_header("TEST 2: File Upload (Two-Step) & Response Structure")
|
||
|
||
# Create test file content
|
||
test_content = b"""
|
||
This is a test document for xAI Collections API testing.
|
||
|
||
Topic: German Contract Law
|
||
|
||
Key Points:
|
||
- Contracts require offer and acceptance
|
||
- Consideration is necessary
|
||
- Written form may be required for certain contracts
|
||
|
||
This document contains sufficient content for testing.
|
||
"""
|
||
|
||
# STEP 1: Upload file to Files API
|
||
print_info("STEP 1: Uploading file to Files API (api.x.ai)...")
|
||
status, data = await self.upload_file(
|
||
content=test_content,
|
||
filename="test_document.txt",
|
||
mime_type="text/plain"
|
||
)
|
||
print_json(data, "Files API Upload Response")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("File upload to Files API failed")
|
||
self.test_results["file_upload"] = False
|
||
return None
|
||
|
||
# Try both field names: 'id' (Files API) or 'file_id' (Collections API)
|
||
file_id = data.get("id") or data.get("file_id")
|
||
if not file_id:
|
||
print_error("No 'id' or 'file_id' field in response")
|
||
print_json(data, "Response for debugging")
|
||
self.test_results["file_upload"] = False
|
||
return None
|
||
|
||
print_success(f"File uploaded to Files API: {file_id}")
|
||
|
||
# STEP 2: Add file to collection using Management API
|
||
print_info("STEP 2: Adding file to collection (management-api.x.ai)...")
|
||
status2, data2 = await self.add_document_to_collection(collection_id, file_id)
|
||
print_json(data2, "Add to Collection Response")
|
||
|
||
if status2 not in [200, 201]:
|
||
print_error("Failed to add file to collection")
|
||
self.test_results["file_upload"] = False
|
||
return None
|
||
|
||
print_success(f"File added to collection: {file_id}")
|
||
self.test_results["file_upload"] = True
|
||
return file_id
|
||
|
||
async def test_document_in_collection(self, collection_id: str, file_id: str):
|
||
"""Test 3: Verify document is in collection and get details"""
|
||
print_header("TEST 3: Verify Document in Collection")
|
||
|
||
# Verify by listing documents
|
||
print_info("Listing collection documents...")
|
||
status, data = await self.get_collection_documents(collection_id)
|
||
print_json(data, "Collection Documents")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to list documents")
|
||
self.test_results["add_to_collection"] = False
|
||
return False
|
||
|
||
# Get specific document
|
||
print_info("Getting specific document...")
|
||
status, data = await self.get_collection_document(collection_id, file_id)
|
||
print_json(data, "Document Details")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to get document details")
|
||
self.test_results["add_to_collection"] = False
|
||
return False
|
||
|
||
print_success("Document verified in collection")
|
||
self.test_results["add_to_collection"] = True
|
||
return True
|
||
|
||
async def test_shared_file_across_collections(self, file_id: str):
|
||
"""Test 4: CRITICAL - Can same file_id be used in multiple collections?"""
|
||
print_header("TEST 4: Shared File Across Collections (CRITICAL)")
|
||
|
||
# Create second collection
|
||
print_info("Creating second collection...")
|
||
status, data = await self.create_collection(
|
||
name="Test Collection 2",
|
||
metadata={"test": True, "purpose": "Multi-collection test"}
|
||
)
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to create second collection")
|
||
self.test_results["shared_file"] = False
|
||
return
|
||
|
||
collection2_id = data.get("collection_id") or data.get("id")
|
||
print_success(f"Collection 2 created: {collection2_id}")
|
||
|
||
# Try to add SAME file_id to second collection
|
||
print_info(f"Adding SAME file_id {file_id} to collection 2...")
|
||
|
||
status, data = await self.add_document_to_collection(collection2_id, file_id)
|
||
print_json(data, "Response from adding existing file_id to second collection")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to add same file to second collection")
|
||
print_warning("⚠️ Files might be collection-specific (BAD for our use case)")
|
||
self.test_results["shared_file"] = False
|
||
return
|
||
|
||
print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!")
|
||
print_success("✅ This is PERFECT for our architecture!")
|
||
|
||
# Verify both collections have the file
|
||
print_info("Verifying file in both collections...")
|
||
|
||
status1, data1 = await self.get_collection_documents(self.created_collections[0])
|
||
status2, data2 = await self.get_collection_documents(collection2_id)
|
||
|
||
print_json(data1, "Collection 1 Documents")
|
||
print_json(data2, "Collection 2 Documents")
|
||
|
||
# Extract file_ids from both collections to verify they match
|
||
docs1 = data1.get("documents", [])
|
||
docs2 = data2.get("documents", [])
|
||
|
||
file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1]
|
||
file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2]
|
||
|
||
if file_id in file_ids_1 and file_id in file_ids_2:
|
||
print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!")
|
||
print_info(" → We can store ONE xaiFileId per document!")
|
||
print_info(" → Simply track which collections contain it!")
|
||
|
||
self.test_results["shared_file"] = True
|
||
|
||
async def test_document_update(self, collection_id: str, file_id: str):
|
||
"""Test 5: Update document metadata"""
|
||
print_header("TEST 5: Update Document Metadata")
|
||
|
||
print_info("Updating document metadata...")
|
||
status, data = await self.update_collection_document(
|
||
collection_id,
|
||
file_id,
|
||
metadata={"updated_at": datetime.now().isoformat(), "version": 2}
|
||
)
|
||
print_json(data, "Update Response")
|
||
|
||
if status not in [200, 201]:
|
||
print_error("Failed to update document")
|
||
self.test_results["document_update"] = False
|
||
return
|
||
|
||
print_success("Document metadata updated")
|
||
self.test_results["document_update"] = True
|
||
|
||
async def test_document_removal(self):
|
||
"""Test 6: Remove document from collection"""
|
||
print_header("TEST 6: Remove Document from Collection")
|
||
|
||
if len(self.created_collections) < 2 or not self.uploaded_files:
|
||
print_warning("Skipping - need at least 2 collections and 1 file")
|
||
return
|
||
|
||
collection_id = self.created_collections[0]
|
||
file_id = self.uploaded_files[0]
|
||
|
||
print_info(f"Removing file {file_id} from collection {collection_id}...")
|
||
status, data = await self.remove_document_from_collection(collection_id, file_id)
|
||
print_json(data, "Response")
|
||
|
||
if status not in [200, 204]:
|
||
print_error("Failed to remove document")
|
||
self.test_results["document_removal"] = False
|
||
return
|
||
|
||
print_success("Document removed from collection")
|
||
|
||
# Verify removal
|
||
print_info("Verifying removal...")
|
||
status, data = await self.get_collection_documents(collection_id)
|
||
print_json(data, "Remaining Documents")
|
||
|
||
self.test_results["document_removal"] = True
|
||
|
||
async def test_batch_get(self):
|
||
"""Test 7: Batch get documents"""
|
||
print_header("TEST 7: Batch Get Documents")
|
||
|
||
if not self.created_collections or not self.uploaded_files:
|
||
print_warning("Skipping - need collections and files")
|
||
return
|
||
|
||
collection_id = self.created_collections[-1] # Use last collection
|
||
file_ids = self.uploaded_files
|
||
|
||
if not file_ids:
|
||
print_warning("No file IDs to batch get")
|
||
return
|
||
|
||
print_info(f"Batch getting {len(file_ids)} documents...")
|
||
status, data = await self.batch_get_documents(collection_id, file_ids)
|
||
print_json(data, "Batch Response")
|
||
|
||
self.test_results["batch_get"] = status in [200, 201]
|
||
|
||
async def cleanup(self):
|
||
"""Clean up all created test resources"""
|
||
print_header("CLEANUP: Deleting Test Resources")
|
||
|
||
# Delete collections (should cascade delete documents?)
|
||
for collection_id in list(self.created_collections):
|
||
print_info(f"Deleting collection {collection_id}...")
|
||
await self.delete_collection(collection_id)
|
||
|
||
print_success("Cleanup complete")
|
||
|
||
def print_summary(self):
|
||
"""Print test results summary"""
|
||
print_header("TEST RESULTS SUMMARY")
|
||
|
||
total = len(self.test_results)
|
||
passed = sum(1 for v in self.test_results.values() if v)
|
||
|
||
for test_name, result in self.test_results.items():
|
||
status = "✅ PASS" if result else "❌ FAIL"
|
||
print(f"{status} - {test_name}")
|
||
|
||
print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n")
|
||
|
||
# Critical findings
|
||
print_header("CRITICAL FINDINGS")
|
||
|
||
if "shared_file" in self.test_results:
|
||
if self.test_results["shared_file"]:
|
||
print_success("✅ Same file CAN be used in multiple collections")
|
||
print_info(" → We can use a SINGLE xaiFileId per document!")
|
||
print_info(" → Much simpler architecture!")
|
||
else:
|
||
print_error("❌ Files seem to be collection-specific")
|
||
print_warning(" → More complex mapping required")
|
||
print_warning(" → Each collection might need separate file upload")
|
||
|
||
|
||
async def main():
|
||
"""Run all tests"""
|
||
print_header("xAI Collections API Test Suite")
|
||
print_info(f"Management URL: {XAI_MANAGEMENT_URL}")
|
||
print_info(f"Files URL: {XAI_FILES_URL}")
|
||
print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}")
|
||
print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}")
|
||
|
||
async with XAICollectionsTestClient() as client:
|
||
try:
|
||
# Test 1: Basic Collection CRUD
|
||
collection_id = await client.test_basic_collection_crud()
|
||
|
||
if not collection_id:
|
||
print_error("Cannot continue without collection. Stopping.")
|
||
return
|
||
|
||
# Test 2: File Upload (now two-step process)
|
||
file_id = await client.test_file_upload_and_structure(collection_id)
|
||
|
||
if not file_id:
|
||
print_error("File upload failed. Continuing with remaining tests...")
|
||
else:
|
||
# Test 3: Verify document in collection
|
||
await client.test_document_in_collection(collection_id, file_id)
|
||
|
||
# Test 4: CRITICAL - Shared file test
|
||
await client.test_shared_file_across_collections(file_id)
|
||
|
||
# Test 5: Update document
|
||
await client.test_document_update(collection_id, file_id)
|
||
|
||
# Test 6: Remove document
|
||
await client.test_document_removal()
|
||
|
||
# Test 7: Batch get
|
||
await client.test_batch_get()
|
||
|
||
# Cleanup
|
||
await client.cleanup()
|
||
|
||
# Print summary
|
||
client.print_summary()
|
||
|
||
except Exception as e:
|
||
print_error(f"Test suite failed: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Try cleanup anyway
|
||
try:
|
||
await client.cleanup()
|
||
except:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|