From 52114a3c953aeb7c5411fc300c7b9e3170144671 Mon Sep 17 00:00:00 2001 From: bsiggel Date: Thu, 26 Mar 2026 10:16:33 +0000 Subject: [PATCH] feat(webhooks): Update Akte webhook handlers to trigger immediate synchronization --- .../crm/akte/webhooks/akte_create_api_step.py | 20 +++++-------------- .../crm/akte/webhooks/akte_update_api_step.py | 20 +++++-------------- 2 files changed, 10 insertions(+), 30 deletions(-) diff --git a/src/steps/crm/akte/webhooks/akte_create_api_step.py b/src/steps/crm/akte/webhooks/akte_create_api_step.py index e07943e..a2d8d24 100644 --- a/src/steps/crm/akte/webhooks/akte_create_api_step.py +++ b/src/steps/crm/akte/webhooks/akte_create_api_step.py @@ -1,21 +1,17 @@ """Akte Webhook - Create""" import json -import time -import datetime from typing import Any from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "Akte Webhook - Create", - "description": "Empfängt EspoCRM-Create-Webhooks für CAkten und queued Entity-IDs für den Sync", + "description": "Empfängt EspoCRM-Create-Webhooks für CAkten und triggert sofort den Sync", "flows": ["akte-sync"], "triggers": [http("POST", "/crm/akte/webhook/create")], - "enqueues": [], + "enqueues": ["akte.sync"], } -PENDING_KEY = "akte:pending_entity_ids" - async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: @@ -37,16 +33,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.warn("⚠️ No entity IDs in payload") return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"}) - from services.redis_client import get_redis_client - redis_client = get_redis_client(strict=False) - if not redis_client: - ctx.logger.error("❌ Redis unavailable") - return ApiResponse(status_code=503, body={"error": "Service unavailable"}) + for eid in entity_ids: + await ctx.enqueue({'topic': 'akte.sync', 'data': {'akte_id': eid, 'aktennummer': None}}) - ts = time.time() - redis_client.zadd(PENDING_KEY, {eid: ts for eid in entity_ids}) - - ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}") + ctx.logger.info(f"✅ Emitted akte.sync for {len(entity_ids)} ID(s): {entity_ids}") ctx.logger.info("=" * 60) return ApiResponse(status_code=200, body={"status": "received", "action": "create", "ids_count": len(entity_ids)}) diff --git a/src/steps/crm/akte/webhooks/akte_update_api_step.py b/src/steps/crm/akte/webhooks/akte_update_api_step.py index b792500..d5b27a4 100644 --- a/src/steps/crm/akte/webhooks/akte_update_api_step.py +++ b/src/steps/crm/akte/webhooks/akte_update_api_step.py @@ -1,21 +1,17 @@ """Akte Webhook - Update""" import json -import time -import datetime from typing import Any from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "Akte Webhook - Update", - "description": "Empfängt EspoCRM-Update-Webhooks für CAkten und queued Entity-IDs für den Sync", + "description": "Empfängt EspoCRM-Update-Webhooks für CAkten und triggert sofort den Sync", "flows": ["akte-sync"], "triggers": [http("POST", "/crm/akte/webhook/update")], - "enqueues": [], + "enqueues": ["akte.sync"], } -PENDING_KEY = "akte:pending_entity_ids" - async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: @@ -37,16 +33,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.warn("⚠️ No entity IDs in payload") return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"}) - from services.redis_client import get_redis_client - redis_client = get_redis_client(strict=False) - if not redis_client: - ctx.logger.error("❌ Redis unavailable") - return ApiResponse(status_code=503, body={"error": "Service unavailable"}) + for eid in entity_ids: + await ctx.enqueue({'topic': 'akte.sync', 'data': {'akte_id': eid, 'aktennummer': None}}) - ts = time.time() - redis_client.zadd(PENDING_KEY, {eid: ts for eid in entity_ids}) - - ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}") + ctx.logger.info(f"✅ Emitted akte.sync for {len(entity_ids)} ID(s): {entity_ids}") ctx.logger.info("=" * 60) return ApiResponse(status_code=200, body={"status": "received", "action": "update", "ids_count": len(entity_ids)})