feat(webhooks): Implement Akte webhooks for create, delete, and update operations

This commit is contained in:
bsiggel
2026-03-26 10:16:27 +00:00
parent 3497deeef7
commit bf02b1a4e1
3 changed files with 101 additions and 19 deletions

View File

@@ -0,0 +1,56 @@
"""Akte Webhook - Create"""
import json
import time
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "Akte Webhook - Create",
"description": "Empfängt EspoCRM-Create-Webhooks für CAkten und queued Entity-IDs für den Sync",
"flows": ["akte-sync"],
"triggers": [http("POST", "/crm/akte/webhook/create")],
"enqueues": [],
}
PENDING_KEY = "akte:pending_entity_ids"
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
try:
payload = request.body or {}
ctx.logger.info("=" * 60)
ctx.logger.info("📥 AKTE WEBHOOK: CREATE")
ctx.logger.info(f" Payload: {json.dumps(payload, ensure_ascii=False)[:200]}")
entity_ids: set[str] = set()
if isinstance(payload, list):
for item in payload:
if isinstance(item, dict) and 'id' in item:
entity_ids.add(item['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
if not entity_ids:
ctx.logger.warn("⚠️ No entity IDs in payload")
return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"})
from services.redis_client import get_redis_client
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable")
return ApiResponse(status_code=503, body={"error": "Service unavailable"})
ts = time.time()
redis_client.zadd(PENDING_KEY, {eid: ts for eid in entity_ids})
ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}")
ctx.logger.info("=" * 60)
return ApiResponse(status_code=200, body={"status": "received", "action": "create", "ids_count": len(entity_ids)})
except Exception as e:
ctx.logger.error(f"❌ Webhook error: {e}")
return ApiResponse(status_code=500, body={"error": str(e)})

View File

@@ -0,0 +1,38 @@
"""Akte Webhook - Delete"""
import json
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "Akte Webhook - Delete",
"description": "Empfängt EspoCRM-Delete-Webhooks für CAkten (kein Sync notwendig)",
"flows": ["akte-sync"],
"triggers": [http("POST", "/crm/akte/webhook/delete")],
"enqueues": [],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
try:
payload = request.body or {}
entity_ids: set[str] = set()
if isinstance(payload, list):
for item in payload:
if isinstance(item, dict) and 'id' in item:
entity_ids.add(item['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info("=" * 60)
ctx.logger.info("📥 AKTE WEBHOOK: DELETE")
ctx.logger.info(f" IDs: {entity_ids}")
ctx.logger.info(" → Kein Sync (Entität gelöscht)")
ctx.logger.info("=" * 60)
return ApiResponse(status_code=200, body={"status": "received", "action": "delete", "ids_count": len(entity_ids)})
except Exception as e:
ctx.logger.error(f"❌ Webhook error: {e}")
return ApiResponse(status_code=500, body={"error": str(e)})

View File

@@ -1,23 +1,14 @@
""" """Akte Webhook - Update"""
Akte Sync - EspoCRM Webhook
Empfängt EspoCRM-Webhooks für CAkten (create / update / delete).
Schreibt die Entity-ID in die Redis-Queue `akte:pending_entity_ids`
mit 10-Sekunden-Debounce der Cron-Poller übernimmt den Rest.
Route: POST /akte/webhook/update
Payload: { "id": "..." } oder [{ "id": "..." }, ...]
"""
import json import json
import time import time
import datetime
from typing import Any from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse from motia import FlowContext, http, ApiRequest, ApiResponse
config = { config = {
"name": "Akte Webhook - EspoCRM", "name": "Akte Webhook - Update",
"description": "Empfängt EspoCRM-Webhooks für CAkten und queued Entity-IDs für den Sync", "description": "Empfängt EspoCRM-Update-Webhooks für CAkten und queued Entity-IDs für den Sync",
"flows": ["akte-sync"], "flows": ["akte-sync"],
"triggers": [http("POST", "/crm/akte/webhook/update")], "triggers": [http("POST", "/crm/akte/webhook/update")],
"enqueues": [], "enqueues": [],
@@ -31,10 +22,9 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
payload = request.body or {} payload = request.body or {}
ctx.logger.info("=" * 60) ctx.logger.info("=" * 60)
ctx.logger.info("📥 AKTE WEBHOOK") ctx.logger.info("📥 AKTE WEBHOOK: UPDATE")
ctx.logger.info(f" Payload: {json.dumps(payload, ensure_ascii=False)[:200]}") ctx.logger.info(f" Payload: {json.dumps(payload, ensure_ascii=False)[:200]}")
# ── Collect entity IDs ─────────────────────────────────────
entity_ids: set[str] = set() entity_ids: set[str] = set()
if isinstance(payload, list): if isinstance(payload, list):
for item in payload: for item in payload:
@@ -47,7 +37,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
ctx.logger.warn("⚠️ No entity IDs in payload") ctx.logger.warn("⚠️ No entity IDs in payload")
return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"}) return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"})
# ── Push to Redis with current timestamp (debounce in cron) ─
from services.redis_client import get_redis_client from services.redis_client import get_redis_client
redis_client = get_redis_client(strict=False) redis_client = get_redis_client(strict=False)
if not redis_client: if not redis_client:
@@ -55,13 +44,12 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
return ApiResponse(status_code=503, body={"error": "Service unavailable"}) return ApiResponse(status_code=503, body={"error": "Service unavailable"})
ts = time.time() ts = time.time()
mapping = {eid: ts for eid in entity_ids} redis_client.zadd(PENDING_KEY, {eid: ts for eid in entity_ids})
redis_client.zadd(PENDING_KEY, mapping)
ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}") ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}")
ctx.logger.info("=" * 60) ctx.logger.info("=" * 60)
return ApiResponse(status_code=200, body={"queued": len(entity_ids)}) return ApiResponse(status_code=200, body={"status": "received", "action": "update", "ids_count": len(entity_ids)})
except Exception as e: except Exception as e:
ctx.logger.error(f"❌ Webhook error: {e}") ctx.logger.error(f"❌ Webhook error: {e}")