Migrate VMH Integration - Phase 2: Webhook endpoints

- Added services/espocrm.py: EspoCRM API client with Redis support
- Added 6 VMH webhook steps for EspoCRM integration:

  Beteiligte webhooks:
  - POST /vmh/webhook/beteiligte/create
  - POST /vmh/webhook/beteiligte/update
  - POST /vmh/webhook/beteiligte/delete

  Bankverbindungen webhooks:
  - POST /vmh/webhook/bankverbindungen/create
  - POST /vmh/webhook/bankverbindungen/update
  - POST /vmh/webhook/bankverbindungen/delete

All webhook endpoints receive batch/single entity notifications from
EspoCRM and emit queue events for downstream processing.

Note: Complex sync handlers (event processors) not yet migrated -
they require additional utility modules (beteiligte_sync_utils.py,
mappers, notification_utils) which will be migrated in Phase 3.

Updated MIGRATION_STATUS.md with Phase 2 completion.
This commit is contained in:
bsiggel
2026-03-01 21:52:19 +00:00
parent 164c90c89d
commit 0216c4c3ae
10 changed files with 754 additions and 14 deletions

View File

@@ -163,24 +163,36 @@ From old `requirements.txt` and code analysis:
- ✅ MIGRATION_GUIDE.md reviewed
- ✅ Migration patterns documented
- ✅ New system has example ticketing steps
-**Advoware Proxy Steps migrated** (GET, POST, PUT, DELETE)
-**Phase 1: Advoware Proxy Steps migrated** (GET, POST, PUT, DELETE)
-**Advoware API service module migrated** (services/advoware.py)
-Dependencies updated (aiohttp, redis, python-dotenv)
-ExecModule fixed to use correct Python command
- ✅ All 4 endpoints registered and running:
- `GET /advoware/proxy`
- `POST /advoware/proxy`
- `PUT /advoware/proxy`
- `DELETE /advoware/proxy`
-**Phase 2: VMH Integration - Webhook Steps migrated** (6 endpoints)
-**EspoCRM API service module migrated** (services/espocrm.py)
- ✅ All endpoints registered and running:
- **Advoware Proxy:**
- `GET /advoware/proxy`
- `POST /advoware/proxy`
- `PUT /advoware/proxy`
- `DELETE /advoware/proxy`
- **VMH Webhooks - Beteiligte:**
- `POST /vmh/webhook/beteiligte/create`
- `POST /vmh/webhook/beteiligte/update`
- `POST /vmh/webhook/beteiligte/delete`
- **VMH Webhooks - Bankverbindungen:**
- `POST /vmh/webhook/bankverbindungen/create`
- `POST /vmh/webhook/bankverbindungen/update`
- `POST /vmh/webhook/bankverbindungen/delete`
### Current Status: Phase 1 Complete ✅
### Current Status: Phase 2 Complete ✅
The simple Advoware Proxy steps have been successfully migrated and are running in production. These steps provide a universal proxy interface to the Advoware API with automatic token management via Redis.
VMH Webhook endpoints are now receiving EspoCRM webhook events and emitting queue events for processing. The webhook steps handle batch and single entity notifications and provide deduplication via the event handling system.
### Next Steps
1. Migrate additional service modules (espocrm.py, notification_utils.py, etc.)
2. Migrate VMH integration steps (beteiligte, bankverbindungen sync)
3. Migrate complex calendar sync steps (requires PostgreSQL)
**Note:** The complex sync handlers (beteiligte_sync_event_step.py, bankverbindungen_sync_event_step.py) are NOT yet migrated as they require additional utility modules:
- `services/beteiligte_sync_utils.py` (663 lines - distributed locking, retry logic, notifications)
- `services/bankverbindungen_mapper.py` (data mapping between EspoCRM and Advoware)
- `services/espocrm_mapper.py` (mapping utilities)
- `services/notification_utils.py` (in-app notifications)
These sync handlers process the queue events emitted by the webhook steps and perform the actual synchronization with Advoware. They will be migrated in Phase 3.
## Notes
- Old system was Node.js + Python hybrid (Python steps as child processes)

300
services/espocrm.py Normal file
View File

