update to iii 0.90 and change directory structure

This commit is contained in:
bsiggel
2026-03-19 20:33:49 +00:00
parent 2ac83df1e0
commit 46085bd8dd
38 changed files with 0 additions and 49 deletions

1
src/steps/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Motia iii Example Steps."""

View File

@@ -0,0 +1,268 @@
# Advoware Calendar Sync - Event-Driven Design
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar. Das System nutzt einen event-driven Ansatz mit **Motia III v1.0-RC**, der auf direkten API-Calls basiert, mit Redis für Locking und Deduplikation. Es stellt sicher, dass Termine konsistent gehalten werden, mit Fokus auf Robustheit, Fehlerbehandlung und korrekte Handhabung von mehrtägigen Terminen.
## Übersicht
Das System synchronisiert Termine zwischen:
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen.
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter.
## Architektur
### Event-Driven Design
- **Direkte API-Synchronisation**: Kein zentraler Hub; Sync läuft direkt zwischen APIs.
- **Redis Locking**: Per-Employee Locking verhindert Race-Conditions.
- **Event Emission**: Cron → All-Step → Employee-Step für skalierbare Verarbeitung.
- **Fehlerresistenz**: Einzelne Fehler stoppen nicht den gesamten Sync.
- **Logging**: Alle Logs erscheinen in der iii Console via `ctx.logger`.
### Sync-Phasen
1. **Cron-Step**: Automatische Auslösung alle 15 Minuten.
2. **All-Step**: Fetcht alle Mitarbeiter und emittiert Events pro Employee.
3. **Employee-Step**: Synchronisiert Termine für einen einzelnen Mitarbeiter.
### Datenmapping und Standardisierung
Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
```python
{
'start': datetime, # Berlin TZ
'end': datetime,
'text': str,
'notiz': str,
'ort': str,
'dauertermin': int, # 0/1
'turnus': int, # 0/1
'turnusArt': int,
'recurrence': str # RRULE oder None
}
```
#### Advoware → Standard
- Start: `datum` + `uhrzeitVon` (Fallback 09:00), oder `datum` als datetime.
- End: `datumBis` + `uhrzeitBis` (Fallback 10:00), oder `datum` + 1h.
- All-Day: `dauertermin=1` oder Dauer >1 Tag.
- Recurring: `turnus`/`turnusArt` (vereinfacht, keine RRULE).
#### Google → Standard
- Start/End: `dateTime` oder `date` (All-Day).
- All-Day: `dauertermin=1` wenn All-Day oder Dauer >1 Tag.
- Recurring: RRULE aus `recurrence`.
#### Standard → Advoware
- POST/PUT: `datum`/`uhrzeitBis`/`datumBis` aus start/end.
- Defaults: `vorbereitungsDauer='00:00:00'`, `sb`/`anwalt`=employee_kuerzel.
#### Standard → Google
- All-Day: `date` statt `dateTime`, end +1 Tag.
- Recurring: RRULE aus `recurrence`.
## Funktionalität
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Sync-Details
#### Cron-Step (calendar_sync_cron_step.py)
- Läuft alle 15 Minuten und emittiert "calendar_sync_all".
- **Trigger**: `cron("0 */15 * * * *")` (6-field: Sekunde Minute Stunde Tag Monat Wochentag)
#### All-Step (calendar_sync_all_step.py)
- Fetcht alle Mitarbeiter aus Advoware.
- Filtert Debug-Liste (falls konfiguriert).
- Setzt Redis-Lock pro Employee.
- Emittiert "calendar_sync" Event pro Employee.
- **Trigger**: `queue('calendar_sync_all')`
#### Employee-Step (calendar_sync_event_step.py)
- Fetcht Advoware-Termine für den Employee.
- Fetcht Google-Events für den Employee.
- Synchronisiert: Neue erstellen, Updates anwenden, Deletes handhaben.
- Verwendet Locking, um parallele Syncs zu verhindern.
- **Trigger**: `queue('calendar_sync')`
#### API-Step (calendar_sync_api_step.py)
- Manueller Trigger für einzelnen Employee oder "ALL".
- Bei "ALL": Emittiert "calendar_sync_all".
- Bei Employee: Setzt Lock und emittiert "calendar_sync".
- **Trigger**: `http('POST', '/advoware/calendar/sync')`
## API-Schwächen und Fixes
### Advoware API
- **Mehrtägige Termine**: `datumBis` wird korrekt für Enddatum verwendet; '00:00:00' als '23:59:59' interpretiert.
- **Zeitformate**: Robuste Parsing mit Fallbacks.
- **Keine 24h-Limit**: Termine können länger als 24h sein; Google Calendar unterstützt das.
### Google Calendar API
- **Zeitbereiche**: Akzeptiert Events >24h ohne Probleme.
- **Rate Limits**: Backoff-Retry implementiert.
## Step-Konfiguration (Motia III)
### calendar_sync_cron_step.py
```python
config = {
'name': 'Calendar Sync Cron Job',
'flows': ['advoware'],
'triggers': [
cron("0 */15 * * * *") # Alle 15 Minuten (6-field format)
],
'enqueues': ['calendar_sync_all']
}
```
### calendar_sync_all_step.py
```python
config = {
'name': 'Calendar Sync All Step',
'flows': ['advoware'],
'triggers': [
queue('calendar_sync_all')
],
'enqueues': ['calendar_sync']
}
```
### calendar_sync_event_step.py
```python
config = {
'name': 'Calendar Sync Event Step',
'flows': ['advoware'],
'triggers': [
queue('calendar_sync')
],
'enqueues': []
}
```
### calendar_sync_api_step.py
```python
config = {
'name': 'Calendar Sync API Trigger',
'flows': ['advoware'],
'triggers': [
http('POST', '/advoware/calendar/sync')
],
'enqueues': ['calendar_sync', 'calendar_sync_all']
}
```
## Setup
### Umgebungsvariablen
```env
# Google Calendar
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
# Advoware API
ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90/
ADVOWARE_PRODUCT_ID=64
ADVOWARE_APP_ID=your_app_id
ADVOWARE_API_KEY=your_api_key
ADVOWARE_KANZLEI=your_kanzlei
ADVOWARE_DATABASE=your_database
ADVOWARE_USER=your_user
ADVOWARE_ROLE=2
ADVOWARE_PASSWORD=your_password
ADVOWARE_TOKEN_LIFETIME_MINUTES=55
ADVOWARE_API_TIMEOUT_SECONDS=30
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_CALENDAR_SYNC=1
REDIS_TIMEOUT_SECONDS=5
# Debug
CALENDAR_SYNC_DEBUG_EMPLOYEES=PB,AI # Optional, filter employees
```
## Verwendung
### Manueller Sync
```bash
# Sync für einen bestimmten Mitarbeiter
curl -X POST "http://localhost:3111/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"kuerzel": "PB"}'
# Sync für alle Mitarbeiter
curl -X POST "http://localhost:3111/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"kuerzel": "ALL"}'
```
### Automatischer Sync
Cron-Step läuft automatisch alle 15 Minuten.
## Fehlerbehandlung und Logging
- **Locking**: Redis NX/EX verhindert parallele Syncs.
- **Logging**: `ctx.logger` für iii Console-Sichtbarkeit.
- **API-Fehler**: Retry mit Backoff.
- **Parsing-Fehler**: Robuste Fallbacks.
## Sicherheit
- Service Account für Google Calendar API.
- HMAC-512 Authentifizierung für Advoware API.
- Redis für Concurrency-Control.
## Bekannte Probleme
- **Recurring-Events**: Begrenzte Unterstützung für komplexe Wiederholungen.
- **Performance**: Bei vielen Terminen Paginierung beachten.
- **Timezone-Handling**: Alle Operationen in Europe/Berlin TZ.
## Datenfluss
```
Cron (alle 15min)
→ calendar_sync_cron_step
→ ctx.enqueue(topic: "calendar_sync_all")
→ calendar_sync_all_step
→ Fetch Employees from Advoware
→ For each Employee:
→ Set Redis Lock (key: calendar_sync:employee:{kuerzel})
→ ctx.enqueue(topic: "calendar_sync", data: {kuerzel, ...})
→ calendar_sync_event_step
→ Fetch Advoware Termine (frNr, datum, text, etc.)
→ Fetch Google Calendar Events
→ 4-Phase Sync:
1. New from Advoware → Google
2. New from Google → Advoware
3. Process Deletes
4. Process Updates
→ Clear Redis Lock
```
## Weitere Dokumentation
- **Individual Step Docs**: Siehe `docs/` Ordner in diesem Verzeichnis
- **Architecture Overview**: [../../docs/ARCHITECTURE.md](../../docs/ARCHITECTURE.md)
- **Google Setup Guide**: [../../docs/GOOGLE_SETUP.md](../../docs/GOOGLE_SETUP.md)
- **Troubleshooting**: [../../docs/TROUBLESHOOTING.md](../../docs/TROUBLESHOOTING.md)
## Migration Notes
Dieses System wurde von **Motia v0.17** nach **Motia III v1.0-RC** migriert:
### Wichtige Änderungen:
-`type: 'event'``triggers: [queue('topic')]`
-`type: 'cron'``triggers: [cron('expression')]` (6-field format)
-`type: 'api'``triggers: [http('METHOD', 'path')]`
-`context.emit()``ctx.enqueue()`
-`emits: [...]``enqueues: [...]`
- ✅ Relative Imports → Absolute Imports mit `sys.path.insert()`
- ✅ Motia Workbench → iii Console
### Kompatibilität:
- ✅ Alle 4 Steps vollständig migriert
- ✅ Google Calendar API Integration unverändert
- ✅ Advoware API Integration unverändert
- ✅ Redis Locking-Mechanismus unverändert
- ✅ Datenbank-Schema kompatibel

View File

@@ -0,0 +1,5 @@
"""
Advoware Calendar Sync Module
Bidirectional synchronization between Google Calendar and Advoware appointments.
"""

View File

@@ -0,0 +1,113 @@
"""
Calendar Sync All Step
Handles calendar_sync_all event and emits individual sync events for oldest employees.
Uses Redis to track last sync times and distribute work.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import (
get_redis_client,
get_advoware_employees,
set_employee_lock,
log_operation
)
import math
import time
from datetime import datetime
from typing import Any, Dict
from motia import queue, FlowContext
from pydantic import BaseModel, Field
from services.advoware_service import AdvowareService
config = {
'name': 'Calendar Sync All Step',
'description': 'Receives sync-all event and emits individual events for oldest employees',
'flows': ['advoware-calendar-sync'],
'triggers': [
queue('calendar_sync_all')
],
'enqueues': ['calendar_sync_employee']
}
async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
"""
Handler that fetches all employees, sorts by last sync time,
and emits calendar_sync_employee events for the oldest ones.
"""
try:
triggered_by = input_data.get('triggered_by', 'unknown')
log_operation('info', f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}", context=ctx)
# Initialize Advoware service
advoware = AdvowareService(ctx)
# Fetch employees
employees = await get_advoware_employees(advoware, ctx)
if not employees:
log_operation('error', "Keine Mitarbeiter gefunden. All-Sync abgebrochen.", context=ctx)
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
redis_client = get_redis_client(ctx)
# Collect last_synced timestamps
employee_timestamps = {}
for employee in employees:
kuerzel = employee.get('kuerzel')
if not kuerzel:
continue
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
timestamp_str = redis_client.get(employee_last_synced_key)
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
employee_timestamps[kuerzel] = timestamp
# Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
# Log the sorted list with timestamps
def format_timestamp(ts):
if ts == 0:
return "never"
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
log_operation('info', f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}", context=ctx)
# Calculate number to sync: ceil(N / 10)
num_to_sync = math.ceil(len(sorted_kuerzel) / 1)
log_operation('info', f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest", context=ctx)
# Emit for the oldest num_to_sync employees, if not locked
emitted_count = 0
for kuerzel in sorted_kuerzel[:num_to_sync]:
if not set_employee_lock(redis_client, kuerzel, triggered_by, ctx):
log_operation('info', f"Calendar Sync All: Sync already active for {kuerzel}, skipping", context=ctx)
continue
# Emit event for this employee
await ctx.enqueue({
"topic": "calendar_sync_employee",
"data": {
"kuerzel": kuerzel,
"triggered_by": triggered_by
}
})
log_operation('info', f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})", context=ctx)
emitted_count += 1
log_operation('info', f"Calendar Sync All: Completed, emitted {emitted_count} events", context=ctx)
return {
'status': 'completed',
'triggered_by': triggered_by,
'emitted_count': emitted_count
}
except Exception as e:
log_operation('error', f"Fehler beim All-Sync: {e}", context=ctx)
return {
'status': 'error',
'error': str(e)
}

