feat: Refactor webhook handlers for Beteiligte to improve ID processing and logging, and enhance update filtering logic
This commit is contained in:
@@ -25,7 +25,7 @@ TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_cha
|
|||||||
# Max retry before permanent failure
|
# Max retry before permanent failure
|
||||||
MAX_SYNC_RETRIES = 5
|
MAX_SYNC_RETRIES = 5
|
||||||
# Lock TTL in seconds (prevents deadlocks)
|
# Lock TTL in seconds (prevents deadlocks)
|
||||||
LOCK_TTL_SECONDS = 300 # 5 minutes
|
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||||
|
|
||||||
|
|
||||||
class BeteiligteSync:
|
class BeteiligteSync:
|
||||||
|
|||||||
@@ -96,10 +96,6 @@ async def handler(event_data, context):
|
|||||||
context.logger.warning(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
|
context.logger.warning(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
|
||||||
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
|
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
|
||||||
|
|
||||||
# Redis Queue Cleanup
|
|
||||||
pending_key = f'vmh:beteiligte:{action}_pending'
|
|
||||||
redis_client.srem(pending_key, entity_id)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
|
|||||||
@@ -1,7 +1,4 @@
|
|||||||
from typing import Any, Dict, Set
|
|
||||||
import json
|
import json
|
||||||
import redis
|
|
||||||
from config import Config
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
@@ -16,71 +13,43 @@ config = {
|
|||||||
|
|
||||||
async def handler(req, context):
|
async def handler(req, context):
|
||||||
try:
|
try:
|
||||||
# Payload aus dem Request-Body holen
|
|
||||||
payload = req.get('body', [])
|
payload = req.get('body', [])
|
||||||
|
|
||||||
# Detailliertes Logging
|
|
||||||
context.logger.info("VMH Webhook Beteiligte Create empfangen")
|
context.logger.info("VMH Webhook Beteiligte Create empfangen")
|
||||||
context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}")
|
|
||||||
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
# Sammle alle IDs aus dem Batch
|
# Sammle alle IDs aus dem Batch
|
||||||
ids_to_sync: Set[str] = set()
|
entity_ids = set()
|
||||||
|
|
||||||
if isinstance(payload, list):
|
if isinstance(payload, list):
|
||||||
for entity in payload:
|
for entity in payload:
|
||||||
if isinstance(entity, dict) and 'id' in entity:
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
entity_id = entity['id']
|
entity_ids.add(entity['id'])
|
||||||
ids_to_sync.add(entity_id)
|
|
||||||
context.logger.info(f"Create Entity ID gefunden: {entity_id}")
|
|
||||||
elif isinstance(payload, dict) and 'id' in payload:
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
ids_to_sync.add(payload['id'])
|
entity_ids.add(payload['id'])
|
||||||
context.logger.info(f"Create Single Entity ID gefunden: {payload['id']}")
|
|
||||||
|
|
||||||
context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Create-Sync gefunden")
|
context.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden")
|
||||||
|
|
||||||
# Redis Verbindung für Deduplizierung
|
# Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||||
redis_client = redis.Redis(
|
for entity_id in entity_ids:
|
||||||
host=Config.REDIS_HOST,
|
await context.emit({
|
||||||
port=int(Config.REDIS_PORT),
|
'topic': 'vmh.beteiligte.create',
|
||||||
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
'data': {
|
||||||
decode_responses=True
|
'entity_id': entity_id,
|
||||||
)
|
'action': 'create',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
# Deduplizierung: Prüfe welche IDs schon in der Queue sind
|
context.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert")
|
||||||
pending_key = 'vmh:beteiligte:create_pending'
|
|
||||||
existing_ids = redis_client.smembers(pending_key)
|
|
||||||
new_ids = ids_to_sync - set(existing_ids)
|
|
||||||
|
|
||||||
if new_ids:
|
|
||||||
# Füge neue IDs zur Pending-Queue hinzu
|
|
||||||
redis_client.sadd(pending_key, *new_ids)
|
|
||||||
context.logger.info(f"{len(new_ids)} neue IDs zur Create-Sync-Queue hinzugefügt: {list(new_ids)}")
|
|
||||||
|
|
||||||
# Emittiere Events für neue IDs
|
|
||||||
for entity_id in new_ids:
|
|
||||||
await context.emit({
|
|
||||||
'topic': 'vmh.beteiligte.create',
|
|
||||||
'data': {
|
|
||||||
'entity_id': entity_id,
|
|
||||||
'action': 'create',
|
|
||||||
'source': 'webhook',
|
|
||||||
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
context.logger.info(f"Create-Event emittiert für ID: {entity_id}")
|
|
||||||
else:
|
|
||||||
context.logger.info("Keine neuen IDs zum Create-Sync gefunden")
|
|
||||||
|
|
||||||
context.logger.info("VMH Create Webhook erfolgreich verarbeitet")
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 200,
|
'status': 200,
|
||||||
'body': {
|
'body': {
|
||||||
'status': 'received',
|
'status': 'received',
|
||||||
'action': 'create',
|
'action': 'create',
|
||||||
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0,
|
'ids_count': len(entity_ids)
|
||||||
'total_ids_in_batch': len(ids_to_sync)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,4 @@
|
|||||||
from typing import Any, Dict, Set
|
|
||||||
import json
|
import json
|
||||||
import redis
|
|
||||||
from config import Config
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
@@ -21,51 +18,38 @@ async def handler(req, context):
|
|||||||
context.logger.info("VMH Webhook Beteiligte Delete empfangen")
|
context.logger.info("VMH Webhook Beteiligte Delete empfangen")
|
||||||
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
ids_to_sync: Set[str] = set()
|
# Sammle alle IDs aus dem Batch
|
||||||
|
entity_ids = set()
|
||||||
|
|
||||||
if isinstance(payload, list):
|
if isinstance(payload, list):
|
||||||
for entity in payload:
|
for entity in payload:
|
||||||
if isinstance(entity, dict) and 'id' in entity:
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
entity_id = entity['id']
|
entity_ids.add(entity['id'])
|
||||||
ids_to_sync.add(entity_id)
|
|
||||||
elif isinstance(payload, dict) and 'id' in payload:
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
ids_to_sync.add(payload['id'])
|
entity_ids.add(payload['id'])
|
||||||
|
|
||||||
context.logger.info(f"{len(ids_to_sync)} IDs zum Delete-Sync gefunden")
|
context.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden")
|
||||||
|
|
||||||
# Redis Verbindung für Deduplizierung
|
# Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||||
redis_client = redis.Redis(
|
for entity_id in entity_ids:
|
||||||
host=Config.REDIS_HOST,
|
await context.emit({
|
||||||
port=int(Config.REDIS_PORT),
|
'topic': 'vmh.beteiligte.delete',
|
||||||
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
'data': {
|
||||||
decode_responses=True
|
'entity_id': entity_id,
|
||||||
)
|
'action': 'delete',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
pending_key = 'vmh:beteiligte:pending_delete'
|
context.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert")
|
||||||
existing_ids = redis_client.smembers(pending_key)
|
|
||||||
new_ids = ids_to_sync - set(existing_ids)
|
|
||||||
|
|
||||||
if new_ids:
|
|
||||||
redis_client.sadd(pending_key, *new_ids)
|
|
||||||
context.logger.info(f"{len(new_ids)} neue IDs zur Delete-Queue hinzugefügt")
|
|
||||||
|
|
||||||
for entity_id in new_ids:
|
|
||||||
await context.emit({
|
|
||||||
'topic': 'vmh.beteiligte.delete',
|
|
||||||
'data': {
|
|
||||||
'entity_id': entity_id,
|
|
||||||
'action': 'delete',
|
|
||||||
'source': 'webhook',
|
|
||||||
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 200,
|
'status': 200,
|
||||||
'body': {
|
'body': {
|
||||||
'status': 'received',
|
'status': 'received',
|
||||||
'action': 'delete',
|
'action': 'delete',
|
||||||
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0
|
'ids_count': len(entity_ids)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,45 @@
|
|||||||
from typing import Any, Dict, Set
|
|
||||||
import json
|
import json
|
||||||
import redis
|
|
||||||
from config import Config
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
def should_skip_update(entity_data):
|
||||||
|
"""
|
||||||
|
Prüft ob Update gefiltert werden soll (verhindert Webhook-Loop)
|
||||||
|
|
||||||
|
SKIP wenn:
|
||||||
|
- Nur Sync-Felder geändert UND
|
||||||
|
- syncStatus ist "clean" oder "syncing" (normale Sync-Completion)
|
||||||
|
|
||||||
|
EMIT wenn:
|
||||||
|
- Echte Datenänderung (nicht nur Sync-Felder) ODER
|
||||||
|
- syncStatus ist "dirty", "failed", "pending_sync" (braucht Sync)
|
||||||
|
"""
|
||||||
|
if not isinstance(entity_data, dict):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Felder die von Sync-Handler gesetzt werden
|
||||||
|
sync_fields = {'syncStatus', 'advowareLastSync', 'syncErrorMessage', 'syncRetryCount'}
|
||||||
|
# Meta-Felder die immer vorhanden sind
|
||||||
|
meta_fields = {'id', 'modifiedAt', 'modifiedById', 'modifiedByName'}
|
||||||
|
# Alle ignorierbaren Felder
|
||||||
|
ignorable = sync_fields | meta_fields
|
||||||
|
|
||||||
|
# Prüfe ob es relevante (nicht-sync) Felder gibt
|
||||||
|
entity_keys = set(entity_data.keys())
|
||||||
|
relevant_keys = entity_keys - ignorable
|
||||||
|
|
||||||
|
# Wenn echte Datenänderung → Emit (nicht skippen)
|
||||||
|
if len(relevant_keys) > 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Nur Sync-Felder vorhanden → Prüfe syncStatus
|
||||||
|
sync_status = entity_data.get('syncStatus')
|
||||||
|
|
||||||
|
# Skip nur wenn Status "clean" oder "syncing" (normale Completion)
|
||||||
|
# Emit wenn "dirty", "failed", "pending_sync" (braucht Sync)
|
||||||
|
should_skip = sync_status in ['clean', 'syncing']
|
||||||
|
|
||||||
|
return should_skip
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
'type': 'api',
|
'type': 'api',
|
||||||
'name': 'VMH Webhook Beteiligte Update',
|
'name': 'VMH Webhook Beteiligte Update',
|
||||||
@@ -16,71 +52,56 @@ config = {
|
|||||||
|
|
||||||
async def handler(req, context):
|
async def handler(req, context):
|
||||||
try:
|
try:
|
||||||
# Payload aus dem Request-Body holen
|
|
||||||
payload = req.get('body', [])
|
payload = req.get('body', [])
|
||||||
|
|
||||||
# Detailliertes Logging
|
|
||||||
context.logger.info("VMH Webhook Beteiligte Update empfangen")
|
context.logger.info("VMH Webhook Beteiligte Update empfangen")
|
||||||
context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}")
|
|
||||||
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
# Sammle alle IDs aus dem Batch
|
# Sammle alle IDs aus dem Batch (filtere Sync-Only-Updates)
|
||||||
ids_to_sync: Set[str] = set()
|
entity_ids = set()
|
||||||
|
filtered_count = 0
|
||||||
|
|
||||||
if isinstance(payload, list):
|
if isinstance(payload, list):
|
||||||
for entity in payload:
|
for entity in payload:
|
||||||
if isinstance(entity, dict) and 'id' in entity:
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
entity_id = entity['id']
|
# Prüfe ob Update gefiltert werden soll (verhindert Loop)
|
||||||
ids_to_sync.add(entity_id)
|
if should_skip_update(entity):
|
||||||
context.logger.info(f"Update Entity ID gefunden: {entity_id}")
|
context.logger.info(f"Sync-Completion gefiltert: {entity['id']} (syncStatus={entity.get('syncStatus')})")
|
||||||
|
filtered_count += 1
|
||||||
|
continue
|
||||||
|
entity_ids.add(entity['id'])
|
||||||
elif isinstance(payload, dict) and 'id' in payload:
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
ids_to_sync.add(payload['id'])
|
if not should_skip_update(payload):
|
||||||
context.logger.info(f"Update Single Entity ID gefunden: {payload['id']}")
|
entity_ids.add(payload['id'])
|
||||||
|
else:
|
||||||
|
context.logger.info(f"Sync-Completion gefiltert: {payload['id']} (syncStatus={payload.get('syncStatus')})")
|
||||||
|
filtered_count += 1
|
||||||
|
|
||||||
context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Update-Sync gefunden")
|
if filtered_count > 0:
|
||||||
|
context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden ({filtered_count} Sync-Completions gefiltert)")
|
||||||
# Redis Verbindung für Deduplizierung
|
|
||||||
redis_client = redis.Redis(
|
|
||||||
host=Config.REDIS_HOST,
|
|
||||||
port=int(Config.REDIS_PORT),
|
|
||||||
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
|
||||||
decode_responses=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# Deduplizierung: Prüfe welche IDs schon in der Queue sind
|
|
||||||
pending_key = 'vmh:beteiligte:update_pending'
|
|
||||||
existing_ids = redis_client.smembers(pending_key)
|
|
||||||
new_ids = ids_to_sync - set(existing_ids)
|
|
||||||
|
|
||||||
if new_ids:
|
|
||||||
# Füge neue IDs zur Pending-Queue hinzu
|
|
||||||
redis_client.sadd(pending_key, *new_ids)
|
|
||||||
context.logger.info(f"{len(new_ids)} neue IDs zur Update-Sync-Queue hinzugefügt: {list(new_ids)}")
|
|
||||||
|
|
||||||
# Emittiere Events für neue IDs
|
|
||||||
for entity_id in new_ids:
|
|
||||||
await context.emit({
|
|
||||||
'topic': 'vmh.beteiligte.update',
|
|
||||||
'data': {
|
|
||||||
'entity_id': entity_id,
|
|
||||||
'action': 'update',
|
|
||||||
'source': 'webhook',
|
|
||||||
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
context.logger.info(f"Update-Event emittiert für ID: {entity_id}")
|
|
||||||
else:
|
else:
|
||||||
context.logger.info("Keine neuen IDs zum Update-Sync gefunden")
|
context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
|
||||||
|
|
||||||
context.logger.info("VMH Update Webhook erfolgreich verarbeitet")
|
# Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||||
|
for entity_id in entity_ids:
|
||||||
|
await context.emit({
|
||||||
|
'topic': 'vmh.beteiligte.update',
|
||||||
|
'data': {
|
||||||
|
'entity_id': entity_id,
|
||||||
|
'action': 'update',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
context.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 200,
|
'status': 200,
|
||||||
'body': {
|
'body': {
|
||||||
'status': 'received',
|
'status': 'received',
|
||||||
'action': 'update',
|
'action': 'update',
|
||||||
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0,
|
'ids_count': len(entity_ids)
|
||||||
'total_ids_in_batch': len(ids_to_sync)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user