@@ -0,0 +1,300 @@
"""EspoCRM API client for Motia III"""
import aiohttp
import asyncio
import logging
import redis
import os
from typing import Optional, Dict, Any, List
logger = logging.getLogger(__name__)
class EspoCRMError(Exception):
"""Base exception for EspoCRM API errors"""
pass
class EspoCRMAuthError(EspoCRMError):
"""Authentication error"""
pass
class EspoCRMAPI:
"""
EspoCRM API Client for BitByLaw integration.
Supports:
- API Key authentication (X-Api-Key header)
- Standard REST operations (GET, POST, PUT, DELETE)
- Entity management (CBeteiligte, CVmhErstgespraech, etc.)
Environment variables required:
- ESPOCRM_API_BASE_URL (e.g., https://crm.bitbylaw.com/api/v1)
- ESPOCRM_API_KEY (Marvin API key)
- ESPOCRM_API_TIMEOUT_SECONDS (optional, default: 30)
- REDIS_HOST, REDIS_PORT, REDIS_DB_ADVOWARE_CACHE (for caching)
"""
def __init__(self, context=None):
"""
Initialize EspoCRM API client.
Args:
context: Motia FlowContext for logging (optional)
"""
self.context = context
self._log("EspoCRMAPI initializing", level='debug')
# Load configuration from environment
self.api_base_url = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
self.api_key = os.getenv('ESPOCRM_API_KEY', '')
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', '30'))
if not self.api_key:
raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment")
self._log(f"EspoCRM API initialized with base URL: {self.api_base_url}")
# Optional Redis for caching/rate limiting
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
socket_timeout=redis_timeout,
socket_connect_timeout=redis_timeout,
decode_responses=True
)
self.redis_client.ping()
self._log("Connected to Redis for EspoCRM operations")
except Exception as e:
self._log(f"Could not connect to Redis: {e}. Continuing without caching.", level='warning')
self.redis_client = None
def _log(self, message: str, level: str = 'info'):
"""Log message via context.logger if available, otherwise use module logger"""
if self.context and hasattr(self.context, 'logger'):
log_func = getattr(self.context.logger, level, self.context.logger.info)
log_func(f"[EspoCRM] {message}")
else:
log_func = getattr(logger, level, logger.info)
log_func(f"[EspoCRM] {message}")
def _get_headers(self) -> Dict[str, str]:
"""Generate request headers with API key"""
return {
'X-Api-Key': self.api_key,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
async def api_call(
self,
endpoint: str,
method: str = 'GET',
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
timeout_seconds: Optional[int] = None
) -> Any:
"""
Make an API call to EspoCRM.
Args:
endpoint: API endpoint (e.g., '/CBeteiligte/123' or '/CVmhErstgespraech')
method: HTTP method (GET, POST, PUT, DELETE)
params: Query parameters
json_data: JSON body for POST/PUT
timeout_seconds: Request timeout
Returns:
Parsed JSON response or None
Raises:
EspoCRMError: On API errors
"""
# Ensure endpoint starts with /
if not endpoint.startswith('/'):
endpoint = '/' + endpoint
url = self.api_base_url.rstrip('/') + endpoint
headers = self._get_headers()
effective_timeout = aiohttp.ClientTimeout(
total=timeout_seconds or self.api_timeout_seconds
)
self._log(f"API call: {method} {url}", level='debug')
if params:
self._log(f"Params: {params}", level='debug')
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
try:
async with session.request(
method,
url,
headers=headers,
params=params,
json=json_data
) as response:
# Log response status
self._log(f"Response status: {response.status}", level='debug')
# Handle errors
if response.status == 401:
raise EspoCRMAuthError("Authentication failed - check API key")
elif response.status == 403:
raise EspoCRMError("Access forbidden")
elif response.status == 404:
raise EspoCRMError(f"Resource not found: {endpoint}")
elif response.status >= 400:
error_text = await response.text()
raise EspoCRMError(f"API error {response.status}: {error_text}")
# Parse response
if response.content_type == 'application/json':
result = await response.json()
self._log(f"Response received", level='debug')
return result
else:
# For DELETE or other non-JSON responses
return None
except aiohttp.ClientError as e:
self._log(f"API call failed: {e}", level='error')
raise EspoCRMError(f"Request failed: {e}") from e
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
"""
Get a single entity by ID.
Args:
entity_type: Entity type (e.g., 'CBeteiligte', 'CVmhErstgespraech')
entity_id: Entity ID
Returns:
Entity data as dict
"""
self._log(f"Getting {entity_type} with ID: {entity_id}")
return await self.api_call(f"/{entity_type}/{entity_id}", method='GET')
async def list_entities(
self,
entity_type: str,
where: Optional[List[Dict]] = None,
select: Optional[str] = None,
order_by: Optional[str] = None,
offset: int = 0,
max_size: int = 50
) -> Dict[str, Any]:
"""
List entities with filtering and pagination.
Args:
entity_type: Entity type
where: Filter conditions (EspoCRM format)
select: Comma-separated field list
order_by: Sort field
offset: Pagination offset
max_size: Max results per page
Returns:
Dict with 'list' and 'total' keys
"""
params = {
'offset': offset,
'maxSize': max_size
}
if where:
import json
# EspoCRM expects JSON-encoded where clause
params['where'] = where if isinstance(where, str) else json.dumps(where)
if select:
params['select'] = select
if order_by:
params['orderBy'] = order_by
self._log(f"Listing {entity_type} entities")
return await self.api_call(f"/{entity_type}", method='GET', params=params)
async def create_entity(
self,
entity_type: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Create a new entity.
Args:
entity_type: Entity type
data: Entity data
Returns:
Created entity with ID
"""
self._log(f"Creating {entity_type} entity")
return await self.api_call(f"/{entity_type}", method='POST', json_data=data)
async def update_entity(
self,
entity_type: str,
entity_id: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Update an existing entity.
Args:
entity_type: Entity type
entity_id: Entity ID
data: Updated fields
Returns:
Updated entity
"""
self._log(f"Updating {entity_type} with ID: {entity_id}")
return await self.api_call(f"/{entity_type}/{entity_id}", method='PUT', json_data=data)
async def delete_entity(self, entity_type: str, entity_id: str) -> bool:
"""
Delete an entity.
Args:
entity_type: Entity type
entity_id: Entity ID
Returns:
True if successful
"""
self._log(f"Deleting {entity_type} with ID: {entity_id}")
await self.api_call(f"/{entity_type}/{entity_id}", method='DELETE')
return True
async def search_entities(
self,
entity_type: str,
query: str,
fields: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Search entities by text query.
Args:
entity_type: Entity type
query: Search query
fields: Fields to search in
Returns:
List of matching entities
"""
where = [{
'type': 'textFilter',
'value': query
}]
result = await self.list_entities(entity_type, where=where)
return result.get('list', [])

1
steps/vmh/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""VMH Steps"""

View File

@@ -0,0 +1 @@
"""VMH Webhook Steps"""

View File

@@ -0,0 +1,69 @@
"""VMH Webhook - Bankverbindungen Create"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Bankverbindungen",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/create")
],
"enqueues": ["vmh.bankverbindungen.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen creation in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Bankverbindungen Create empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}")
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,69 @@
"""VMH Webhook - Bankverbindungen Delete"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Bankverbindungen",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/delete")
],
"enqueues": ["vmh.bankverbindungen.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen deletion in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Bankverbindungen Delete empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Verarbeiten des VMH Delete Webhooks: {e}")
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,69 @@
"""VMH Webhook - Bankverbindungen Update"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Bankverbindungen Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Bankverbindungen",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/bankverbindungen/update")
],
"enqueues": ["vmh.bankverbindungen.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Bankverbindungen updates in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Bankverbindungen Update empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
# Emit events
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.bankverbindungen.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}")
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,75 @@
"""VMH Webhook - Beteiligte Create"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Beteiligte",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/create")
],
"enqueues": ["vmh.beteiligte.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte creation in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to Advoware.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Beteiligte Create empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Create-Sync gefunden")
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'details': str(e)
}
)

View File

@@ -0,0 +1,69 @@
"""VMH Webhook - Beteiligte Delete"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Beteiligte",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/delete")
],
"enqueues": ["vmh.beteiligte.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte deletion in EspoCRM.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Beteiligte Delete empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Delete-Sync gefunden")
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Delete-Webhook: {e}")
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'details': str(e)}
)

View File

@@ -0,0 +1,75 @@
"""VMH Webhook - Beteiligte Update"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Beteiligte Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Beteiligte",
"flows": ["vmh"],
"triggers": [
http("POST", "/vmh/webhook/beteiligte/update")
],
"enqueues": ["vmh.beteiligte.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Beteiligte updates in EspoCRM.
Note: Loop-Prevention ist auf EspoCRM-Seite implementiert.
rowId-Updates triggern keine Webhooks mehr, daher keine Filterung nötig.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Beteiligte Update empfangen")
ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
ctx.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.beteiligte.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'details': str(e)
}
)