From 3354aef93693f5a39f6ef57dd6294e21de75372e Mon Sep 17 00:00:00 2001 From: bitbylaw Date: Sat, 7 Feb 2026 18:53:54 +0000 Subject: [PATCH] feat: Refactor webhook handlers for Beteiligte to improve ID processing and logging, and enhance update filtering logic --- bitbylaw/services/beteiligte_sync_utils.py | 2 +- .../steps/vmh/beteiligte_sync_event_step.py | 4 - .../vmh/webhook/beteiligte_create_api_step.py | 65 +++------- .../vmh/webhook/beteiligte_delete_api_step.py | 52 +++----- .../vmh/webhook/beteiligte_update_api_step.py | 119 ++++++++++-------- 5 files changed, 106 insertions(+), 136 deletions(-) diff --git a/bitbylaw/services/beteiligte_sync_utils.py b/bitbylaw/services/beteiligte_sync_utils.py index 058e3520..259106d9 100644 --- a/bitbylaw/services/beteiligte_sync_utils.py +++ b/bitbylaw/services/beteiligte_sync_utils.py @@ -25,7 +25,7 @@ TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_cha # Max retry before permanent failure MAX_SYNC_RETRIES = 5 # Lock TTL in seconds (prevents deadlocks) -LOCK_TTL_SECONDS = 300 # 5 minutes +LOCK_TTL_SECONDS = 900 # 15 minutes class BeteiligteSync: diff --git a/bitbylaw/steps/vmh/beteiligte_sync_event_step.py b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py index 8502de81..92b1952f 100644 --- a/bitbylaw/steps/vmh/beteiligte_sync_event_step.py +++ b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py @@ -96,10 +96,6 @@ async def handler(event_data, context): context.logger.warning(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}") await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}') - # Redis Queue Cleanup - pending_key = f'vmh:beteiligte:{action}_pending' - redis_client.srem(pending_key, entity_id) - except Exception as e: context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") import traceback diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py index 7b52ae1b..5d44a0ea 100644 --- a/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py +++ b/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py @@ -1,7 +1,4 @@ -from typing import Any, Dict, Set import json -import redis -from config import Config import datetime config = { @@ -16,71 +13,43 @@ config = { async def handler(req, context): try: - # Payload aus dem Request-Body holen payload = req.get('body', []) - # Detailliertes Logging context.logger.info("VMH Webhook Beteiligte Create empfangen") - context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}") context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") # Sammle alle IDs aus dem Batch - ids_to_sync: Set[str] = set() + entity_ids = set() if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: - entity_id = entity['id'] - ids_to_sync.add(entity_id) - context.logger.info(f"Create Entity ID gefunden: {entity_id}") + entity_ids.add(entity['id']) elif isinstance(payload, dict) and 'id' in payload: - ids_to_sync.add(payload['id']) - context.logger.info(f"Create Single Entity ID gefunden: {payload['id']}") + entity_ids.add(payload['id']) - context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Create-Sync gefunden") + context.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden") - # Redis Verbindung für Deduplizierung - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_ADVOWARE_CACHE), - decode_responses=True - ) + # Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock) + for entity_id in entity_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.create', + 'data': { + 'entity_id': entity_id, + 'action': 'create', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) - # Deduplizierung: Prüfe welche IDs schon in der Queue sind - pending_key = 'vmh:beteiligte:create_pending' - existing_ids = redis_client.smembers(pending_key) - new_ids = ids_to_sync - set(existing_ids) - - if new_ids: - # Füge neue IDs zur Pending-Queue hinzu - redis_client.sadd(pending_key, *new_ids) - context.logger.info(f"{len(new_ids)} neue IDs zur Create-Sync-Queue hinzugefügt: {list(new_ids)}") - - # Emittiere Events für neue IDs - for entity_id in new_ids: - await context.emit({ - 'topic': 'vmh.beteiligte.create', - 'data': { - 'entity_id': entity_id, - 'action': 'create', - 'source': 'webhook', - 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() - } - }) - context.logger.info(f"Create-Event emittiert für ID: {entity_id}") - else: - context.logger.info("Keine neuen IDs zum Create-Sync gefunden") - - context.logger.info("VMH Create Webhook erfolgreich verarbeitet") + context.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert") return { 'status': 200, 'body': { 'status': 'received', 'action': 'create', - 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0, - 'total_ids_in_batch': len(ids_to_sync) + 'ids_count': len(entity_ids) } } diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py index 1f8b2558..1520be90 100644 --- a/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py +++ b/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py @@ -1,7 +1,4 @@ -from typing import Any, Dict, Set import json -import redis -from config import Config import datetime config = { @@ -21,51 +18,38 @@ async def handler(req, context): context.logger.info("VMH Webhook Beteiligte Delete empfangen") context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") - ids_to_sync: Set[str] = set() + # Sammle alle IDs aus dem Batch + entity_ids = set() if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: - entity_id = entity['id'] - ids_to_sync.add(entity_id) + entity_ids.add(entity['id']) elif isinstance(payload, dict) and 'id' in payload: - ids_to_sync.add(payload['id']) + entity_ids.add(payload['id']) - context.logger.info(f"{len(ids_to_sync)} IDs zum Delete-Sync gefunden") + context.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden") - # Redis Verbindung für Deduplizierung - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_ADVOWARE_CACHE), - decode_responses=True - ) + # Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock) + for entity_id in entity_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.delete', + 'data': { + 'entity_id': entity_id, + 'action': 'delete', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) - pending_key = 'vmh:beteiligte:pending_delete' - existing_ids = redis_client.smembers(pending_key) - new_ids = ids_to_sync - set(existing_ids) - - if new_ids: - redis_client.sadd(pending_key, *new_ids) - context.logger.info(f"{len(new_ids)} neue IDs zur Delete-Queue hinzugefügt") - - for entity_id in new_ids: - await context.emit({ - 'topic': 'vmh.beteiligte.delete', - 'data': { - 'entity_id': entity_id, - 'action': 'delete', - 'source': 'webhook', - 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() - } - }) + context.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert") return { 'status': 200, 'body': { 'status': 'received', 'action': 'delete', - 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0 + 'ids_count': len(entity_ids) } } diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py index 6c9db992..bf285dce 100644 --- a/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py +++ b/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py @@ -1,9 +1,45 @@ -from typing import Any, Dict, Set import json -import redis -from config import Config import datetime +def should_skip_update(entity_data): + """ + Prüft ob Update gefiltert werden soll (verhindert Webhook-Loop) + + SKIP wenn: + - Nur Sync-Felder geändert UND + - syncStatus ist "clean" oder "syncing" (normale Sync-Completion) + + EMIT wenn: + - Echte Datenänderung (nicht nur Sync-Felder) ODER + - syncStatus ist "dirty", "failed", "pending_sync" (braucht Sync) + """ + if not isinstance(entity_data, dict): + return False + + # Felder die von Sync-Handler gesetzt werden + sync_fields = {'syncStatus', 'advowareLastSync', 'syncErrorMessage', 'syncRetryCount'} + # Meta-Felder die immer vorhanden sind + meta_fields = {'id', 'modifiedAt', 'modifiedById', 'modifiedByName'} + # Alle ignorierbaren Felder + ignorable = sync_fields | meta_fields + + # Prüfe ob es relevante (nicht-sync) Felder gibt + entity_keys = set(entity_data.keys()) + relevant_keys = entity_keys - ignorable + + # Wenn echte Datenänderung → Emit (nicht skippen) + if len(relevant_keys) > 0: + return False + + # Nur Sync-Felder vorhanden → Prüfe syncStatus + sync_status = entity_data.get('syncStatus') + + # Skip nur wenn Status "clean" oder "syncing" (normale Completion) + # Emit wenn "dirty", "failed", "pending_sync" (braucht Sync) + should_skip = sync_status in ['clean', 'syncing'] + + return should_skip + config = { 'type': 'api', 'name': 'VMH Webhook Beteiligte Update', @@ -16,71 +52,56 @@ config = { async def handler(req, context): try: - # Payload aus dem Request-Body holen payload = req.get('body', []) - # Detailliertes Logging context.logger.info("VMH Webhook Beteiligte Update empfangen") - context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}") context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") - # Sammle alle IDs aus dem Batch - ids_to_sync: Set[str] = set() + # Sammle alle IDs aus dem Batch (filtere Sync-Only-Updates) + entity_ids = set() + filtered_count = 0 if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: - entity_id = entity['id'] - ids_to_sync.add(entity_id) - context.logger.info(f"Update Entity ID gefunden: {entity_id}") + # Prüfe ob Update gefiltert werden soll (verhindert Loop) + if should_skip_update(entity): + context.logger.info(f"Sync-Completion gefiltert: {entity['id']} (syncStatus={entity.get('syncStatus')})") + filtered_count += 1 + continue + entity_ids.add(entity['id']) elif isinstance(payload, dict) and 'id' in payload: - ids_to_sync.add(payload['id']) - context.logger.info(f"Update Single Entity ID gefunden: {payload['id']}") + if not should_skip_update(payload): + entity_ids.add(payload['id']) + else: + context.logger.info(f"Sync-Completion gefiltert: {payload['id']} (syncStatus={payload.get('syncStatus')})") + filtered_count += 1 - context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Update-Sync gefunden") - - # Redis Verbindung für Deduplizierung - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_ADVOWARE_CACHE), - decode_responses=True - ) - - # Deduplizierung: Prüfe welche IDs schon in der Queue sind - pending_key = 'vmh:beteiligte:update_pending' - existing_ids = redis_client.smembers(pending_key) - new_ids = ids_to_sync - set(existing_ids) - - if new_ids: - # Füge neue IDs zur Pending-Queue hinzu - redis_client.sadd(pending_key, *new_ids) - context.logger.info(f"{len(new_ids)} neue IDs zur Update-Sync-Queue hinzugefügt: {list(new_ids)}") - - # Emittiere Events für neue IDs - for entity_id in new_ids: - await context.emit({ - 'topic': 'vmh.beteiligte.update', - 'data': { - 'entity_id': entity_id, - 'action': 'update', - 'source': 'webhook', - 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() - } - }) - context.logger.info(f"Update-Event emittiert für ID: {entity_id}") + if filtered_count > 0: + context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden ({filtered_count} Sync-Completions gefiltert)") else: - context.logger.info("Keine neuen IDs zum Update-Sync gefunden") + context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden") - context.logger.info("VMH Update Webhook erfolgreich verarbeitet") + # Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock) + for entity_id in entity_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.update', + 'data': { + 'entity_id': entity_id, + 'action': 'update', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) + + context.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert") return { 'status': 200, 'body': { 'status': 'received', 'action': 'update', - 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0, - 'total_ids_in_batch': len(ids_to_sync) + 'ids_count': len(entity_ids) } }