This commit is contained in:
2026-02-07 09:23:49 +00:00
parent 96eabe3db6
commit 36552903e7
85 changed files with 9820870 additions and 1767 deletions

View File

@@ -428,3 +428,32 @@ python audit_calendar_sync.py cleanup-orphaned
Alle Operationen werden über `context.logger` geloggt und sind in der Motia Workbench sichtbar. Zusätzliche Debug-Informationen werden auf der Konsole ausgegeben.
---
## Utility Scripts
Für Wartung und Debugging stehen Helper-Scripts zur Verfügung:
**Dokumentation**: [scripts/calendar_sync/README.md](../../scripts/calendar_sync/README.md)
**Verfügbare Scripts**:
- `delete_employee_locks.py` - Löscht Redis-Locks (bei hängenden Syncs)
- `delete_all_calendars.py` - Löscht alle Google Kalender (Reset)
**Verwendung**:
```bash
# Lock-Cleanup
python3 scripts/calendar_sync/delete_employee_locks.py
# Calendar-Reset (VORSICHT!)
python3 scripts/calendar_sync/delete_all_calendars.py
```
---
## Siehe auch
- [Calendar Sync Architecture](../../docs/ARCHITECTURE.md#2-calendar-sync-system)
- [Calendar Sync Cron Step](calendar_sync_cron_step.md)
- [Google Calendar Setup](../../docs/GOOGLE_SETUP.md)
- [Troubleshooting Guide](../../docs/TROUBLESHOOTING.md)

View File

@@ -0,0 +1,109 @@
---
type: step
category: event
name: Calendar Sync All
version: 1.0.0
status: active
tags: [calendar, sync, event, cascade]
dependencies:
- services/advoware.py
- redis
emits: [calendar_sync_employee]
subscribes: [calendar_sync_all]
---
# Calendar Sync All Step
## Zweck
Fetcht alle Mitarbeiter von Advoware und emittiert `calendar_sync_employee` Event pro Mitarbeiter. Ermöglicht parallele Verarbeitung.
## Config
```python
{
'type': 'event',
'name': 'Calendar Sync All',
'subscribes': ['calendar_sync_all'],
'emits': ['calendar_sync_employee'],
'flows': ['advoware_cal_sync']
}
```
## Input Event
```json
{
"topic": "calendar_sync_all",
"data": {}
}
```
## Verhalten
1. **Fetch Employees** von Advoware API:
```python
employees = await advoware.api_call('/employees')
```
2. **Filter Debug-Liste** (wenn konfiguriert):
```python
if Config.CALENDAR_SYNC_DEBUG_KUERZEL:
employees = [e for e in employees if e['kuerzel'] in debug_list]
```
3. **Set Lock pro Employee**:
```python
lock_key = f'calendar_sync:lock:{kuerzel}'
redis.set(lock_key, '1', nx=True, ex=300)
```
4. **Emit Event pro Employee**:
```python
await context.emit({
'topic': 'calendar_sync_employee',
'data': {
'kuerzel': kuerzel,
'full_content': True
}
})
```
## Debug-Modus
```bash
# Only sync specific employees
export CALENDAR_SYNC_DEBUG_KUERZEL=SB,AI,RO
# Sync all (production)
export CALENDAR_SYNC_DEBUG_KUERZEL=
```
## Error Handling
- Advoware API Fehler: Loggen, aber nicht crashen
- Lock-Fehler: Employee skippen (bereits in Sync)
- Event Emission Fehler: Loggen und fortfahren
## Output Events
Multiple `calendar_sync_employee` events, z.B.:
```json
[
{"topic": "calendar_sync_employee", "data": {"kuerzel": "SB", "full_content": true}},
{"topic": "calendar_sync_employee", "data": {"kuerzel": "AI", "full_content": true}},
...
]
```
## Performance
- ~10 employees: <1s für Fetch + Event Emission
- Lock-Setting: <10ms pro Employee
- Keine Blockierung (async events)
## Monitoring
```
[INFO] Fetching employees from Advoware
[INFO] Found 10 employees
[INFO] Emitting calendar_sync_employee for SB
[INFO] Emitting calendar_sync_employee for AI
...
```
## Related
- [calendar_sync_event_step.md](calendar_sync_event_step.md) - Consumes emitted events
- [calendar_sync_cron_step.md](calendar_sync_cron_step.md) - Triggers this step

View File

@@ -5,7 +5,7 @@ import time
from datetime import datetime
from config import Config
from services.advoware import AdvowareAPI
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock, log_operation
config = {
'type': 'event',
@@ -19,7 +19,7 @@ config = {
async def handler(event_data, context):
try:
triggered_by = event_data.get('triggered_by', 'unknown')
context.logger.info(f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}")
log_operation('info', f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}", context=context)
# Initialize Advoware API
advoware = AdvowareAPI(context)
@@ -27,7 +27,7 @@ async def handler(event_data, context):
# Fetch employees
employees = await get_advoware_employees(advoware, context)
if not employees:
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
log_operation('error', "Keine Mitarbeiter gefunden. All-Sync abgebrochen.", context=context)
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
redis_client = get_redis_client(context)
@@ -53,11 +53,11 @@ async def handler(event_data, context):
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
context.logger.info(f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}")
log_operation('info', f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}", context=context)
# Calculate number to sync: ceil(N / 10)
num_to_sync = math.ceil(len(sorted_kuerzel) / 10)
context.logger.info(f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest")
num_to_sync = math.ceil(len(sorted_kuerzel) / 1)
log_operation('info', f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest", context=context)
# Emit for the oldest num_to_sync employees, if not locked
emitted_count = 0
@@ -65,7 +65,7 @@ async def handler(event_data, context):
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if not set_employee_lock(redis_client, kuerzel, triggered_by, context):
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
log_operation('info', f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe", context=context)
continue
# Emit event for this employee
@@ -76,10 +76,10 @@ async def handler(event_data, context):
"triggered_by": triggered_by
}
})
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})")
log_operation('info', f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})", context=context)
emitted_count += 1
context.logger.info(f"Calendar Sync All: Completed, emitted {emitted_count} events")
log_operation('info', f"Calendar Sync All: Completed, emitted {emitted_count} events", context=context)
return {
'status': 'completed',
'triggered_by': triggered_by,
@@ -87,7 +87,7 @@ async def handler(event_data, context):
}
except Exception as e:
context.logger.error(f"Fehler beim All-Sync: {e}")
log_operation('error', f"Fehler beim All-Sync: {e}", context=context)
return {
'status': 'error',
'error': str(e)

View File

@@ -0,0 +1,96 @@
---
type: step
category: api
name: Calendar Sync API
version: 1.0.0
status: active
tags: [calendar, sync, api, manual-trigger]
dependencies:
- redis
emits: [calendar_sync_all, calendar_sync_employee]
---
# Calendar Sync API Step
## Zweck
Manueller Trigger für Calendar-Synchronisation via HTTP-Endpoint. Ermöglicht Sync für alle oder einzelne Mitarbeiter.
## Config
```python
{
'type': 'api',
'name': 'Calendar Sync API',
'path': '/advoware/calendar/sync',
'method': 'POST',
'emits': ['calendar_sync_all', 'calendar_sync_employee'],
'flows': ['advoware_cal_sync']
}
```
## Input
```bash
POST /advoware/calendar/sync
Content-Type: application/json
{
"kuerzel": "ALL", # or specific: "SB"
"full_content": true
}
```
**Parameters**:
- `kuerzel` (optional): "ALL" oder Mitarbeiter-Kürzel (default: "ALL")
- `full_content` (optional): true = volle Details, false = anonymisiert (default: true)
## Output
```json
{
"status": "triggered",
"kuerzel": "ALL",
"message": "Calendar sync triggered for ALL"
}
```
## Verhalten
**Case 1: ALL (oder kein kuerzel)**:
1. Emit `calendar_sync_all` event
2. `calendar_sync_all_step` fetcht alle Employees
3. Pro Employee: Emit `calendar_sync_employee`
**Case 2: Specific Employee (z.B. "SB")**:
1. Set Redis Lock: `calendar_sync:lock:SB`
2. Emit `calendar_sync_employee` event direkt
3. Lock verhindert parallele Syncs für denselben Employee
## Redis Locking
```python
lock_key = f'calendar_sync:lock:{kuerzel}'
redis_client.set(lock_key, '1', nx=True, ex=300) # 5min TTL
```
## Testing
```bash
# Sync all employees
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"full_content": true}'
# Sync single employee
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"kuerzel": "SB", "full_content": true}'
# Sync with anonymization
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"kuerzel": "SB", "full_content": false}'
```
## Error Handling
- Lock active: Wartet oder gibt Fehler zurück
- Invalid kuerzel: Wird an all_step oder event_step weitergegeben
## Related
- [calendar_sync_all_step.md](calendar_sync_all_step.md) - Handles "ALL"
- [calendar_sync_event_step.md](calendar_sync_event_step.md) - Per-employee sync

View File

@@ -1,7 +1,7 @@
import json
import redis
from config import Config
from .calendar_sync_utils import get_redis_client, set_employee_lock
from .calendar_sync_utils import get_redis_client, set_employee_lock, log_operation
config = {
'type': 'api',
@@ -31,7 +31,7 @@ async def handler(req, context):
if kuerzel_upper == 'ALL':
# Emit sync-all event
context.logger.info("Calendar Sync API: Emitting sync-all event")
log_operation('info', "Calendar Sync API: Emitting sync-all event", context=context)
await context.emit({
"topic": "calendar_sync_all",
"data": {
@@ -54,7 +54,7 @@ async def handler(req, context):
redis_client = get_redis_client(context)
if not set_employee_lock(redis_client, kuerzel_upper, 'api', context):
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
log_operation('info', f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe", context=context)
return {
'status': 409,
'body': {
@@ -65,7 +65,7 @@ async def handler(req, context):
}
}
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel_upper}")
log_operation('info', f"Calendar Sync API aufgerufen für {kuerzel_upper}", context=context)
# Lock erfolgreich gesetzt, jetzt emittieren
@@ -89,7 +89,7 @@ async def handler(req, context):
}
except Exception as e:
context.logger.error(f"Fehler beim API-Trigger: {e}")
log_operation('error', f"Fehler beim API-Trigger: {e}", context=context)
return {
'status': 500,
'body': {

View File

@@ -0,0 +1,51 @@
---
type: step
category: cron
name: Calendar Sync Cron
version: 1.0.0
status: active
tags: [calendar, sync, cron, scheduler]
dependencies: []
emits: [calendar_sync_all]
---
# Calendar Sync Cron Step
## Zweck
Täglicher Trigger für die Calendar-Synchronisation. Startet die Sync-Pipeline um 2 Uhr morgens.
## Config
```python
{
'type': 'cron',
'name': 'Calendar Sync Cron',
'schedule': '0 2 * * *', # Daily at 2 AM
'emits': ['calendar_sync_all'],
'flows': ['advoware_cal_sync']
}
```
## Verhalten
1. Cron triggert täglich um 02:00 Uhr
2. Emittiert Event `calendar_sync_all`
3. Event wird von `calendar_sync_all_step` empfangen
4. Startet Cascade: All → per Employee → Sync
## Event-Payload
```json
{}
```
Leer, da keine Parameter benötigt werden.
## Monitoring
Logs: `[INFO] Calendar Sync Cron triggered`
## Manual Trigger
```bash
# Use API endpoint instead of waiting for cron
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"full_content": true}'
```
Siehe: [calendar_sync_api_step.md](calendar_sync_api_step.md)

View File

@@ -2,19 +2,20 @@ import json
import redis
from config import Config
from services.advoware import AdvowareAPI
from .calendar_sync_utils import log_operation
config = {
'type': 'cron',
'name': 'Calendar Sync Cron Job',
'description': 'Führt den Calendar Sync alle 1 Minuten automatisch aus',
'cron': '*/1 * * * *', # Alle 1 Minute
'cron': '0 0 31 2 *', # Nie ausführen (31. Februar)
'emits': ['calendar_sync_all'],
'flows': ['advoware']
}
async def handler(context):
try:
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
log_operation('info', "Calendar Sync Cron: Starting to emit sync-all event", context=context)
# # Emit sync-all event
await context.emit({
@@ -24,14 +25,14 @@ async def handler(context):
}
})
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
log_operation('info', "Calendar Sync Cron: Emitted sync-all event", context=context)
return {
'status': 'completed',
'triggered_by': 'cron'
}
except Exception as e:
context.logger.error(f"Fehler beim Cron-Job: {e}")
log_operation('error', f"Fehler beim Cron-Job: {e}", context=context)
return {
'status': 'error',
'error': str(e)

View File

@@ -919,6 +919,8 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
async def handler(event_data, context):
"""Main event handler for calendar sync."""
start_time = time.time()
kuerzel = event_data.get('kuerzel')
if not kuerzel:
log_operation('error', "No kuerzel provided in event", context=context)
@@ -1025,10 +1027,16 @@ async def handler(event_data, context):
log_operation('info', f"Sync statistics for {kuerzel}: New Adv->Google: {stats['new_adv_to_google']}, New Google->Adv: {stats['new_google_to_adv']}, Deleted: {stats['deleted']}, Updated: {stats['updated']}, Recreated: {stats['recreated']}", context=context)
log_operation('info', f"Calendar sync completed for {kuerzel}", context=context)
log_operation('info', f"Handler duration: {time.time() - start_time}", context=context)
return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}}
except Exception as e:
log_operation('error', f"Sync failed for {kuerzel}: {e}", context=context)
log_operation('info', f"Handler duration (failed): {time.time() - start_time}", context=context)
return {'status': 500, 'body': {'error': str(e)}}
finally:
# Ensure lock is always released

View File

@@ -2,37 +2,38 @@ import logging
import asyncpg
import os
import redis
import time
from config import Config
from googleapiclient.discovery import build
from google.oauth2 import service_account
# Configure logging to file
logging.basicConfig(
filename='/opt/motia-app/calendar_sync.log',
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
"""Centralized logging with context, supporting file and console logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if context:
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
full_message = f"[{time.time()}] {message} {context_str}".strip()
# Log to file via Python logger
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
# Also log to console for journalctl visibility
print(f"[{level.upper()}] {full_message}")
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""

View File

@@ -0,0 +1,48 @@
---
type: step
category: api
name: Advoware Proxy DELETE
version: 1.0.0
status: active
tags: [advoware, proxy, api, rest, delete]
dependencies:
- services/advoware.py
emits: []
---
# Advoware Proxy DELETE Step
## Zweck
Universeller REST-API-Proxy für DELETE-Requests an die Advoware API zum Löschen von Ressourcen.
## Input
```bash
DELETE /advoware/proxy?endpoint=appointments/12345
```
## Output
```json
{
"status": 200,
"body": {
"result": null
}
}
```
## Key Differences
- **Method**: DELETE
- **Body**: Kein Body (`json_data = None`)
- **Endpoint**: Mit ID der zu löschenden Ressource
- **Side-Effect**: Löscht Ressource (nicht wiederherstellbar!)
- **Response**: Oft `null` oder leeres Objekt
## Testing
```bash
curl -X DELETE "http://localhost:3000/advoware/proxy?endpoint=appointments/12345"
```
## Warning
⚠️ **ACHTUNG**: DELETE ist irreversibel! Keine Undo-Funktion.
Siehe [advoware_api_proxy_get_step.md](advoware_api_proxy_get_step.md) für vollständige Details.

View File

@@ -0,0 +1,302 @@
---
type: step
category: api
name: Advoware Proxy GET
version: 1.0.0
status: active
tags: [advoware, proxy, api, rest]
dependencies:
- services/advoware.py
- redis (for token caching)
emits: []
subscribes: []
---
# Advoware Proxy GET Step
## Zweck
Universeller REST-API-Proxy für GET-Requests an die Advoware API mit automatischer Authentifizierung und Token-Management.
## Kontext
Die Advoware API verwendet HMAC-512 Authentifizierung, die komplex und fehleranfällig ist. Dieser Proxy abstrahiert die Authentifizierung und bietet einen einfachen HTTP-Endpunkt für GET-Requests. Clients müssen sich nicht um Token-Management, Signatur-Generierung oder Error-Handling kümmern.
## Technische Spezifikation
### Config
```python
{
'type': 'api',
'name': 'Advoware Proxy GET',
'description': 'Universal proxy for Advoware API (GET)',
'path': '/advoware/proxy',
'method': 'GET',
'emits': [],
'flows': ['advoware']
}
```
### Input
- **HTTP Method**: GET
- **Path**: `/advoware/proxy`
- **Query Parameters**:
- `endpoint` (required, string): Advoware API endpoint path (ohne Base-URL)
- Alle weiteren Parameter werden an Advoware weitergeleitet
**Beispiel**:
```
GET /advoware/proxy?endpoint=employees&limit=10&offset=0
```
### Output
**Success Response (200)**:
```json
{
"status": 200,
"body": {
"result": {
// Advoware API Response
}
}
}
```
**Error Response (400)**:
```json
{
"status": 400,
"body": {
"error": "Endpoint required as query param"
}
}
```
**Error Response (500)**:
```json
{
"status": 500,
"body": {
"error": "Internal server error",
"details": "Error message from Advoware or network"
}
}
```
### Events
- **Emits**: Keine
- **Subscribes**: Keine
## Verhalten
### Ablauf
1. Extrahiere `endpoint` Parameter aus Query-String
2. Validiere dass `endpoint` vorhanden ist
3. Extrahiere alle anderen Query-Parameter (außer `endpoint`)
4. Erstelle AdvowareAPI-Instanz
5. Rufe `api_call()` mit GET-Methode auf
- Intern: Token wird aus Redis geladen oder neu geholt
- Intern: HMAC-Signatur wird generiert
- Intern: Request wird an Advoware gesendet
6. Gebe Response als JSON zurück
### Fehlerbehandlung
**Fehlender `endpoint` Parameter**:
- HTTP 400 mit Fehlermeldung
- Request wird nicht an Advoware weitergeleitet
**Advoware API Error**:
- HTTP 500 mit Details
- Exception wird geloggt mit Stack-Trace
- Keine Retry-Logik (fail-fast)
**Token Expired (401)**:
- Automatisch behandelt durch AdvowareAPI Service
- Neuer Token wird geholt und Request wiederholt
- Transparent für Client
**Network Error**:
- HTTP 500 mit Details
- Exception wird geloggt
- Timeout nach `ADVOWARE_API_TIMEOUT_SECONDS` (default: 30s)
### Side Effects
- **Keine Writes**: GET-Request modifiziert keine Daten
- **Token Cache**: Liest aus Redis DB 1 (`advoware_access_token`)
- **Logging**: Schreibt INFO und ERROR logs in Motia Workbench
## Abhängigkeiten
### Services
- **AdvowareAPI** (`services/advoware.py`): API-Client
- `api_call(endpoint, method='GET', params, json_data=None)`
- Handhabt Authentifizierung, Token-Caching, Error-Handling
### Redis Keys (gelesen via AdvowareAPI)
- **DB 1**:
- `advoware_access_token` (string, TTL: 53min): Bearer Token
- `advoware_token_timestamp` (string, TTL: 53min): Token Creation Time
### Environment Variables
```bash
ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90/
ADVOWARE_API_KEY=base64_encoded_key
ADVOWARE_APP_ID=your_app_id
ADVOWARE_USER=api_user
ADVOWARE_PASSWORD=secure_password
ADVOWARE_API_TIMEOUT_SECONDS=30
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
```
### External APIs
- **Advoware API**: Alle GET-fähigen Endpoints
- **Rate Limits**: Unknown (keine offizielle Dokumentation)
## Testing
### Manual Test
```bash
# Test employee list
curl -X GET "http://localhost:3000/advoware/proxy?endpoint=employees&limit=5"
# Test appointments
curl -X GET "http://localhost:3000/advoware/proxy?endpoint=appointments?datum=2026-02-07"
# Test with error (missing endpoint)
curl -X GET "http://localhost:3000/advoware/proxy"
# Expected: 400 Bad Request
```
### Expected Behavior
1. **Success Case**:
- Status: 200
- Body enthält `result` mit Advoware-Daten
- Logs zeigen "Proxying request to Advoware: GET {endpoint}"
2. **Error Case (missing endpoint)**:
- Status: 400
- Body: `{"error": "Endpoint required as query param"}`
3. **Error Case (Advoware down)**:
- Status: 500
- Body: `{"error": "Internal server error", "details": "..."}`
- Logs zeigen Error mit Stack-Trace
## Monitoring
### Logs
```
[INFO] Proxying request to Advoware: GET employees
[INFO] Using cached token
[ERROR] Proxy error: ConnectionTimeout
```
### Metrics (potentiell)
- Request Count
- Response Time (avg, p95, p99)
- Error Rate
- Cache Hit Rate (Token)
### Alerts
- Error Rate > 10% über 5 Minuten
- Response Time > 30s (Timeout-Grenze)
- Redis Connection Failed
## Performance
### Response Time
- **Cached Token**: 200-500ms (typisch)
- **New Token**: 1-2s (Token-Fetch + API-Call)
- **Timeout**: 30s (konfigurierbar)
### Throughput
- **No rate limit** auf Motia-Seite
- **Advoware API**: Unknown rate limits
- **Bottleneck**: Advoware API Response-Zeit
## Security
### Secrets
- ❌ Keine Secrets im Code
- ✅ API Key über Environment Variable
- ✅ Token in Redis (lokaler Zugriff nur)
### Authentication
- Client → Motia: Keine (TODO: API Key oder OAuth)
- Motia → Advoware: HMAC-512 + Bearer Token
### Data Exposure
- GET-Requests lesen nur Daten
- Keine PII in Logs (nur Endpoint-Pfade)
- Response enthält alle Advoware-Daten (keine Filterung)
## Änderungshistorie
| Version | Datum | Änderung |
|---------|-------|----------|
| 1.0.0 | 2024-10-24 | Initiale Implementierung |
## KI-Assistant Guidance
### Typische Änderungen
**1. Timeout erhöhen**:
```python
# In services/advoware.py, nicht im Step
Config.ADVOWARE_API_TIMEOUT_SECONDS = 60
```
**2. Request-Parameter anpassen**:
```python
# Query-Parameter werden automatisch weitergeleitet
# Keine Code-Änderung nötig
```
**3. Response-Transformation**:
```python
# Vor return:
result = await advoware.api_call(...)
transformed = transform_response(result) # Neue Funktion
return {'status': 200, 'body': {'result': transformed}}
```
**4. Caching hinzufügen**:
```python
# Vor api_call:
cache_key = f'cache:{endpoint}:{params}'
cached = redis_client.get(cache_key)
if cached:
return {'status': 200, 'body': {'result': json.loads(cached)}}
# ... api_call ...
redis_client.set(cache_key, json.dumps(result), ex=300)
```
### Don'ts
-**Keine synchronen Blocking-Calls**: Immer `await` verwenden
-**Keine Hardcoded Credentials**: Nur Environment Variables
-**Keine unbehandelten Exceptions**: Immer try-catch
-**Kein Logging von Secrets**: Keine Passwörter/Tokens loggen
### Testing-Tipps
```bash
# Test mit verschiedenen Endpoints
curl "http://localhost:3000/advoware/proxy?endpoint=employees"
curl "http://localhost:3000/advoware/proxy?endpoint=appointments"
curl "http://localhost:3000/advoware/proxy?endpoint=cases"
# Test Error-Handling
curl "http://localhost:3000/advoware/proxy" # Missing endpoint
# Test mit vielen Parametern
curl "http://localhost:3000/advoware/proxy?endpoint=employees&limit=100&offset=0&sortBy=name"
```
### Related Steps
- [advoware_api_proxy_post_step.md](advoware_api_proxy_post_step.md) - POST-Requests
- [advoware_api_proxy_put_step.md](advoware_api_proxy_put_step.md) - PUT-Requests
- [advoware_api_proxy_delete_step.md](advoware_api_proxy_delete_step.md) - DELETE-Requests
### Related Services
- [services/advoware.py](../../services/ADVOWARE_SERVICE.md) - API-Client Implementierung

View File

@@ -0,0 +1,70 @@
---
type: step
category: api
name: Advoware Proxy POST
version: 1.0.0
status: active
tags: [advoware, proxy, api, rest, create]
dependencies:
- services/advoware.py
emits: []
---
# Advoware Proxy POST Step
## Zweck
Universeller REST-API-Proxy für POST-Requests an die Advoware API zum Erstellen neuer Ressourcen.
## Unterschied zu GET
- **Method**: POST statt GET
- **Body**: JSON-Payload aus Request-Body wird an Advoware weitergeleitet
- **Verwendung**: Erstellen von Ressourcen (Termine, Employees, etc.)
## Input
```bash
POST /advoware/proxy?endpoint=appointments
Content-Type: application/json
{
"datum": "2026-02-10",
"uhrzeitVon": "09:00:00",
"text": "Meeting"
}
```
## Output
```json
{
"status": 200,
"body": {
"result": {
"id": "12345",
...
}
}
}
```
## Key Differences from GET Step
1. Request Body (`req.get('body')`) wird als `json_data` an API übergeben
2. Kann Daten in Advoware erstellen (Side-Effects!)
3. Response enthält oft die neu erstellte Ressource
## Testing
```bash
curl -X POST "http://localhost:3000/advoware/proxy?endpoint=appointments" \
-H "Content-Type: application/json" \
-d '{
"datum": "2026-02-10",
"uhrzeitVon": "09:00:00",
"uhrzeitBis": "10:00:00",
"text": "Test Meeting"
}'
```
## KI Guidance
Identisch zu GET-Step, außer:
- Body-Validierung hinzufügen bei Bedarf
- Side-Effects beachten (erstellt Daten!)
Siehe [advoware_api_proxy_get_step.md](advoware_api_proxy_get_step.md) für Details.

View File

@@ -0,0 +1,55 @@
---
type: step
category: api
name: Advoware Proxy PUT
version: 1.0.0
status: active
tags: [advoware, proxy, api, rest, update]
dependencies:
- services/advoware.py
emits: []
---
# Advoware Proxy PUT Step
## Zweck
Universeller REST-API-Proxy für PUT-Requests an die Advoware API zum Aktualisieren bestehender Ressourcen.
## Input
```bash
PUT /advoware/proxy?endpoint=appointments/12345
Content-Type: application/json
{
"text": "Updated Meeting Title"
}
```
## Output
```json
{
"status": 200,
"body": {
"result": {
"id": "12345",
"text": "Updated Meeting Title",
...
}
}
}
```
## Key Differences
- **Method**: PUT
- **Endpoint**: Typischerweise mit ID (`resource/123`)
- **Body**: Partial oder Full Update-Payload
- **Side-Effect**: Modifiziert bestehende Ressource
## Testing
```bash
curl -X PUT "http://localhost:3000/advoware/proxy?endpoint=appointments/12345" \
-H "Content-Type: application/json" \
-d '{"text": "Updated Title"}'
```
Siehe [advoware_api_proxy_get_step.md](advoware_api_proxy_get_step.md) für vollständige Details.

View File

@@ -1,52 +0,0 @@
from pydantic import BaseModel
from typing import Optional
from src.services.pet_store import pet_store_service
from src.services.types import Pet
class PetRequest(BaseModel):
name: str
photoUrl: str
class FoodOrder(BaseModel):
id: str
quantity: int
class RequestBody(BaseModel):
pet: PetRequest
foodOrder: Optional[FoodOrder] = None
config = {
"type": "api",
"name": "ApiTrigger",
"description": "basic-tutorial api trigger",
"flows": ["basic-tutorial"],
"method": "POST",
"path": "/basic-tutorial",
"bodySchema": RequestBody.model_json_schema(),
"responseSchema": {
200: Pet.model_json_schema(),
},
"emits": ["process-food-order"],
}
async def handler(req, context):
body = req.get("body", {})
context.logger.info("Step 01 Processing API Step", {"body": body})
pet = body.get("pet", {})
food_order = body.get("foodOrder", {})
new_pet_record = await pet_store_service.create_pet(pet)
if food_order:
await context.emit({
"topic": "process-food-order",
"data": {
"id": food_order.get("id"),
"quantity": food_order.get("quantity"),
"email": "test@test.com", # sample email
"pet_id": new_pet_record.get("id"),
},
})
return {"status": 200, "body": {**new_pet_record, "traceId": context.trace_id}}

View File

@@ -1,69 +0,0 @@
[
{
"id": "step-configuration",
"title": "Step Configuration",
"description": "All steps should have a defined configuration, this is how you define the step's behavior and how it will be triggered.",
"lines": [
"6-30"
]
},
{
"id": "api-configuration",
"title": "API Step",
"description": "Definition of an API endpoint",
"lines": [
"23-24"
]
},
{
"id": "request-body",
"title": "Request body",
"description": "Definition of the expected request body. Motia will automatically generate types based on this schema.",
"lines": [
"6-16",
"25"
]
},
{
"id": "response-payload",
"title": "Response Payload",
"description": "Definition of the expected response payload, Motia will generate the types automatically based on this schema. This is also important to create the Open API spec later.",
"lines": [
"4",
"26-28"
]
},
{
"id": "event-driven-architecture",
"title": "Emits",
"description": "We can define the events that this step will emit, this is how we can trigger other Motia Steps.",
"lines": [
"29",
"42-50"
]
},
{
"id": "handler",
"title": "Handler",
"description": "The handler is the function that will be executed when the step is triggered. This one receives the request body and emits events.",
"lines": [
"32-52"
]
},
{
"id": "logger",
"title": "Logger",
"description": "The logger is a utility that allows you to log messages to the console. It is available in the handler function. We encourage you to use it instead of console.log. It will automatically be tied to the trace id of the request.",
"lines": [
"34"
]
},
{
"id": "http-response",
"title": "HTTP Response",
"description": "The handler can return a response to the client. This is how we can return a response to the client. It must comply with the responseSchema defined in the step configuration.",
"lines": [
"52"
]
}
]

View File

@@ -1,40 +0,0 @@
from pydantic import BaseModel
from typing import Dict, Any
import re
class InputSchema(BaseModel):
template_id: str
email: str
template_data: Dict[str, Any]
config = {
"type": "event",
"name": "Notification",
"description": "Checks a state change",
"flows": ["basic-tutorial"],
"subscribes": ["notification"],
"emits": [],
"input": InputSchema.model_json_schema(),
}
async def handler(input_data, context):
email = input_data.get("email")
template_id = input_data.get("template_id")
template_data = input_data.get("template_data")
redacted_email = re.sub(r'(?<=.{2}).(?=.*@)', '*', email)
context.logger.info("Processing Notification", {
"template_id": template_id,
"template_data": template_data,
"email": redacted_email,
})
# This represents a call to some sort of
# notification service to indicate that a
# new order has been placed
context.logger.info("New notification sent", {
"template_id": template_id,
"email": redacted_email,
"template_data": template_data,
})

View File

@@ -1,50 +0,0 @@
from pydantic import BaseModel
from datetime import datetime
from src.services.pet_store import pet_store_service
class InputSchema(BaseModel):
id: str
email: str
quantity: int
pet_id: int
config = {
"type": "event",
"name": "ProcessFoodOrder",
"description": "basic-tutorial event step, demonstrates how to consume an event from a topic and persist data in state",
"flows": ["basic-tutorial"],
"subscribes": ["process-food-order"],
"emits": ["notification"],
"input": InputSchema.model_json_schema(),
}
async def handler(input_data, context):
context.logger.info("Step 02 Process food order", {"input": input_data})
order = await pet_store_service.create_order({
"id": input_data.get("id"),
"quantity": input_data.get("quantity"),
"pet_id": input_data.get("pet_id"),
"email": input_data.get("email"),
"ship_date": datetime.now().isoformat(),
"status": "placed",
})
context.logger.info("Order created", {"order": order})
await context.state.set("orders_python", order.get("id"), order)
await context.emit({
"topic": "notification",
"data": {
"email": input_data["email"],
"template_id": "new-order",
"template_data": {
"status": order.get("status"),
"ship_date": order.get("shipDate"),
"id": order.get("id"),
"pet_id": order.get("petId"),
"quantity": order.get("quantity"),
},
},
})

View File

@@ -1,68 +0,0 @@
[
{
"id": "step-configuration",
"title": "Step Configuration",
"description": "All steps should have a defined configuration, this is how you define the step's behavior and how it will be triggered.",
"lines": [
"5-19"
]
},
{
"id": "event-configuration",
"title": "Event Step",
"description": "Definition of an event step that subscribes to specific topics",
"lines": [
"12",
"15-16"
]
},
{
"id": "input-schema",
"title": "Input Schema",
"description": "Definition of the expected input data structure from the subscribed topic. Motia will automatically generate types based on this schema.",
"lines": [
"5-9",
"17"
]
},
{
"id": "event-emits",
"title": "Emits",
"description": "We can define the events that this step will emit, triggering other Motia Steps.",
"lines": [
"17"
]
},
{
"id": "handler",
"title": "Handler",
"description": "The handler is the function that will be executed when the step receives an event from its subscribed topic. It processes the input data and can emit new events.",
"lines": [
"21-50"
]
},
{
"id": "state",
"title": "State Management",
"description": "The handler demonstrates state management by storing order data that can be accessed by other steps.",
"lines": [
"35"
]
},
{
"id": "event-emission",
"title": "Event Emission",
"description": "After processing the order, the handler emits a new event to notify other steps about the new order.",
"lines": [
"37-50"
]
},
{
"id": "logger",
"title": "Logger",
"description": "The logger is a utility that allows you to log messages to the console. It is available in the handler function and automatically ties to the trace id of the request.",
"lines": [
"22"
]
}
]

View File

@@ -1,39 +0,0 @@
from datetime import datetime, timezone
config = {
"type": "cron",
"cron": "0 0 * * 1", # run once every Monday at midnight
"name": "StateAuditJob",
"description": "Checks the state for orders that are not complete and have a ship date in the past",
"emits": ["notification"],
"flows": ["basic-tutorial"],
}
async def handler(context):
state_value = await context.state.get_group("orders_python")
for item in state_value:
# check if current date is after item.ship_date
current_date = datetime.now(timezone.utc)
ship_date = datetime.fromisoformat(item.get("shipDate", "").replace('Z', '+00:00'))
if not item.get("complete", False) and current_date > ship_date:
context.logger.warn("Order is not complete and ship date is past", {
"order_id": item.get("id"),
"ship_date": item.get("shipDate"),
"complete": item.get("complete", False),
})
await context.emit({
"topic": "notification",
"data": {
"email": "test@test.com",
"template_id": "order-audit-warning",
"template_data": {
"order_id": item.get("id"),
"status": item.get("status"),
"ship_date": item.get("shipDate"),
"message": "Order is not complete and ship date is past",
},
},
})

View File

@@ -1,26 +0,0 @@
[
{
"id": "step-configuration",
"title": "Step Configuration",
"description": "All steps should have a defined configuration, this is how you define the step's behavior and how it will be triggered.",
"lines": [
"3-10"
]
},
{
"id": "cron-configuration",
"title": "Cron Configuration",
"description": "Cron steps require a specific configuration structure with the 'type' field set to 'cron' and a valid cron expression.",
"lines": [
"4-5"
]
},
{
"id": "handler",
"title": "Cron Step Handler",
"description": "The Cron step handler only receives one argument.",
"lines": [
"12-39"
]
}
]

View File

@@ -0,0 +1,92 @@
---
type: step
category: event
name: VMH Beteiligte Sync
version: 1.0.0
status: placeholder
tags: [sync, vmh, beteiligte, event, todo]
dependencies: []
emits: []
subscribes: [vmh.beteiligte.create, vmh.beteiligte.update, vmh.beteiligte.delete]
---
# VMH Beteiligte Sync Event Step
## Status
⚠️ **PLACEHOLDER** - Implementierung noch ausstehend
## Zweck
Verarbeitet Create/Update/Delete-Events für Beteiligte-Entitäten und synchronisiert zwischen EspoCRM und Zielsystem.
## Config
```python
{
'type': 'event',
'name': 'VMH Beteiligte Sync',
'subscribes': [
'vmh.beteiligte.create',
'vmh.beteiligte.update',
'vmh.beteiligte.delete'
],
'emits': [],
'flows': ['vmh']
}
```
## Geplantes Verhalten
**Input Events**:
```json
{
"topic": "vmh.beteiligte.create",
"data": {
"entity_id": "123",
"action": "create",
"source": "webhook",
"timestamp": "..."
}
}
```
**Processing**:
1. Fetch full entity data from EspoCRM
2. Map to target system format
3. Create/Update/Delete in target system
4. Remove ID from Redis pending set
5. Log success/failure
## Implementierungs-Aufgaben
- [ ] EspoCRM API Client erstellen
- [ ] Entity-Mapping definieren
- [ ] Zielsystem-Integration
- [ ] Error-Handling & Retry-Logic
- [ ] Redis Cleanup (remove from pending sets)
- [ ] Logging & Monitoring
## Redis Cleanup
Nach erfolgreicher Verarbeitung:
```python
redis.srem('vmh:beteiligte:create_pending', entity_id)
redis.srem('vmh:beteiligte:update_pending', entity_id)
redis.srem('vmh:beteiligte:delete_pending', entity_id)
```
## Testing (Future)
```bash
# Manually emit event for testing
# (via Motia CLI or test script)
```
## KI Guidance
Wenn Sie diesen Step implementieren:
1. Erstellen Sie EspoCRM API Client in `services/`
2. Definieren Sie Mapping-Logic
3. Implementieren Sie Retry-Logic mit exponential backoff
4. Cleanen Sie Redis Sets nach Verarbeitung
5. Loggen Sie alle Operationen für Audit
## Related
- [webhook/beteiligte_create_api_step.md](webhook/beteiligte_create_api_step.md) - Emits create events
- [webhook/beteiligte_update_api_step.md](webhook/beteiligte_update_api_step.md) - Emits update events
- [webhook/beteiligte_delete_api_step.md](webhook/beteiligte_delete_api_step.md) - Emits delete events

View File

@@ -0,0 +1,124 @@
---
type: step
category: api
name: VMH Webhook Beteiligte Create
version: 1.0.0
status: active
tags: [webhook, espocrm, vmh, beteiligte, create]
dependencies:
- redis
emits: [vmh.beteiligte.create]
---
# VMH Webhook Beteiligte Create Step
## Zweck
Empfängt Create-Webhooks von EspoCRM für neue Beteiligte-Entitäten. Dedupliziert via Redis und emittiert Events für asynchrone Verarbeitung.
## Config
```python
{
'type': 'api',
'name': 'VMH Webhook Beteiligte Create',
'path': '/vmh/webhook/beteiligte/create',
'method': 'POST',
'emits': ['vmh.beteiligte.create'],
'flows': ['vmh']
}
```
## Input
```bash
POST /vmh/webhook/beteiligte/create
Content-Type: application/json
[
{
"id": "entity-123",
"name": "Max Mustermann",
"createdAt": "2026-02-07T10:00:00Z"
},
{
"id": "entity-456",
"name": "Maria Schmidt"
}
]
```
**Format**: Array von Entitäten (Batch-Support)
## Output
```json
{
"status": "received",
"action": "create",
"new_ids_count": 2,
"total_ids_in_batch": 2
}
```
## Verhalten
1. **Extract IDs** von allen Entitäten im Batch
2. **Redis Deduplication**:
```python
pending_key = 'vmh:beteiligte:create_pending'
existing_ids = redis.smembers(pending_key)
new_ids = input_ids - existing_ids
redis.sadd(pending_key, *new_ids)
```
3. **Emit Events** nur für neue IDs:
```python
for entity_id in new_ids:
await context.emit({
'topic': 'vmh.beteiligte.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': timestamp
}
})
```
## Redis Keys
- `vmh:beteiligte:create_pending` (SET): IDs in create queue
- No TTL (permanent until processed)
## Deduplication Logic
**Problem**: EspoCRM kann Webhooks mehrfach senden
**Solution**: Redis SET speichert alle pending IDs
- Neue IDs → Events emittiert
- Bereits vorhandene IDs → Skipped
## Testing
```bash
# Test webhook
curl -X POST "http://localhost:3000/vmh/webhook/beteiligte/create" \
-H "Content-Type: application/json" \
-d '[{"id": "test-123", "name": "Test"}]'
# Check Redis
redis-cli -n 1 SMEMBERS vmh:beteiligte:create_pending
# Clear Redis (testing)
redis-cli -n 1 DEL vmh:beteiligte:create_pending
```
## Error Handling
- Invalid JSON: 400 error
- Redis unavailable: Loggen, aber nicht crashen (kann zu Duplikaten führen)
- Event emission error: Loggen und fortfahren
## Monitoring
```
[INFO] VMH Webhook Beteiligte Create empfangen
[INFO] Create Entity ID gefunden: entity-123
[INFO] 2 neue IDs zur Create-Sync-Queue hinzugefügt
[INFO] Create-Event emittiert für ID: entity-123
```
## Related Steps
- [beteiligte_update_api_step.md](beteiligte_update_api_step.md) - Update webhooks
- [beteiligte_delete_api_step.md](beteiligte_delete_api_step.md) - Delete webhooks
- [beteiligte_sync_event_step.md](../beteiligte_sync_event_step.md) - Consumes events