View File

@@ -0,0 +1,112 @@
"""
Calendar Sync API Step
HTTP API endpoint for manual calendar sync triggering.
Supports syncing a single employee or all employees.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import get_redis_client, set_employee_lock, get_logger
from motia import http, ApiRequest, ApiResponse, FlowContext
config = {
'name': 'Calendar Sync API Trigger',
'description': 'API endpoint for manual calendar sync triggering',
'flows': ['advoware-calendar-sync'],
'triggers': [
http('POST', '/advoware/calendar/sync')
],
'enqueues': ['calendar_sync_employee', 'calendar_sync_all']
}
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
"""
HTTP handler for manual calendar sync triggering.
Request body:
{
"kuerzel": "SB" // or "ALL" for all employees
}
"""
try:
# Get kuerzel from request body
body = request.body
kuerzel = body.get('kuerzel')
if not kuerzel:
return ApiResponse(
status=400,
body={
'error': 'kuerzel required',
'message': 'Please provide kuerzel in body'
}
)
kuerzel_upper = kuerzel.upper()
if kuerzel_upper == 'ALL':
# Emit sync-all event
ctx.logger.info("Calendar Sync API: Emitting sync-all event")
await ctx.enqueue({
"topic": "calendar_sync_all",
"data": {
"triggered_by": "api"
}
})
return ApiResponse(
status=200,
body={
'status': 'triggered',
'message': 'Calendar sync triggered for all employees',
'triggered_by': 'api'
}
)
else:
# Single employee sync
redis_client = get_redis_client(ctx)
if not set_employee_lock(redis_client, kuerzel_upper, 'api', ctx):
ctx.logger.info(f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping")
return ApiResponse(
status=409,
body={
'status': 'conflict',
'message': f'Calendar sync already active for {kuerzel_upper}',
'kuerzel': kuerzel_upper,
'triggered_by': 'api'
}
)
ctx.logger.info(f"Calendar Sync API called for {kuerzel_upper}")
# Lock successfully set, now emit event
await ctx.enqueue({
"topic": "calendar_sync_employee",
"data": {
"kuerzel": kuerzel_upper,
"triggered_by": "api"
}
})
return ApiResponse(
status=200,
body={
'status': 'triggered',
'message': f'Calendar sync triggered for {kuerzel_upper}',
'kuerzel': kuerzel_upper,
'triggered_by': 'api'
}
)
except Exception as e:
ctx.logger.error(f"Error in API trigger: {e}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'details': str(e)
}
)

View File

@@ -0,0 +1,50 @@
"""
Calendar Sync Cron Step
Cron trigger for automatic calendar synchronization.
Emits calendar_sync_all event to start sync cascade.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import log_operation
from typing import Dict, Any
from motia import cron, FlowContext
config = {
'name': 'Calendar Sync Cron Job',
'description': 'Runs calendar sync automatically every 15 minutes',
'flows': ['advoware-calendar-sync'],
'triggers': [
cron("0 15 1 * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
],
'enqueues': ['calendar_sync_all']
}
async def handler(input_data: None, ctx: FlowContext) -> None:
"""Cron handler that triggers the calendar sync cascade."""
try:
ctx.logger.info("=" * 80)
ctx.logger.info("🕐 CALENDAR SYNC CRON: STARTING")
ctx.logger.info("=" * 80)
ctx.logger.info("Emitting sync-all event")
# Enqueue sync-all event
await ctx.enqueue({
"topic": "calendar_sync_all",
"data": {
"triggered_by": "cron"
}
})
ctx.logger.info("✅ Calendar sync-all event emitted successfully")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: CALENDAR SYNC CRON")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,133 @@
"""
Calendar Sync Utilities
Shared utility functions for calendar synchronization between Google Calendar and Advoware.
"""
import asyncpg
import os
import redis
import time
from typing import Optional, Any, List
from googleapiclient.discovery import build
from google.oauth2 import service_account
from services.logging_utils import get_service_logger
def get_logger(context=None):
"""Get logger for calendar sync operations"""
return get_service_logger('calendar_sync', context)
def log_operation(level: str, message: str, context=None, **extra):
"""
Log calendar sync operations with structured context.
Args:
level: Log level ('debug', 'info', 'warning', 'error')
message: Log message
context: FlowContext if available
**extra: Additional key-value pairs to log
"""
logger = get_logger(context)
log_func = getattr(logger, level.lower(), logger.info)
if extra:
extra_str = " | " + " | ".join(f"{k}={v}" for k, v in extra.items())
log_func(message + extra_str)
else:
log_func(message)
async def connect_db(context=None):
"""Connect to Postgres DB from environment variables."""
logger = get_logger(context)
try:
conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
user=os.getenv('POSTGRES_USER', 'calendar_sync_user'),
password=os.getenv('POSTGRES_PASSWORD', 'default_password'),
database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'),
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
logger = get_logger(context)
try:
service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
scopes = ['https://www.googleapis.com/auth/calendar']
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=scopes
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
def get_redis_client(context=None) -> redis.Redis:
"""Initialize Redis client for calendar sync operations."""
logger = get_logger(context)
try:
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')),
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')),
decode_responses=True
)
return redis_client
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}")
raise
async def get_advoware_employees(advoware, context=None) -> List[Any]:
"""Fetch list of employees from Advoware."""
logger = get_logger(context)
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
logger.error(f"Failed to fetch Advoware employees: {e}")
raise
def set_employee_lock(redis_client: redis.Redis, kuerzel: str, triggered_by: str, context=None) -> bool:
"""Set lock for employee sync operation."""
logger = get_logger(context)
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
logger.info(f"Sync already active for {kuerzel}, skipping")
return False
return True
def clear_employee_lock(redis_client: redis.Redis, kuerzel: str, context=None) -> None:
"""Clear lock for employee sync operation and update last-synced timestamp."""
logger = get_logger(context)
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
# Update last-synced timestamp (no TTL, persistent)
current_time = int(time.time())
redis_client.set(employee_last_synced_key, current_time)
# Delete the lock
redis_client.delete(employee_lock_key)
logger.debug(f"Cleared lock and updated last-synced for {kuerzel} to {current_time}")
except Exception as e:
logger.warning(f"Failed to clear lock and update last-synced for {kuerzel}: {e}")

View File

@@ -0,0 +1,314 @@
# Advoware API Proxy Steps
Dieser Ordner enthält die API-Proxy-Steps für die Advoware-Integration. Jeder Step implementiert eine HTTP-Methode als universellen Proxy zur Advoware-API mit **Motia III v1.0-RC**.
## Übersicht
Die Proxy-Steps fungieren als transparente Schnittstelle zwischen Clients und der Advoware-API. Sie handhaben Authentifizierung, Fehlerbehandlung und Logging automatisch.
## Steps
### 1. GET Proxy (`advoware_api_proxy_get_step.py`)
**Zweck:** Universeller Proxy für GET-Requests an die Advoware-API.
**Konfiguration:**
```python
config = {
'name': 'Advoware Proxy GET',
'flows': ['advoware'],
'triggers': [
http('GET', '/advoware/proxy')
],
'enqueues': []
}
```
**Funktionalität:**
- Extrahiert den Ziel-Endpoint aus Query-Parametern (`endpoint`)
- Übergibt alle anderen Query-Parameter als API-Parameter
- Gibt das Ergebnis als JSON zurück
**Beispiel Request:**
```bash
GET /advoware/proxy?endpoint=employees&limit=10&offset=0
```
**Response:**
```json
{
"result": {
"data": [...],
"total": 100
}
}
```
### 2. POST Proxy (`advoware_api_proxy_post_step.py`)
**Zweck:** Universeller Proxy für POST-Requests an die Advoware-API.
**Konfiguration:**
```python
config = {
'name': 'Advoware Proxy POST',
'flows': ['advoware'],
'triggers': [
http('POST', '/advoware/proxy')
],
'enqueues': []
}
```
**Funktionalität:**
- Extrahiert den Ziel-Endpoint aus Query-Parametern (`endpoint`)
- Verwendet den Request-Body als JSON-Daten für die API
- Erstellt neue Ressourcen in Advoware
**Beispiel Request:**
```bash
POST /advoware/proxy?endpoint=employees
Content-Type: application/json
{
"name": "John Doe",
"email": "john@example.com"
}
```
### 3. PUT Proxy (`advoware_api_proxy_put_step.py`)
**Zweck:** Universeller Proxy für PUT-Requests an die Advoware-API.
**Konfiguration:**
```python
config = {
'name': 'Advoware Proxy PUT',
'flows': ['advoware'],
'triggers': [
http('PUT', '/advoware/proxy')
],
'enqueues': []
}
```
**Funktionalität:**
- Extrahiert den Ziel-Endpoint aus Query-Parametern (`endpoint`)
- Verwendet den Request-Body als JSON-Daten für Updates
- Aktualisiert bestehende Ressourcen in Advoware
**Beispiel Request:**
```bash
PUT /advoware/proxy?endpoint=employees/123
Content-Type: application/json
{
"name": "John Smith",
"email": "johnsmith@example.com"
}
```
### 4. DELETE Proxy (`advoware_api_proxy_delete_step.py`)
**Zweck:** Universeller Proxy für DELETE-Requests an die Advoware-API.
**Konfiguration:**
```python
config = {
'name': 'Advoware Proxy DELETE',
'flows': ['advoware'],
'triggers': [
http('DELETE', '/advoware/proxy')
],
'enqueues': []
}
```
**Funktionalität:**
- Extrahiert den Ziel-Endpoint aus Query-Parametern (`endpoint`)
- Löscht Ressourcen in Advoware
**Beispiel Request:**
```bash
DELETE /advoware/proxy?endpoint=employees/123
```
## Gemeinsame Features
### Authentifizierung
Alle Steps verwenden den `AdvowareService` für automatische Token-Verwaltung und Authentifizierung:
- HMAC-512 basierte Signatur
- Token-Caching in Redis (55 Minuten Lifetime)
- Automatischer Token-Refresh bei 401-Errors
### Fehlerbehandling
- **400 Bad Request:** Fehlender `endpoint` Parameter
- **500 Internal Server Error:** API-Fehler oder Exceptions
- **401 Unauthorized:** Automatischer Token-Refresh und Retry
### Logging
Detaillierte Logs via `ctx.logger` für:
- Eingehende Requests
- API-Calls an Advoware
- Fehler und Exceptions
- Token-Management
Alle Logs sind in der **iii Console** sichtbar.
### Sicherheit
- Keine direkte Weitergabe sensibler Daten
- Authentifizierung über Service-Layer
- Input-Validation für erforderliche Parameter
- HMAC-512 Signatur für alle API-Requests
## Handler-Struktur (Motia III)
Alle Steps folgen dem gleichen Pattern:
```python
from motia import http, ApiRequest, ApiResponse, FlowContext
from services.advoware_service import AdvowareService
config = {
'name': 'Advoware Proxy {METHOD}',
'flows': ['advoware'],
'triggers': [
http('{METHOD}', '/advoware/proxy')
],
'enqueues': []
}
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Extract endpoint from query params
endpoint = request.query_params.get('endpoint')
if not endpoint:
return ApiResponse(
status=400,
body={'error': 'Missing required query parameter: endpoint'}
)
# Call Advoware API
advoware_service = AdvowareService()
result = await advoware_service.{method}(endpoint, **params)
return ApiResponse(status=200, body={'result': result})
```
## Testing
### Unit Tests
```bash
# Test GET Proxy
curl -X GET "http://localhost:3111/advoware/proxy?endpoint=employees"
# Test POST Proxy
curl -X POST "http://localhost:3111/advoware/proxy?endpoint=employees" \
-H "Content-Type: application/json" \
-d '{"name": "Test Employee"}'
# Test PUT Proxy
curl -X PUT "http://localhost:3111/advoware/proxy?endpoint=employees/1" \
-H "Content-Type: application/json" \
-d '{"name": "Updated Employee"}'
# Test DELETE Proxy
curl -X DELETE "http://localhost:3111/advoware/proxy?endpoint=employees/1"
```
### Integration Tests
Überprüfen Sie die Logs in der iii Console:
```bash
# Check logs
curl http://localhost:3111/_console/logs
```
## Konfiguration
### Umgebungsvariablen
Stellen Sie sicher, dass folgende Variablen gesetzt sind:
```env
ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90/
ADVOWARE_PRODUCT_ID=64
ADVOWARE_APP_ID=your_app_id
ADVOWARE_API_KEY=your_api_key
ADVOWARE_KANZLEI=your_kanzlei
ADVOWARE_DATABASE=your_database
ADVOWARE_USER=your_user
ADVOWARE_ROLE=2
ADVOWARE_PASSWORD=your_password
ADVOWARE_TOKEN_LIFETIME_MINUTES=55
ADVOWARE_API_TIMEOUT_SECONDS=30
# Redis (für Token-Caching)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
```
### Dependencies
- `services/advoware_service.py` - Advoware API Client mit HMAC Auth
- `config.py` - Konfigurationsmanagement
- `motia` - Motia III Python SDK
- `asyncpg` - PostgreSQL Client
- `redis` - Redis Client für Token-Caching
## Erweiterungen
### Geplante Features
- Request/Response Caching für häufige Queries
- Rate Limiting pro Client
- Request Validation Schemas mit Pydantic
- Batch-Operations Support
### Custom Endpoints
Für spezifische Endpoints können zusätzliche Steps erstellt werden, die direkt auf bestimmte Ressourcen zugreifen und erweiterte Validierung/Transformation bieten.
## Architektur
```
Client Request
HTTP Trigger (http('METHOD', '/advoware/proxy'))
Handler (ApiRequest → ApiResponse)
├─► Extract 'endpoint' from query params
├─► Extract other params/body
AdvowareService
├─► Check Redis for valid token
├─► If expired: Get new token (HMAC-512 auth)
├─► Build HTTP request
Advoware API
Response → Transform → Return ApiResponse
```
## Migration Notes
Dieses System wurde von **Motia v0.17** nach **Motia III v1.0-RC** migriert:
### Wichtige Änderungen:
-`type: 'api'``triggers: [http('METHOD', 'path')]`
-`ApiRouteConfig``StepConfig` mit `as const satisfies`
-`Handlers['StepName']``Handlers<typeof config>`
-`context``ctx`
-`req` dict → `ApiRequest` typed object
- ✅ Return dict → `ApiResponse` typed object
-`method`, `path` moved into trigger
- ✅ Motia Workbench → iii Console
### Kompatibilität:
- ✅ Alle 4 Proxy Steps vollständig migriert
- ✅ AdvowareService kompatibel (keine Änderungen)
- ✅ Redis Token-Caching unverändert
- ✅ HMAC-512 Auth unverändert
- ✅ API-Endpoints identisch

