feat(webhooks): add EspoCRM Document Delete and Update webhook handlers with detailed logging

- Implemented document delete webhook handler to process delete notifications from EspoCRM.
- Added detailed logging for incoming payloads, including headers and entity information.
- Extracted and emitted document IDs for further processing.
- Implemented document update webhook handler to handle update notifications from EspoCRM.
- Enhanced logging for update payloads, including changed fields and metadata.
- Both handlers emit events for queue processing to facilitate further actions.

test: add comprehensive xAI Collections API test suite

- Created a test suite for xAI Collections API covering critical operations.
- Included tests for file uploads, CRUD operations on collections and documents, and response structures.
- Verified shared file behavior across multiple collections.
- Implemented cleanup procedures to remove test resources after execution.
This commit is contained in:
bsiggel
2026-03-02 17:16:07 +00:00
parent 0282149613
commit bc917bd885
5 changed files with 1362 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
"""
EspoCRM Generic Webhooks
Empfängt Webhooks von EspoCRM für verschiedene Entities.
Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext.
"""

View File

@@ -0,0 +1,198 @@
"""EspoCRM Webhook - Document Create
Empfängt Create-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "EspoCRM Document Create Webhook",
"description": "Empfängt Create-Webhooks von EspoCRM für Document Entities",
"flows": ["espocrm-documents"],
"triggers": [
http("POST", "/api/espocrm/document/create")
],
"enqueues": ["espocrm.document.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document creation in EspoCRM.
Receives notifications when documents are created and emits queue events
for processing (xAI sync, etc.).
Payload Analysis Mode: Logs comprehensive details about webhook structure.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
payload_details = []
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
# Sammle Details für Logging
detail = {
'index': idx,
'id': entity_id,
'name': entity.get('name', 'N/A'),
'type': entity.get('type', 'N/A'),
'size': entity.get('size', 'N/A'),
'all_fields': list(entity.keys())
}
payload_details.append(detail)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Type: {entity.get('type', 'N/A')}")
ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Parent/Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Type: {payload.get('type', 'N/A')}")
ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'espocrm.document.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: espocrm.document.create für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
# Log Stack Trace
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -0,0 +1,174 @@
"""EspoCRM Webhook - Document Delete
Empfängt Delete-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "EspoCRM Document Delete Webhook",
"description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities",
"flows": ["espocrm-documents"],
"triggers": [
http("POST", "/api/espocrm/document/delete")
],
"enqueues": ["espocrm.document.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document deletion in EspoCRM.
Receives notifications when documents are deleted.
Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Bei Delete haben wir oft nur minimale Daten
if 'name' in entity:
ctx.logger.info(f" Name: {entity.get('name')}")
if 'deletedAt' in entity or 'deleted' in entity:
ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
if 'name' in payload:
ctx.logger.info(f" Name: {payload.get('name')}")
if 'deletedAt' in payload or 'deleted' in payload:
ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'espocrm.document.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: espocrm.document.delete für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -0,0 +1,196 @@
"""EspoCRM Webhook - Document Update
Empfängt Update-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "EspoCRM Document Update Webhook",
"description": "Empfängt Update-Webhooks von EspoCRM für Document Entities",
"flows": ["espocrm-documents"],
"triggers": [
http("POST", "/api/espocrm/document/update")
],
"enqueues": ["espocrm.document.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document updates in EspoCRM.
Receives notifications when documents are updated and emits queue events
for processing (xAI sync, etc.).
Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden.
xAI-Feld-Updates sollten keine neuen Webhooks triggern.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Prüfe ob CHANGED fields mitgeliefert werden
changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# Geänderte Felder
changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'espocrm.document.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: espocrm.document.update für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)