Compare commits

..

3 Commits

Author SHA1 Message Date
bsiggel
cb0e170ee9 feat(webhooks): update EspoCRM webhook configurations to use VMH naming and endpoints 2026-03-02 17:34:17 +00:00
bsiggel
bc917bd885 feat(webhooks): add EspoCRM Document Delete and Update webhook handlers with detailed logging
- Implemented document delete webhook handler to process delete notifications from EspoCRM.
- Added detailed logging for incoming payloads, including headers and entity information.
- Extracted and emitted document IDs for further processing.
- Implemented document update webhook handler to handle update notifications from EspoCRM.
- Enhanced logging for update payloads, including changed fields and metadata.
- Both handlers emit events for queue processing to facilitate further actions.

test: add comprehensive xAI Collections API test suite

- Created a test suite for xAI Collections API covering critical operations.
- Included tests for file uploads, CRUD operations on collections and documents, and response structures.
- Verified shared file behavior across multiple collections.
- Implemented cleanup procedures to remove test resources after execution.
2026-03-02 17:16:07 +00:00
bsiggel
0282149613 Fix cron expression in Calendar Sync Cron Job to trigger at second 0 and update entity retrieval methods in Beteiligte Sync steps for consistency 2026-03-02 09:36:09 +00:00
8 changed files with 1369 additions and 7 deletions

View File