View File

@@ -0,0 +1 @@
"""Advoware Proxy Steps"""

View File

@@ -0,0 +1,65 @@
"""Advoware API Proxy - DELETE requests"""
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
from services.advoware import AdvowareAPI
config = {
"name": "Advoware Proxy DELETE",
"description": "Universal proxy for Advoware API (DELETE requests)",
"flows": ["advoware-proxy"],
"triggers": [
http("DELETE", "/advoware/proxy")
],
"enqueues": [],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Proxy DELETE requests to Advoware API.
Query parameters:
- endpoint: Advoware API endpoint (required)
- any other params are forwarded to Advoware
"""
try:
# Extract endpoint from query parameters
endpoint = request.query_params.get('endpoint', '')
if not endpoint:
return ApiResponse(
status=400,
body={'error': 'Endpoint required as query parameter'}
)
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 ADVOWARE PROXY: DELETE REQUEST")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Endpoint: {endpoint}")
ctx.logger.info("=" * 80)
# Initialize Advoware client
advoware = AdvowareAPI(ctx)
# Forward all query params except 'endpoint'
params = {k: v for k, v in request.query_params.items() if k != 'endpoint'}
result = await advoware.api_call(
endpoint,
method='DELETE',
params=params
)
ctx.logger.info("✅ Proxy DELETE erfolgreich")
return ApiResponse(status=200, body={'result': result})
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ADVOWARE PROXY DELETE FEHLER")
ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,65 @@
"""Advoware API Proxy - GET requests"""
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
from services.advoware import AdvowareAPI
config = {
"name": "Advoware Proxy GET",
"description": "Universal proxy for Advoware API (GET requests)",
"flows": ["advoware-proxy"],
"triggers": [
http("GET", "/advoware/proxy")
],
"enqueues": [],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Proxy GET requests to Advoware API.
Query parameters:
- endpoint: Advoware API endpoint (required)
- any other params are forwarded to Advoware
"""
try:
# Extract endpoint from query parameters
endpoint = request.query_params.get('endpoint', '')
if not endpoint:
return ApiResponse(
status=400,
body={'error': 'Endpoint required as query parameter'}
)
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 ADVOWARE PROXY: GET REQUEST")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Endpoint: {endpoint}")
ctx.logger.info("=" * 80)
# Initialize Advoware client
advoware = AdvowareAPI(ctx)
# Forward all query params except 'endpoint'
params = {k: v for k, v in request.query_params.items() if k != 'endpoint'}
result = await advoware.api_call(
endpoint,
method='GET',
params=params
)
ctx.logger.info("✅ Proxy GET erfolgreich")
return ApiResponse(status=200, body={'result': result})
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ADVOWARE PROXY GET FEHLER")
ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,71 @@
"""Advoware API Proxy - POST requests"""
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
from services.advoware import AdvowareAPI
config = {
"name": "Advoware Proxy POST",
"description": "Universal proxy for Advoware API (POST requests)",
"flows": ["advoware-proxy"],
"triggers": [
http("POST", "/advoware/proxy")
],
"enqueues": [],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Proxy POST requests to Advoware API.
Query parameters:
- endpoint: Advoware API endpoint (required)
- any other params are forwarded to Advoware
Body: JSON payload to forward to Advoware
"""
try:
# Extract endpoint from query parameters
endpoint = request.query_params.get('endpoint', '')
if not endpoint:
return ApiResponse(
status=400,
body={'error': 'Endpoint required as query parameter'}
)
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 ADVOWARE PROXY: POST REQUEST")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Endpoint: {endpoint}")
ctx.logger.info("=" * 80)
# Initialize Advoware client
advoware = AdvowareAPI(ctx)
# Forward all query params except 'endpoint'
params = {k: v for k, v in request.query_params.items() if k != 'endpoint'}
# Get request body
json_data = request.body
result = await advoware.api_call(
endpoint,
method='POST',
params=params,
json_data=json_data
)
ctx.logger.info("✅ Proxy POST erfolgreich")
return ApiResponse(status=200, body={'result': result})
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ADVOWARE PROXY POST FEHLER")
ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,71 @@
"""Advoware API Proxy - PUT requests"""
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
from services.advoware import AdvowareAPI
config = {
"name": "Advoware Proxy PUT",
"description": "Universal proxy for Advoware API (PUT requests)",
"flows": ["advoware-proxy"],
"triggers": [
http("PUT", "/advoware/proxy")
],
"enqueues": [],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Proxy PUT requests to Advoware API.
Query parameters:
- endpoint: Advoware API endpoint (required)
- any other params are forwarded to Advoware
Body: JSON payload to forward to Advoware
"""
try:
# Extract endpoint from query parameters
endpoint = request.query_params.get('endpoint', '')
if not endpoint:
return ApiResponse(
status=400,
body={'error': 'Endpoint required as query parameter'}
)
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 ADVOWARE PROXY: PUT REQUEST")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Endpoint: {endpoint}")
ctx.logger.info("=" * 80)
# Initialize Advoware client
advoware = AdvowareAPI(ctx)
# Forward all query params except 'endpoint'
params = {k: v for k, v in request.query_params.items() if k != 'endpoint'}
# Get request body
json_data = request.body
result = await advoware.api_call(
endpoint,
method='PUT',
params=params,
json_data=json_data
)
ctx.logger.info("✅ Proxy PUT erfolgreich")
return ApiResponse(status=200, body={'result': result})
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ADVOWARE PROXY PUT FEHLER")
ctx.logger.error(f"Endpoint: {request.query_params.get('endpoint', 'N/A')}")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

0
src/steps/ai/__init__.py Normal file
View File

View File

