From 805d8c7362b8894eb264ad669f88b859b0184360 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 20 Oct 2025 11:42:52 +0000 Subject: [PATCH] =?UTF-8?q?Vollst=C3=A4ndige=20Advoware-EspoCRM=20Integrat?= =?UTF-8?q?ion=20implementiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Advoware API Proxy für alle HTTP-Methoden (GET/POST/PUT/DELETE) - EspoCRM Webhook-Receiver für Beteiligte CRUD-Operationen - Redis-basierte Deduplikation für Webhook-Events - Event-driven Synchronisations-Handler (Placeholder) - Detaillierte README.md mit Setup und Verwendungsanleitung - Fehlerbehebungen für Context-Attribute und Redis-Verbindungen --- bitbylaw/README.md | 225 ++++++++++++++++++ bitbylaw/motia-workbench.json | 49 +++- bitbylaw/package.json | 2 +- .../advoware_api_proxy_delete_step.py | 5 +- .../advoware_api_proxy_get_step.py | 5 +- .../advoware_api_proxy_put_step.py | 6 +- .../steps/vmh/beteiligte_sync_event_step.py | 52 ++++ .../vmh/webhook/beteiligte_create_api_step.py | 96 ++++++++ .../vmh/webhook/beteiligte_delete_api_step.py | 77 ++++++ .../vmh/webhook/beteiligte_update_api_step.py | 96 ++++++++ bitbylaw/types.d.ts | 4 + 11 files changed, 611 insertions(+), 6 deletions(-) create mode 100644 bitbylaw/README.md create mode 100644 bitbylaw/steps/vmh/beteiligte_sync_event_step.py create mode 100644 bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py create mode 100644 bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py create mode 100644 bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py diff --git a/bitbylaw/README.md b/bitbylaw/README.md new file mode 100644 index 00000000..f2ed2a8d --- /dev/null +++ b/bitbylaw/README.md @@ -0,0 +1,225 @@ +# Motia Advoware-EspoCRM Integration + +Dieses Projekt implementiert eine robuste Integration zwischen Advoware und EspoCRM über das Motia-Framework. Es bietet eine vollständige API-Proxy für Advoware und Webhook-Handler für EspoCRM, um Änderungen an Beteiligte-Entitäten zu synchronisieren. + +## Übersicht + +Das System besteht aus drei Hauptkomponenten: + +1. **Advoware API Proxy**: Vollständige REST-API-Proxy für alle HTTP-Methoden (GET, POST, PUT, DELETE) +2. **EspoCRM Webhook Receiver**: Empfängt Webhooks für CRUD-Operationen auf Beteiligte-Entitäten +3. **Event-Driven Sync**: Verarbeitet Synchronisationsereignisse mit Redis-basierter Deduplikation + +## Architektur + +### Komponenten + +- **Motia Framework**: Event-driven Backend-Orchestrierung +- **Python Steps**: Asynchrone Verarbeitung mit aiohttp und redis-py +- **Advoware API Client**: Authentifizierte API-Kommunikation mit Token-Management +- **Redis**: Deduplikation von Webhook-Events und Caching +- **EspoCRM Integration**: Webhook-Handler für create/update/delete Operationen + +### Datenfluss + +``` +EspoCRM Webhook → VMH Webhook Receiver → Redis Deduplication → Event Emission → Sync Handler +Advoware API → Proxy Steps → Response +``` + +## Setup + +### Voraussetzungen + +- Python 3.13+ +- Node.js 18+ +- Redis Server +- Motia CLI + +### Installation + +1. **Repository klonen und Dependencies installieren:** + ```bash + cd /opt/motia-app/bitbylaw + npm install + pip install -r requirements.txt + ``` + +2. **Umgebungsvariablen konfigurieren:** + Erstellen Sie eine `.env`-Datei mit folgenden Variablen: + ```env + ADVOWARE_BASE_URL=https://api.advoware.com + ADVOWARE_USERNAME=your_username + ADVOWARE_PASSWORD=your_password + REDIS_URL=redis://localhost:6379 + ESPOCRM_WEBHOOK_SECRET=your_webhook_secret + ``` + +3. **Redis starten:** + ```bash + redis-server + ``` + +4. **Motia starten:** + ```bash + motia start + ``` + +## Verwendung + +### Advoware API Proxy + +Die Proxy-Endpunkte spiegeln die Advoware-API wider: + +- `GET /api/advoware/*` - Daten abrufen +- `POST /api/advoware/*` - Neue Ressourcen erstellen +- `PUT /api/advoware/*` - Ressourcen aktualisieren +- `DELETE /api/advoware/*` - Ressourcen löschen + +**Beispiel:** +```bash +curl -X GET "http://localhost:3000/api/advoware/employees" +``` + +### EspoCRM Webhooks + +Webhooks werden automatisch von EspoCRM gesendet für Änderungen an Beteiligte-Entitäten: + +- **Create**: `/webhooks/vmh/beteiligte/create` +- **Update**: `/webhooks/vmh/beteiligte/update` +- **Delete**: `/webhooks/vmh/beteiligte/delete` + +### Synchronisation + +Die Synchronisation läuft event-driven ab: + +1. Webhook-Events werden in Redis-Queues dedupliziert +2. Events werden an den Sync-Handler emittiert +3. Sync-Handler verarbeitet die Änderungen (aktuell Placeholder) + +## Konfiguration + +### Motia Workbench + +Die Flows sind in `motia-workbench.json` definiert: + +- `advoware-proxy`: API-Proxy-Flows +- `vmh-webhook`: Webhook-Receiver-Flows +- `beteiligte-sync`: Synchronisations-Flow + +### Redis Keys + +- `vmh:webhook:create`: Create-Event Queue +- `vmh:webhook:update`: Update-Event Queue +- `vmh:webhook:delete`: Delete-Event Queue + +## Entwicklung + +### Projektstruktur + +``` +bitbylaw/ +├── steps/ +│ ├── advoware_proxy/ # API Proxy Steps +│ │ ├── advoware_api_proxy_get_step.py +│ │ ├── advoware_api_proxy_post_step.py +│ │ ├── advoware_api_proxy_put_step.py +│ │ └── advoware_api_proxy_delete_step.py +│ └── vmh/ +│ ├── webhook/ # Webhook Receiver Steps +│ │ ├── beteiligte_create_api_step.py +│ │ ├── beteiligte_update_api_step.py +│ │ └── beteiligte_delete_api_step.py +│ └── beteiligte_sync_event_step.py # Sync Handler +├── services/ +│ └── advoware.py # API Client +├── config.py # Configuration +├── motia-workbench.json # Flow Definitions +├── package.json +├── requirements.txt +└── tsconfig.json +``` + +### Testing + +**API Proxy testen:** +```bash +curl -X GET "http://localhost:3000/api/advoware/employees" +``` + +**Webhook simulieren:** +```bash +curl -X POST "http://localhost:3000/webhooks/vmh/beteiligte/create" \ + -H "Content-Type: application/json" \ + -d '{"id": "123", "name": "Test Beteiligte"}' +``` + +### Logging + +Alle Steps enthalten detaillierte Logging-Ausgaben für Debugging: + +- API-Requests/Responses +- Redis-Operationen +- Event-Emission +- Fehlerbehandlung + +## Deployment + +### Docker + +```dockerfile +FROM python:3.13-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY . . +EXPOSE 3000 + +CMD ["motia", "start"] +``` + +### Production Setup + +1. Redis Cluster für Hochverfügbarkeit +2. Load Balancer für API-Endpunkte +3. Monitoring für Sync-Operationen +4. Backup-Strategie für Redis-Daten + +## Fehlerbehebung + +### Häufige Probleme + +1. **Context Attribute Error**: Verwenden Sie `Config` statt `context.config` +2. **Redis Connection Failed**: Überprüfen Sie Redis-URL und Netzwerkverbindung +3. **Webhook Duplikate**: Redis-Deduplikation verhindert Mehrfachverarbeitung + +### Logs überprüfen + +```bash +motia logs +``` + +## Erweiterungen + +### Geplante Features + +- Vollständige EspoCRM-API-Integration im Sync-Handler +- Retry-Logic für fehlgeschlagene Syncs +- Metriken und Alerting +- Batch-Verarbeitung für große Datenmengen + +### API Erweiterungen + +- Zusätzliche Advoware-Endpunkte +- Mehr EspoCRM-Entitäten +- Custom Mapping-Regeln + +## Lizenz + +[License Information] + +## Beitrag + +Bitte erstellen Sie Issues für Bugs oder Feature-Requests. Pull-Requests sind willkommen! \ No newline at end of file diff --git a/bitbylaw/motia-workbench.json b/bitbylaw/motia-workbench.json index de56bfba..56e9970f 100644 --- a/bitbylaw/motia-workbench.json +++ b/bitbylaw/motia-workbench.json @@ -21,14 +21,59 @@ "x": 15, "y": 461, "sourceHandlePosition": "right" + }, + "steps/advoware_proxy/advoware_api_proxy_put_step.py": { + "x": 12, + "y": 408 + }, + "steps/advoware_proxy/advoware_api_proxy_get_step.py": { + "x": 12, + "y": 611 + }, + "steps/advoware_proxy/advoware_api_proxy_delete_step.py": { + "x": 0, + "y": 814 + } + } + }, + { + "id": "vmh", + "config": { + "steps/vmh/beteiligte_sync_event_step.py": { + "x": 805, + "y": 188 + }, + "steps/vmh/webhook/beteiligte_update_api_step.py": { + "x": 13, + "y": 154 + }, + "steps/vmh/webhook/beteiligte_delete_api_step.py": { + "x": 14, + "y": -72 + }, + "steps/vmh/webhook/beteiligte_create_api_step.py": { + "x": 7, + "y": 373 } } }, { "id": "advoware", "config": { - "steps/advoware_api_proxy_step.py": { - "x": 0, + "steps/advoware_proxy/advoware_api_proxy_put_step.py": { + "x": 400, + "y": 0 + }, + "steps/advoware_proxy/advoware_api_proxy_post_step.py": { + "x": 200, + "y": 0 + }, + "steps/advoware_proxy/advoware_api_proxy_get_step.py": { + "x": 12, + "y": 406 + }, + "steps/advoware_proxy/advoware_api_proxy_delete_step.py": { + "x": 600, "y": 0 } } diff --git a/bitbylaw/package.json b/bitbylaw/package.json index c1f8c23c..d611dbe6 100644 --- a/bitbylaw/package.json +++ b/bitbylaw/package.json @@ -4,7 +4,7 @@ "scripts": { "postinstall": "motia install", "dev": "motia dev", - "start": "motia start --host 0.0.0.0", + "start": ". python_modules/bin/activate && motia start --host 0.0.0.0", "generate-types": "motia generate-types", "build": "motia build", "clean": "rm -rf dist node_modules python_modules .motia .mermaid" diff --git a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_delete_step.py b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_delete_step.py index 8cc1d9b5..35fc2e1f 100644 --- a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_delete_step.py +++ b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_delete_step.py @@ -7,7 +7,7 @@ config = { 'path': '/advoware/proxy', 'method': 'DELETE', 'emits': [], - 'flows': ['advoware'] + 'flows': ['basic-tutorial', 'advoware'] } async def handler(req, context): @@ -23,7 +23,10 @@ async def handler(req, context): json_data = None context.logger.info(f"Proxying request to Advoware: {method} {endpoint}") + context.logger.info(f"Query params: {params}") result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data) + context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}") + context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}") return {'status': 200, 'body': {'result': result}} except Exception as e: diff --git a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_get_step.py b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_get_step.py index 2463f4c8..1b4ca79f 100644 --- a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_get_step.py +++ b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_get_step.py @@ -7,7 +7,7 @@ config = { 'path': '/advoware/proxy', 'method': 'GET', 'emits': [], - 'flows': ['advoware'] + 'flows': ['basic-tutorial', 'advoware'] } async def handler(req, context): @@ -23,7 +23,10 @@ async def handler(req, context): json_data = None context.logger.info(f"Proxying request to Advoware: {method} {endpoint}") + context.logger.info(f"Query params: {params}") result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data) + context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}") + context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}") return {'status': 200, 'body': {'result': result}} except Exception as e: diff --git a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_put_step.py b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_put_step.py index c7b6447b..f13b2d65 100644 --- a/bitbylaw/steps/advoware_proxy/advoware_api_proxy_put_step.py +++ b/bitbylaw/steps/advoware_proxy/advoware_api_proxy_put_step.py @@ -7,7 +7,7 @@ config = { 'path': '/advoware/proxy', 'method': 'PUT', 'emits': [], - 'flows': ['advoware'] + 'flows': ['basic-tutorial', 'advoware'] } async def handler(req, context): @@ -23,7 +23,11 @@ async def handler(req, context): json_data = req.get('body') context.logger.info(f"Proxying request to Advoware: {method} {endpoint}") + context.logger.info(f"Query params: {params}") + context.logger.info(f"Request body: {json_data}") result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data) + context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}") + context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}") return {'status': 200, 'body': {'result': result}} except Exception as e: diff --git a/bitbylaw/steps/vmh/beteiligte_sync_event_step.py b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py new file mode 100644 index 00000000..c932034c --- /dev/null +++ b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py @@ -0,0 +1,52 @@ +from services.advoware import AdvowareAPI +import json +import redis +from config import Config + +config = { + 'type': 'event', + 'name': 'VMH Beteiligte Sync', + 'description': 'Synchronisiert Beteiligte Entities von Advoware nach Änderungen (Create/Update/Delete)', + 'subscribes': ['vmh.beteiligte.create', 'vmh.beteiligte.update', 'vmh.beteiligte.delete'], + 'flows': ['vmh'], + 'emits': [] +} + +async def handler(event_data, context): + try: + entity_id = event_data.get('entity_id') + action = event_data.get('action', 'unknown') + + if not entity_id: + context.logger.error("Keine entity_id im Event gefunden") + return + + context.logger.info(f"Starte {action.upper()} Sync für Beteiligte Entity: {entity_id}") + + # Advoware API initialisieren (für später) + # advoware = AdvowareAPI(context) + + # PLATZHALTER: Für jetzt nur loggen, keine API-Anfrage + context.logger.info(f"PLATZHALTER: {action.upper()} Sync für Entity {entity_id} würde hier Advoware API aufrufen") + context.logger.info(f"PLATZHALTER: Entity-Daten würden hier verarbeitet werden") + + # TODO: Hier die Entity in das Zielsystem syncen (EspoCRM?) + # Für Create: Neu anlegen + # Für Update: Aktualisieren + # Für Delete: Löschen + + # Entferne die ID aus der entsprechenden Pending-Queue + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_ADVOWARE_CACHE), + decode_responses=True + ) + + pending_key = f'vmh:beteiligte:{action}_pending' + redis_client.srem(pending_key, entity_id) + context.logger.info(f"Entity {entity_id} aus {action.upper()}-Pending-Queue entfernt") + + except Exception as e: + context.logger.error(f"Fehler beim {event_data.get('action', 'unknown').upper()} Sync von Beteiligte Entity: {e}") + context.logger.error(f"Event Data: {event_data}") \ No newline at end of file diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py new file mode 100644 index 00000000..7b52ae1b --- /dev/null +++ b/bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py @@ -0,0 +1,96 @@ +from typing import Any, Dict, Set +import json +import redis +from config import Config +import datetime + +config = { + 'type': 'api', + 'name': 'VMH Webhook Beteiligte Create', + 'description': 'Empfängt Create-Webhooks von EspoCRM für Beteiligte', + 'path': '/vmh/webhook/beteiligte/create', + 'method': 'POST', + 'flows': ['vmh'], + 'emits': ['vmh.beteiligte.create'] +} + +async def handler(req, context): + try: + # Payload aus dem Request-Body holen + payload = req.get('body', []) + + # Detailliertes Logging + context.logger.info("VMH Webhook Beteiligte Create empfangen") + context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}") + context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + + # Sammle alle IDs aus dem Batch + ids_to_sync: Set[str] = set() + + if isinstance(payload, list): + for entity in payload: + if isinstance(entity, dict) and 'id' in entity: + entity_id = entity['id'] + ids_to_sync.add(entity_id) + context.logger.info(f"Create Entity ID gefunden: {entity_id}") + elif isinstance(payload, dict) and 'id' in payload: + ids_to_sync.add(payload['id']) + context.logger.info(f"Create Single Entity ID gefunden: {payload['id']}") + + context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Create-Sync gefunden") + + # Redis Verbindung für Deduplizierung + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_ADVOWARE_CACHE), + decode_responses=True + ) + + # Deduplizierung: Prüfe welche IDs schon in der Queue sind + pending_key = 'vmh:beteiligte:create_pending' + existing_ids = redis_client.smembers(pending_key) + new_ids = ids_to_sync - set(existing_ids) + + if new_ids: + # Füge neue IDs zur Pending-Queue hinzu + redis_client.sadd(pending_key, *new_ids) + context.logger.info(f"{len(new_ids)} neue IDs zur Create-Sync-Queue hinzugefügt: {list(new_ids)}") + + # Emittiere Events für neue IDs + for entity_id in new_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.create', + 'data': { + 'entity_id': entity_id, + 'action': 'create', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) + context.logger.info(f"Create-Event emittiert für ID: {entity_id}") + else: + context.logger.info("Keine neuen IDs zum Create-Sync gefunden") + + context.logger.info("VMH Create Webhook erfolgreich verarbeitet") + + return { + 'status': 200, + 'body': { + 'status': 'received', + 'action': 'create', + 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0, + 'total_ids_in_batch': len(ids_to_sync) + } + } + + except Exception as e: + context.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}") + context.logger.error(f"Request: {req}") + return { + 'status': 500, + 'body': { + 'error': 'Internal server error', + 'details': str(e) + } + } diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py new file mode 100644 index 00000000..1f8b2558 --- /dev/null +++ b/bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py @@ -0,0 +1,77 @@ +from typing import Any, Dict, Set +import json +import redis +from config import Config +import datetime + +config = { + 'type': 'api', + 'name': 'VMH Webhook Beteiligte Delete', + 'description': 'Empfängt Delete-Webhooks von EspoCRM für Beteiligte', + 'path': '/vmh/webhook/beteiligte/delete', + 'method': 'POST', + 'flows': ['vmh'], + 'emits': ['vmh.beteiligte.delete'] +} + +async def handler(req, context): + try: + payload = req.get('body', []) + + context.logger.info("VMH Webhook Beteiligte Delete empfangen") + context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + + ids_to_sync: Set[str] = set() + + if isinstance(payload, list): + for entity in payload: + if isinstance(entity, dict) and 'id' in entity: + entity_id = entity['id'] + ids_to_sync.add(entity_id) + elif isinstance(payload, dict) and 'id' in payload: + ids_to_sync.add(payload['id']) + + context.logger.info(f"{len(ids_to_sync)} IDs zum Delete-Sync gefunden") + + # Redis Verbindung für Deduplizierung + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_ADVOWARE_CACHE), + decode_responses=True + ) + + pending_key = 'vmh:beteiligte:pending_delete' + existing_ids = redis_client.smembers(pending_key) + new_ids = ids_to_sync - set(existing_ids) + + if new_ids: + redis_client.sadd(pending_key, *new_ids) + context.logger.info(f"{len(new_ids)} neue IDs zur Delete-Queue hinzugefügt") + + for entity_id in new_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.delete', + 'data': { + 'entity_id': entity_id, + 'action': 'delete', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) + + return { + 'status': 200, + 'body': { + 'status': 'received', + 'action': 'delete', + 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0 + } + } + + except Exception as e: + context.logger.error(f"Fehler beim Delete-Webhook: {e}") + return { + 'status': 500, + 'body': {'error': 'Internal server error', 'details': str(e)} + } \ No newline at end of file diff --git a/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py b/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py new file mode 100644 index 00000000..6c9db992 --- /dev/null +++ b/bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py @@ -0,0 +1,96 @@ +from typing import Any, Dict, Set +import json +import redis +from config import Config +import datetime + +config = { + 'type': 'api', + 'name': 'VMH Webhook Beteiligte Update', + 'description': 'Empfängt Update-Webhooks von EspoCRM für Beteiligte', + 'path': '/vmh/webhook/beteiligte/update', + 'method': 'POST', + 'flows': ['vmh'], + 'emits': ['vmh.beteiligte.update'] +} + +async def handler(req, context): + try: + # Payload aus dem Request-Body holen + payload = req.get('body', []) + + # Detailliertes Logging + context.logger.info("VMH Webhook Beteiligte Update empfangen") + context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}") + context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + + # Sammle alle IDs aus dem Batch + ids_to_sync: Set[str] = set() + + if isinstance(payload, list): + for entity in payload: + if isinstance(entity, dict) and 'id' in entity: + entity_id = entity['id'] + ids_to_sync.add(entity_id) + context.logger.info(f"Update Entity ID gefunden: {entity_id}") + elif isinstance(payload, dict) and 'id' in payload: + ids_to_sync.add(payload['id']) + context.logger.info(f"Update Single Entity ID gefunden: {payload['id']}") + + context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Update-Sync gefunden") + + # Redis Verbindung für Deduplizierung + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_ADVOWARE_CACHE), + decode_responses=True + ) + + # Deduplizierung: Prüfe welche IDs schon in der Queue sind + pending_key = 'vmh:beteiligte:update_pending' + existing_ids = redis_client.smembers(pending_key) + new_ids = ids_to_sync - set(existing_ids) + + if new_ids: + # Füge neue IDs zur Pending-Queue hinzu + redis_client.sadd(pending_key, *new_ids) + context.logger.info(f"{len(new_ids)} neue IDs zur Update-Sync-Queue hinzugefügt: {list(new_ids)}") + + # Emittiere Events für neue IDs + for entity_id in new_ids: + await context.emit({ + 'topic': 'vmh.beteiligte.update', + 'data': { + 'entity_id': entity_id, + 'action': 'update', + 'source': 'webhook', + 'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat() + } + }) + context.logger.info(f"Update-Event emittiert für ID: {entity_id}") + else: + context.logger.info("Keine neuen IDs zum Update-Sync gefunden") + + context.logger.info("VMH Update Webhook erfolgreich verarbeitet") + + return { + 'status': 200, + 'body': { + 'status': 'received', + 'action': 'update', + 'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0, + 'total_ids_in_batch': len(ids_to_sync) + } + } + + except Exception as e: + context.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}") + context.logger.error(f"Request: {req}") + return { + 'status': 500, + 'body': { + 'error': 'Internal server error', + 'details': str(e) + } + } diff --git a/bitbylaw/types.d.ts b/bitbylaw/types.d.ts index 5b2ba16f..6bdc92ee 100644 --- a/bitbylaw/types.d.ts +++ b/bitbylaw/types.d.ts @@ -12,6 +12,10 @@ declare module 'motia' { } interface Handlers { + 'VMH Beteiligte Sync': EventHandler + 'VMH Webhook Beteiligte Update': ApiRouteHandler, unknown, { topic: 'vmh.beteiligte.update'; data: never }> + 'VMH Webhook Beteiligte Delete': ApiRouteHandler, unknown, { topic: 'vmh.beteiligte.delete'; data: never }> + 'VMH Webhook Beteiligte Create': ApiRouteHandler, unknown, { topic: 'vmh.beteiligte.create'; data: never }> 'StateAuditJob': CronHandler<{ topic: 'notification'; data: { template_id: string; email: string; template_data: Record } }> 'ProcessFoodOrder': EventHandler<{ id: string; email: string; quantity: unknown; pet_id: unknown }, { topic: 'notification'; data: { template_id: string; email: string; template_data: Record } }> 'Notification': EventHandler<{ template_id: string; email: string; template_data: Record }, never>