@@ -17,7 +17,7 @@ config = {
'description': 'Runs calendar sync automatically every 15 minutes',
'flows': ['advoware-calendar-sync'],
'triggers': [
cron("*/15 * * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
cron("0 */15 * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
],
'enqueues': ['calendar_sync_all']
}

View File

@@ -0,0 +1,6 @@
"""
EspoCRM Generic Webhooks
Empfängt Webhooks von EspoCRM für verschiedene Entities.
Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext.
"""

View File

@@ -0,0 +1,198 @@
"""EspoCRM Webhook - Document Create
Empfängt Create-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/create")
],
"enqueues": ["vmh.document.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document creation in EspoCRM.
Receives notifications when documents are created and emits queue events
for processing (xAI sync, etc.).
Payload Analysis Mode: Logs comprehensive details about webhook structure.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
payload_details = []
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
# Sammle Details für Logging
detail = {
'index': idx,
'id': entity_id,
'name': entity.get('name', 'N/A'),
'type': entity.get('type', 'N/A'),
'size': entity.get('size', 'N/A'),
'all_fields': list(entity.keys())
}
payload_details.append(detail)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Type: {entity.get('type', 'N/A')}")
ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Parent/Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Type: {payload.get('type', 'N/A')}")
ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.create für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
# Log Stack Trace
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -0,0 +1,174 @@
"""EspoCRM Webhook - Document Delete
Empfängt Delete-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/delete")
],
"enqueues": ["vmh.document.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document deletion in EspoCRM.
Receives notifications when documents are deleted.
Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Bei Delete haben wir oft nur minimale Daten
if 'name' in entity:
ctx.logger.info(f" Name: {entity.get('name')}")
if 'deletedAt' in entity or 'deleted' in entity:
ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
if 'name' in payload:
ctx.logger.info(f" Name: {payload.get('name')}")
if 'deletedAt' in payload or 'deleted' in payload:
ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.delete für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -0,0 +1,196 @@
"""EspoCRM Webhook - Document Update
Empfängt Update-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/update")
],
"enqueues": ["vmh.document.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document updates in EspoCRM.
Receives notifications when documents are updated and emits queue events
for processing (xAI sync, etc.).
Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden.
xAI-Feld-Updates sollten keine neuen Webhooks triggern.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Prüfe ob CHANGED fields mitgeliefert werden
changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# Geänderte Felder
changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.update für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -54,7 +54,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
]
}
unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100)
unclean_result = await espocrm.list_entities('CBeteiligte', where=unclean_filter['where'], max_size=100)
unclean_entities = unclean_result.get('list', [])
ctx.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
@@ -73,7 +73,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
]
}
reset_result = await espocrm.search_entities('CBeteiligte', permanently_failed_filter, max_size=50)
reset_result = await espocrm.list_entities('CBeteiligte', where=permanently_failed_filter['where'], max_size=50)
reset_entities = reset_result.get('list', [])
# Reset permanently_failed entities
@@ -111,7 +111,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
]
}
stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50)
stale_result = await espocrm.list_entities('CBeteiligte', where=stale_filter['where'], max_size=50)
stale_entities = stale_result.get('list', [])
ctx.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")

View File

@@ -37,9 +37,9 @@ config = {
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
"""Zentraler Sync-Handler für Beteiligte"""
entity_id = event_data.entity_id
action = event_data.action
source = event_data.source
entity_id = event_data.get('entity_id')
action = event_data.get('action')
source = event_data.get('source')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")

788
test_xai_collections_api.py Executable file
View File

@@ -0,0 +1,788 @@
#!/usr/bin/env python3
"""
xAI Collections API Test Script
Tests all critical operations for our document sync requirements:
1. File upload and ID behavior (collection-specific vs global?)
2. Same file in multiple collections (shared file_id?)
3. CRUD operations on collections
4. CRUD operations on documents
5. Response structures and metadata
6. Update/versioning behavior
Usage:
export XAI_API_KEY="xai-..."
python test_xai_collections_api.py
"""
import os
import sys
import json
import asyncio
import aiohttp
from typing import Optional, Dict, Any, List
from datetime import datetime
import tempfile
# Configuration
XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai")
XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai")
XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key
XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload
if not XAI_MANAGEMENT_KEY:
print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!")
print(" export XAI_MANAGEMENT_KEY='xai-token-...'")
sys.exit(1)
if not XAI_API_KEY:
print("❌ ERROR: XAI_API_KEY environment variable not set!")
print(" export XAI_API_KEY='xai-...'")
sys.exit(1)
class Colors:
"""ANSI color codes for terminal output"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_header(text: str):
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n")
def print_success(text: str):
print(f"{Colors.GREEN}{text}{Colors.END}")
def print_error(text: str):
print(f"{Colors.RED}{text}{Colors.END}")
def print_info(text: str):
print(f"{Colors.BLUE} {text}{Colors.END}")
def print_warning(text: str):
print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}")
def print_json(data: Any, title: Optional[str] = None):
if title:
print(f"{Colors.BOLD}{title}:{Colors.END}")
print(json.dumps(data, indent=2, ensure_ascii=False))
class XAICollectionsTestClient:
"""Test client for xAI Collections API"""
def __init__(self):
self.management_url = XAI_MANAGEMENT_URL
self.files_url = XAI_FILES_URL
self.management_key = XAI_MANAGEMENT_KEY
self.api_key = XAI_API_KEY
self.session: Optional[aiohttp.ClientSession] = None
# Test state
self.created_collections: List[str] = []
self.uploaded_files: List[str] = []
self.test_results: Dict[str, bool] = {}
async def __aenter__(self):
# Session without default Content-Type (set per-request)
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]:
"""Make HTTP request and return (status, response_data)"""
base_url = self.files_url if use_files_api else self.management_url
url = f"{base_url}{path}"
# Set headers per-request
if 'headers' not in kwargs:
kwargs['headers'] = {}
# Set authorization
if use_files_api:
kwargs['headers']['Authorization'] = f"Bearer {self.api_key}"
else:
kwargs['headers']['Authorization'] = f"Bearer {self.management_key}"
# Set Content-Type for JSON requests
if 'json' in kwargs:
kwargs['headers']['Content-Type'] = 'application/json'
print_info(f"{method} {url}")
print_info(f"Headers: {kwargs.get('headers', {})}")
try:
async with self.session.request(method, url, **kwargs) as response:
status = response.status
try:
data = await response.json()
except:
text = await response.text()
data = {"_raw_text": text} if text else {}
if status < 400:
print_success(f"Response: {status}")
else:
print_error(f"Response: {status}")
return status, data
except Exception as e:
print_error(f"Request failed: {e}")
return 0, {"error": str(e)}
# ========================================================================
# COLLECTION OPERATIONS
# ========================================================================
async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]:
"""POST /v1/collections"""
payload = {
"collection_name": name, # xAI uses "collection_name" not "name"
"metadata": metadata or {}
}
status, data = await self._request("POST", "/v1/collections", json=payload)
if status == 200 or status == 201:
# Try different possible field names for collection ID
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
if collection_id:
self.created_collections.append(collection_id)
print_success(f"Created collection: {collection_id}")
return status, data
async def get_collection(self, collection_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}"""
return await self._request("GET", f"/v1/collections/{collection_id}")
async def list_collections(self) -> tuple[int, Any]:
"""GET /v1/collections"""
return await self._request("GET", "/v1/collections")
async def update_collection(self, collection_id: str, name: Optional[str] = None,
metadata: Optional[Dict] = None) -> tuple[int, Any]:
"""PUT /v1/collections/{collection_id}"""
payload = {}
if name:
payload["collection_name"] = name # xAI uses "collection_name"
if metadata:
payload["metadata"] = metadata
return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload)
async def delete_collection(self, collection_id: str) -> tuple[int, Any]:
"""DELETE /v1/collections/{collection_id}"""
status, data = await self._request("DELETE", f"/v1/collections/{collection_id}")
if status == 200 or status == 204:
if collection_id in self.created_collections:
self.created_collections.remove(collection_id)
return status, data
# ========================================================================
# FILE OPERATIONS (multiple upload methods)
# ========================================================================
async def upload_file_multipart(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 0: Multipart form-data upload (what the server actually expects!)
POST /v1/files with multipart/form-data
"""
print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)")
# Create multipart form data
form = aiohttp.FormData()
form.add_field('file', content, filename=filename, content_type=mime_type)
print_info(f"Uploading {len(content)} bytes as multipart/form-data")
# Use _request but with form data instead of json
base_url = self.files_url
url = f"{base_url}/v1/files"
headers = {
"Authorization": f"Bearer {self.api_key}"
# Do NOT set Content-Type - aiohttp will set it with boundary
}
print_info(f"POST {url}")
print_info(f"Headers: {headers}")
try:
async with self.session.request("POST", url, data=form, headers=headers) as response:
status = response.status
try:
data = await response.json()
except:
text = await response.text()
data = {"_raw_text": text} if text else {}
if status < 400:
print_success(f"Response: {status}")
else:
print_error(f"Response: {status}")
return status, data
except Exception as e:
print_error(f"Request failed: {e}")
return 0, {"error": str(e)}
async def upload_file_direct(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 1: Direct upload to xAI Files API
POST /v1/files with JSON body containing base64-encoded data
"""
import base64
print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)")
# Encode file content as base64
data_b64 = base64.b64encode(content).decode('ascii')
payload = {
"name": filename,
"content_type": mime_type,
"data": data_b64
}
print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)")
status, data = await self._request(
"POST",
"/v1/files",
use_files_api=True,
json=payload
)
return status, data
async def upload_file_chunked(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Method 2: Initialize + Chunk streaming upload
POST /v1/files:initialize → POST /v1/files:uploadChunks
"""
import base64
print_info("METHOD 2: Initialize + Chunk Streaming")
# Step 1: Initialize upload
print_info("Step 1: Initialize upload")
init_payload = {
"name": filename,
"content_type": mime_type
}
status, data = await self._request(
"POST",
"/v1/files:initialize",
use_files_api=True,
json=init_payload
)
print_json(data, "Initialize Response")
if status not in [200, 201]:
print_error("Failed to initialize upload")
return status, data
file_id = data.get("file_id")
if not file_id:
print_error("No file_id in initialize response")
return status, data
print_success(f"Initialized upload with file_id: {file_id}")
# Step 2: Upload chunks
print_info(f"Step 2: Upload {len(content)} bytes in chunks")
# Encode content as base64 for chunk upload
chunk_b64 = base64.b64encode(content).decode('ascii')
chunk_payload = {
"file_id": file_id,
"chunk": chunk_b64
}
status, data = await self._request(
"POST",
"/v1/files:uploadChunks",
use_files_api=True,
json=chunk_payload
)
print_json(data, "Upload Chunks Response")
if status in [200, 201]:
print_success(f"Uploaded file chunks: {file_id}")
self.uploaded_files.append(file_id)
return status, data
async def upload_file(self, content: bytes, filename: str,
mime_type: str = "text/plain") -> tuple[int, Any]:
"""
Try multiple upload methods until one succeeds
"""
print_info("Trying upload methods...")
# Try Method 0: Multipart form-data (what the server really wants!)
status0, data0 = await self.upload_file_multipart(content, filename, mime_type)
if status0 in [200, 201]:
file_id = data0.get("id") or data0.get("file_id") # Try both field names
if file_id:
self.uploaded_files.append(file_id)
print_success(f"✅ Multipart upload succeeded: {file_id}")
return status0, data0
else:
print_error("No 'id' or 'file_id' in response")
print_json(data0, "Response data")
print_warning(f"Multipart upload failed ({status0}), trying JSON upload...")
# Try Method 1: Direct upload with JSON
status1, data1 = await self.upload_file_direct(content, filename, mime_type)
if status1 in [200, 201]:
file_id = data1.get("file_id")
if file_id:
self.uploaded_files.append(file_id)
print_success(f"✅ Direct upload succeeded: {file_id}")
return status1, data1
print_warning(f"Direct upload failed ({status1}), trying chunked upload...")
# Try Method 2: Initialize + Chunks
status2, data2 = await self.upload_file_chunked(content, filename, mime_type)
if status2 in [200, 201]:
print_success("✅ Chunked upload succeeded")
return status2, data2
print_error("❌ All upload methods failed")
return status0, data0 # Return multipart method's error
# ========================================================================
# COLLECTION DOCUMENT OPERATIONS
# ========================================================================
async def add_document_to_collection(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""POST /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("POST",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents"""
return await self._request("GET",
f"/v1/collections/{collection_id}/documents")
async def get_collection_document(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("GET",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def update_collection_document(self, collection_id: str, file_id: str,
metadata: Dict) -> tuple[int, Any]:
"""PATCH /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("PATCH",
f"/v1/collections/{collection_id}/documents/{file_id}",
json={"metadata": metadata})
async def remove_document_from_collection(self, collection_id: str,
file_id: str) -> tuple[int, Any]:
"""DELETE /v1/collections/{collection_id}/documents/{file_id}"""
return await self._request("DELETE",
f"/v1/collections/{collection_id}/documents/{file_id}")
async def batch_get_documents(self, collection_id: str,
file_ids: List[str]) -> tuple[int, Any]:
"""GET /v1/collections/{collection_id}/documents:batchGet"""
params = {"fileIds": ",".join(file_ids)}
return await self._request("GET",
f"/v1/collections/{collection_id}/documents:batchGet",
params=params)
# ========================================================================
# TEST SCENARIOS
# ========================================================================
async def test_basic_collection_crud(self):
"""Test 1: Basic Collection CRUD operations"""
print_header("TEST 1: Basic Collection CRUD")
# Create
print_info("Creating collection...")
status, data = await self.create_collection(
name="Test Collection 1",
metadata={"test": True, "purpose": "API testing"}
)
print_json(data, "Response")
if status not in [200, 201]:
print_error("Failed to create collection")
self.test_results["collection_crud"] = False
return None
# Try different possible field names for collection ID
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
if not collection_id:
print_error("No collection ID field in response")
print_json(data, "Response Data")
self.test_results["collection_crud"] = False
return None
print_success(f"Collection created: {collection_id}")
# Read
print_info("Reading collection...")
status, data = await self.get_collection(collection_id)
print_json(data, "Response")
# Update
print_info("Updating collection...")
status, data = await self.update_collection(
collection_id,
name="Test Collection 1 (Updated)",
metadata={"test": True, "updated": True}
)
print_json(data, "Response")
self.test_results["collection_crud"] = True
return collection_id
async def test_file_upload_and_structure(self, collection_id: str):
"""Test 2: File upload (two-step process)"""
print_header("TEST 2: File Upload (Two-Step) & Response Structure")
# Create test file content
test_content = b"""
This is a test document for xAI Collections API testing.
Topic: German Contract Law
Key Points:
- Contracts require offer and acceptance
- Consideration is necessary
- Written form may be required for certain contracts
This document contains sufficient content for testing.
"""
# STEP 1: Upload file to Files API
print_info("STEP 1: Uploading file to Files API (api.x.ai)...")
status, data = await self.upload_file(
content=test_content,
filename="test_document.txt",
mime_type="text/plain"
)
print_json(data, "Files API Upload Response")
if status not in [200, 201]:
print_error("File upload to Files API failed")
self.test_results["file_upload"] = False
return None
# Try both field names: 'id' (Files API) or 'file_id' (Collections API)
file_id = data.get("id") or data.get("file_id")
if not file_id:
print_error("No 'id' or 'file_id' field in response")
print_json(data, "Response for debugging")
self.test_results["file_upload"] = False
return None
print_success(f"File uploaded to Files API: {file_id}")
# STEP 2: Add file to collection using Management API
print_info("STEP 2: Adding file to collection (management-api.x.ai)...")
status2, data2 = await self.add_document_to_collection(collection_id, file_id)
print_json(data2, "Add to Collection Response")
if status2 not in [200, 201]:
print_error("Failed to add file to collection")
self.test_results["file_upload"] = False
return None
print_success(f"File added to collection: {file_id}")
self.test_results["file_upload"] = True
return file_id
async def test_document_in_collection(self, collection_id: str, file_id: str):
"""Test 3: Verify document is in collection and get details"""
print_header("TEST 3: Verify Document in Collection")
# Verify by listing documents
print_info("Listing collection documents...")
status, data = await self.get_collection_documents(collection_id)
print_json(data, "Collection Documents")
if status not in [200, 201]:
print_error("Failed to list documents")
self.test_results["add_to_collection"] = False
return False
# Get specific document
print_info("Getting specific document...")
status, data = await self.get_collection_document(collection_id, file_id)
print_json(data, "Document Details")
if status not in [200, 201]:
print_error("Failed to get document details")
self.test_results["add_to_collection"] = False
return False
print_success("Document verified in collection")
self.test_results["add_to_collection"] = True
return True
async def test_shared_file_across_collections(self, file_id: str):
"""Test 4: CRITICAL - Can same file_id be used in multiple collections?"""
print_header("TEST 4: Shared File Across Collections (CRITICAL)")
# Create second collection
print_info("Creating second collection...")
status, data = await self.create_collection(
name="Test Collection 2",
metadata={"test": True, "purpose": "Multi-collection test"}
)
if status not in [200, 201]:
print_error("Failed to create second collection")
self.test_results["shared_file"] = False
return
collection2_id = data.get("collection_id") or data.get("id")
print_success(f"Collection 2 created: {collection2_id}")
# Try to add SAME file_id to second collection
print_info(f"Adding SAME file_id {file_id} to collection 2...")
status, data = await self.add_document_to_collection(collection2_id, file_id)
print_json(data, "Response from adding existing file_id to second collection")
if status not in [200, 201]:
print_error("Failed to add same file to second collection")
print_warning("⚠️ Files might be collection-specific (BAD for our use case)")
self.test_results["shared_file"] = False
return
print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!")
print_success("✅ This is PERFECT for our architecture!")
# Verify both collections have the file
print_info("Verifying file in both collections...")
status1, data1 = await self.get_collection_documents(self.created_collections[0])
status2, data2 = await self.get_collection_documents(collection2_id)
print_json(data1, "Collection 1 Documents")
print_json(data2, "Collection 2 Documents")
# Extract file_ids from both collections to verify they match
docs1 = data1.get("documents", [])
docs2 = data2.get("documents", [])
file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1]
file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2]
if file_id in file_ids_1 and file_id in file_ids_2:
print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!")
print_info(" → We can store ONE xaiFileId per document!")
print_info(" → Simply track which collections contain it!")
self.test_results["shared_file"] = True
async def test_document_update(self, collection_id: str, file_id: str):
"""Test 5: Update document metadata"""
print_header("TEST 5: Update Document Metadata")
print_info("Updating document metadata...")
status, data = await self.update_collection_document(
collection_id,
file_id,
metadata={"updated_at": datetime.now().isoformat(), "version": 2}
)
print_json(data, "Update Response")
if status not in [200, 201]:
print_error("Failed to update document")
self.test_results["document_update"] = False
return
print_success("Document metadata updated")
self.test_results["document_update"] = True
async def test_document_removal(self):
"""Test 6: Remove document from collection"""
print_header("TEST 6: Remove Document from Collection")
if len(self.created_collections) < 2 or not self.uploaded_files:
print_warning("Skipping - need at least 2 collections and 1 file")
return
collection_id = self.created_collections[0]
file_id = self.uploaded_files[0]
print_info(f"Removing file {file_id} from collection {collection_id}...")
status, data = await self.remove_document_from_collection(collection_id, file_id)
print_json(data, "Response")
if status not in [200, 204]:
print_error("Failed to remove document")
self.test_results["document_removal"] = False
return
print_success("Document removed from collection")
# Verify removal
print_info("Verifying removal...")
status, data = await self.get_collection_documents(collection_id)
print_json(data, "Remaining Documents")
self.test_results["document_removal"] = True
async def test_batch_get(self):
"""Test 7: Batch get documents"""
print_header("TEST 7: Batch Get Documents")
if not self.created_collections or not self.uploaded_files:
print_warning("Skipping - need collections and files")
return
collection_id = self.created_collections[-1] # Use last collection
file_ids = self.uploaded_files
if not file_ids:
print_warning("No file IDs to batch get")
return
print_info(f"Batch getting {len(file_ids)} documents...")
status, data = await self.batch_get_documents(collection_id, file_ids)
print_json(data, "Batch Response")
self.test_results["batch_get"] = status in [200, 201]
async def cleanup(self):
"""Clean up all created test resources"""
print_header("CLEANUP: Deleting Test Resources")
# Delete collections (should cascade delete documents?)
for collection_id in list(self.created_collections):
print_info(f"Deleting collection {collection_id}...")
await self.delete_collection(collection_id)
print_success("Cleanup complete")
def print_summary(self):
"""Print test results summary"""
print_header("TEST RESULTS SUMMARY")
total = len(self.test_results)
passed = sum(1 for v in self.test_results.values() if v)
for test_name, result in self.test_results.items():
status = "✅ PASS" if result else "❌ FAIL"
print(f"{status} - {test_name}")
print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n")
# Critical findings
print_header("CRITICAL FINDINGS")
if "shared_file" in self.test_results:
if self.test_results["shared_file"]:
print_success("✅ Same file CAN be used in multiple collections")
print_info(" → We can use a SINGLE xaiFileId per document!")
print_info(" → Much simpler architecture!")
else:
print_error("❌ Files seem to be collection-specific")
print_warning(" → More complex mapping required")
print_warning(" → Each collection might need separate file upload")
async def main():
"""Run all tests"""
print_header("xAI Collections API Test Suite")
print_info(f"Management URL: {XAI_MANAGEMENT_URL}")
print_info(f"Files URL: {XAI_FILES_URL}")
print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}")
print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}")
async with XAICollectionsTestClient() as client:
try:
# Test 1: Basic Collection CRUD
collection_id = await client.test_basic_collection_crud()
if not collection_id:
print_error("Cannot continue without collection. Stopping.")
return
# Test 2: File Upload (now two-step process)
file_id = await client.test_file_upload_and_structure(collection_id)
if not file_id:
print_error("File upload failed. Continuing with remaining tests...")
else:
# Test 3: Verify document in collection
await client.test_document_in_collection(collection_id, file_id)
# Test 4: CRITICAL - Shared file test
await client.test_shared_file_across_collections(file_id)
# Test 5: Update document
await client.test_document_update(collection_id, file_id)
# Test 6: Remove document
await client.test_document_removal()
# Test 7: Batch get
await client.test_batch_get()
# Cleanup
await client.cleanup()
# Print summary
client.print_summary()
except Exception as e:
print_error(f"Test suite failed: {e}")
import traceback
traceback.print_exc()
# Try cleanup anyway
try:
await client.cleanup()
except:
pass
if __name__ == "__main__":
asyncio.run(main())