@@ -0,0 +1,386 @@
"""AI Chat Completions API
OpenAI-compatible Chat Completions endpoint with xAI/LangChain backend.
Features:
- File Search (RAG) via xAI Collections
- Web Search via xAI web_search tool
- Aktenzeichen-based automatic collection lookup
- Multiple tools simultaneously
- Clean, reusable architecture for future LLM endpoints
Note: Streaming is not supported (Motia limitation - returns clear error).
Reusability:
- extract_request_params(): Parse requests for any LLM endpoint
- resolve_collection_id(): Auto-detect Aktenzeichen, lookup collection
- initialize_model_with_tools(): Bind tools to any LangChain model
- invoke_and_format_response(): Standard OpenAI response formatting
"""
import time
from typing import Any, Dict, List, Optional
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "AI Chat Completions API",
"description": "OpenAI-compatible Chat Completions API with xAI backend",
"flows": ["ai-general"],
"triggers": [
http("POST", "/ai/v1/chat/completions"),
http("POST", "/v1/chat/completions")
],
}
# ============================================================================
# MAIN HANDLER
# ============================================================================
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
OpenAI-compatible Chat Completions endpoint.
Returns:
ApiResponse with chat completion or error
"""
ctx.logger.info("=" * 80)
ctx.logger.info("🤖 AI Chat Completions API")
ctx.logger.info("=" * 80)
try:
# 1. Parse and validate request
params = extract_request_params(request, ctx)
# 2. Check streaming (not supported)
if params['stream']:
return ApiResponse(
status=501,
body={
'error': {
'message': 'Streaming is not supported. Please set stream=false.',
'type': 'not_implemented',
'param': 'stream'
}
}
)
# 3. Resolve collection (explicit ID or Aktenzeichen lookup)
collection_id = await resolve_collection_id(
params['collection_id'],
params['messages'],
params['enable_web_search'],
ctx
)
# 4. Validate: collection or web_search required
if not collection_id and not params['enable_web_search']:
return ApiResponse(
status=400,
body={
'error': {
'message': 'Either collection_id or enable_web_search must be provided',
'type': 'invalid_request_error'
}
}
)
# 5. Initialize LLM with tools
model_with_tools = await initialize_model_with_tools(
model_name=params['model'],
temperature=params['temperature'],
max_tokens=params['max_tokens'],
collection_id=collection_id,
enable_web_search=params['enable_web_search'],
web_search_config=params['web_search_config'],
ctx=ctx
)
# 6. Invoke LLM
completion_id = f"chatcmpl-{int(time.time())}"
response = await invoke_and_format_response(
model=model_with_tools,
messages=params['messages'],
completion_id=completion_id,
model_name=params['model'],
ctx=ctx
)
ctx.logger.info(f"✅ Completion successful {len(response.body['choices'][0]['message']['content'])} chars")
return response
except ValueError as e:
ctx.logger.error(f"❌ Validation error: {e}")
return ApiResponse(
status=400,
body={'error': {'message': str(e), 'type': 'invalid_request_error'}}
)
except Exception as e:
ctx.logger.error(f"❌ Error: {e}")
return ApiResponse(
status=500,
body={'error': {'message': 'Internal server error', 'type': 'server_error'}}
)
# ============================================================================
# REUSABLE HELPER FUNCTIONS
# ============================================================================
def extract_request_params(request: ApiRequest, ctx: FlowContext) -> Dict[str, Any]:
"""
Extract and validate request parameters.
Returns:
Dict with validated parameters
Raises:
ValueError: If validation fails
"""
body = request.body or {}
if not isinstance(body, dict):
raise ValueError("Request body must be JSON object")
messages = body.get('messages', [])
if not messages or not isinstance(messages, list):
raise ValueError("messages must be non-empty array")
# Extract parameters with defaults
params = {
'model': body.get('model', 'grok-4-1-fast-reasoning'),
'messages': messages,
'temperature': body.get('temperature', 0.7),
'max_tokens': body.get('max_tokens'),
'stream': body.get('stream', False),
'extra_body': body.get('extra_body', {}),
}
# Handle enable_web_search (body or extra_body)
params['enable_web_search'] = body.get(
'enable_web_search',
params['extra_body'].get('enable_web_search', False)
)
# Handle web_search_config
params['web_search_config'] = body.get(
'web_search_config',
params['extra_body'].get('web_search_config', {})
)
# Handle collection_id (multiple sources)
params['collection_id'] = (
body.get('collection_id') or
body.get('custom_collection_id') or
params['extra_body'].get('collection_id')
)
# Log concisely
ctx.logger.info(f"📋 Model: {params['model']} | Stream: {params['stream']}")
ctx.logger.info(f"📋 Web Search: {params['enable_web_search']} | Collection: {params['collection_id'] or 'auto'}")
ctx.logger.info(f"📨 Messages: {len(messages)}")
return params
async def resolve_collection_id(
explicit_collection_id: Optional[str],
messages: List[Dict[str, Any]],
enable_web_search: bool,
ctx: FlowContext
) -> Optional[str]:
"""
Resolve collection ID from explicit ID or Aktenzeichen auto-detection.
Args:
explicit_collection_id: Explicitly provided collection ID
messages: Chat messages (for Aktenzeichen extraction)
enable_web_search: Whether web search is enabled
ctx: Motia context
Returns:
Collection ID or None
"""
# Explicit collection ID takes precedence
if explicit_collection_id:
ctx.logger.info(f"🔍 Using explicit collection: {explicit_collection_id}")
return explicit_collection_id
# Try Aktenzeichen auto-detection from first user message
from services.aktenzeichen_utils import (
extract_aktenzeichen,
normalize_aktenzeichen,
remove_aktenzeichen
)
for msg in messages:
if msg.get('role') == 'user':
content = msg.get('content', '')
aktenzeichen_raw = extract_aktenzeichen(content)
if aktenzeichen_raw:
aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw)
ctx.logger.info(f"🔍 Aktenzeichen detected: {aktenzeichen}")
collection_id = await lookup_collection_by_aktenzeichen(aktenzeichen, ctx)
if collection_id:
# Clean Aktenzeichen from message
msg['content'] = remove_aktenzeichen(content)
ctx.logger.info(f"✅ Collection found: {collection_id}")
return collection_id
else:
ctx.logger.warning(f"⚠️ No collection for Aktenzeichen: {aktenzeichen}")
break # Only check first user message
return None
async def initialize_model_with_tools(
model_name: str,
temperature: float,
max_tokens: Optional[int],
collection_id: Optional[str],
enable_web_search: bool,
web_search_config: Dict[str, Any],
ctx: FlowContext
) -> Any:
"""
Initialize LangChain model with tool bindings (file_search, web_search).
Returns:
Model instance with tools bound
"""
from services.langchain_xai_service import LangChainXAIService
service = LangChainXAIService(ctx)
# Create base model
model = service.get_chat_model(
model=model_name,
temperature=temperature,
max_tokens=max_tokens
)
# Bind tools
model_with_tools = service.bind_tools(
model=model,
collection_id=collection_id,
enable_web_search=enable_web_search,
web_search_config=web_search_config,
max_num_results=10
)
return model_with_tools
async def invoke_and_format_response(
model: Any,
messages: List[Dict[str, Any]],
completion_id: str,
model_name: str,
ctx: FlowContext
) -> ApiResponse:
"""
Invoke LLM and format response in OpenAI-compatible format.
Returns:
ApiResponse with chat completion
"""
from services.langchain_xai_service import LangChainXAIService
service = LangChainXAIService(ctx)
result = await service.invoke_chat(model, messages)
# Extract content (handle structured responses)
if hasattr(result, 'content'):
raw = result.content
if isinstance(raw, list):
# Extract text parts from structured response
text_parts = [
item.get('text', '')
for item in raw
if isinstance(item, dict) and item.get('type') == 'text'
]
content = ''.join(text_parts) or str(raw)
else:
content = raw
else:
content = str(result)
# Extract usage metadata (if available)
usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
if hasattr(result, 'usage_metadata'):
u = result.usage_metadata
usage = {
"prompt_tokens": getattr(u, 'input_tokens', 0),
"completion_tokens": getattr(u, 'output_tokens', 0),
"total_tokens": getattr(u, 'input_tokens', 0) + getattr(u, 'output_tokens', 0)
}
# Log complete LLM response
ctx.logger.info("=" * 80)
ctx.logger.info("📤 LLM RESPONSE")
ctx.logger.info("-" * 80)
ctx.logger.info(f"Model: {model_name}")
ctx.logger.info(f"Completion ID: {completion_id}")
ctx.logger.info(f"Usage: {usage['prompt_tokens']} prompt + {usage['completion_tokens']} completion = {usage['total_tokens']} total tokens")
ctx.logger.info("-" * 80)
ctx.logger.info("Content:")
ctx.logger.info(content)
ctx.logger.info("=" * 80)
# Format OpenAI-compatible response
response_body = {
'id': completion_id,
'object': 'chat.completion',
'created': int(time.time()),
'model': model_name,
'choices': [{
'index': 0,
'message': {'role': 'assistant', 'content': content},
'finish_reason': 'stop'
}],
'usage': usage
}
return ApiResponse(status=200, body=response_body)
async def lookup_collection_by_aktenzeichen(
aktenzeichen: str,
ctx: FlowContext
) -> Optional[str]:
"""
Lookup xAI Collection ID by Aktenzeichen via EspoCRM.
Args:
aktenzeichen: Normalized Aktenzeichen (e.g., "1234/56")
ctx: Motia context
Returns:
Collection ID or None if not found
"""
try:
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
search_result = await espocrm.search_entities(
entity_type='Raeumungsklage',
where=[{
'type': 'equals',
'attribute': 'advowareAkteBezeichner',
'value': aktenzeichen
}],
select=['id', 'xaiCollectionId'],
maxSize=1
)
if search_result and len(search_result) > 0:
return search_result[0].get('xaiCollectionId')
return None
except Exception as e:
ctx.logger.error(f"❌ Collection lookup failed: {e}")
return None

View File

@@ -0,0 +1,124 @@
"""AI Models List API
OpenAI-compatible models list endpoint for OpenWebUI and other clients.
Returns all available AI models that can be used with /ai/chat/completions.
"""
import time
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "AI Models List API",
"description": "OpenAI-compatible models endpoint - lists available AI models",
"flows": ["ai-general"],
"triggers": [
http("GET", "/ai/v1/models"),
http("GET", "/v1/models"),
http("GET", "/ai/models")
],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
OpenAI-compatible models list endpoint.
Returns list of available models for OpenWebUI and other clients.
Response Format (OpenAI compatible):
{
"object": "list",
"data": [
{
"id": "grok-4.20-beta-0309-reasoning",
"object": "model",
"created": 1735689600,
"owned_by": "xai",
"permission": [],
"root": "grok-4.20-beta-0309-reasoning",
"parent": null
}
]
}
"""
ctx.logger.info("📋 Models list requested")
try:
# Define available models
# These correspond to models supported by /ai/chat/completions
current_timestamp = int(time.time())
models = [
{
"id": "grok-4.20-beta-0309-reasoning",
"object": "model",
"created": current_timestamp,
"owned_by": "xai",
"permission": [],
"root": "grok-4.20-beta-0309-reasoning",
"parent": None,
"capabilities": {
"file_search": True,
"web_search": True,
"streaming": True,
"reasoning": True
}
},
{
"id": "grok-4.20-multi-agent-beta-0309",
"object": "model",
"created": current_timestamp,
"owned_by": "xai",
"permission": [],
"root": "grok-4.20-multi-agent-beta-0309",
"parent": None,
"capabilities": {
"file_search": True,
"web_search": True,
"streaming": True,
"reasoning": True,
"multi_agent": True
}
},
{
"id": "grok-4-1-fast-reasoning",
"object": "model",
"created": current_timestamp,
"owned_by": "xai",
"permission": [],
"root": "grok-4-1-fast-reasoning",
"parent": None,
"capabilities": {
"file_search": True,
"web_search": True,
"streaming": True,
"reasoning": True
}
}
]
# Build OpenAI-compatible response
response_body = {
"object": "list",
"data": models
}
ctx.logger.info(f"✅ Returned {len(models)} models")
return ApiResponse(
status=200,
body=response_body
)
except Exception as e:
ctx.logger.error(f"❌ Error listing models: {e}")
return ApiResponse(
status=500,
body={
"error": {
"message": str(e),
"type": "server_error"
}
}
)

View File

@@ -0,0 +1 @@
"""VMH Steps"""

View File

