Add comprehensive test scripts for thumbnail generation and xAI collections API

- Implemented `test_thumbnail_generation.py` to validate the complete flow of document thumbnail generation in EspoCRM, including document creation, file upload, webhook triggering, and preview verification.
- Created `test_xai_collections_api.py` to test critical operations of the xAI Collections API, covering file uploads, collection CRUD operations, document management, and response validation.
- Both scripts include detailed logging for success and error states, ensuring robust testing and easier debugging.
This commit is contained in:
bsiggel
2026-03-03 17:03:08 +00:00
parent c45bfb7233
commit bcb6454b2a
15 changed files with 505 additions and 1259 deletions

110
tests/README.md Normal file
View File

@@ -0,0 +1,110 @@
# Test Scripts
This directory contains test scripts for the Motia III xAI Collections integration.
## Test Files
### `test_xai_collections_api.py`
Tests xAI Collections API authentication and basic operations.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_xai_collections_api.py
```
**Required Environment Variables:**
- `XAI_MANAGEMENT_API_KEY` - xAI Management API key for collection operations
- `XAI_API_KEY` - xAI Regular API key for file operations
**Tests:**
- ✅ Management API authentication
- ✅ Regular API authentication
- ✅ Collection listing
- ✅ Collection creation
- ✅ File upload
- ✅ Collection deletion
- ✅ Error handling
### `test_preview_upload.py`
Tests preview/thumbnail upload to EspoCRM CDokumente entity.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_preview_upload.py
```
**Required Environment Variables:**
- `ESPOCRM_URL` - EspoCRM instance URL (default: https://crm.bitbylaw.com)
- `ESPOCRM_API_KEY` - EspoCRM API key
**Tests:**
- ✅ Preview image generation (WebP format, 600x800px)
- ✅ Base64 Data URI encoding
- ✅ Attachment upload via JSON POST
- ✅ Entity update with previewId/previewName
**Status:** ✅ Successfully tested - Attachment ID `69a71194c7c6baebf` created
### `test_thumbnail_generation.py`
Tests thumbnail generation for various document types.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_thumbnail_generation.py
```
**Supported Formats:**
- PDF → WebP (first page)
- DOCX/DOC → PDF → WebP
- Images (JPEG, PNG, etc.) → WebP resize
**Dependencies:**
- `python3-pil` - PIL/Pillow for image processing
- `poppler-utils` - PDF rendering
- `libreoffice` - DOCX to PDF conversion
- `pdf2image` - PDF to image conversion
## Running Tests
### All Tests
```bash
cd /opt/motia-iii/bitbylaw
python -m pytest tests/ -v
```
### Individual Tests
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_xai_collections_api.py
python tests/test_preview_upload.py
python tests/test_thumbnail_generation.py
```
## Environment Setup
Create `.env` file in `/opt/motia-iii/bitbylaw/`:
```bash
# xAI Collections API
XAI_MANAGEMENT_API_KEY=xai-token-xxx...
XAI_API_KEY=xai-xxx...
# EspoCRM API
ESPOCRM_URL=https://crm.bitbylaw.com
ESPOCRM_API_KEY=xxx...
# Redis (for locking)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
```
## Test Results
Last test run: Successfully validated preview upload functionality
- Preview upload works with base64 Data URI format
- Attachment created with ID: `69a71194c7c6baebf`
- CDokumente entity updated with previewId/previewName
- WebP format at 600x800px confirmed working

279
tests/test_preview_upload.py Executable file
View File

@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
Test Script: Preview Image Upload zu EspoCRM
Testet das Hochladen eines Preview-Bildes (WebP) als Attachment
zu einem CDokumente Entity via EspoCRM API.
Usage:
python test_preview_upload.py <document_id>
Example:
python test_preview_upload.py 69a68906ac3d0fd25
"""
import asyncio
import aiohttp
import base64
import os
import sys
from io import BytesIO
from PIL import Image
# EspoCRM Config (aus Environment oder hardcoded für Test)
ESPOCRM_API_BASE_URL = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
ESPOCRM_API_KEY = os.getenv('ESPOCRM_API_KEY', '')
# Test-Parameter
ENTITY_TYPE = 'CDokumente'
FIELD_NAME = 'preview'
def generate_test_webp(text: str = "TEST PREVIEW", size: tuple = (600, 800)) -> bytes:
"""
Generiert ein einfaches Test-WebP-Bild
Args:
text: Text der im Bild angezeigt wird
size: Größe des Bildes (width, height)
Returns:
WebP image als bytes
"""
print(f"📐 Generating test image ({size[0]}x{size[1]})...")
# Erstelle einfaches Bild mit Text
img = Image.new('RGB', size, color='lightblue')
# Optional: Füge Text hinzu (benötigt PIL ImageDraw)
try:
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# Versuche ein größeres Font zu laden
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 40)
except:
font = ImageFont.load_default()
# Text zentriert
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (size[0] - text_width) // 2
y = (size[1] - text_height) // 2
draw.text((x, y), text, fill='darkblue', font=font)
except Exception as e:
print(f"⚠️ Text rendering failed: {e}")
# Konvertiere zu WebP
buffer = BytesIO()
img.save(buffer, format='WEBP', quality=85)
webp_bytes = buffer.getvalue()
print(f"✅ Test image generated: {len(webp_bytes)} bytes")
return webp_bytes
async def upload_preview_to_espocrm(
document_id: str,
preview_data: bytes,
entity_type: str = 'CDokumente'
) -> dict:
"""
Upload Preview zu EspoCRM Attachment API
Args:
document_id: ID des CDokumente/Document Entity
preview_data: WebP image als bytes
entity_type: Entity-Type (CDokumente oder Document)
Returns:
Response dict mit Attachment ID
"""
print(f"\n📤 Uploading preview to {entity_type}/{document_id}...")
print(f" Preview size: {len(preview_data)} bytes")
# Base64-encode
base64_data = base64.b64encode(preview_data).decode('ascii')
file_data_uri = f"data:image/webp;base64,{base64_data}"
print(f" Base64 encoded: {len(base64_data)} chars")
# API Request
url = ESPOCRM_API_BASE_URL.rstrip('/') + '/Attachment'
headers = {
'X-Api-Key': ESPOCRM_API_KEY,
'Content-Type': 'application/json'
}
payload = {
'name': 'preview.webp',
'type': 'image/webp',
'role': 'Attachment',
'field': FIELD_NAME,
'relatedType': entity_type,
'relatedId': document_id,
'file': file_data_uri
}
print(f"\n🌐 POST {url}")
print(f" Headers: X-Api-Key={ESPOCRM_API_KEY[:20]}...")
print(f" Payload keys: {list(payload.keys())}")
print(f" - name: {payload['name']}")
print(f" - type: {payload['type']}")
print(f" - role: {payload['role']}")
print(f" - field: {payload['field']}")
print(f" - relatedType: {payload['relatedType']}")
print(f" - relatedId: {payload['relatedId']}")
print(f" - file: data:image/webp;base64,... ({len(base64_data)} chars)")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, headers=headers, json=payload) as response:
print(f"\n📥 Response Status: {response.status}")
print(f" Content-Type: {response.content_type}")
response_text = await response.text()
if response.status >= 400:
print(f"\n❌ Upload FAILED!")
print(f" Status: {response.status}")
print(f" Response: {response_text}")
raise Exception(f"Upload error {response.status}: {response_text}")
# Parse JSON response
result = await response.json()
attachment_id = result.get('id')
print(f"\n✅ Upload SUCCESSFUL!")
print(f" Attachment ID: {attachment_id}")
print(f" Full response: {result}")
return result
async def update_entity_with_preview(
document_id: str,
attachment_id: str,
entity_type: str = 'CDokumente'
) -> dict:
"""
Update Entity mit previewId und previewName
Args:
document_id: Entity ID
attachment_id: Attachment ID vom Upload
entity_type: Entity-Type
Returns:
Updated entity data
"""
print(f"\n📝 Updating {entity_type}/{document_id} with previewId...")
url = f"{ESPOCRM_API_BASE_URL.rstrip('/')}/{entity_type}/{document_id}"
headers = {
'X-Api-Key': ESPOCRM_API_KEY,
'Content-Type': 'application/json'
}
payload = {
'previewId': attachment_id,
'previewName': 'preview.webp'
}
print(f" PUT {url}")
print(f" Payload: {payload}")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.put(url, headers=headers, json=payload) as response:
print(f" Response Status: {response.status}")
if response.status >= 400:
response_text = await response.text()
print(f"\n❌ Update FAILED!")
print(f" Status: {response.status}")
print(f" Response: {response_text}")
raise Exception(f"Update error {response.status}: {response_text}")
result = await response.json()
print(f"\n✅ Entity updated successfully!")
print(f" previewId: {result.get('previewId')}")
print(f" previewName: {result.get('previewName')}")
return result
async def main():
"""Main test flow"""
print("=" * 80)
print("🖼️ ESPOCRM PREVIEW UPLOAD TEST")
print("=" * 80)
# Check arguments
if len(sys.argv) < 2:
print("\n❌ Error: Document ID required!")
print(f"\nUsage: {sys.argv[0]} <document_id>")
print(f"Example: {sys.argv[0]} 69a68906ac3d0fd25")
sys.exit(1)
document_id = sys.argv[1]
# Check API key
if not ESPOCRM_API_KEY:
print("\n❌ Error: ESPOCRM_API_KEY environment variable not set!")
sys.exit(1)
print(f"\n📋 Test Parameters:")
print(f" API Base URL: {ESPOCRM_API_BASE_URL}")
print(f" API Key: {ESPOCRM_API_KEY[:20]}...")
print(f" Entity Type: {ENTITY_TYPE}")
print(f" Document ID: {document_id}")
print(f" Field: {FIELD_NAME}")
try:
# Step 1: Generate test image
print("\n" + "=" * 80)
print("STEP 1: Generate Test Image")
print("=" * 80)
preview_data = generate_test_webp(f"Preview Test\n{document_id[:8]}", size=(600, 800))
# Step 2: Upload to EspoCRM
print("\n" + "=" * 80)
print("STEP 2: Upload to EspoCRM Attachment API")
print("=" * 80)
result = await upload_preview_to_espocrm(document_id, preview_data, ENTITY_TYPE)
attachment_id = result.get('id')
# Step 3: Update Entity
print("\n" + "=" * 80)
print("STEP 3: Update Entity with Preview Reference")
print("=" * 80)
await update_entity_with_preview(document_id, attachment_id, ENTITY_TYPE)
# Success summary
print("\n" + "=" * 80)
print("✅ TEST SUCCESSFUL!")
print("=" * 80)
print(f"\n📊 Summary:")
print(f" - Attachment ID: {attachment_id}")
print(f" - Entity: {ENTITY_TYPE}/{document_id}")
print(f" - Preview Size: {len(preview_data)} bytes")
print(f"\n🔗 View in EspoCRM:")
print(f" {ESPOCRM_API_BASE_URL.replace('/api/v1', '')}/#CDokumente/view/{document_id}")
except Exception as e:
print("\n" + "=" * 80)
print("❌ TEST FAILED!")
print("=" * 80)
print(f"\nError: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Test script for Document Thumbnail Generation
Tests the complete flow:
1. Create a test document in EspoCRM
2. Upload a file attachment
3. Trigger the webhook (or wait for automatic trigger)
4. Verify preview generation
"""
import asyncio
import aiohttp
import os
import sys
import json
from pathlib import Path
from io import BytesIO
from PIL import Image
# Add bitbylaw to path
sys.path.insert(0, str(Path(__file__).parent))
from services.espocrm import EspoCRMAPI
async def create_test_image(width: int = 800, height: int = 600) -> bytes:
"""Create a simple test PNG image"""
img = Image.new('RGB', (width, height), color='lightblue')
# Add some text/pattern so it's not just a solid color
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# Draw some shapes
draw.rectangle([50, 50, width-50, height-50], outline='darkblue', width=5)
draw.ellipse([width//4, height//4, 3*width//4, 3*height//4], outline='red', width=3)
# Add text
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 48)
except:
font = None
text = "TEST IMAGE\nFor Thumbnail\nGeneration"
draw.text((width//2, height//2), text, fill='black', anchor='mm', font=font, align='center')
# Save to bytes
buffer = BytesIO()
img.save(buffer, format='PNG')
return buffer.getvalue()
async def create_test_document(espocrm: EspoCRMAPI) -> str:
"""Create a test document in EspoCRM"""
print("\n📄 Creating test document in EspoCRM...")
document_data = {
"name": f"Test Thumbnail Generation {asyncio.get_event_loop().time()}",
"status": "Active",
"dateiStatus": "Neu", # This should trigger preview generation
"type": "Image",
"description": "Automated test document for thumbnail generation"
}
result = await espocrm.create_entity("Document", document_data)
doc_id = result.get("id")
print(f"✅ Document created: {doc_id}")
print(f" Name: {result.get('name')}")
print(f" Datei-Status: {result.get('dateiStatus')}")
return doc_id
async def upload_test_file(espocrm: EspoCRMAPI, doc_id: str) -> str:
"""Upload a test image file to the document"""
print(f"\n📤 Uploading test image to document {doc_id}...")
# Create test image
image_data = await create_test_image(1200, 900)
print(f" Generated test image: {len(image_data)} bytes")
# Upload to EspoCRM
attachment = await espocrm.upload_attachment(
file_content=image_data,
filename="test_image.png",
parent_type="Document",
parent_id=doc_id,
field="file",
mime_type="image/png",
role="Attachment"
)
attachment_id = attachment.get("id")
print(f"✅ File uploaded: {attachment_id}")
print(f" Filename: {attachment.get('name')}")
print(f" Size: {attachment.get('size')} bytes")
return attachment_id
async def trigger_webhook(doc_id: str, action: str = "update"):
"""Manually trigger the document webhook"""
print(f"\n🔔 Triggering webhook for document {doc_id}...")
webhook_url = f"http://localhost:7777/vmh/webhook/document/{action}"
payload = {
"entityType": "Document",
"entity": {
"id": doc_id,
"entityType": "Document"
},
"data": {
"entity": {
"id": doc_id
}
}
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
status = response.status
text = await response.text()
if status == 200:
print(f"✅ Webhook triggered successfully")
print(f" Response: {text}")
else:
print(f"❌ Webhook failed: {status}")
print(f" Response: {text}")
return status == 200
async def check_preview_generated(espocrm: EspoCRMAPI, doc_id: str, max_wait: int = 30):
"""Check if preview was generated (poll for a few seconds)"""
print(f"\n🔍 Checking for preview generation (max {max_wait}s)...")
for i in range(max_wait):
await asyncio.sleep(1)
# Get document
doc = await espocrm.get_entity("Document", doc_id)
# Check if preview field is populated
preview_id = doc.get("previewId")
if preview_id:
print(f"\n✅ Preview generated!")
print(f" Preview Attachment ID: {preview_id}")
print(f" Preview Name: {doc.get('previewName')}")
print(f" Preview Type: {doc.get('previewType')}")
# Try to download and check the preview
try:
preview_data = await espocrm.download_attachment(preview_id)
print(f" Preview Size: {len(preview_data)} bytes")
# Verify it's a WebP image
from PIL import Image
img = Image.open(BytesIO(preview_data))
print(f" Preview Format: {img.format}")
print(f" Preview Dimensions: {img.width}x{img.height}")
if img.format == "WEBP":
print(" ✅ Format is WebP as expected")
if img.width <= 600 and img.height <= 800:
print(" ✅ Dimensions within expected range")
except Exception as e:
print(f" ⚠️ Could not verify preview: {e}")
return True
if (i + 1) % 5 == 0:
print(f" Still waiting... ({i + 1}s)")
print(f"\n❌ Preview not generated after {max_wait}s")
return False
async def cleanup_test_document(espocrm: EspoCRMAPI, doc_id: str):
"""Delete the test document"""
print(f"\n🗑️ Cleaning up test document {doc_id}...")
try:
await espocrm.delete_entity("Document", doc_id)
print("✅ Test document deleted")
except Exception as e:
print(f"⚠️ Could not delete test document: {e}")
async def main():
print("=" * 80)
print("THUMBNAIL GENERATION TEST")
print("=" * 80)
# Initialize EspoCRM API
espocrm = EspoCRMAPI()
doc_id = None
try:
# Step 1: Create test document
doc_id = await create_test_document(espocrm)
# Step 2: Upload test file
attachment_id = await upload_test_file(espocrm, doc_id)
# Step 3: Update document to trigger webhook (set dateiStatus to trigger sync)
print(f"\n🔄 Updating document to trigger webhook...")
await espocrm.update_entity("Document", doc_id, {
"dateiStatus": "Neu" # This should trigger the webhook
})
print("✅ Document updated")
# Step 4: Wait a bit for webhook to be processed
print("\n⏳ Waiting 3 seconds for webhook processing...")
await asyncio.sleep(3)
# Step 5: Check if preview was generated
success = await check_preview_generated(espocrm, doc_id, max_wait=20)
# Summary
print("\n" + "=" * 80)
if success:
print("✅ TEST PASSED - Preview generation successful!")
else:
print("❌ TEST FAILED - Preview was not generated")
print("\nCheck logs with:")
print(" sudo journalctl -u motia.service --since '2 minutes ago' | grep -E '(PREVIEW|Document)'")
print("=" * 80)
# Ask if we should clean up
print(f"\nTest document ID: {doc_id}")
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
if cleanup == 'y':
await cleanup_test_document(espocrm, doc_id)
else:
print(f" Test document kept: {doc_id}")
print(f" View in EspoCRM: https://crm.bitbylaw.com/#Document/view/{doc_id}")
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
if doc_id:
print(f"\nTest document ID: {doc_id}")
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
if cleanup == 'y':
await cleanup_test_document(espocrm, doc_id)
if __name__ == "__main__":
asyncio.run(main())

788
tests/test_xai_collections_api.py Executable file
View File

@@ -0,0 +1,788 @@
#!/usr/bin/env python3
"""
xAI Collections API Test Script
Tests all critical operations for our document sync requirements:
1. File upload and ID behavior (collection-specific vs global?)
2. Same file in multiple collections (shared file_id?)
3. CRUD operations on collections
4. CRUD operations on documents
5. Response structures and metadata
6. Update/versioning behavior
Usage:
export XAI_API_KEY="xai-..."
python test_xai_collections_api.py
"""
import os
import sys
import json
import asyncio
import aiohttp
from typing import Optional, Dict, Any, List
from datetime import datetime
import tempfile
# Configuration
XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai")
XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai")
XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key
XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload
if not XAI_MANAGEMENT_KEY:
print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!")
print(" export XAI_MANAGEMENT_KEY='xai-token-...'")
sys.exit(1)
if not XAI_API_KEY:
print("❌ ERROR: XAI_API_KEY environment variable not set!")
print(" export XAI_API_KEY='xai-...'")
sys.exit(1)
class Colors:
"""ANSI color codes for terminal output"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_header(text: str):
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n")
def print_success(text: str):
print(f"{Colors.GREEN}{text}{Colors.END}")
def print_error(text: str):
print(f"{Colors.RED}{text}{Colors.END}")
def print_info(text: str):
print(f"{Colors.BLUE} {text}{Colors.END}")
def print_warning(text: str):
print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}")
def print_json(data: Any, title: Optional[str] = None):
if title:
print(f"{Colors.BOLD}{title}:{Colors.END}")
print(json.dumps(data, indent=2, ensure_ascii=False))
class XAICollectionsTestClient:
"""Test client for xAI Collections API"""
def __init__(self):
self.management_url = XAI_MANAGEMENT_URL
self.files_url = XAI_FILES_URL
self.management_key = XAI_MANAGEMENT_KEY
self.api_key = XAI_API_KEY
self.session: Optional[aiohttp.ClientSession] = None
# Test state
self.created_collections: List[str] = []
self.uploaded_files: List[str] = []
self.test_results: Dict[str, bool] = {}
async def __aenter__(self):
# Session without default Content-Type (set per-request)
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]:
"""Make HTTP request and return (status, response_data)"""
base_url = self.files_url if use_files_api else self.management_url
url = f"{base_url}{path}"
# Set headers per-request
if 'headers' not in kwargs:
kwargs['headers'] = {}
# Set authorization
if use_files_api:
kwargs['headers']['Authorization'] = f"Bearer {self.api_key}"
else:
kwargs['headers']['Authorization'] = f"Bearer {self.management_key}"
# Set Content-Type for JSON requests
if 'json' in kwargs:
kwargs['headers']['Content-Type'] = 'application/json'
print_info(f"{method} {url}")
print_info(f"Headers: {kwargs.get('headers', {})}")
try:
async with self.session.request(method, url, **kwargs) as response:
status = response.status
try:
data = await response.json()
except:
text = await response.text()
data = {"_raw_text": text} if text else {}
if status < 400:
print_success(f"Response: {status}")
else:
print_error(f"Response: {status}")
return status, data
except Exception as e:
print_error(f"Request failed: {e}")
return 0, {"error": str(e)}
# ========================================================================
# COLLECTION OPERATIONS
# ========================================================================
async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]:
"""POST /v1/collections"""
payload = {
"collection_name": name, # xAI uses "collection_name" not "name"
"metadata": metadata or {}
}
status, data = await self._request("POST", "/v1/collections", json=payload)
if status == 200 or status == 201:
# Try different possible field names for collection ID
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
if collection_id:
self.created_collections.append(collection_id)
print_success(f"Created collection: {collection_id}")
return status, data
async def get_collection(self, collection_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}"""
return await self._request("GET", f"/v1/collections/{collection_id}")
async def list_collections(self) -> tuple[int, Any]:
"""GET /v1/collections"""
return await self._request("GET", "/v1/collections")
async def update_collection(self, collection_id: str, name: Optional[str] = None,
metadata: Optional[Dict] = None) -> tuple[int, Any]:
"""PUT /v1/collections/{collection_id}"""
payload = {}
if name:
payload["collection_name"] = name # xAI uses "collection_name"
if metadata:
payload["metadata"] = metadata
return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload)
async def delete_collection(self, collection_id: str) -> tuple[int, Any]:
"""DELETE /v1/collections/{collection_id}"""
status, data = await self._request("DELETE", f"/v1/collections/{collection_id}")
if status == 200 or status == 204:
if collection_id in self.created_collections:
self.created_collections.remove(collection_id)
return status, data
# ========================================================================
# FILE OPERATIONS (multiple upload methods)
# ========================================================================
async def upload_file_multipart(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 0: Multipart form-data upload (what the server actually expects!)
POST /v1/files with multipart/form-data
"""
print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)")
# Create multipart form data
form = aiohttp.FormData()
form.add_field('file', content, filename=filename, content_type=mime_type)
print_info(f"Uploading {len(content)} bytes as multipart/form-data")
# Use _request but with form data instead of json
base_url = self.files_url
url = f"{base_url}/v1/files"
headers = {
"Authorization": f"Bearer {self.api_key}"
# Do NOT set Content-Type - aiohttp will set it with boundary
}
print_info(f"POST {url}")
print_info(f"Headers: {headers}")
try:
async with self.session.request("POST", url, data=form, headers=headers) as response:
status = response.status
try:
data = await response.json()
except:
text = await response.text()
data = {"_raw_text": text} if text else {}
if status < 400:
print_success(f"Response: {status}")
else:
print_error(f"Response: {status}")
return status, data
except Exception as e:
print_error(f"Request failed: {e}")
return 0, {"error": str(e)}
async def upload_file_direct(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 1: Direct upload to xAI Files API
POST /v1/files with JSON body containing base64-encoded data
"""
import base64
print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)")
# Encode file content as base64
data_b64 = base64.b64encode(content).decode('ascii')
payload = {
"name": filename,
"content_type": mime_type,
"data": data_b64
}
print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)")
status, data = await self._request(
"POST",
"/v1/files",
use_files_api=True,
json=payload
)
return status, data
async def upload_file_chunked(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 2: Initialize + Chunk streaming upload
POST /v1/files:initialize → POST /v1/files:uploadChunks
"""
import base64
print_info("METHOD 2: Initialize + Chunk Streaming")
# Step 1: Initialize upload
print_info("Step 1: Initialize upload")
init_payload = {
"name": filename,
"content_type": mime_type
}
status, data = await self._request(
"POST",
"/v1/files:initialize",
use_files_api=True,
json=init_payload
)
print_json(data, "Initialize Response")
if status not in [200, 201]:
print_error("Failed to initialize upload")
return status, data
file_id = data.get("file_id")
if not file_id:
print_error("No file_id in initialize response")
return status, data
print_success(f"Initialized upload with file_id: {file_id}")
# Step 2: Upload chunks
print_info(f"Step 2: Upload {len(content)} bytes in chunks")
# Encode content as base64 for chunk upload
chunk_b64 = base64.b64encode(content).decode('ascii')
chunk_payload = {
"file_id": file_id,
"chunk": chunk_b64
}
status, data = await self._request(
"POST",
"/v1/files:uploadChunks",
use_files_api=True,
json=chunk_payload
)
print_json(data, "Upload Chunks Response")
if status in [200, 201]:
print_success(f"Uploaded file chunks: {file_id}")
self.uploaded_files.append(file_id)
return status, data
async def upload_file(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Try multiple upload methods until one succeeds
"""
print_info("Trying upload methods...")
# Try Method 0: Multipart form-data (what the server really wants!)
status0, data0 = await self.upload_file_multipart(content, filename, mime_type)
if status0 in [200, 201]:
file_id = data0.get("id") or data0.get("file_id") # Try both field names
if file_id:
self.uploaded_files.append(file_id)
print_success(f"✅ Multipart upload succeeded: {file_id}")
return status0, data0
else:
print_error("No 'id' or 'file_id' in response")
print_json(data0, "Response data")
print_warning(f"Multipart upload failed ({status0}), trying JSON upload...")
# Try Method 1: Direct upload with JSON
status1, data1 = await self.upload_file_direct(content, filename, mime_type)
if status1 in [200, 201]:
file_id = data1.get("file_id")
if file_id:
self.uploaded_files.append(file_id)
print_success(f"✅ Direct upload succeeded: {file_id}")
return status1, data1
print_warning(f"Direct upload failed ({status1}), trying chunked upload...")
# Try Method 2: Initialize + Chunks
status2, data2 = await self.upload_file_chunked(content, filename, mime_type)
if status2 in [200, 201]:
print_success("✅ Chunked upload succeeded")
return status2, data2
print_error("❌ All upload methods failed")
return status0, data0 # Return multipart method's error
# ========================================================================
# COLLECTION DOCUMENT OPERATIONS
# ========================================================================
async def add_document_to_collection(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""POST /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("POST",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents"""
return await self._request("GET",
f"/v1/collections/{collection_id}/documents")
async def get_collection_document(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("GET",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def update_collection_document(self, collection_id: str, file_id: str,
metadata: Dict) -> tuple[int, Any]:
"""PATCH /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("PATCH",
f"/v1/collections/{collection_id}/documents/{file_id}",
json={"metadata": metadata})
async def remove_document_from_collection(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""DELETE /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("DELETE",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def batch_get_documents(self, collection_id: str,
file_ids: List[str]) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents:batchGet"""
params = {"fileIds": ",".join(file_ids)}
return await self._request("GET",
f"/v1/collections/{collection_id}/documents:batchGet",
params=params)
# ========================================================================
# TEST SCENARIOS
# ========================================================================
async def test_basic_collection_crud(self):
"""Test 1: Basic Collection CRUD operations"""
print_header("TEST 1: Basic Collection CRUD")
# Create
print_info("Creating collection...")
status, data = await self.create_collection(
name="Test Collection 1",
metadata={"test": True, "purpose": "API testing"}
)
print_json(data, "Response")
if status not in [200, 201]:
print_error("Failed to create collection")
self.test_results["collection_crud"] = False
return None
# Try different possible field names for collection ID
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
if not collection_id:
print_error("No collection ID field in response")
print_json(data, "Response Data")
self.test_results["collection_crud"] = False
return None
print_success(f"Collection created: {collection_id}")
# Read
print_info("Reading collection...")
status, data = await self.get_collection(collection_id)
print_json(data, "Response")
# Update
print_info("Updating collection...")
status, data = await self.update_collection(
collection_id,
name="Test Collection 1 (Updated)",
metadata={"test": True, "updated": True}
)
print_json(data, "Response")
self.test_results["collection_crud"] = True
return collection_id
async def test_file_upload_and_structure(self, collection_id: str):
"""Test 2: File upload (two-step process)"""
print_header("TEST 2: File Upload (Two-Step) & Response Structure")
# Create test file content
test_content = b"""
This is a test document for xAI Collections API testing.
Topic: German Contract Law
Key Points:
- Contracts require offer and acceptance
- Consideration is necessary
- Written form may be required for certain contracts
This document contains sufficient content for testing.
"""
# STEP 1: Upload file to Files API
print_info("STEP 1: Uploading file to Files API (api.x.ai)...")
status, data = await self.upload_file(
content=test_content,
filename="test_document.txt",
mime_type="text/plain"
)
print_json(data, "Files API Upload Response")
if status not in [200, 201]:
print_error("File upload to Files API failed")
self.test_results["file_upload"] = False
return None
# Try both field names: 'id' (Files API) or 'file_id' (Collections API)
file_id = data.get("id") or data.get("file_id")
if not file_id:
print_error("No 'id' or 'file_id' field in response")
print_json(data, "Response for debugging")
self.test_results["file_upload"] = False
return None
print_success(f"File uploaded to Files API: {file_id}")
# STEP 2: Add file to collection using Management API
print_info("STEP 2: Adding file to collection (management-api.x.ai)...")
status2, data2 = await self.add_document_to_collection(collection_id, file_id)
print_json(data2, "Add to Collection Response")
if status2 not in [200, 201]:
print_error("Failed to add file to collection")
self.test_results["file_upload"] = False
return None
print_success(f"File added to collection: {file_id}")
self.test_results["file_upload"] = True
return file_id
async def test_document_in_collection(self, collection_id: str, file_id: str):
"""Test 3: Verify document is in collection and get details"""
print_header("TEST 3: Verify Document in Collection")
# Verify by listing documents
print_info("Listing collection documents...")
status, data = await self.get_collection_documents(collection_id)
print_json(data, "Collection Documents")
if status not in [200, 201]:
print_error("Failed to list documents")
self.test_results["add_to_collection"] = False
return False
# Get specific document
print_info("Getting specific document...")
status, data = await self.get_collection_document(collection_id, file_id)
print_json(data, "Document Details")
if status not in [200, 201]:
print_error("Failed to get document details")
self.test_results["add_to_collection"] = False
return False
print_success("Document verified in collection")
self.test_results["add_to_collection"] = True
return True
async def test_shared_file_across_collections(self, file_id: str):
"""Test 4: CRITICAL - Can same file_id be used in multiple collections?"""
print_header("TEST 4: Shared File Across Collections (CRITICAL)")
# Create second collection
print_info("Creating second collection...")
status, data = await self.create_collection(
name="Test Collection 2",
metadata={"test": True, "purpose": "Multi-collection test"}
)
if status not in [200, 201]:
print_error("Failed to create second collection")
self.test_results["shared_file"] = False
return
collection2_id = data.get("collection_id") or data.get("id")
print_success(f"Collection 2 created: {collection2_id}")
# Try to add SAME file_id to second collection
print_info(f"Adding SAME file_id {file_id} to collection 2...")
status, data = await self.add_document_to_collection(collection2_id, file_id)
print_json(data, "Response from adding existing file_id to second collection")
if status not in [200, 201]:
print_error("Failed to add same file to second collection")
print_warning("⚠️ Files might be collection-specific (BAD for our use case)")
self.test_results["shared_file"] = False
return
print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!")
print_success("✅ This is PERFECT for our architecture!")
# Verify both collections have the file
print_info("Verifying file in both collections...")
status1, data1 = await self.get_collection_documents(self.created_collections[0])
status2, data2 = await self.get_collection_documents(collection2_id)
print_json(data1, "Collection 1 Documents")
print_json(data2, "Collection 2 Documents")
# Extract file_ids from both collections to verify they match
docs1 = data1.get("documents", [])
docs2 = data2.get("documents", [])
file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1]
file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2]
if file_id in file_ids_1 and file_id in file_ids_2:
print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!")
print_info(" → We can store ONE xaiFileId per document!")
print_info(" → Simply track which collections contain it!")
self.test_results["shared_file"] = True
async def test_document_update(self, collection_id: str, file_id: str):
"""Test 5: Update document metadata"""
print_header("TEST 5: Update Document Metadata")
print_info("Updating document metadata...")
status, data = await self.update_collection_document(
collection_id,
file_id,
metadata={"updated_at": datetime.now().isoformat(), "version": 2}
)
print_json(data, "Update Response")
if status not in [200, 201]:
print_error("Failed to update document")
self.test_results["document_update"] = False
return
print_success("Document metadata updated")
self.test_results["document_update"] = True
async def test_document_removal(self):
"""Test 6: Remove document from collection"""
print_header("TEST 6: Remove Document from Collection")
if len(self.created_collections) < 2 or not self.uploaded_files:
print_warning("Skipping - need at least 2 collections and 1 file")
return
collection_id = self.created_collections[0]
file_id = self.uploaded_files[0]
print_info(f"Removing file {file_id} from collection {collection_id}...")
status, data = await self.remove_document_from_collection(collection_id, file_id)
print_json(data, "Response")
if status not in [200, 204]:
print_error("Failed to remove document")
self.test_results["document_removal"] = False
return
print_success("Document removed from collection")
# Verify removal
print_info("Verifying removal...")
status, data = await self.get_collection_documents(collection_id)
print_json(data, "Remaining Documents")
self.test_results["document_removal"] = True
async def test_batch_get(self):
"""Test 7: Batch get documents"""
print_header("TEST 7: Batch Get Documents")
if not self.created_collections or not self.uploaded_files:
print_warning("Skipping - need collections and files")
return
collection_id = self.created_collections[-1] # Use last collection
file_ids = self.uploaded_files
if not file_ids:
print_warning("No file IDs to batch get")
return
print_info(f"Batch getting {len(file_ids)} documents...")
status, data = await self.batch_get_documents(collection_id, file_ids)
print_json(data, "Batch Response")
self.test_results["batch_get"] = status in [200, 201]
async def cleanup(self):
"""Clean up all created test resources"""
print_header("CLEANUP: Deleting Test Resources")
# Delete collections (should cascade delete documents?)
for collection_id in list(self.created_collections):
print_info(f"Deleting collection {collection_id}...")
await self.delete_collection(collection_id)
print_success("Cleanup complete")
def print_summary(self):
"""Print test results summary"""
print_header("TEST RESULTS SUMMARY")
total = len(self.test_results)
passed = sum(1 for v in self.test_results.values() if v)
for test_name, result in self.test_results.items():
status = "✅ PASS" if result else "❌ FAIL"
print(f"{status} - {test_name}")
print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n")
# Critical findings
print_header("CRITICAL FINDINGS")
if "shared_file" in self.test_results:
if self.test_results["shared_file"]:
print_success("✅ Same file CAN be used in multiple collections")
print_info(" → We can use a SINGLE xaiFileId per document!")
print_info(" → Much simpler architecture!")
else:
print_error("❌ Files seem to be collection-specific")
print_warning(" → More complex mapping required")
print_warning(" → Each collection might need separate file upload")
async def main():
"""Run all tests"""
print_header("xAI Collections API Test Suite")
print_info(f"Management URL: {XAI_MANAGEMENT_URL}")
print_info(f"Files URL: {XAI_FILES_URL}")
print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}")
print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}")
async with XAICollectionsTestClient() as client:
try:
# Test 1: Basic Collection CRUD
collection_id = await client.test_basic_collection_crud()
if not collection_id:
print_error("Cannot continue without collection. Stopping.")
return
# Test 2: File Upload (now two-step process)
file_id = await client.test_file_upload_and_structure(collection_id)
if not file_id:
print_error("File upload failed. Continuing with remaining tests...")
else:
# Test 3: Verify document in collection
await client.test_document_in_collection(collection_id, file_id)
# Test 4: CRITICAL - Shared file test
await client.test_shared_file_across_collections(file_id)
# Test 5: Update document
await client.test_document_update(collection_id, file_id)
# Test 6: Remove document
await client.test_document_removal()
# Test 7: Batch get
await client.test_batch_get()
# Cleanup
await client.cleanup()
# Print summary
client.print_summary()
except Exception as e:
print_error(f"Test suite failed: {e}")
import traceback
traceback.print_exc()
# Try cleanup anyway
try:
await client.cleanup()
except:
pass
if __name__ == "__main__":
asyncio.run(main())