Compare commits
3 Commits
0740952063
...
cb0e170ee9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb0e170ee9 | ||
|
|
bc917bd885 | ||
|
|
0282149613 |
@@ -17,7 +17,7 @@ config = {
|
||||
'description': 'Runs calendar sync automatically every 15 minutes',
|
||||
'flows': ['advoware-calendar-sync'],
|
||||
'triggers': [
|
||||
cron("*/15 * * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
|
||||
cron("0 */15 * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
|
||||
],
|
||||
'enqueues': ['calendar_sync_all']
|
||||
}
|
||||
|
||||
6
steps/espocrm_webhooks/__init__.py
Normal file
6
steps/espocrm_webhooks/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
EspoCRM Generic Webhooks
|
||||
|
||||
Empfängt Webhooks von EspoCRM für verschiedene Entities.
|
||||
Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext.
|
||||
"""
|
||||
198
steps/espocrm_webhooks/document_create_webhook_api_step.py
Normal file
198
steps/espocrm_webhooks/document_create_webhook_api_step.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""EspoCRM Webhook - Document Create
|
||||
|
||||
Empfängt Create-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Create",
|
||||
"description": "Empfängt Create-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/create")
|
||||
],
|
||||
"enqueues": ["vmh.document.create"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document creation in EspoCRM.
|
||||
|
||||
Receives notifications when documents are created and emits queue events
|
||||
for processing (xAI sync, etc.).
|
||||
|
||||
Payload Analysis Mode: Logs comprehensive details about webhook structure.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
payload_details = []
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
# Sammle Details für Logging
|
||||
detail = {
|
||||
'index': idx,
|
||||
'id': entity_id,
|
||||
'name': entity.get('name', 'N/A'),
|
||||
'type': entity.get('type', 'N/A'),
|
||||
'size': entity.get('size', 'N/A'),
|
||||
'all_fields': list(entity.keys())
|
||||
}
|
||||
payload_details.append(detail)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {entity.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# xAI-relevante Felder (falls vorhanden)
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Parent/Relationship Felder
|
||||
rel_fields = {k: v for k, v in entity.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {payload.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in payload.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'create',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.create',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'create',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.create für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'create',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
# Log Stack Trace
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
174
steps/espocrm_webhooks/document_delete_webhook_api_step.py
Normal file
174
steps/espocrm_webhooks/document_delete_webhook_api_step.py
Normal file
@@ -0,0 +1,174 @@
|
||||
"""EspoCRM Webhook - Document Delete
|
||||
|
||||
Empfängt Delete-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Delete",
|
||||
"description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/delete")
|
||||
],
|
||||
"enqueues": ["vmh.document.delete"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document deletion in EspoCRM.
|
||||
|
||||
Receives notifications when documents are deleted.
|
||||
Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# Bei Delete haben wir oft nur minimale Daten
|
||||
if 'name' in entity:
|
||||
ctx.logger.info(f" Name: {entity.get('name')}")
|
||||
if 'deletedAt' in entity or 'deleted' in entity:
|
||||
ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}")
|
||||
|
||||
# xAI-relevante Felder (falls vorhanden)
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 🗑️ Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
if 'name' in payload:
|
||||
ctx.logger.info(f" Name: {payload.get('name')}")
|
||||
if 'deletedAt' in payload or 'deleted' in payload:
|
||||
ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'delete',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.delete',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'delete',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.delete für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'delete',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
196
steps/espocrm_webhooks/document_update_webhook_api_step.py
Normal file
196
steps/espocrm_webhooks/document_update_webhook_api_step.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""EspoCRM Webhook - Document Update
|
||||
|
||||
Empfängt Update-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Update",
|
||||
"description": "Empfängt Update-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/update")
|
||||
],
|
||||
"enqueues": ["vmh.document.update"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document updates in EspoCRM.
|
||||
|
||||
Receives notifications when documents are updated and emits queue events
|
||||
for processing (xAI sync, etc.).
|
||||
|
||||
Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden.
|
||||
xAI-Feld-Updates sollten keine neuen Webhooks triggern.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}")
|
||||
ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# Prüfe ob CHANGED fields mitgeliefert werden
|
||||
changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields')
|
||||
if changed_fields:
|
||||
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in entity.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}")
|
||||
ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
# Geänderte Felder
|
||||
changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields')
|
||||
if changed_fields:
|
||||
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in payload.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'update',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.update',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'update',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.update für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'update',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
@@ -54,7 +54,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
|
||||
]
|
||||
}
|
||||
|
||||
unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100)
|
||||
unclean_result = await espocrm.list_entities('CBeteiligte', where=unclean_filter['where'], max_size=100)
|
||||
unclean_entities = unclean_result.get('list', [])
|
||||
|
||||
ctx.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
|
||||
@@ -73,7 +73,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
|
||||
]
|
||||
}
|
||||
|
||||
reset_result = await espocrm.search_entities('CBeteiligte', permanently_failed_filter, max_size=50)
|
||||
reset_result = await espocrm.list_entities('CBeteiligte', where=permanently_failed_filter['where'], max_size=50)
|
||||
reset_entities = reset_result.get('list', [])
|
||||
|
||||
# Reset permanently_failed entities
|
||||
@@ -111,7 +111,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext):
|
||||
]
|
||||
}
|
||||
|
||||
stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50)
|
||||
stale_result = await espocrm.list_entities('CBeteiligte', where=stale_filter['where'], max_size=50)
|
||||
stale_entities = stale_result.get('list', [])
|
||||
|
||||
ctx.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")
|
||||
|
||||
@@ -37,9 +37,9 @@ config = {
|
||||
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
"""Zentraler Sync-Handler für Beteiligte"""
|
||||
entity_id = event_data.entity_id
|
||||
action = event_data.action
|
||||
source = event_data.source
|
||||
entity_id = event_data.get('entity_id')
|
||||
action = event_data.get('action')
|
||||
source = event_data.get('source')
|
||||
|
||||
if not entity_id:
|
||||
ctx.logger.error("Keine entity_id im Event gefunden")
|
||||
|
||||
788
test_xai_collections_api.py
Executable file
788
test_xai_collections_api.py
Executable file
@@ -0,0 +1,788 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
xAI Collections API Test Script
|
||||
|
||||
Tests all critical operations for our document sync requirements:
|
||||
1. File upload and ID behavior (collection-specific vs global?)
|
||||
2. Same file in multiple collections (shared file_id?)
|
||||
3. CRUD operations on collections
|
||||
4. CRUD operations on documents
|
||||
5. Response structures and metadata
|
||||
6. Update/versioning behavior
|
||||
|
||||
Usage:
|
||||
export XAI_API_KEY="xai-..."
|
||||
python test_xai_collections_api.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from typing import Optional, Dict, Any, List
|
||||
from datetime import datetime
|
||||
import tempfile
|
||||
|
||||
# Configuration
|
||||
XAI_MANAGEMENT_URL = os.getenv("XAI_MANAGEMENT_URL", "https://management-api.x.ai")
|
||||
XAI_FILES_URL = os.getenv("XAI_FILES_URL", "https://api.x.ai")
|
||||
XAI_MANAGEMENT_KEY = os.getenv("XAI_MANAGEMENT_KEY", "") # Management API Key
|
||||
XAI_API_KEY = os.getenv("XAI_API_KEY", "") # Regular API Key for file upload
|
||||
|
||||
if not XAI_MANAGEMENT_KEY:
|
||||
print("❌ ERROR: XAI_MANAGEMENT_KEY environment variable not set!")
|
||||
print(" export XAI_MANAGEMENT_KEY='xai-token-...'")
|
||||
sys.exit(1)
|
||||
|
||||
if not XAI_API_KEY:
|
||||
print("❌ ERROR: XAI_API_KEY environment variable not set!")
|
||||
print(" export XAI_API_KEY='xai-...'")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
class Colors:
|
||||
"""ANSI color codes for terminal output"""
|
||||
HEADER = '\033[95m'
|
||||
BLUE = '\033[94m'
|
||||
CYAN = '\033[96m'
|
||||
GREEN = '\033[92m'
|
||||
YELLOW = '\033[93m'
|
||||
RED = '\033[91m'
|
||||
BOLD = '\033[1m'
|
||||
UNDERLINE = '\033[4m'
|
||||
END = '\033[0m'
|
||||
|
||||
|
||||
def print_header(text: str):
|
||||
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}{text}{Colors.END}")
|
||||
print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}\n")
|
||||
|
||||
|
||||
def print_success(text: str):
|
||||
print(f"{Colors.GREEN}✅ {text}{Colors.END}")
|
||||
|
||||
|
||||
def print_error(text: str):
|
||||
print(f"{Colors.RED}❌ {text}{Colors.END}")
|
||||
|
||||
|
||||
def print_info(text: str):
|
||||
print(f"{Colors.BLUE}ℹ️ {text}{Colors.END}")
|
||||
|
||||
|
||||
def print_warning(text: str):
|
||||
print(f"{Colors.YELLOW}⚠️ {text}{Colors.END}")
|
||||
|
||||
|
||||
def print_json(data: Any, title: Optional[str] = None):
|
||||
if title:
|
||||
print(f"{Colors.BOLD}{title}:{Colors.END}")
|
||||
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
class XAICollectionsTestClient:
|
||||
"""Test client for xAI Collections API"""
|
||||
|
||||
def __init__(self):
|
||||
self.management_url = XAI_MANAGEMENT_URL
|
||||
self.files_url = XAI_FILES_URL
|
||||
self.management_key = XAI_MANAGEMENT_KEY
|
||||
self.api_key = XAI_API_KEY
|
||||
self.session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
# Test state
|
||||
self.created_collections: List[str] = []
|
||||
self.uploaded_files: List[str] = []
|
||||
self.test_results: Dict[str, bool] = {}
|
||||
|
||||
async def __aenter__(self):
|
||||
# Session without default Content-Type (set per-request)
|
||||
self.session = aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=30)
|
||||
)
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
|
||||
async def _request(self, method: str, path: str, use_files_api: bool = False, **kwargs) -> tuple[int, Any]:
|
||||
"""Make HTTP request and return (status, response_data)"""
|
||||
base_url = self.files_url if use_files_api else self.management_url
|
||||
url = f"{base_url}{path}"
|
||||
|
||||
# Set headers per-request
|
||||
if 'headers' not in kwargs:
|
||||
kwargs['headers'] = {}
|
||||
|
||||
# Set authorization
|
||||
if use_files_api:
|
||||
kwargs['headers']['Authorization'] = f"Bearer {self.api_key}"
|
||||
else:
|
||||
kwargs['headers']['Authorization'] = f"Bearer {self.management_key}"
|
||||
|
||||
# Set Content-Type for JSON requests
|
||||
if 'json' in kwargs:
|
||||
kwargs['headers']['Content-Type'] = 'application/json'
|
||||
|
||||
print_info(f"{method} {url}")
|
||||
print_info(f"Headers: {kwargs.get('headers', {})}")
|
||||
|
||||
try:
|
||||
async with self.session.request(method, url, **kwargs) as response:
|
||||
status = response.status
|
||||
|
||||
try:
|
||||
data = await response.json()
|
||||
except:
|
||||
text = await response.text()
|
||||
data = {"_raw_text": text} if text else {}
|
||||
|
||||
if status < 400:
|
||||
print_success(f"Response: {status}")
|
||||
else:
|
||||
print_error(f"Response: {status}")
|
||||
|
||||
return status, data
|
||||
|
||||
except Exception as e:
|
||||
print_error(f"Request failed: {e}")
|
||||
return 0, {"error": str(e)}
|
||||
|
||||
# ========================================================================
|
||||
# COLLECTION OPERATIONS
|
||||
# ========================================================================
|
||||
|
||||
async def create_collection(self, name: str, metadata: Optional[Dict] = None) -> tuple[int, Any]:
|
||||
"""POST /v1/collections"""
|
||||
payload = {
|
||||
"collection_name": name, # xAI uses "collection_name" not "name"
|
||||
"metadata": metadata or {}
|
||||
}
|
||||
status, data = await self._request("POST", "/v1/collections", json=payload)
|
||||
|
||||
if status == 200 or status == 201:
|
||||
# Try different possible field names for collection ID
|
||||
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
|
||||
if collection_id:
|
||||
self.created_collections.append(collection_id)
|
||||
print_success(f"Created collection: {collection_id}")
|
||||
|
||||
return status, data
|
||||
|
||||
async def get_collection(self, collection_id: str) -> tuple[int, Any]:
|
||||
"""GET /v1/collections/{collection_id}"""
|
||||
return await self._request("GET", f"/v1/collections/{collection_id}")
|
||||
|
||||
async def list_collections(self) -> tuple[int, Any]:
|
||||
"""GET /v1/collections"""
|
||||
return await self._request("GET", "/v1/collections")
|
||||
|
||||
async def update_collection(self, collection_id: str, name: Optional[str] = None,
|
||||
metadata: Optional[Dict] = None) -> tuple[int, Any]:
|
||||
"""PUT /v1/collections/{collection_id}"""
|
||||
payload = {}
|
||||
if name:
|
||||
payload["collection_name"] = name # xAI uses "collection_name"
|
||||
if metadata:
|
||||
payload["metadata"] = metadata
|
||||
|
||||
return await self._request("PUT", f"/v1/collections/{collection_id}", json=payload)
|
||||
|
||||
async def delete_collection(self, collection_id: str) -> tuple[int, Any]:
|
||||
"""DELETE /v1/collections/{collection_id}"""
|
||||
status, data = await self._request("DELETE", f"/v1/collections/{collection_id}")
|
||||
|
||||
if status == 200 or status == 204:
|
||||
if collection_id in self.created_collections:
|
||||
self.created_collections.remove(collection_id)
|
||||
|
||||
return status, data
|
||||
|
||||
# ========================================================================
|
||||
# FILE OPERATIONS (multiple upload methods)
|
||||
# ========================================================================
|
||||
|
||||
async def upload_file_multipart(self, content: bytes, filename: str,
|
||||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||||
"""
|
||||
Method 0: Multipart form-data upload (what the server actually expects!)
|
||||
POST /v1/files with multipart/form-data
|
||||
"""
|
||||
print_info("METHOD 0: Multipart Form-Data Upload (POST /v1/files)")
|
||||
|
||||
# Create multipart form data
|
||||
form = aiohttp.FormData()
|
||||
form.add_field('file', content, filename=filename, content_type=mime_type)
|
||||
|
||||
print_info(f"Uploading {len(content)} bytes as multipart/form-data")
|
||||
|
||||
# Use _request but with form data instead of json
|
||||
base_url = self.files_url
|
||||
url = f"{base_url}/v1/files"
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.api_key}"
|
||||
# Do NOT set Content-Type - aiohttp will set it with boundary
|
||||
}
|
||||
|
||||
print_info(f"POST {url}")
|
||||
print_info(f"Headers: {headers}")
|
||||
|
||||
try:
|
||||
async with self.session.request("POST", url, data=form, headers=headers) as response:
|
||||
status = response.status
|
||||
|
||||
try:
|
||||
data = await response.json()
|
||||
except:
|
||||
text = await response.text()
|
||||
data = {"_raw_text": text} if text else {}
|
||||
|
||||
if status < 400:
|
||||
print_success(f"Response: {status}")
|
||||
else:
|
||||
print_error(f"Response: {status}")
|
||||
|
||||
return status, data
|
||||
|
||||
except Exception as e:
|
||||
print_error(f"Request failed: {e}")
|
||||
return 0, {"error": str(e)}
|
||||
|
||||
async def upload_file_direct(self, content: bytes, filename: str,
|
||||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||||
"""
|
||||
Method 1: Direct upload to xAI Files API
|
||||
POST /v1/files with JSON body containing base64-encoded data
|
||||
"""
|
||||
import base64
|
||||
|
||||
print_info("METHOD 1: Direct Upload (POST /v1/files with JSON)")
|
||||
|
||||
# Encode file content as base64
|
||||
data_b64 = base64.b64encode(content).decode('ascii')
|
||||
|
||||
payload = {
|
||||
"name": filename,
|
||||
"content_type": mime_type,
|
||||
"data": data_b64
|
||||
}
|
||||
|
||||
print_info(f"Uploading {len(content)} bytes as base64 ({len(data_b64)} chars)")
|
||||
|
||||
status, data = await self._request(
|
||||
"POST",
|
||||
"/v1/files",
|
||||
use_files_api=True,
|
||||
json=payload
|
||||
)
|
||||
|
||||
return status, data
|
||||
|
||||
async def upload_file_chunked(self, content: bytes, filename: str,
|
||||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||||
"""
|
||||
Method 2: Initialize + Chunk streaming upload
|
||||
POST /v1/files:initialize → POST /v1/files:uploadChunks
|
||||
"""
|
||||
import base64
|
||||
|
||||
print_info("METHOD 2: Initialize + Chunk Streaming")
|
||||
|
||||
# Step 1: Initialize upload
|
||||
print_info("Step 1: Initialize upload")
|
||||
init_payload = {
|
||||
"name": filename,
|
||||
"content_type": mime_type
|
||||
}
|
||||
|
||||
status, data = await self._request(
|
||||
"POST",
|
||||
"/v1/files:initialize",
|
||||
use_files_api=True,
|
||||
json=init_payload
|
||||
)
|
||||
|
||||
print_json(data, "Initialize Response")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to initialize upload")
|
||||
return status, data
|
||||
|
||||
file_id = data.get("file_id")
|
||||
if not file_id:
|
||||
print_error("No file_id in initialize response")
|
||||
return status, data
|
||||
|
||||
print_success(f"Initialized upload with file_id: {file_id}")
|
||||
|
||||
# Step 2: Upload chunks
|
||||
print_info(f"Step 2: Upload {len(content)} bytes in chunks")
|
||||
|
||||
# Encode content as base64 for chunk upload
|
||||
chunk_b64 = base64.b64encode(content).decode('ascii')
|
||||
|
||||
chunk_payload = {
|
||||
"file_id": file_id,
|
||||
"chunk": chunk_b64
|
||||
}
|
||||
|
||||
status, data = await self._request(
|
||||
"POST",
|
||||
"/v1/files:uploadChunks",
|
||||
use_files_api=True,
|
||||
json=chunk_payload
|
||||
)
|
||||
|
||||
print_json(data, "Upload Chunks Response")
|
||||
|
||||
if status in [200, 201]:
|
||||
print_success(f"Uploaded file chunks: {file_id}")
|
||||
self.uploaded_files.append(file_id)
|
||||
|
||||
return status, data
|
||||
|
||||
async def upload_file(self, content: bytes, filename: str,
|
||||
mime_type: str = "text/plain") -> tuple[int, Any]:
|
||||
"""
|
||||
Try multiple upload methods until one succeeds
|
||||
"""
|
||||
print_info("Trying upload methods...")
|
||||
|
||||
# Try Method 0: Multipart form-data (what the server really wants!)
|
||||
status0, data0 = await self.upload_file_multipart(content, filename, mime_type)
|
||||
|
||||
if status0 in [200, 201]:
|
||||
file_id = data0.get("id") or data0.get("file_id") # Try both field names
|
||||
if file_id:
|
||||
self.uploaded_files.append(file_id)
|
||||
print_success(f"✅ Multipart upload succeeded: {file_id}")
|
||||
return status0, data0
|
||||
else:
|
||||
print_error("No 'id' or 'file_id' in response")
|
||||
print_json(data0, "Response data")
|
||||
|
||||
print_warning(f"Multipart upload failed ({status0}), trying JSON upload...")
|
||||
|
||||
# Try Method 1: Direct upload with JSON
|
||||
status1, data1 = await self.upload_file_direct(content, filename, mime_type)
|
||||
|
||||
if status1 in [200, 201]:
|
||||
file_id = data1.get("file_id")
|
||||
if file_id:
|
||||
self.uploaded_files.append(file_id)
|
||||
print_success(f"✅ Direct upload succeeded: {file_id}")
|
||||
return status1, data1
|
||||
|
||||
print_warning(f"Direct upload failed ({status1}), trying chunked upload...")
|
||||
|
||||
# Try Method 2: Initialize + Chunks
|
||||
status2, data2 = await self.upload_file_chunked(content, filename, mime_type)
|
||||
|
||||
if status2 in [200, 201]:
|
||||
print_success("✅ Chunked upload succeeded")
|
||||
return status2, data2
|
||||
|
||||
print_error("❌ All upload methods failed")
|
||||
return status0, data0 # Return multipart method's error
|
||||
|
||||
# ========================================================================
|
||||
# COLLECTION DOCUMENT OPERATIONS
|
||||
# ========================================================================
|
||||
|
||||
async def add_document_to_collection(self, collection_id: str,
|
||||
file_id: str) -> tuple[int, Any]:
|
||||
"""POST /v1/collections/{collection_id}/documents/{file_id}"""
|
||||
return await self._request("POST",
|
||||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||||
|
||||
async def get_collection_documents(self, collection_id: str) -> tuple[int, Any]:
|
||||
"""GET /v1/collections/{collection_id}/documents"""
|
||||
return await self._request("GET",
|
||||
f"/v1/collections/{collection_id}/documents")
|
||||
|
||||
async def get_collection_document(self, collection_id: str,
|
||||
file_id: str) -> tuple[int, Any]:
|
||||
"""GET /v1/collections/{collection_id}/documents/{file_id}"""
|
||||
return await self._request("GET",
|
||||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||||
|
||||
async def update_collection_document(self, collection_id: str, file_id: str,
|
||||
metadata: Dict) -> tuple[int, Any]:
|
||||
"""PATCH /v1/collections/{collection_id}/documents/{file_id}"""
|
||||
return await self._request("PATCH",
|
||||
f"/v1/collections/{collection_id}/documents/{file_id}",
|
||||
json={"metadata": metadata})
|
||||
|
||||
async def remove_document_from_collection(self, collection_id: str,
|
||||
file_id: str) -> tuple[int, Any]:
|
||||
"""DELETE /v1/collections/{collection_id}/documents/{file_id}"""
|
||||
return await self._request("DELETE",
|
||||
f"/v1/collections/{collection_id}/documents/{file_id}")
|
||||
|
||||
async def batch_get_documents(self, collection_id: str,
|
||||
file_ids: List[str]) -> tuple[int, Any]:
|
||||
"""GET /v1/collections/{collection_id}/documents:batchGet"""
|
||||
params = {"fileIds": ",".join(file_ids)}
|
||||
return await self._request("GET",
|
||||
f"/v1/collections/{collection_id}/documents:batchGet",
|
||||
params=params)
|
||||
|
||||
# ========================================================================
|
||||
# TEST SCENARIOS
|
||||
# ========================================================================
|
||||
|
||||
async def test_basic_collection_crud(self):
|
||||
"""Test 1: Basic Collection CRUD operations"""
|
||||
print_header("TEST 1: Basic Collection CRUD")
|
||||
|
||||
# Create
|
||||
print_info("Creating collection...")
|
||||
status, data = await self.create_collection(
|
||||
name="Test Collection 1",
|
||||
metadata={"test": True, "purpose": "API testing"}
|
||||
)
|
||||
print_json(data, "Response")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to create collection")
|
||||
self.test_results["collection_crud"] = False
|
||||
return None
|
||||
|
||||
# Try different possible field names for collection ID
|
||||
collection_id = data.get("id") or data.get("collection_id") or data.get("collectionId")
|
||||
if not collection_id:
|
||||
print_error("No collection ID field in response")
|
||||
print_json(data, "Response Data")
|
||||
self.test_results["collection_crud"] = False
|
||||
return None
|
||||
|
||||
print_success(f"Collection created: {collection_id}")
|
||||
|
||||
# Read
|
||||
print_info("Reading collection...")
|
||||
status, data = await self.get_collection(collection_id)
|
||||
print_json(data, "Response")
|
||||
|
||||
# Update
|
||||
print_info("Updating collection...")
|
||||
status, data = await self.update_collection(
|
||||
collection_id,
|
||||
name="Test Collection 1 (Updated)",
|
||||
metadata={"test": True, "updated": True}
|
||||
)
|
||||
print_json(data, "Response")
|
||||
|
||||
self.test_results["collection_crud"] = True
|
||||
return collection_id
|
||||
|
||||
async def test_file_upload_and_structure(self, collection_id: str):
|
||||
"""Test 2: File upload (two-step process)"""
|
||||
print_header("TEST 2: File Upload (Two-Step) & Response Structure")
|
||||
|
||||
# Create test file content
|
||||
test_content = b"""
|
||||
This is a test document for xAI Collections API testing.
|
||||
|
||||
Topic: German Contract Law
|
||||
|
||||
Key Points:
|
||||
- Contracts require offer and acceptance
|
||||
- Consideration is necessary
|
||||
- Written form may be required for certain contracts
|
||||
|
||||
This document contains sufficient content for testing.
|
||||
"""
|
||||
|
||||
# STEP 1: Upload file to Files API
|
||||
print_info("STEP 1: Uploading file to Files API (api.x.ai)...")
|
||||
status, data = await self.upload_file(
|
||||
content=test_content,
|
||||
filename="test_document.txt",
|
||||
mime_type="text/plain"
|
||||
)
|
||||
print_json(data, "Files API Upload Response")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("File upload to Files API failed")
|
||||
self.test_results["file_upload"] = False
|
||||
return None
|
||||
|
||||
# Try both field names: 'id' (Files API) or 'file_id' (Collections API)
|
||||
file_id = data.get("id") or data.get("file_id")
|
||||
if not file_id:
|
||||
print_error("No 'id' or 'file_id' field in response")
|
||||
print_json(data, "Response for debugging")
|
||||
self.test_results["file_upload"] = False
|
||||
return None
|
||||
|
||||
print_success(f"File uploaded to Files API: {file_id}")
|
||||
|
||||
# STEP 2: Add file to collection using Management API
|
||||
print_info("STEP 2: Adding file to collection (management-api.x.ai)...")
|
||||
status2, data2 = await self.add_document_to_collection(collection_id, file_id)
|
||||
print_json(data2, "Add to Collection Response")
|
||||
|
||||
if status2 not in [200, 201]:
|
||||
print_error("Failed to add file to collection")
|
||||
self.test_results["file_upload"] = False
|
||||
return None
|
||||
|
||||
print_success(f"File added to collection: {file_id}")
|
||||
self.test_results["file_upload"] = True
|
||||
return file_id
|
||||
|
||||
async def test_document_in_collection(self, collection_id: str, file_id: str):
|
||||
"""Test 3: Verify document is in collection and get details"""
|
||||
print_header("TEST 3: Verify Document in Collection")
|
||||
|
||||
# Verify by listing documents
|
||||
print_info("Listing collection documents...")
|
||||
status, data = await self.get_collection_documents(collection_id)
|
||||
print_json(data, "Collection Documents")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to list documents")
|
||||
self.test_results["add_to_collection"] = False
|
||||
return False
|
||||
|
||||
# Get specific document
|
||||
print_info("Getting specific document...")
|
||||
status, data = await self.get_collection_document(collection_id, file_id)
|
||||
print_json(data, "Document Details")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to get document details")
|
||||
self.test_results["add_to_collection"] = False
|
||||
return False
|
||||
|
||||
print_success("Document verified in collection")
|
||||
self.test_results["add_to_collection"] = True
|
||||
return True
|
||||
|
||||
async def test_shared_file_across_collections(self, file_id: str):
|
||||
"""Test 4: CRITICAL - Can same file_id be used in multiple collections?"""
|
||||
print_header("TEST 4: Shared File Across Collections (CRITICAL)")
|
||||
|
||||
# Create second collection
|
||||
print_info("Creating second collection...")
|
||||
status, data = await self.create_collection(
|
||||
name="Test Collection 2",
|
||||
metadata={"test": True, "purpose": "Multi-collection test"}
|
||||
)
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to create second collection")
|
||||
self.test_results["shared_file"] = False
|
||||
return
|
||||
|
||||
collection2_id = data.get("collection_id") or data.get("id")
|
||||
print_success(f"Collection 2 created: {collection2_id}")
|
||||
|
||||
# Try to add SAME file_id to second collection
|
||||
print_info(f"Adding SAME file_id {file_id} to collection 2...")
|
||||
|
||||
status, data = await self.add_document_to_collection(collection2_id, file_id)
|
||||
print_json(data, "Response from adding existing file_id to second collection")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to add same file to second collection")
|
||||
print_warning("⚠️ Files might be collection-specific (BAD for our use case)")
|
||||
self.test_results["shared_file"] = False
|
||||
return
|
||||
|
||||
print_success("✅ SAME FILE_ID CAN BE USED IN MULTIPLE COLLECTIONS!")
|
||||
print_success("✅ This is PERFECT for our architecture!")
|
||||
|
||||
# Verify both collections have the file
|
||||
print_info("Verifying file in both collections...")
|
||||
|
||||
status1, data1 = await self.get_collection_documents(self.created_collections[0])
|
||||
status2, data2 = await self.get_collection_documents(collection2_id)
|
||||
|
||||
print_json(data1, "Collection 1 Documents")
|
||||
print_json(data2, "Collection 2 Documents")
|
||||
|
||||
# Extract file_ids from both collections to verify they match
|
||||
docs1 = data1.get("documents", [])
|
||||
docs2 = data2.get("documents", [])
|
||||
|
||||
file_ids_1 = [d.get("file_metadata", {}).get("file_id") for d in docs1]
|
||||
file_ids_2 = [d.get("file_metadata", {}).get("file_id") for d in docs2]
|
||||
|
||||
if file_id in file_ids_1 and file_id in file_ids_2:
|
||||
print_success(f"✅ CONFIRMED: file_id {file_id} is IDENTICAL in both collections!")
|
||||
print_info(" → We can store ONE xaiFileId per document!")
|
||||
print_info(" → Simply track which collections contain it!")
|
||||
|
||||
self.test_results["shared_file"] = True
|
||||
|
||||
async def test_document_update(self, collection_id: str, file_id: str):
|
||||
"""Test 5: Update document metadata"""
|
||||
print_header("TEST 5: Update Document Metadata")
|
||||
|
||||
print_info("Updating document metadata...")
|
||||
status, data = await self.update_collection_document(
|
||||
collection_id,
|
||||
file_id,
|
||||
metadata={"updated_at": datetime.now().isoformat(), "version": 2}
|
||||
)
|
||||
print_json(data, "Update Response")
|
||||
|
||||
if status not in [200, 201]:
|
||||
print_error("Failed to update document")
|
||||
self.test_results["document_update"] = False
|
||||
return
|
||||
|
||||
print_success("Document metadata updated")
|
||||
self.test_results["document_update"] = True
|
||||
|
||||
async def test_document_removal(self):
|
||||
"""Test 6: Remove document from collection"""
|
||||
print_header("TEST 6: Remove Document from Collection")
|
||||
|
||||
if len(self.created_collections) < 2 or not self.uploaded_files:
|
||||
print_warning("Skipping - need at least 2 collections and 1 file")
|
||||
return
|
||||
|
||||
collection_id = self.created_collections[0]
|
||||
file_id = self.uploaded_files[0]
|
||||
|
||||
print_info(f"Removing file {file_id} from collection {collection_id}...")
|
||||
status, data = await self.remove_document_from_collection(collection_id, file_id)
|
||||
print_json(data, "Response")
|
||||
|
||||
if status not in [200, 204]:
|
||||
print_error("Failed to remove document")
|
||||
self.test_results["document_removal"] = False
|
||||
return
|
||||
|
||||
print_success("Document removed from collection")
|
||||
|
||||
# Verify removal
|
||||
print_info("Verifying removal...")
|
||||
status, data = await self.get_collection_documents(collection_id)
|
||||
print_json(data, "Remaining Documents")
|
||||
|
||||
self.test_results["document_removal"] = True
|
||||
|
||||
async def test_batch_get(self):
|
||||
"""Test 7: Batch get documents"""
|
||||
print_header("TEST 7: Batch Get Documents")
|
||||
|
||||
if not self.created_collections or not self.uploaded_files:
|
||||
print_warning("Skipping - need collections and files")
|
||||
return
|
||||
|
||||
collection_id = self.created_collections[-1] # Use last collection
|
||||
file_ids = self.uploaded_files
|
||||
|
||||
if not file_ids:
|
||||
print_warning("No file IDs to batch get")
|
||||
return
|
||||
|
||||
print_info(f"Batch getting {len(file_ids)} documents...")
|
||||
status, data = await self.batch_get_documents(collection_id, file_ids)
|
||||
print_json(data, "Batch Response")
|
||||
|
||||
self.test_results["batch_get"] = status in [200, 201]
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up all created test resources"""
|
||||
print_header("CLEANUP: Deleting Test Resources")
|
||||
|
||||
# Delete collections (should cascade delete documents?)
|
||||
for collection_id in list(self.created_collections):
|
||||
print_info(f"Deleting collection {collection_id}...")
|
||||
await self.delete_collection(collection_id)
|
||||
|
||||
print_success("Cleanup complete")
|
||||
|
||||
def print_summary(self):
|
||||
"""Print test results summary"""
|
||||
print_header("TEST RESULTS SUMMARY")
|
||||
|
||||
total = len(self.test_results)
|
||||
passed = sum(1 for v in self.test_results.values() if v)
|
||||
|
||||
for test_name, result in self.test_results.items():
|
||||
status = "✅ PASS" if result else "❌ FAIL"
|
||||
print(f"{status} - {test_name}")
|
||||
|
||||
print(f"\n{Colors.BOLD}Total: {passed}/{total} tests passed{Colors.END}\n")
|
||||
|
||||
# Critical findings
|
||||
print_header("CRITICAL FINDINGS")
|
||||
|
||||
if "shared_file" in self.test_results:
|
||||
if self.test_results["shared_file"]:
|
||||
print_success("✅ Same file CAN be used in multiple collections")
|
||||
print_info(" → We can use a SINGLE xaiFileId per document!")
|
||||
print_info(" → Much simpler architecture!")
|
||||
else:
|
||||
print_error("❌ Files seem to be collection-specific")
|
||||
print_warning(" → More complex mapping required")
|
||||
print_warning(" → Each collection might need separate file upload")
|
||||
|
||||
|
||||
async def main():
|
||||
"""Run all tests"""
|
||||
print_header("xAI Collections API Test Suite")
|
||||
print_info(f"Management URL: {XAI_MANAGEMENT_URL}")
|
||||
print_info(f"Files URL: {XAI_FILES_URL}")
|
||||
print_info(f"Management Key: {XAI_MANAGEMENT_KEY[:20]}...{XAI_MANAGEMENT_KEY[-4:]}")
|
||||
print_info(f"API Key: {XAI_API_KEY[:20]}...{XAI_API_KEY[-4:]}")
|
||||
|
||||
async with XAICollectionsTestClient() as client:
|
||||
try:
|
||||
# Test 1: Basic Collection CRUD
|
||||
collection_id = await client.test_basic_collection_crud()
|
||||
|
||||
if not collection_id:
|
||||
print_error("Cannot continue without collection. Stopping.")
|
||||
return
|
||||
|
||||
# Test 2: File Upload (now two-step process)
|
||||
file_id = await client.test_file_upload_and_structure(collection_id)
|
||||
|
||||
if not file_id:
|
||||
print_error("File upload failed. Continuing with remaining tests...")
|
||||
else:
|
||||
# Test 3: Verify document in collection
|
||||
await client.test_document_in_collection(collection_id, file_id)
|
||||
|
||||
# Test 4: CRITICAL - Shared file test
|
||||
await client.test_shared_file_across_collections(file_id)
|
||||
|
||||
# Test 5: Update document
|
||||
await client.test_document_update(collection_id, file_id)
|
||||
|
||||
# Test 6: Remove document
|
||||
await client.test_document_removal()
|
||||
|
||||
# Test 7: Batch get
|
||||
await client.test_batch_get()
|
||||
|
||||
# Cleanup
|
||||
await client.cleanup()
|
||||
|
||||
# Print summary
|
||||
client.print_summary()
|
||||
|
||||
except Exception as e:
|
||||
print_error(f"Test suite failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
# Try cleanup anyway
|
||||
try:
|
||||
await client.cleanup()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user