@@ -0,0 +1,90 @@
"""AI Knowledge Daily Sync - Cron Job"""
from typing import Any
from motia import FlowContext, cron
config = {
"name": "AI Knowledge Daily Sync",
"description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
"flows": ["aiknowledge-full-sync"],
"triggers": [
cron("0 0 2 * * *"), # Daily at 2:00 AM
],
"enqueues": ["aiknowledge.sync"],
}
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
"""
Daily sync handler - ensures all active knowledge bases are synchronized.
Loads all CAIKnowledge entities that need sync and emits events.
Blake3 hash verification is always performed (hash available from JunctionData API).
Runs every day at 02:00:00.
"""
from services.espocrm import EspoCRMAPI
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
ctx.logger.info("=" * 80)
ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
ctx.logger.info("=" * 80)
espocrm = EspoCRMAPI(ctx)
try:
# Load all CAIKnowledge entities with status 'active' that need sync
result = await espocrm.list_entities(
'CAIKnowledge',
where=[
{
'type': 'equals',
'attribute': 'aktivierungsstatus',
'value': AIKnowledgeActivationStatus.ACTIVE.value
},
{
'type': 'in',
'attribute': 'syncStatus',
'value': [
AIKnowledgeSyncStatus.UNCLEAN.value,
AIKnowledgeSyncStatus.FAILED.value
]
}
],
select='id,name,syncStatus',
max_size=1000 # Adjust if you have more
)
entities = result.get('list', [])
total = len(entities)
ctx.logger.info(f"📊 Found {total} knowledge bases needing sync")
if total == 0:
ctx.logger.info("✅ All knowledge bases are synced")
ctx.logger.info("=" * 80)
return
# Enqueue sync events for all (Blake3 verification always enabled)
for i, entity in enumerate(entities, 1):
await ctx.enqueue({
'topic': 'aiknowledge.sync',
'data': {
'knowledge_id': entity['id'],
'source': 'daily_cron'
}
})
ctx.logger.info(
f"📤 [{i}/{total}] Enqueued: {entity['name']} "
f"(syncStatus={entity.get('syncStatus')})"
)
ctx.logger.info("=" * 80)
ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ FULL SYNC FAILED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,89 @@
"""AI Knowledge Sync Event Handler"""
from typing import Dict, Any
from redis import Redis
from motia import FlowContext, queue
config = {
"name": "AI Knowledge Sync",
"description": "Synchronizes CAIKnowledge entities with XAI Collections",
"flows": ["vmh-aiknowledge"],
"triggers": [
queue("aiknowledge.sync")
],
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""
Event handler for AI Knowledge synchronization.
Emitted by:
- Webhook on CAIKnowledge update
- Daily full sync cron job
Args:
event_data: Event payload with knowledge_id
ctx: Motia context
"""
from services.redis_client import RedisClientFactory
from services.aiknowledge_sync_utils import AIKnowledgeSync
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 AI KNOWLEDGE SYNC STARTED")
ctx.logger.info("=" * 80)
# Extract data
knowledge_id = event_data.get('knowledge_id')
source = event_data.get('source', 'unknown')
if not knowledge_id:
ctx.logger.error("❌ Missing knowledge_id in event data")
return
ctx.logger.info(f"📋 Knowledge ID: {knowledge_id}")
ctx.logger.info(f"📋 Source: {source}")
ctx.logger.info("=" * 80)
# Get Redis for locking
redis_client = RedisClientFactory.get_client(strict=False)
# Initialize sync utils
sync_utils = AIKnowledgeSync(ctx, redis_client)
# Acquire lock
lock_acquired = await sync_utils.acquire_sync_lock(knowledge_id)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Lock already held for {knowledge_id}, skipping")
ctx.logger.info(" (Will be retried by Motia queue)")
raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry
try:
# Perform sync (Blake3 hash verification always enabled)
await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ AI KNOWLEDGE SYNC COMPLETED")
ctx.logger.info("=" * 80)
# Release lock with success=True
await sync_utils.release_sync_lock(knowledge_id, success=True)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ AI KNOWLEDGE SYNC FAILED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Knowledge ID: {knowledge_id}")
ctx.logger.error("=" * 80)
# Release lock with failure
await sync_utils.release_sync_lock(
knowledge_id,
success=False,
error_message=str(e)
)
# Re-raise to let Motia retry
raise

View File

@@ -0,0 +1,254 @@
"""
VMH Bankverbindungen Sync Handler
Zentraler Sync-Handler für Bankverbindungen (Webhooks + Cron Events)
Verarbeitet:
- vmh.bankverbindungen.create: Neu in EspoCRM → Create in Advoware
- vmh.bankverbindungen.update: Geändert in EspoCRM → Notification (nicht unterstützt)
- vmh.bankverbindungen.delete: Gelöscht in EspoCRM → Notification (nicht unterstützt)
- vmh.bankverbindungen.sync_check: Cron-Check → Sync wenn nötig
"""
from typing import Dict, Any, Optional
from motia import FlowContext, queue
from services.advoware import AdvowareAPI
from services.espocrm import EspoCRMAPI
from services.bankverbindungen_mapper import BankverbindungenMapper
from services.notification_utils import NotificationManager
from services.redis_client import get_redis_client
import json
config = {
"name": "VMH Bankverbindungen Sync Handler",
"description": "Zentraler Sync-Handler für Bankverbindungen (Webhooks + Cron Events)",
"flows": ["vmh-bankverbindungen"],
"triggers": [
queue("vmh.bankverbindungen.create"),
queue("vmh.bankverbindungen.update"),
queue("vmh.bankverbindungen.delete"),
queue("vmh.bankverbindungen.sync_check")
],
"enqueues": []
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""Zentraler Sync-Handler für Bankverbindungen"""
entity_id = event_data.get('entity_id')
action = event_data.get('action', 'sync_check')
source = event_data.get('source', 'unknown')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")
return
ctx.logger.info(f"🔄 Bankverbindungen Sync gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
# Shared Redis client (centralized factory)
redis_client = get_redis_client(strict=False)
# APIs initialisieren (mit Context für besseres Logging)
espocrm = EspoCRMAPI(ctx)
advoware = AdvowareAPI(ctx)
mapper = BankverbindungenMapper()
notification_mgr = NotificationManager(espocrm_api=espocrm, context=ctx)
try:
# 1. ACQUIRE LOCK
lock_key = f"sync_lock:cbankverbindungen:{entity_id}"
acquired = redis_client.set(lock_key, "locked", nx=True, ex=900) # 15min TTL
if not acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
return
# 2. FETCH ENTITY VON ESPOCRM
try:
espo_entity = await espocrm.get_entity('CBankverbindungen', entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
redis_client.delete(lock_key)
return
ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name', 'Unbenannt')} (IBAN: {espo_entity.get('iban', 'N/A')})")
advoware_id = espo_entity.get('advowareId')
beteiligte_id = espo_entity.get('cBeteiligteId') # Parent Beteiligter
if not beteiligte_id:
ctx.logger.error(f"❌ Keine cBeteiligteId gefunden - Bankverbindung muss einem Beteiligten zugeordnet sein")
redis_client.delete(lock_key)
return
# Hole betNr vom Parent
parent = await espocrm.get_entity('CBeteiligte', beteiligte_id)
betnr = parent.get('betnr')
if not betnr:
ctx.logger.error(f"❌ Parent Beteiligter {beteiligte_id} hat keine betNr")
redis_client.delete(lock_key)
return
# 3. BESTIMME SYNC-AKTION
# FALL A: Neu (kein advowareId) → CREATE in Advoware
if not advoware_id and action in ['create', 'sync_check']:
await handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key)
# FALL B: Existiert (hat advowareId) → UPDATE oder CHECK (nicht unterstützt!)
elif advoware_id and action in ['update', 'sync_check']:
await handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key)
# FALL C: DELETE (nicht unterstützt!)
elif action == 'delete':
await handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key)
else:
ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, advowareId={advoware_id}")
redis_client.delete(lock_key)
except Exception as e:
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
redis_client.delete(lock_key)
except:
pass
async def handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key) -> None:
"""Erstellt neue Bankverbindung in Advoware"""
try:
ctx.logger.info(f"🔨 CREATE Bankverbindung in Advoware für Beteiligter {betnr}...")
advo_data = mapper.map_cbankverbindungen_to_advoware(espo_entity)
ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...")
# POST zu Advoware (Beteiligten-spezifischer Endpoint!)
result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}/Bankverbindungen',
method='POST',
json_data=advo_data
)
# Extrahiere ID und rowId
if isinstance(result, list) and len(result) > 0:
new_entity = result[0]
elif isinstance(result, dict):
new_entity = result
else:
raise Exception(f"Unexpected response format: {result}")
new_id = new_entity.get('id')
new_rowid = new_entity.get('rowId')
if not new_id:
raise Exception(f"Keine ID in Advoware Response: {result}")
ctx.logger.info(f"✅ In Advoware erstellt: ID={new_id}, rowId={new_rowid[:20] if new_rowid else 'N/A'}...")
# Schreibe advowareId + rowId zurück
await espocrm.update_entity('CBankverbindungen', entity_id, {
'advowareId': new_id,
'advowareRowId': new_rowid
})
redis_client.delete(lock_key)
ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → Advoware ID {new_id}")
except Exception as e:
ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}")
redis_client.delete(lock_key)
async def handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) -> None:
"""Update nicht möglich - Sendet Notification an User"""
try:
ctx.logger.warn(f"⚠️ UPDATE: Advoware API unterstützt kein PUT für Bankverbindungen")
iban = espo_entity.get('iban', 'N/A')
bank = espo_entity.get('bank', 'N/A')
name = espo_entity.get('name', 'Unbenannt')
# Sende Notification
await notification_mgr.notify_manual_action_required(
entity_type='CBankverbindungen',
entity_id=entity_id,
action_type='general_manual_action',
details={
'message': f'UPDATE nicht möglich für Bankverbindung: {name}',
'description': (
f"Die Advoware API unterstützt keine Updates für Bankverbindungen.\n\n"
f"**Details:**\n"
f"- Bank: {bank}\n"
f"- IBAN: {iban}\n"
f"- Beteiligter betNr: {betnr}\n"
f"- Advoware ID: {advoware_id}\n\n"
f"**Workaround:**\n"
f"Löschen Sie die Bankverbindung in EspoCRM und erstellen Sie sie neu. "
f"Die neue Bankverbindung wird dann automatisch in Advoware angelegt."
),
'entity_name': name,
'priority': 'Normal'
},
create_task=True
)
ctx.logger.info(f"📧 Notification gesendet: Update-Limitation")
redis_client.delete(lock_key)
except Exception as e:
ctx.logger.error(f"❌ UPDATE Notification fehlgeschlagen: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
redis_client.delete(lock_key)
async def handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) -> None:
"""Delete nicht möglich - Sendet Notification an User"""
try:
ctx.logger.warn(f"⚠️ DELETE: Advoware API unterstützt kein DELETE für Bankverbindungen")
if not advoware_id:
ctx.logger.info(f" Keine advowareId vorhanden, nur EspoCRM-seitiges Delete")
redis_client.delete(lock_key)
return
iban = espo_entity.get('iban', 'N/A')
bank = espo_entity.get('bank', 'N/A')
name = espo_entity.get('name', 'Unbenannt')
# Sende Notification
await notification_mgr.notify_manual_action_required(
entity_type='CBankverbindungen',
entity_id=entity_id,
action_type='general_manual_action',
details={
'message': f'DELETE erforderlich für Bankverbindung: {name}',
'description': (
f"Die Advoware API unterstützt keine Löschungen für Bankverbindungen.\n\n"
f"**Bitte manuell in Advoware löschen:**\n"
f"- Bank: {bank}\n"
f"- IBAN: {iban}\n"
f"- Beteiligter betNr: {betnr}\n"
f"- Advoware ID: {advoware_id}\n\n"
f"Die Bankverbindung wurde in EspoCRM gelöscht, bleibt aber in Advoware "
f"bestehen bis zur manuellen Löschung."
),
'entity_name': name,
'priority': 'Normal'
},
create_task=True
)
ctx.logger.info(f"📧 Notification gesendet: Delete erforderlich")
redis_client.delete(lock_key)
except Exception as e:
ctx.logger.error(f"❌ DELETE Notification fehlgeschlagen: {e}")
redis_client.delete(lock_key)

View File

