Vollständige Advoware-EspoCRM Integration implementiert
- Advoware API Proxy für alle HTTP-Methoden (GET/POST/PUT/DELETE) - EspoCRM Webhook-Receiver für Beteiligte CRUD-Operationen - Redis-basierte Deduplikation für Webhook-Events - Event-driven Synchronisations-Handler (Placeholder) - Detaillierte README.md mit Setup und Verwendungsanleitung - Fehlerbehebungen für Context-Attribute und Redis-Verbindungen
This commit is contained in:
225
bitbylaw/README.md
Normal file
225
bitbylaw/README.md
Normal file
@@ -0,0 +1,225 @@
|
|||||||
|
# Motia Advoware-EspoCRM Integration
|
||||||
|
|
||||||
|
Dieses Projekt implementiert eine robuste Integration zwischen Advoware und EspoCRM über das Motia-Framework. Es bietet eine vollständige API-Proxy für Advoware und Webhook-Handler für EspoCRM, um Änderungen an Beteiligte-Entitäten zu synchronisieren.
|
||||||
|
|
||||||
|
## Übersicht
|
||||||
|
|
||||||
|
Das System besteht aus drei Hauptkomponenten:
|
||||||
|
|
||||||
|
1. **Advoware API Proxy**: Vollständige REST-API-Proxy für alle HTTP-Methoden (GET, POST, PUT, DELETE)
|
||||||
|
2. **EspoCRM Webhook Receiver**: Empfängt Webhooks für CRUD-Operationen auf Beteiligte-Entitäten
|
||||||
|
3. **Event-Driven Sync**: Verarbeitet Synchronisationsereignisse mit Redis-basierter Deduplikation
|
||||||
|
|
||||||
|
## Architektur
|
||||||
|
|
||||||
|
### Komponenten
|
||||||
|
|
||||||
|
- **Motia Framework**: Event-driven Backend-Orchestrierung
|
||||||
|
- **Python Steps**: Asynchrone Verarbeitung mit aiohttp und redis-py
|
||||||
|
- **Advoware API Client**: Authentifizierte API-Kommunikation mit Token-Management
|
||||||
|
- **Redis**: Deduplikation von Webhook-Events und Caching
|
||||||
|
- **EspoCRM Integration**: Webhook-Handler für create/update/delete Operationen
|
||||||
|
|
||||||
|
### Datenfluss
|
||||||
|
|
||||||
|
```
|
||||||
|
EspoCRM Webhook → VMH Webhook Receiver → Redis Deduplication → Event Emission → Sync Handler
|
||||||
|
Advoware API → Proxy Steps → Response
|
||||||
|
```
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
### Voraussetzungen
|
||||||
|
|
||||||
|
- Python 3.13+
|
||||||
|
- Node.js 18+
|
||||||
|
- Redis Server
|
||||||
|
- Motia CLI
|
||||||
|
|
||||||
|
### Installation
|
||||||
|
|
||||||
|
1. **Repository klonen und Dependencies installieren:**
|
||||||
|
```bash
|
||||||
|
cd /opt/motia-app/bitbylaw
|
||||||
|
npm install
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Umgebungsvariablen konfigurieren:**
|
||||||
|
Erstellen Sie eine `.env`-Datei mit folgenden Variablen:
|
||||||
|
```env
|
||||||
|
ADVOWARE_BASE_URL=https://api.advoware.com
|
||||||
|
ADVOWARE_USERNAME=your_username
|
||||||
|
ADVOWARE_PASSWORD=your_password
|
||||||
|
REDIS_URL=redis://localhost:6379
|
||||||
|
ESPOCRM_WEBHOOK_SECRET=your_webhook_secret
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Redis starten:**
|
||||||
|
```bash
|
||||||
|
redis-server
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **Motia starten:**
|
||||||
|
```bash
|
||||||
|
motia start
|
||||||
|
```
|
||||||
|
|
||||||
|
## Verwendung
|
||||||
|
|
||||||
|
### Advoware API Proxy
|
||||||
|
|
||||||
|
Die Proxy-Endpunkte spiegeln die Advoware-API wider:
|
||||||
|
|
||||||
|
- `GET /api/advoware/*` - Daten abrufen
|
||||||
|
- `POST /api/advoware/*` - Neue Ressourcen erstellen
|
||||||
|
- `PUT /api/advoware/*` - Ressourcen aktualisieren
|
||||||
|
- `DELETE /api/advoware/*` - Ressourcen löschen
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
```bash
|
||||||
|
curl -X GET "http://localhost:3000/api/advoware/employees"
|
||||||
|
```
|
||||||
|
|
||||||
|
### EspoCRM Webhooks
|
||||||
|
|
||||||
|
Webhooks werden automatisch von EspoCRM gesendet für Änderungen an Beteiligte-Entitäten:
|
||||||
|
|
||||||
|
- **Create**: `/webhooks/vmh/beteiligte/create`
|
||||||
|
- **Update**: `/webhooks/vmh/beteiligte/update`
|
||||||
|
- **Delete**: `/webhooks/vmh/beteiligte/delete`
|
||||||
|
|
||||||
|
### Synchronisation
|
||||||
|
|
||||||
|
Die Synchronisation läuft event-driven ab:
|
||||||
|
|
||||||
|
1. Webhook-Events werden in Redis-Queues dedupliziert
|
||||||
|
2. Events werden an den Sync-Handler emittiert
|
||||||
|
3. Sync-Handler verarbeitet die Änderungen (aktuell Placeholder)
|
||||||
|
|
||||||
|
## Konfiguration
|
||||||
|
|
||||||
|
### Motia Workbench
|
||||||
|
|
||||||
|
Die Flows sind in `motia-workbench.json` definiert:
|
||||||
|
|
||||||
|
- `advoware-proxy`: API-Proxy-Flows
|
||||||
|
- `vmh-webhook`: Webhook-Receiver-Flows
|
||||||
|
- `beteiligte-sync`: Synchronisations-Flow
|
||||||
|
|
||||||
|
### Redis Keys
|
||||||
|
|
||||||
|
- `vmh:webhook:create`: Create-Event Queue
|
||||||
|
- `vmh:webhook:update`: Update-Event Queue
|
||||||
|
- `vmh:webhook:delete`: Delete-Event Queue
|
||||||
|
|
||||||
|
## Entwicklung
|
||||||
|
|
||||||
|
### Projektstruktur
|
||||||
|
|
||||||
|
```
|
||||||
|
bitbylaw/
|
||||||
|
├── steps/
|
||||||
|
│ ├── advoware_proxy/ # API Proxy Steps
|
||||||
|
│ │ ├── advoware_api_proxy_get_step.py
|
||||||
|
│ │ ├── advoware_api_proxy_post_step.py
|
||||||
|
│ │ ├── advoware_api_proxy_put_step.py
|
||||||
|
│ │ └── advoware_api_proxy_delete_step.py
|
||||||
|
│ └── vmh/
|
||||||
|
│ ├── webhook/ # Webhook Receiver Steps
|
||||||
|
│ │ ├── beteiligte_create_api_step.py
|
||||||
|
│ │ ├── beteiligte_update_api_step.py
|
||||||
|
│ │ └── beteiligte_delete_api_step.py
|
||||||
|
│ └── beteiligte_sync_event_step.py # Sync Handler
|
||||||
|
├── services/
|
||||||
|
│ └── advoware.py # API Client
|
||||||
|
├── config.py # Configuration
|
||||||
|
├── motia-workbench.json # Flow Definitions
|
||||||
|
├── package.json
|
||||||
|
├── requirements.txt
|
||||||
|
└── tsconfig.json
|
||||||
|
```
|
||||||
|
|
||||||
|
### Testing
|
||||||
|
|
||||||
|
**API Proxy testen:**
|
||||||
|
```bash
|
||||||
|
curl -X GET "http://localhost:3000/api/advoware/employees"
|
||||||
|
```
|
||||||
|
|
||||||
|
**Webhook simulieren:**
|
||||||
|
```bash
|
||||||
|
curl -X POST "http://localhost:3000/webhooks/vmh/beteiligte/create" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"id": "123", "name": "Test Beteiligte"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Logging
|
||||||
|
|
||||||
|
Alle Steps enthalten detaillierte Logging-Ausgaben für Debugging:
|
||||||
|
|
||||||
|
- API-Requests/Responses
|
||||||
|
- Redis-Operationen
|
||||||
|
- Event-Emission
|
||||||
|
- Fehlerbehandlung
|
||||||
|
|
||||||
|
## Deployment
|
||||||
|
|
||||||
|
### Docker
|
||||||
|
|
||||||
|
```dockerfile
|
||||||
|
FROM python:3.13-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install -r requirements.txt
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
EXPOSE 3000
|
||||||
|
|
||||||
|
CMD ["motia", "start"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Production Setup
|
||||||
|
|
||||||
|
1. Redis Cluster für Hochverfügbarkeit
|
||||||
|
2. Load Balancer für API-Endpunkte
|
||||||
|
3. Monitoring für Sync-Operationen
|
||||||
|
4. Backup-Strategie für Redis-Daten
|
||||||
|
|
||||||
|
## Fehlerbehebung
|
||||||
|
|
||||||
|
### Häufige Probleme
|
||||||
|
|
||||||
|
1. **Context Attribute Error**: Verwenden Sie `Config` statt `context.config`
|
||||||
|
2. **Redis Connection Failed**: Überprüfen Sie Redis-URL und Netzwerkverbindung
|
||||||
|
3. **Webhook Duplikate**: Redis-Deduplikation verhindert Mehrfachverarbeitung
|
||||||
|
|
||||||
|
### Logs überprüfen
|
||||||
|
|
||||||
|
```bash
|
||||||
|
motia logs
|
||||||
|
```
|
||||||
|
|
||||||
|
## Erweiterungen
|
||||||
|
|
||||||
|
### Geplante Features
|
||||||
|
|
||||||
|
- Vollständige EspoCRM-API-Integration im Sync-Handler
|
||||||
|
- Retry-Logic für fehlgeschlagene Syncs
|
||||||
|
- Metriken und Alerting
|
||||||
|
- Batch-Verarbeitung für große Datenmengen
|
||||||
|
|
||||||
|
### API Erweiterungen
|
||||||
|
|
||||||
|
- Zusätzliche Advoware-Endpunkte
|
||||||
|
- Mehr EspoCRM-Entitäten
|
||||||
|
- Custom Mapping-Regeln
|
||||||
|
|
||||||
|
## Lizenz
|
||||||
|
|
||||||
|
[License Information]
|
||||||
|
|
||||||
|
## Beitrag
|
||||||
|
|
||||||
|
Bitte erstellen Sie Issues für Bugs oder Feature-Requests. Pull-Requests sind willkommen!
|
||||||
@@ -21,14 +21,59 @@
|
|||||||
"x": 15,
|
"x": 15,
|
||||||
"y": 461,
|
"y": 461,
|
||||||
"sourceHandlePosition": "right"
|
"sourceHandlePosition": "right"
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_put_step.py": {
|
||||||
|
"x": 12,
|
||||||
|
"y": 408
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_get_step.py": {
|
||||||
|
"x": 12,
|
||||||
|
"y": 611
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_delete_step.py": {
|
||||||
|
"x": 0,
|
||||||
|
"y": 814
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "vmh",
|
||||||
|
"config": {
|
||||||
|
"steps/vmh/beteiligte_sync_event_step.py": {
|
||||||
|
"x": 805,
|
||||||
|
"y": 188
|
||||||
|
},
|
||||||
|
"steps/vmh/webhook/beteiligte_update_api_step.py": {
|
||||||
|
"x": 13,
|
||||||
|
"y": 154
|
||||||
|
},
|
||||||
|
"steps/vmh/webhook/beteiligte_delete_api_step.py": {
|
||||||
|
"x": 14,
|
||||||
|
"y": -72
|
||||||
|
},
|
||||||
|
"steps/vmh/webhook/beteiligte_create_api_step.py": {
|
||||||
|
"x": 7,
|
||||||
|
"y": 373
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"id": "advoware",
|
"id": "advoware",
|
||||||
"config": {
|
"config": {
|
||||||
"steps/advoware_api_proxy_step.py": {
|
"steps/advoware_proxy/advoware_api_proxy_put_step.py": {
|
||||||
"x": 0,
|
"x": 400,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_post_step.py": {
|
||||||
|
"x": 200,
|
||||||
|
"y": 0
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_get_step.py": {
|
||||||
|
"x": 12,
|
||||||
|
"y": 406
|
||||||
|
},
|
||||||
|
"steps/advoware_proxy/advoware_api_proxy_delete_step.py": {
|
||||||
|
"x": 600,
|
||||||
"y": 0
|
"y": 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"postinstall": "motia install",
|
"postinstall": "motia install",
|
||||||
"dev": "motia dev",
|
"dev": "motia dev",
|
||||||
"start": "motia start --host 0.0.0.0",
|
"start": ". python_modules/bin/activate && motia start --host 0.0.0.0",
|
||||||
"generate-types": "motia generate-types",
|
"generate-types": "motia generate-types",
|
||||||
"build": "motia build",
|
"build": "motia build",
|
||||||
"clean": "rm -rf dist node_modules python_modules .motia .mermaid"
|
"clean": "rm -rf dist node_modules python_modules .motia .mermaid"
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ config = {
|
|||||||
'path': '/advoware/proxy',
|
'path': '/advoware/proxy',
|
||||||
'method': 'DELETE',
|
'method': 'DELETE',
|
||||||
'emits': [],
|
'emits': [],
|
||||||
'flows': ['advoware']
|
'flows': ['basic-tutorial', 'advoware']
|
||||||
}
|
}
|
||||||
|
|
||||||
async def handler(req, context):
|
async def handler(req, context):
|
||||||
@@ -23,7 +23,10 @@ async def handler(req, context):
|
|||||||
json_data = None
|
json_data = None
|
||||||
|
|
||||||
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
||||||
|
context.logger.info(f"Query params: {params}")
|
||||||
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
||||||
|
context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}")
|
||||||
|
context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}")
|
||||||
|
|
||||||
return {'status': 200, 'body': {'result': result}}
|
return {'status': 200, 'body': {'result': result}}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ config = {
|
|||||||
'path': '/advoware/proxy',
|
'path': '/advoware/proxy',
|
||||||
'method': 'GET',
|
'method': 'GET',
|
||||||
'emits': [],
|
'emits': [],
|
||||||
'flows': ['advoware']
|
'flows': ['basic-tutorial', 'advoware']
|
||||||
}
|
}
|
||||||
|
|
||||||
async def handler(req, context):
|
async def handler(req, context):
|
||||||
@@ -23,7 +23,10 @@ async def handler(req, context):
|
|||||||
json_data = None
|
json_data = None
|
||||||
|
|
||||||
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
||||||
|
context.logger.info(f"Query params: {params}")
|
||||||
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
||||||
|
context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}")
|
||||||
|
context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}")
|
||||||
|
|
||||||
return {'status': 200, 'body': {'result': result}}
|
return {'status': 200, 'body': {'result': result}}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ config = {
|
|||||||
'path': '/advoware/proxy',
|
'path': '/advoware/proxy',
|
||||||
'method': 'PUT',
|
'method': 'PUT',
|
||||||
'emits': [],
|
'emits': [],
|
||||||
'flows': ['advoware']
|
'flows': ['basic-tutorial', 'advoware']
|
||||||
}
|
}
|
||||||
|
|
||||||
async def handler(req, context):
|
async def handler(req, context):
|
||||||
@@ -23,7 +23,11 @@ async def handler(req, context):
|
|||||||
json_data = req.get('body')
|
json_data = req.get('body')
|
||||||
|
|
||||||
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
context.logger.info(f"Proxying request to Advoware: {method} {endpoint}")
|
||||||
|
context.logger.info(f"Query params: {params}")
|
||||||
|
context.logger.info(f"Request body: {json_data}")
|
||||||
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
result = await advoware.api_call(endpoint, method=method, params=params, json_data=json_data)
|
||||||
|
context.logger.info(f"Advoware API response received, length: {len(str(result)) if result else 0}")
|
||||||
|
context.logger.info(f"Response preview: {str(result)[:500] if result else 'None'}")
|
||||||
|
|
||||||
return {'status': 200, 'body': {'result': result}}
|
return {'status': 200, 'body': {'result': result}}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
52
bitbylaw/steps/vmh/beteiligte_sync_event_step.py
Normal file
52
bitbylaw/steps/vmh/beteiligte_sync_event_step.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
from services.advoware import AdvowareAPI
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
from config import Config
|
||||||
|
|
||||||
|
config = {
|
||||||
|
'type': 'event',
|
||||||
|
'name': 'VMH Beteiligte Sync',
|
||||||
|
'description': 'Synchronisiert Beteiligte Entities von Advoware nach Änderungen (Create/Update/Delete)',
|
||||||
|
'subscribes': ['vmh.beteiligte.create', 'vmh.beteiligte.update', 'vmh.beteiligte.delete'],
|
||||||
|
'flows': ['vmh'],
|
||||||
|
'emits': []
|
||||||
|
}
|
||||||
|
|
||||||
|
async def handler(event_data, context):
|
||||||
|
try:
|
||||||
|
entity_id = event_data.get('entity_id')
|
||||||
|
action = event_data.get('action', 'unknown')
|
||||||
|
|
||||||
|
if not entity_id:
|
||||||
|
context.logger.error("Keine entity_id im Event gefunden")
|
||||||
|
return
|
||||||
|
|
||||||
|
context.logger.info(f"Starte {action.upper()} Sync für Beteiligte Entity: {entity_id}")
|
||||||
|
|
||||||
|
# Advoware API initialisieren (für später)
|
||||||
|
# advoware = AdvowareAPI(context)
|
||||||
|
|
||||||
|
# PLATZHALTER: Für jetzt nur loggen, keine API-Anfrage
|
||||||
|
context.logger.info(f"PLATZHALTER: {action.upper()} Sync für Entity {entity_id} würde hier Advoware API aufrufen")
|
||||||
|
context.logger.info(f"PLATZHALTER: Entity-Daten würden hier verarbeitet werden")
|
||||||
|
|
||||||
|
# TODO: Hier die Entity in das Zielsystem syncen (EspoCRM?)
|
||||||
|
# Für Create: Neu anlegen
|
||||||
|
# Für Update: Aktualisieren
|
||||||
|
# Für Delete: Löschen
|
||||||
|
|
||||||
|
# Entferne die ID aus der entsprechenden Pending-Queue
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
|
||||||
|
pending_key = f'vmh:beteiligte:{action}_pending'
|
||||||
|
redis_client.srem(pending_key, entity_id)
|
||||||
|
context.logger.info(f"Entity {entity_id} aus {action.upper()}-Pending-Queue entfernt")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
context.logger.error(f"Fehler beim {event_data.get('action', 'unknown').upper()} Sync von Beteiligte Entity: {e}")
|
||||||
|
context.logger.error(f"Event Data: {event_data}")
|
||||||
96
bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py
Normal file
96
bitbylaw/steps/vmh/webhook/beteiligte_create_api_step.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
from typing import Any, Dict, Set
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
from config import Config
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
config = {
|
||||||
|
'type': 'api',
|
||||||
|
'name': 'VMH Webhook Beteiligte Create',
|
||||||
|
'description': 'Empfängt Create-Webhooks von EspoCRM für Beteiligte',
|
||||||
|
'path': '/vmh/webhook/beteiligte/create',
|
||||||
|
'method': 'POST',
|
||||||
|
'flows': ['vmh'],
|
||||||
|
'emits': ['vmh.beteiligte.create']
|
||||||
|
}
|
||||||
|
|
||||||
|
async def handler(req, context):
|
||||||
|
try:
|
||||||
|
# Payload aus dem Request-Body holen
|
||||||
|
payload = req.get('body', [])
|
||||||
|
|
||||||
|
# Detailliertes Logging
|
||||||
|
context.logger.info("VMH Webhook Beteiligte Create empfangen")
|
||||||
|
context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}")
|
||||||
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
|
# Sammle alle IDs aus dem Batch
|
||||||
|
ids_to_sync: Set[str] = set()
|
||||||
|
|
||||||
|
if isinstance(payload, list):
|
||||||
|
for entity in payload:
|
||||||
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
|
entity_id = entity['id']
|
||||||
|
ids_to_sync.add(entity_id)
|
||||||
|
context.logger.info(f"Create Entity ID gefunden: {entity_id}")
|
||||||
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
|
ids_to_sync.add(payload['id'])
|
||||||
|
context.logger.info(f"Create Single Entity ID gefunden: {payload['id']}")
|
||||||
|
|
||||||
|
context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Create-Sync gefunden")
|
||||||
|
|
||||||
|
# Redis Verbindung für Deduplizierung
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deduplizierung: Prüfe welche IDs schon in der Queue sind
|
||||||
|
pending_key = 'vmh:beteiligte:create_pending'
|
||||||
|
existing_ids = redis_client.smembers(pending_key)
|
||||||
|
new_ids = ids_to_sync - set(existing_ids)
|
||||||
|
|
||||||
|
if new_ids:
|
||||||
|
# Füge neue IDs zur Pending-Queue hinzu
|
||||||
|
redis_client.sadd(pending_key, *new_ids)
|
||||||
|
context.logger.info(f"{len(new_ids)} neue IDs zur Create-Sync-Queue hinzugefügt: {list(new_ids)}")
|
||||||
|
|
||||||
|
# Emittiere Events für neue IDs
|
||||||
|
for entity_id in new_ids:
|
||||||
|
await context.emit({
|
||||||
|
'topic': 'vmh.beteiligte.create',
|
||||||
|
'data': {
|
||||||
|
'entity_id': entity_id,
|
||||||
|
'action': 'create',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
context.logger.info(f"Create-Event emittiert für ID: {entity_id}")
|
||||||
|
else:
|
||||||
|
context.logger.info("Keine neuen IDs zum Create-Sync gefunden")
|
||||||
|
|
||||||
|
context.logger.info("VMH Create Webhook erfolgreich verarbeitet")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'status': 200,
|
||||||
|
'body': {
|
||||||
|
'status': 'received',
|
||||||
|
'action': 'create',
|
||||||
|
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0,
|
||||||
|
'total_ids_in_batch': len(ids_to_sync)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
context.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}")
|
||||||
|
context.logger.error(f"Request: {req}")
|
||||||
|
return {
|
||||||
|
'status': 500,
|
||||||
|
'body': {
|
||||||
|
'error': 'Internal server error',
|
||||||
|
'details': str(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
77
bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py
Normal file
77
bitbylaw/steps/vmh/webhook/beteiligte_delete_api_step.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
from typing import Any, Dict, Set
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
from config import Config
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
config = {
|
||||||
|
'type': 'api',
|
||||||
|
'name': 'VMH Webhook Beteiligte Delete',
|
||||||
|
'description': 'Empfängt Delete-Webhooks von EspoCRM für Beteiligte',
|
||||||
|
'path': '/vmh/webhook/beteiligte/delete',
|
||||||
|
'method': 'POST',
|
||||||
|
'flows': ['vmh'],
|
||||||
|
'emits': ['vmh.beteiligte.delete']
|
||||||
|
}
|
||||||
|
|
||||||
|
async def handler(req, context):
|
||||||
|
try:
|
||||||
|
payload = req.get('body', [])
|
||||||
|
|
||||||
|
context.logger.info("VMH Webhook Beteiligte Delete empfangen")
|
||||||
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
|
ids_to_sync: Set[str] = set()
|
||||||
|
|
||||||
|
if isinstance(payload, list):
|
||||||
|
for entity in payload:
|
||||||
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
|
entity_id = entity['id']
|
||||||
|
ids_to_sync.add(entity_id)
|
||||||
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
|
ids_to_sync.add(payload['id'])
|
||||||
|
|
||||||
|
context.logger.info(f"{len(ids_to_sync)} IDs zum Delete-Sync gefunden")
|
||||||
|
|
||||||
|
# Redis Verbindung für Deduplizierung
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
|
||||||
|
pending_key = 'vmh:beteiligte:pending_delete'
|
||||||
|
existing_ids = redis_client.smembers(pending_key)
|
||||||
|
new_ids = ids_to_sync - set(existing_ids)
|
||||||
|
|
||||||
|
if new_ids:
|
||||||
|
redis_client.sadd(pending_key, *new_ids)
|
||||||
|
context.logger.info(f"{len(new_ids)} neue IDs zur Delete-Queue hinzugefügt")
|
||||||
|
|
||||||
|
for entity_id in new_ids:
|
||||||
|
await context.emit({
|
||||||
|
'topic': 'vmh.beteiligte.delete',
|
||||||
|
'data': {
|
||||||
|
'entity_id': entity_id,
|
||||||
|
'action': 'delete',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
'status': 200,
|
||||||
|
'body': {
|
||||||
|
'status': 'received',
|
||||||
|
'action': 'delete',
|
||||||
|
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
context.logger.error(f"Fehler beim Delete-Webhook: {e}")
|
||||||
|
return {
|
||||||
|
'status': 500,
|
||||||
|
'body': {'error': 'Internal server error', 'details': str(e)}
|
||||||
|
}
|
||||||
96
bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py
Normal file
96
bitbylaw/steps/vmh/webhook/beteiligte_update_api_step.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
from typing import Any, Dict, Set
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
from config import Config
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
config = {
|
||||||
|
'type': 'api',
|
||||||
|
'name': 'VMH Webhook Beteiligte Update',
|
||||||
|
'description': 'Empfängt Update-Webhooks von EspoCRM für Beteiligte',
|
||||||
|
'path': '/vmh/webhook/beteiligte/update',
|
||||||
|
'method': 'POST',
|
||||||
|
'flows': ['vmh'],
|
||||||
|
'emits': ['vmh.beteiligte.update']
|
||||||
|
}
|
||||||
|
|
||||||
|
async def handler(req, context):
|
||||||
|
try:
|
||||||
|
# Payload aus dem Request-Body holen
|
||||||
|
payload = req.get('body', [])
|
||||||
|
|
||||||
|
# Detailliertes Logging
|
||||||
|
context.logger.info("VMH Webhook Beteiligte Update empfangen")
|
||||||
|
context.logger.info(f"Headers: {json.dumps(dict(req.get('headers', {})), indent=2)}")
|
||||||
|
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||||
|
|
||||||
|
# Sammle alle IDs aus dem Batch
|
||||||
|
ids_to_sync: Set[str] = set()
|
||||||
|
|
||||||
|
if isinstance(payload, list):
|
||||||
|
for entity in payload:
|
||||||
|
if isinstance(entity, dict) and 'id' in entity:
|
||||||
|
entity_id = entity['id']
|
||||||
|
ids_to_sync.add(entity_id)
|
||||||
|
context.logger.info(f"Update Entity ID gefunden: {entity_id}")
|
||||||
|
elif isinstance(payload, dict) and 'id' in payload:
|
||||||
|
ids_to_sync.add(payload['id'])
|
||||||
|
context.logger.info(f"Update Single Entity ID gefunden: {payload['id']}")
|
||||||
|
|
||||||
|
context.logger.info(f"Insgesamt {len(ids_to_sync)} eindeutige IDs zum Update-Sync gefunden")
|
||||||
|
|
||||||
|
# Redis Verbindung für Deduplizierung
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deduplizierung: Prüfe welche IDs schon in der Queue sind
|
||||||
|
pending_key = 'vmh:beteiligte:update_pending'
|
||||||
|
existing_ids = redis_client.smembers(pending_key)
|
||||||
|
new_ids = ids_to_sync - set(existing_ids)
|
||||||
|
|
||||||
|
if new_ids:
|
||||||
|
# Füge neue IDs zur Pending-Queue hinzu
|
||||||
|
redis_client.sadd(pending_key, *new_ids)
|
||||||
|
context.logger.info(f"{len(new_ids)} neue IDs zur Update-Sync-Queue hinzugefügt: {list(new_ids)}")
|
||||||
|
|
||||||
|
# Emittiere Events für neue IDs
|
||||||
|
for entity_id in new_ids:
|
||||||
|
await context.emit({
|
||||||
|
'topic': 'vmh.beteiligte.update',
|
||||||
|
'data': {
|
||||||
|
'entity_id': entity_id,
|
||||||
|
'action': 'update',
|
||||||
|
'source': 'webhook',
|
||||||
|
'timestamp': req.get('timestamp') or datetime.datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
context.logger.info(f"Update-Event emittiert für ID: {entity_id}")
|
||||||
|
else:
|
||||||
|
context.logger.info("Keine neuen IDs zum Update-Sync gefunden")
|
||||||
|
|
||||||
|
context.logger.info("VMH Update Webhook erfolgreich verarbeitet")
|
||||||
|
|
||||||
|
return {
|
||||||
|
'status': 200,
|
||||||
|
'body': {
|
||||||
|
'status': 'received',
|
||||||
|
'action': 'update',
|
||||||
|
'new_ids_count': len(new_ids) if 'new_ids' in locals() else 0,
|
||||||
|
'total_ids_in_batch': len(ids_to_sync)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
context.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}")
|
||||||
|
context.logger.error(f"Request: {req}")
|
||||||
|
return {
|
||||||
|
'status': 500,
|
||||||
|
'body': {
|
||||||
|
'error': 'Internal server error',
|
||||||
|
'details': str(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
4
bitbylaw/types.d.ts
vendored
4
bitbylaw/types.d.ts
vendored
@@ -12,6 +12,10 @@ declare module 'motia' {
|
|||||||
}
|
}
|
||||||
|
|
||||||
interface Handlers {
|
interface Handlers {
|
||||||
|
'VMH Beteiligte Sync': EventHandler<never, never>
|
||||||
|
'VMH Webhook Beteiligte Update': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.update'; data: never }>
|
||||||
|
'VMH Webhook Beteiligte Delete': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.delete'; data: never }>
|
||||||
|
'VMH Webhook Beteiligte Create': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.create'; data: never }>
|
||||||
'StateAuditJob': CronHandler<{ topic: 'notification'; data: { template_id: string; email: string; template_data: Record<string, unknown> } }>
|
'StateAuditJob': CronHandler<{ topic: 'notification'; data: { template_id: string; email: string; template_data: Record<string, unknown> } }>
|
||||||
'ProcessFoodOrder': EventHandler<{ id: string; email: string; quantity: unknown; pet_id: unknown }, { topic: 'notification'; data: { template_id: string; email: string; template_data: Record<string, unknown> } }>
|
'ProcessFoodOrder': EventHandler<{ id: string; email: string; quantity: unknown; pet_id: unknown }, { topic: 'notification'; data: { template_id: string; email: string; template_data: Record<string, unknown> } }>
|
||||||
'Notification': EventHandler<{ template_id: string; email: string; template_data: Record<string, unknown> }, never>
|
'Notification': EventHandler<{ template_id: string; email: string; template_data: Record<string, unknown> }, never>
|
||||||
|
|||||||
Reference in New Issue
Block a user