@@ -0,0 +1,164 @@
"""
Beteiligte Sync Cron Job
Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die:
- Neu sind (pending_sync)
- Geändert wurden (dirty)
- Fehlgeschlagen sind (failed → Retry)
- Lange nicht gesynct wurden (clean aber > 24h alt)
"""
import asyncio
from typing import Dict, Any
from motia import FlowContext, cron
from services.espocrm import EspoCRMAPI
import datetime
config = {
"name": "VMH Beteiligte Sync Cron",
"description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen",
"flows": ["vmh-beteiligte"],
"triggers": [
cron("0 */15 1 * * *") # Alle 15 Minuten (6-field format!)
],
"enqueues": ["vmh.beteiligte.sync_check"]
}
async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
"""
Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events
"""
ctx.logger.info("🕐 Beteiligte Sync Cron gestartet")
try:
espocrm = EspoCRMAPI(ctx)
# Berechne Threshold für "veraltete" Syncs (24 Stunden)
threshold = datetime.datetime.now() - datetime.timedelta(hours=24)
threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S')
ctx.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})")
# QUERY 1: Entities mit Status pending_sync, dirty oder failed
unclean_filter = {
'where': [
{
'type': 'or',
'value': [
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'},
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'},
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'},
]
}
]
}
unclean_result = await espocrm.list_entities('CBeteiligte', where=unclean_filter['where'], max_size=100)
unclean_entities = unclean_result.get('list', [])
ctx.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
# QUERY 1b: permanently_failed Entities die Auto-Reset erreicht haben
permanently_failed_filter = {
'where': [
{
'type': 'and',
'value': [
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'permanently_failed'},
{'type': 'isNotNull', 'attribute': 'syncAutoResetAt'},
{'type': 'before', 'attribute': 'syncAutoResetAt', 'value': threshold_str}
]
}
]
}
reset_result = await espocrm.list_entities('CBeteiligte', where=permanently_failed_filter['where'], max_size=50)
reset_entities = reset_result.get('list', [])
# Reset permanently_failed entities
for entity in reset_entities:
entity_id = entity['id']
ctx.logger.info(f"🔄 Auto-Reset für permanently_failed Entity {entity_id}")
# Reset Status und Retry-Count
await espocrm.update_entity('CBeteiligte', entity_id, {
'syncStatus': 'failed', # Zurück zu 'failed' für normalen Retry
'syncRetryCount': 0,
'syncAutoResetAt': None,
'syncErrorMessage': f"Auto-Reset nach 24h - vorheriger Fehler: {entity.get('syncErrorMessage', 'N/A')}"
})
ctx.logger.info(f"📊 Auto-Reset: {len(reset_entities)} permanently_failed Entities")
# QUERY 2: Clean Entities die > 24h nicht gesynct wurden
stale_filter = {
'where': [
{
'type': 'and',
'value': [
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'},
{'type': 'isNotNull', 'attribute': 'betnr'},
{
'type': 'or',
'value': [
{'type': 'isNull', 'attribute': 'advowareLastSync'},
{'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str}
]
}
]
}
]
}
stale_result = await espocrm.list_entities('CBeteiligte', where=stale_filter['where'], max_size=50)
stale_entities = stale_result.get('list', [])
ctx.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")
# KOMBINIERE ALLE (inkl. reset_entities)
all_entities = unclean_entities + stale_entities + reset_entities
entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere
ctx.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync")
if not entity_ids:
ctx.logger.info("✅ Keine Entities benötigen Sync")
return
# Emittiere Events parallel
ctx.logger.info(f"🚀 Emittiere {len(entity_ids)} Events parallel...")
emit_tasks = [
ctx.enqueue({
'topic': 'vmh.beteiligte.sync_check',
'data': {
'entity_id': entity_id,
'action': 'sync_check',
'source': 'cron',
'timestamp': datetime.datetime.now().isoformat()
}
})
for entity_id in entity_ids
]
# Parallel emit mit error handling
results = await asyncio.gather(*emit_tasks, return_exceptions=True)
# Count successes and failures
emitted_count = sum(1 for r in results if not isinstance(r, Exception))
failed_count = sum(1 for r in results if isinstance(r, Exception))
if failed_count > 0:
ctx.logger.warn(f"⚠️ {failed_count} Events konnten nicht emittiert werden")
# Log first few errors
for i, result in enumerate(results[:5]): # Log max 5 errors
if isinstance(result, Exception):
ctx.logger.error(f" Entity {entity_ids[i]}: {result}")
ctx.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert")
except Exception as e:
ctx.logger.error(f"❌ Fehler im Sync Cron: {e}")
import traceback
ctx.logger.error(traceback.format_exc())

View File

@@ -0,0 +1,423 @@
"""
VMH Beteiligte Sync Handler
Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)
Verarbeitet:
- vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware
- vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware
- vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware (TODO)
- vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig
"""
from typing import Dict, Any, Optional
from motia import FlowContext, queue
from services.advoware import AdvowareAPI
from services.advoware_service import AdvowareService
from services.espocrm import EspoCRMAPI
from services.espocrm_mapper import BeteiligteMapper
from services.beteiligte_sync_utils import BeteiligteSync
from services.redis_client import get_redis_client
from services.exceptions import (
AdvowareAPIError,
EspoCRMAPIError,
SyncError,
RetryableError,
is_retryable
)
from services.logging_utils import get_step_logger
import json
config = {
"name": "VMH Beteiligte Sync Handler",
"description": "Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)",
"flows": ["vmh-beteiligte"],
"triggers": [
queue("vmh.beteiligte.create"),
queue("vmh.beteiligte.update"),
queue("vmh.beteiligte.delete"),
queue("vmh.beteiligte.sync_check")
],
"enqueues": []
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""
Zentraler Sync-Handler für Beteiligte
Args:
event_data: Event data mit entity_id, action, source
ctx: Motia FlowContext
"""
entity_id = event_data.get('entity_id')
action = event_data.get('action')
source = event_data.get('source')
step_logger = get_step_logger('beteiligte_sync', ctx)
if not entity_id:
step_logger.error("Keine entity_id im Event gefunden")
return
step_logger.info("=" * 80)
step_logger.info(f"🔄 BETEILIGTE SYNC HANDLER: {action.upper()}")
step_logger.info("=" * 80)
step_logger.info(f"Entity: {entity_id} | Source: {source}")
step_logger.info("=" * 80)
# Get shared Redis client (centralized)
redis_client = get_redis_client(strict=False)
# APIs initialisieren
espocrm = EspoCRMAPI(ctx)
advoware = AdvowareAPI(ctx)
sync_utils = BeteiligteSync(espocrm, redis_client, ctx)
mapper = BeteiligteMapper()
# NOTE: Kommunikation Sync Manager wird in zukünftiger Version hinzugefügt
# wenn kommunikation_sync_utils.py migriert ist
# advo_service = AdvowareService(ctx)
# komm_sync = KommunikationSyncManager(advo_service, espocrm, ctx)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH ENTITY VON ESPOCRM
try:
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
return
ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
betnr = espo_entity.get('betnr')
sync_status = espo_entity.get('syncStatus', 'pending_sync')
# Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht
sync_next_retry = espo_entity.get('syncNextRetry')
if sync_next_retry and sync_status == 'failed':
import datetime
import pytz
try:
next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S')
next_retry_ts = pytz.UTC.localize(next_retry_ts)
now_utc = datetime.datetime.now(pytz.UTC)
if now_utc < next_retry_ts:
remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60)
ctx.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten")
await sync_utils.release_sync_lock(entity_id, sync_status)
return
except Exception as e:
ctx.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}")
# 3. BESTIMME SYNC-AKTION
# FALL A: Neu (kein betnr) → CREATE in Advoware
if not betnr and action in ['create', 'sync_check']:
ctx.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx)
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
elif betnr:
ctx.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx)
# FALL C: DELETE (TODO: Implementierung später)
elif action == 'delete':
ctx.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
else:
ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
'failed',
f'Unerwarteter Fehler: {str(e)[:1900]}',
increment_retry=True
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
redis_client.delete(lock_key)
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) -> None:
"""Erstellt neuen Beteiligten in Advoware"""
try:
ctx.logger.info(f"🔨 CREATE in Advoware...")
# Transform zu Advoware Format
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...")
# POST zu Advoware
result = await advoware.api_call(
'api/v1/advonet/Beteiligte',
method='POST',
json_data=advo_data
)
# Extrahiere betNr aus Response (case-insensitive: betNr oder betnr)
new_betnr = None
if isinstance(result, dict):
new_betnr = result.get('betNr') or result.get('betnr')
if not new_betnr:
raise Exception(f"Keine betNr/betnr in Advoware Response: {result}")
ctx.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}")
# Lade Entity nach POST um rowId zu bekommen (WICHTIG für Change Detection!)
created_entity = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{new_betnr}',
method='GET'
)
if isinstance(created_entity, list):
new_rowid = created_entity[0].get('rowId') if created_entity else None
else:
new_rowid = created_entity.get('rowId')
if not new_rowid:
ctx.logger.warn(f"⚠️ Keine rowId nach CREATE - Change Detection nicht möglich!")
# OPTIMIERT: Kombiniere release_lock + betnr + rowId update in 1 API call
await sync_utils.release_sync_lock(
entity_id,
'clean',
error_message=None,
extra_fields={
'betnr': new_betnr,
'advowareRowId': new_rowid # WICHTIG für Change Detection!
}
)
ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}, rowId {new_rowid[:20] if new_rowid else 'N/A'}...")
except Exception as e:
ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) -> None:
"""Synchronisiert existierenden Beteiligten"""
try:
ctx.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...")
# Fetch von Advoware
try:
advo_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='GET'
)
# Advoware gibt manchmal Listen zurück
if isinstance(advo_result, list):
advo_entity = advo_result[0] if advo_result else None
else:
advo_entity = advo_result
if not advo_entity:
raise Exception(f"Beteiligter betNr={betnr} nicht gefunden")
except Exception as e:
# 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht
if '404' in str(e) or 'nicht gefunden' in str(e).lower():
ctx.logger.warn(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}")
await sync_utils.handle_advoware_deleted(entity_id, str(e))
return
else:
raise
ctx.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}")
# ÄNDERUNGSERKENNUNG (Primary: rowId, Fallback: Timestamps)
comparison = sync_utils.compare_entities(espo_entity, advo_entity)
ctx.logger.info(f"⏱️ Vergleich: {comparison}")
# KEIN STAMMDATEN-SYNC NÖTIG
if comparison == 'no_change':
ctx.logger.info(f"✅ Keine Stammdaten-Änderungen erkannt")
# NOTE: Kommunikation-Sync würde hier stattfinden
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
await sync_utils.release_sync_lock(entity_id, 'clean')
return
# ESPOCRM NEUER → Update Advoware
if comparison == 'espocrm_newer':
ctx.logger.info(f"📤 EspoCRM ist neuer → Update Advoware STAMMDATEN")
# OPTIMIERT: Use merge utility
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
put_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
json_data=merged_data
)
# Extrahiere neue rowId aus PUT Response (spart extra GET!)
new_rowid = None
if isinstance(put_result, list) and len(put_result) > 0:
new_rowid = put_result[0].get('rowId')
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
ctx.logger.info(f"✅ Advoware STAMMDATEN aktualisiert, rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
# Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_advoware'
)
if not validation_success:
ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Validation failed: {validation_error}",
increment_retry=True
)
return
# NOTE: Kommunikation-Sync würde hier stattfinden
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
# Release Lock + Update rowId
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': new_rowid}
)
# ADVOWARE NEUER → Update EspoCRM
elif comparison == 'advoware_newer':
ctx.logger.info(f"📥 Advoware ist neuer → Update EspoCRM STAMMDATEN")
espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity)
await espocrm.update_entity('CBeteiligte', entity_id, espo_data)
ctx.logger.info(f"✅ EspoCRM STAMMDATEN aktualisiert")
# Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_espocrm'
)
if not validation_success:
ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Validation failed: {validation_error}",
increment_retry=True
)
return
# NOTE: Kommunikation-Sync würde hier stattfinden
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
# Release Lock + Update rowId
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': advo_entity.get('rowId')}
)
# KONFLIKT → EspoCRM WINS
elif comparison == 'conflict':
ctx.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS (STAMMDATEN)")
# OPTIMIERT: Use merge utility
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
put_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
json_data=merged_data
)
# Extrahiere neue rowId aus PUT Response
new_rowid = None
if isinstance(put_result, list) and len(put_result) > 0:
new_rowid = put_result[0].get('rowId')
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
conflict_msg = (
f"EspoCRM: {espo_entity.get('modifiedAt')}, "
f"Advoware: {advo_entity.get('geaendertAm')}. "
f"EspoCRM hat gewonnen."
)
ctx.logger.info(f"✅ Konflikt gelöst (EspoCRM won), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
# Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_advoware'
)
if not validation_success:
ctx.logger.error(f"❌ Conflict resolution validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Conflict resolution validation failed: {validation_error}",
increment_retry=True
)
return
await sync_utils.resolve_conflict_espocrm_wins(
entity_id,
espo_entity,
advo_entity,
conflict_msg,
extra_fields={'advowareRowId': new_rowid}
)
# NOTE: Kommunikation-Sync (nur EspoCRM→Advoware) würde hier stattfinden
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx, direction='to_advoware', force_espo_wins=True)
await sync_utils.release_sync_lock(entity_id, 'clean')
except Exception as e:
ctx.logger.error(f"❌ UPDATE fehlgeschlagen: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)

View File

@@ -0,0 +1,394 @@
"""
VMH Document Sync Handler
Zentraler Sync-Handler für Documents mit xAI Collections
Verarbeitet:
- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig
- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig
- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections
"""
from typing import Dict, Any
from motia import FlowContext, queue
from services.espocrm import EspoCRMAPI
from services.document_sync_utils import DocumentSync
from services.xai_service import XAIService
from services.redis_client import get_redis_client
import hashlib
import json
config = {
"name": "VMH Document Sync Handler",
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
"flows": ["vmh-documents"],
"triggers": [
queue("vmh.document.create"),
queue("vmh.document.update"),
queue("vmh.document.delete")
],
"enqueues": []
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""Zentraler Sync-Handler für Documents"""
entity_id = event_data.get('entity_id')
entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente
action = event_data.get('action')
source = event_data.get('source')
if not entity_id:
ctx.logger.error("Keine entity_id im Event gefunden")
return
ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info(f"Source: {source}")
ctx.logger.info("=" * 80)
# Shared Redis client for distributed locking (centralized factory)
redis_client = get_redis_client(strict=False)
# APIs initialisieren (mit Context für besseres Logging)
espocrm = EspoCRMAPI(ctx)
sync_utils = DocumentSync(espocrm, redis_client, ctx)
xai_service = XAIService(ctx)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_type} {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
try:
document = await espocrm.get_entity(entity_type, entity_id)
except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
return
ctx.logger.info(f"📋 {entity_type} geladen:")
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}")
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
if action == 'delete':
await handle_delete(entity_id, document, sync_utils, xai_service, ctx, entity_type)
elif action in ['create', 'update']:
await handle_create_or_update(entity_id, document, sync_utils, xai_service, ctx, entity_type)
else:
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type)
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
success=False,
error_message=str(e)[:2000],
entity_type=entity_type
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:document:{entity_id}"
redis_client.delete(lock_key)
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None:
"""
Behandelt Create/Update von Documents
Entscheidet ob xAI-Sync nötig ist und führt diesen durch
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
ctx.logger.info("=" * 80)
# Datei-Status für Preview-Generierung (verschiedene Feld-Namen unterstützen)
datei_status = document.get('fileStatus') or document.get('dateiStatus')
# Entscheidungslogik: Soll dieses Document zu xAI?
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
ctx.logger.info(f" Grund: {reason}")
ctx.logger.info(f" File-Status: {datei_status or 'N/A'}")
if collection_ids:
ctx.logger.info(f" Collections: {collection_ids}")
# ═══════════════════════════════════════════════════════════════
# CHECK: Knowledge Bases mit Status "new" (noch keine Collection)
# ═══════════════════════════════════════════════════════════════
new_knowledge_bases = [cid for cid in collection_ids if cid.startswith('NEW:')]
if new_knowledge_bases:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🆕 DOKUMENT IST MIT KNOWLEDGE BASE(S) VERKNÜPFT (Status: new)")
ctx.logger.info("=" * 80)
for new_kb in new_knowledge_bases:
kb_id = new_kb[4:] # Remove "NEW:" prefix
ctx.logger.info(f"📋 CAIKnowledge {kb_id}")
ctx.logger.info(f" Status: new → Collection muss zuerst erstellt werden")
# Trigger Knowledge Sync
ctx.logger.info(f"📤 Triggering aiknowledge.sync event...")
await ctx.emit('aiknowledge.sync', {
'entity_id': kb_id,
'entity_type': 'CAIKnowledge',
'triggered_by': 'document_sync',
'document_id': entity_id
})
ctx.logger.info(f"✅ Event emitted for {kb_id}")
# Release lock and skip document sync - knowledge sync will handle documents
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ KNOWLEDGE SYNC GETRIGGERT")
ctx.logger.info(" Document Sync wird übersprungen")
ctx.logger.info(" (Knowledge Sync erstellt Collection und synchronisiert dann Dokumente)")
ctx.logger.info("=" * 80)
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
# ═══════════════════════════════════════════════════════════════
# PREVIEW-GENERIERUNG bei neuen/geänderten Dateien
# ═══════════════════════════════════════════════════════════════
# Case-insensitive check für Datei-Status
datei_status_lower = (datei_status or '').lower()
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN")
ctx.logger.info(f" Datei-Status: {datei_status}")
ctx.logger.info("=" * 80)
try:
# 1. Hole Download-Informationen
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview")
else:
ctx.logger.info(f"📥 Datei-Info:")
ctx.logger.info(f" Filename: {download_info['filename']}")
ctx.logger.info(f" MIME-Type: {download_info['mime_type']}")
ctx.logger.info(f" Size: {download_info['size']} bytes")
# 2. Download File von EspoCRM
ctx.logger.info(f"📥 Downloading file...")
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. Speichere temporär für Preview-Generierung
import tempfile
import os
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{download_info['filename']}") as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
try:
# 4. Generiere Preview
ctx.logger.info(f"🖼️ Generating preview (600x800 WebP)...")
preview_data = await sync_utils.generate_thumbnail(
tmp_path,
download_info['mime_type'],
max_width=600,
max_height=800
)
if preview_data:
ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP")
# 5. Upload Preview zu EspoCRM und reset file status
ctx.logger.info(f"📤 Uploading preview to EspoCRM...")
await sync_utils.update_sync_metadata(
entity_id,
preview_data=preview_data,
reset_file_status=True, # Reset status nach Preview-Generierung
entity_type=entity_type
)
ctx.logger.info(f"✅ Preview uploaded successfully")
else:
ctx.logger.warn("⚠️ Preview-Generierung lieferte keine Daten")
# Auch bei fehlgeschlagener Preview-Generierung Status zurücksetzen
await sync_utils.update_sync_metadata(
entity_id,
reset_file_status=True,
entity_type=entity_type
)
finally:
# Cleanup temp file
try:
os.remove(tmp_path)
except:
pass
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Preview-Generierung: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
# Continue - Preview ist optional
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ PREVIEW-VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════════
# xAI SYNC (falls erforderlich)
# ═══════════════════════════════════════════════════════════════
if not needs_sync:
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
# Wenn Preview generiert wurde aber kein xAI sync nötig,
# wurde Status bereits in Preview-Schritt zurückgesetzt
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
# ═══════════════════════════════════════════════════════════════
# xAI SYNC DURCHFÜHREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🤖 xAI SYNC STARTEN")
ctx.logger.info("=" * 80)
# 1. Hole Download-Informationen (falls nicht schon aus Preview-Schritt vorhanden)
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
raise Exception("Konnte Download-Info nicht ermitteln Datei fehlt?")
ctx.logger.info(f"📥 Datei: {download_info['filename']} ({download_info['size']} bytes, {download_info['mime_type']})")
# 2. Download Datei von EspoCRM
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. MD5-Hash berechnen für Change-Detection
file_hash = hashlib.md5(file_content).hexdigest()
ctx.logger.info(f"🔑 MD5: {file_hash}")
# 4. Upload zu xAI
# Immer neu hochladen wenn needs_sync=True (neues File oder Hash geändert)
ctx.logger.info("📤 Uploading to xAI...")
xai_file_id = await xai_service.upload_file(
file_content,
download_info['filename'],
download_info['mime_type']
)
ctx.logger.info(f"✅ xAI file_id: {xai_file_id}")
# 5. Zu allen Ziel-Collections hinzufügen
ctx.logger.info(f"📚 Füge zu {len(collection_ids)} Collection(s) hinzu...")
added_collections = await xai_service.add_to_collections(collection_ids, xai_file_id)
ctx.logger.info(f"✅ In {len(added_collections)}/{len(collection_ids)} Collections eingetragen")
# 6. EspoCRM Metadaten aktualisieren und Lock freigeben
await sync_utils.update_sync_metadata(
entity_id,
xai_file_id=xai_file_id,
collection_ids=added_collections,
file_hash=file_hash,
entity_type=entity_type
)
await sync_utils.release_sync_lock(
entity_id,
success=True,
entity_type=entity_type
)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Create/Update: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None:
"""
Behandelt Delete von Documents
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
"""
try:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
ctx.logger.info("=" * 80)
xai_file_id = document.get('xaiFileId') or document.get('xaiId')
xai_collections = document.get('xaiCollections') or []
if not xai_file_id or not xai_collections:
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return
ctx.logger.info(f"📋 Document Info:")
ctx.logger.info(f" xaiFileId: {xai_file_id}")
ctx.logger.info(f" Collections: {xai_collections}")
ctx.logger.info(f"🗑️ Entferne aus {len(xai_collections)} Collection(s)...")
await xai_service.remove_from_collections(xai_collections, xai_file_id)
ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
ctx.logger.info(" (File selbst bleibt in xAI kann in anderen Collections sein)")
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ DELETE ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)

View File

@@ -0,0 +1 @@
"""VMH Webhook Steps"""

View File

@@ -0,0 +1,91 @@
"""VMH Webhook - AI Knowledge Update"""
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook AI Knowledge Update",
"description": "Receives update webhooks from EspoCRM for CAIKnowledge entities",
"flows": ["vmh-aiknowledge"],
"triggers": [
http("POST", "/vmh/webhook/aiknowledge/update")
],
"enqueues": ["aiknowledge.sync"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for CAIKnowledge updates in EspoCRM.
Triggered when:
- activationStatus changes
- syncStatus changes (e.g., set to 'unclean')
- Documents linked/unlinked
"""
try:
ctx.logger.info("=" * 80)
ctx.logger.info("🔔 AI Knowledge Update Webhook")
ctx.logger.info("=" * 80)
# Extract payload
payload = request.body
# Handle case where payload is a list (e.g., from array-based webhook)
if isinstance(payload, list):
if not payload:
ctx.logger.error("❌ Empty payload list")
return ApiResponse(
status=400,
body={'success': False, 'error': 'Empty payload'}
)
payload = payload[0] # Take first item
# Ensure payload is a dict
if not isinstance(payload, dict):
ctx.logger.error(f"❌ Invalid payload type: {type(payload)}")
return ApiResponse(
status=400,
body={'success': False, 'error': f'Invalid payload type: {type(payload).__name__}'}
)
# Validate required fields
knowledge_id = payload.get('entity_id') or payload.get('id')
entity_type = payload.get('entity_type', 'CAIKnowledge')
action = payload.get('action', 'update')
if not knowledge_id:
ctx.logger.error("❌ Missing entity_id in payload")
return ApiResponse(
status=400,
body={'success': False, 'error': 'Missing entity_id'}
)
ctx.logger.info(f"📋 Entity Type: {entity_type}")
ctx.logger.info(f"📋 Entity ID: {knowledge_id}")
ctx.logger.info(f"📋 Action: {action}")
# Enqueue sync event
await ctx.enqueue({
'topic': 'aiknowledge.sync',
'data': {
'knowledge_id': knowledge_id,
'source': 'webhook',
'action': action
}
})
ctx.logger.info(f"✅ Sync event enqueued for {knowledge_id}")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={'success': True, 'knowledge_id': knowledge_id}
)
except Exception as e:
ctx.logger.error(f"❌ Webhook error: {e}")
return ApiResponse(
status=500,
body={'success': False, 'error': str(e)}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Bankverbindungen Create"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Create",
"description": "Receives create webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/create")
],
"enqueues": ["vmh.bankverbindungen.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen creation in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BANKVERBINDUNGEN CREATE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for create sync")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN CREATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Bankverbindungen Delete"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Delete",
"description": "Receives delete webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/delete")
],
"enqueues": ["vmh.bankverbindungen.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen deletion in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BANKVERBINDUNGEN DELETE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN DELETE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Bankverbindungen Update"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Update",
"description": "Receives update webhooks from EspoCRM for Bankverbindungen",
"flows": ["vmh-bankverbindungen"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/update")
],
"enqueues": ["vmh.bankverbindungen.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen updates in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BANKVERBINDUNGEN UPDATE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for update sync")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: BANKVERBINDUNGEN UPDATE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,86 @@
"""VMH Webhook - Beteiligte Create"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Create",
"description": "Receives create webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/create")
],
"enqueues": ["vmh.beteiligte.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte creation in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to Advoware.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BETEILIGTE CREATE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for create sync")
# Emit events for queue processing (deduplication via lock in event handler)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: VMH CREATE WEBHOOK")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'details': str(e)
}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Beteiligte Delete"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Delete",
"description": "Receives delete webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/delete")
],
"enqueues": ["vmh.beteiligte.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte deletion in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BETEILIGTE DELETE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for delete sync")
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: BETEILIGTE DELETE WEBHOOK")
ctx.logger.error(f"Error: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,86 @@
"""VMH Webhook - Beteiligte Update"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Update",
"description": "Receives update webhooks from EspoCRM for Beteiligte",
"flows": ["vmh-beteiligte"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/update")
],
"enqueues": ["vmh.beteiligte.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte updates in EspoCRM.
Note: Loop prevention is implemented on EspoCRM side.
rowId updates no longer trigger webhooks, so no filtering needed.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: BETEILIGTE UPDATE")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
ctx.logger.info("=" * 80)
# Collect all IDs from batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs found for update sync")
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info("✅ VMH Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: VMH UPDATE WEBHOOK")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'details': str(e)
}
)

View File

@@ -0,0 +1,91 @@
"""VMH Webhook - Document Create"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/create")
],
"enqueues": ["vmh.document.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document creation in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to xAI.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: DOCUMENT CREATE")
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
# Take entityType from first entity if present
if entity_type == 'CDokumente':
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} document IDs found for create sync")
# Emit events for queue processing (deduplication via lock in event handler)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.create',
'data': {
'entity_id': entity_id,
'entity_type': entity_type,
'action': 'create',
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
}
})
ctx.logger.info("✅ Document Create Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'success': True,
'message': f'{len(entity_ids)} document(s) enqueued for sync',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: DOCUMENT CREATE WEBHOOK")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'success': False,
'error': str(e)
}
)

View File

@@ -0,0 +1,91 @@
"""VMH Webhook - Document Delete"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/delete")
],
"enqueues": ["vmh.document.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document deletion in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be removed from xAI.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: DOCUMENT DELETE")
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
# Take entityType from first entity if present
if entity_type == 'CDokumente':
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} document IDs found for delete sync")
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.delete',
'data': {
'entity_id': entity_id,
'entity_type': entity_type,
'action': 'delete',
'timestamp': payload[0].get('deletedAt') if isinstance(payload, list) and payload else None
}
})
ctx.logger.info("✅ Document Delete Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'success': True,
'message': f'{len(entity_ids)} document(s) enqueued for deletion',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: DOCUMENT DELETE WEBHOOK")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'success': False,
'error': str(e)
}
)

View File

@@ -0,0 +1,91 @@
"""VMH Webhook - Document Update"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/update")
],
"enqueues": ["vmh.document.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document updates in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to xAI.
"""
try:
payload = request.body or []
ctx.logger.info("=" * 80)
ctx.logger.info("📥 VMH WEBHOOK: DOCUMENT UPDATE")
ctx.logger.info("=" * 80)
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Collect all IDs from batch
entity_ids = set()
entity_type = 'CDokumente' # Default
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
# Take entityType from first entity if present
if entity_type == 'CDokumente':
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} document IDs found for update sync")
# Emit events for queue processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.update',
'data': {
'entity_id': entity_id,
'entity_type': entity_type,
'action': 'update',
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
}
})
ctx.logger.info("✅ Document Update Webhook processed: "
f"{len(entity_ids)} events emitted")
return ApiResponse(
status=200,
body={
'success': True,
'message': f'{len(entity_ids)} document(s) enqueued for sync',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: DOCUMENT UPDATE WEBHOOK")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'success': False,
'error': str(e)
}
)

View File

@@ -0,0 +1,523 @@
"""VMH xAI Chat Completions API
OpenAI-kompatible Chat Completions API mit xAI/LangChain Backend.
Unterstützt file_search über xAI Collections (RAG).
"""
import json
import time
from typing import Any, Dict, List, Optional
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH xAI Chat Completions API",
"description": "OpenAI-compatible Chat Completions API with xAI LangChain backend",
"flows": ["vmh-chat"],
"triggers": [
http("POST", "/vmh/v1/chat/completions")
],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
OpenAI-compatible Chat Completions endpoint.
Request Body (OpenAI format):
{
"model": "grok-2-latest",
"messages": [
{"role": "system", "content": "You are helpful"},
{"role": "user", "content": "1234/56 Was ist der Stand?"}
],
"temperature": 0.7,
"max_tokens": 2000,
"stream": false,
"extra_body": {
"collection_id": "col_abc123", // Optional: override auto-detection
"enable_web_search": true, // Optional: enable web search (default: false)
"web_search_config": { // Optional: web search configuration
"allowed_domains": ["example.com"],
"excluded_domains": ["spam.com"],
"enable_image_understanding": true
}
}
}
Aktenzeichen-Erkennung (Priority):
1. extra_body.collection_id (explicit override)
2. First user message starts with Aktenzeichen (e.g., "1234/56 ...")
3. Error 400 if no collection_id found (strict mode)
Response (OpenAI format):
Non-Streaming:
{
"id": "chatcmpl-...",
"object": "chat.completion",
"created": 1234567890,
"model": "grok-2-latest",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "..."},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": X, "completion_tokens": Y, "total_tokens": Z}
}
Streaming (SSE):
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},...}]}
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":" world"},...}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
"""
from services.langchain_xai_service import LangChainXAIService
from services.aktenzeichen_utils import extract_aktenzeichen, normalize_aktenzeichen
from services.espocrm import EspoCRMAPI
ctx.logger.info("=" * 80)
ctx.logger.info("💬 VMH CHAT COMPLETIONS API")
ctx.logger.info("=" * 80)
try:
# Parse request body
body = request.body or {}
if not isinstance(body, dict):
ctx.logger.error(f"❌ Invalid request body type: {type(body)}")
return ApiResponse(
status=400,
body={'error': 'Request body must be JSON object'}
)
# Extract parameters
model_name = body.get('model', 'grok-4.20-beta-0309-reasoning')
messages = body.get('messages', [])
temperature = body.get('temperature', 0.7)
max_tokens = body.get('max_tokens')
stream = body.get('stream', False)
extra_body = body.get('extra_body', {})
# Web Search parameters (default: disabled)
enable_web_search = extra_body.get('enable_web_search', False)
web_search_config = extra_body.get('web_search_config', {})
ctx.logger.info(f"📋 Model: {model_name}")
ctx.logger.info(f"📋 Messages: {len(messages)}")
ctx.logger.info(f"📋 Stream: {stream}")
ctx.logger.info(f"📋 Web Search: {'enabled' if enable_web_search else 'disabled'}")
if enable_web_search and web_search_config:
ctx.logger.debug(f"Web Search Config: {json.dumps(web_search_config, indent=2)}")
# Log full conversation messages
ctx.logger.info("-" * 80)
ctx.logger.info("📨 REQUEST MESSAGES:")
for i, msg in enumerate(messages, 1):
role = msg.get('role', 'unknown')
content = msg.get('content', '')
preview = content[:150] + "..." if len(content) > 150 else content
ctx.logger.info(f" [{i}] {role}: {preview}")
ctx.logger.info("-" * 80)
# Validate messages
if not messages or not isinstance(messages, list):
ctx.logger.error("❌ Missing or invalid messages array")
return ApiResponse(
status=400,
body={'error': 'messages must be non-empty array'}
)
# Determine collection_id (Priority: extra_body > Aktenzeichen > error)
collection_id: Optional[str] = None
aktenzeichen: Optional[str] = None
# Priority 1: Explicit collection_id in extra_body
if 'collection_id' in extra_body:
collection_id = extra_body['collection_id']
ctx.logger.info(f"🔍 Collection ID from extra_body: {collection_id}")
# Priority 2: Extract Aktenzeichen from first user message
else:
for msg in messages:
if msg.get('role') == 'user':
content = msg.get('content', '')
aktenzeichen_raw = extract_aktenzeichen(content)
if aktenzeichen_raw:
aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw)
ctx.logger.info(f"🔍 Aktenzeichen detected: {aktenzeichen}")
# Lookup collection_id via EspoCRM
collection_id = await lookup_collection_by_aktenzeichen(
aktenzeichen, ctx
)
if collection_id:
ctx.logger.info(f"✅ Collection found: {collection_id}")
# Remove Aktenzeichen from message (clean prompt)
from services.aktenzeichen_utils import remove_aktenzeichen
msg['content'] = remove_aktenzeichen(content)
ctx.logger.debug(f"Cleaned message: {msg['content']}")
else:
ctx.logger.warn(f"⚠️ No collection found for {aktenzeichen}")
break # Only check first user message
# Priority 3: Error if no collection_id AND web_search disabled
if not collection_id and not enable_web_search:
ctx.logger.error("❌ No collection_id found and web_search disabled")
ctx.logger.error(" Provide collection_id, enable web_search, or both")
return ApiResponse(
status=400,
body={
'error': 'collection_id or web_search required',
'message': 'Provide collection_id in extra_body, enable web_search, or start message with Aktenzeichen (e.g., "1234/56 question")'
}
)
# Initialize LangChain xAI Service
try:
langchain_service = LangChainXAIService(ctx)
except ValueError as e:
ctx.logger.error(f"❌ Service initialization failed: {e}")
return ApiResponse(
status=500,
body={'error': 'Service configuration error', 'details': str(e)}
)
# Create ChatXAI model
model = langchain_service.get_chat_model(
model=model_name,
temperature=temperature,
max_tokens=max_tokens
)
# Bind tools (file_search and/or web_search)
model_with_tools = langchain_service.bind_tools(
model=model,
collection_id=collection_id,
enable_web_search=enable_web_search,
web_search_config=web_search_config,
max_num_results=10
)
# Generate completion_id
completion_id = f"chatcmpl-{ctx.traceId[:12]}" if hasattr(ctx, 'traceId') else f"chatcmpl-{int(time.time())}"
created_ts = int(time.time())
# Branch: Streaming vs Non-Streaming
if stream:
ctx.logger.info("🌊 Starting streaming response...")
return await handle_streaming_response(
model_with_tools=model_with_tools,
messages=messages,
completion_id=completion_id,
created_ts=created_ts,
model_name=model_name,
langchain_service=langchain_service,
ctx=ctx
)
else:
ctx.logger.info("📦 Starting non-streaming response...")
return await handle_non_streaming_response(
model_with_tools=model_with_tools,
messages=messages,
completion_id=completion_id,
created_ts=created_ts,
model_name=model_name,
langchain_service=langchain_service,
ctx=ctx
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error("❌ ERROR: CHAT COMPLETIONS API")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error: {e}", exc_info=True)
ctx.logger.error(f"Request body: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
ctx.logger.error("=" * 80)
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'message': str(e)
}
)
async def handle_non_streaming_response(
model_with_tools,
messages: List[Dict[str, Any]],
completion_id: str,
created_ts: int,
model_name: str,
langchain_service,
ctx: FlowContext
) -> ApiResponse:
"""
Handle non-streaming chat completion.
Returns:
ApiResponse with OpenAI-format JSON body
"""
try:
# Invoke model
result = await langchain_service.invoke_chat(model_with_tools, messages)
# Extract content - handle both string and structured responses
if hasattr(result, 'content'):
raw_content = result.content
# If content is a list (tool calls + text message), extract text
if isinstance(raw_content, list):
# Find the text message (usually last element with type='text')
text_messages = [
item.get('text', '')
for item in raw_content
if isinstance(item, dict) and item.get('type') == 'text'
]
content = text_messages[0] if text_messages else str(raw_content)
else:
content = raw_content
else:
content = str(result)
# Build OpenAI-compatible response
response_body = {
'id': completion_id,
'object': 'chat.completion',
'created': created_ts,
'model': model_name,
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': content
},
'finish_reason': 'stop'
}],
'usage': {
'prompt_tokens': 0, # LangChain doesn't expose token counts easily
'completion_tokens': 0,
'total_tokens': 0
}
}
# Log token usage (if available)
if hasattr(result, 'usage_metadata'):
usage = result.usage_metadata
prompt_tokens = getattr(usage, 'input_tokens', 0)
completion_tokens = getattr(usage, 'output_tokens', 0)
response_body['usage'] = {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens
}
ctx.logger.info(f"📊 Token Usage: prompt={prompt_tokens}, completion={completion_tokens}")
# Log citations if available (from tool response annotations)
if hasattr(result, 'content') and isinstance(result.content, list):
# Extract citations from structured response
for item in result.content:
if isinstance(item, dict) and item.get('type') == 'text':
annotations = item.get('annotations', [])
if annotations:
ctx.logger.info(f"🔗 Citations: {len(annotations)}")
for i, citation in enumerate(annotations[:10], 1): # Log first 10
url = citation.get('url', 'N/A')
title = citation.get('title', '')
if url.startswith('collections://'):
# Internal collection reference
ctx.logger.debug(f" [{i}] Collection Document: {title}")
else:
# External URL
ctx.logger.debug(f" [{i}] {url}")
# Log complete response content
ctx.logger.info(f"✅ Chat completion: {len(content)} chars")
ctx.logger.info("=" * 80)
ctx.logger.info("📝 COMPLETE RESPONSE:")
ctx.logger.info("-" * 80)
ctx.logger.info(content)
ctx.logger.info("-" * 80)
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body=response_body
)
except Exception as e:
ctx.logger.error(f"❌ Non-streaming completion failed: {e}", exc_info=True)
raise
async def handle_streaming_response(
model_with_tools,
messages: List[Dict[str, Any]],
completion_id: str,
created_ts: int,
model_name: str,
langchain_service,
ctx: FlowContext
):
"""
Handle streaming chat completion via SSE.
Returns:
Streaming response generator
"""
async def stream_generator():
try:
# Set SSE headers
await ctx.response.status(200)
await ctx.response.headers({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
})
ctx.logger.info("🌊 Streaming started")
# Stream chunks
chunk_count = 0
total_content = ""
async for chunk in langchain_service.astream_chat(model_with_tools, messages):
# Extract delta content - handle structured chunks
if hasattr(chunk, "content"):
chunk_content = chunk.content
# If chunk content is a list (tool calls), extract text parts
if isinstance(chunk_content, list):
# Accumulate only text deltas
text_parts = [
item.get('text', '')
for item in chunk_content
if isinstance(item, dict) and item.get('type') == 'text'
]
delta = ''.join(text_parts)
else:
delta = chunk_content
else:
delta = ""
if delta:
total_content += delta
chunk_count += 1
# Build SSE data
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model_name,
"choices": [{
"index": 0,
"delta": {"content": delta},
"finish_reason": None
}]
}
# Send SSE event
await ctx.response.stream(f"data: {json.dumps(data, ensure_ascii=False)}\n\n")
# Send finish event
finish_data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model_name,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
await ctx.response.stream(f"data: {json.dumps(finish_data)}\n\n")
# Send [DONE]
await ctx.response.stream("data: [DONE]\n\n")
# Close stream
await ctx.response.close()
# Log complete streamed response
ctx.logger.info(f"✅ Streaming completed: {chunk_count} chunks, {len(total_content)} chars")
ctx.logger.info("=" * 80)
ctx.logger.info("📝 COMPLETE STREAMED RESPONSE:")
ctx.logger.info("-" * 80)
ctx.logger.info(total_content)
ctx.logger.info("-" * 80)
ctx.logger.info("=" * 80)
except Exception as e:
ctx.logger.error(f"❌ Streaming failed: {e}", exc_info=True)
# Send error event
error_data = {
"error": {
"message": str(e),
"type": "server_error"
}
}
await ctx.response.stream(f"data: {json.dumps(error_data)}\n\n")
await ctx.response.close()
return stream_generator()
async def lookup_collection_by_aktenzeichen(
aktenzeichen: str,
ctx: FlowContext
) -> Optional[str]:
"""
Lookup xAI Collection ID for Aktenzeichen via EspoCRM.
Search strategy:
1. Search for Raeumungsklage with matching advowareAkteBezeichner
2. Return xaiCollectionId if found
Args:
aktenzeichen: Normalized Aktenzeichen (e.g., "1234/56")
ctx: Motia context
Returns:
Collection ID or None if not found
"""
try:
# Initialize EspoCRM API
espocrm = EspoCRMAPI(ctx)
# Search Räumungsklage by advowareAkteBezeichner
ctx.logger.info(f"🔍 Searching Räumungsklage for Aktenzeichen: {aktenzeichen}")
search_result = await espocrm.search_entities(
entity_type='Raeumungsklage',
where=[{
'type': 'equals',
'attribute': 'advowareAkteBezeichner',
'value': aktenzeichen
}],
select=['id', 'xaiCollectionId', 'advowareAkteBezeichner'],
maxSize=1
)
if search_result and len(search_result) > 0:
entity = search_result[0]
collection_id = entity.get('xaiCollectionId')
if collection_id:
ctx.logger.info(f"✅ Found Räumungsklage: {entity.get('id')}")
return collection_id
else:
ctx.logger.warn(f"⚠️ Räumungsklage found but no xaiCollectionId: {entity.get('id')}")
else:
ctx.logger.warn(f"⚠️ No Räumungsklage found for {aktenzeichen}")
return None
except Exception as e:
ctx.logger.error(f"❌ Collection lookup failed: {e}", exc_info=True)
return None