feat: Implement Akte webhook for EspoCRM to queue entity IDs for synchronization

fix: Refactor Akte sync logic to handle multiple Redis queues and improve logging
refactor: Enhance parameter flattening for EspoCRM API calls
This commit is contained in:
bsiggel
2026-03-26 09:48:46 +00:00
parent b4d35b1790
commit 3459b9342f
6 changed files with 278 additions and 127 deletions

View File

@@ -150,12 +150,16 @@ class AdvowareService:
try:
endpoint = f"api/v1/advonet/Akten/{akte_id}"
result = await self.api.api_call(endpoint, method='GET')
# API may return a list (batch response) or a single dict
if isinstance(result, list):
result = result[0] if result else None
if result:
self._log(f"[ADVO] ✅ Fetched Akte {akte_id}: {result.get('az', 'N/A')}")
return result
except Exception as e:
self._log(f"[ADVO] Error loading Akte {akte_id}: {e}", level='error')
return None

View File

@@ -162,11 +162,33 @@ class EspoCRMAPI:
self._log(f"⚠️ Could not load entity def for {entity_type}: {e}", level='warn')
return {}
@staticmethod
def _flatten_params(data, prefix: str = '') -> list:
"""
Flatten nested dict/list into PHP-style repeated query params.
EspoCRM expects where[0][type]=equals&where[0][attribute]=x format.
"""
result = []
if isinstance(data, dict):
for k, v in data.items():
new_key = f"{prefix}[{k}]" if prefix else str(k)
result.extend(EspoCRMAPI._flatten_params(v, new_key))
elif isinstance(data, (list, tuple)):
for i, v in enumerate(data):
result.extend(EspoCRMAPI._flatten_params(v, f"{prefix}[{i}]"))
elif isinstance(data, bool):
result.append((prefix, 'true' if data else 'false'))
elif data is None:
result.append((prefix, ''))
else:
result.append((prefix, str(data)))
return result
async def api_call(
self,
endpoint: str,
method: str = 'GET',
params: Optional[Dict] = None,
params=None,
json_data: Optional[Dict] = None,
timeout_seconds: Optional[int] = None
) -> Any:
@@ -292,22 +314,22 @@ class EspoCRMAPI:
Returns:
Dict with 'list' and 'total' keys
"""
params = {
search_params: Dict[str, Any] = {
'offset': offset,
'maxSize': max_size
'maxSize': max_size,
}
if where:
import json
# EspoCRM expects JSON-encoded where clause
params['where'] = where if isinstance(where, str) else json.dumps(where)
search_params['where'] = where
if select:
params['select'] = select
search_params['select'] = select
if order_by:
params['orderBy'] = order_by
search_params['orderBy'] = order_by
self._log(f"Listing {entity_type} entities")
return await self.api_call(f"/{entity_type}", method='GET', params=params)
return await self.api_call(
f"/{entity_type}", method='GET',
params=self._flatten_params(search_params)
)
async def list_related(
self,
@@ -321,23 +343,24 @@ class EspoCRMAPI:
offset: int = 0,
max_size: int = 50
) -> Dict[str, Any]:
params = {
search_params: Dict[str, Any] = {
'offset': offset,
'maxSize': max_size
'maxSize': max_size,
}
if where:
import json
params['where'] = where if isinstance(where, str) else json.dumps(where)
search_params['where'] = where
if select:
params['select'] = select
search_params['select'] = select
if order_by:
params['orderBy'] = order_by
search_params['orderBy'] = order_by
if order:
params['order'] = order
search_params['order'] = order
self._log(f"Listing related {entity_type}/{entity_id}/{link}")
return await self.api_call(f"/{entity_type}/{entity_id}/{link}", method='GET', params=params)
return await self.api_call(
f"/{entity_type}/{entity_id}/{link}", method='GET',
params=self._flatten_params(search_params)
)
async def create_entity(
self,

View File

@@ -1,17 +1,16 @@
"""
Akte Sync - Cron Poller
Polls Redis Sorted Set for pending Aktennummern every 10 seconds.
Respects a 10-second debounce window so that rapid filesystem events
(e.g. many files being updated at once) are batched into a single sync.
Polls two Redis Sorted Sets every 10 seconds (10 s debounce each):
Redis keys (same as advoware-watcher writes to):
advoware:pending_aktennummern Sorted Set { aktennummer → timestamp }
advoware:processing_aktennummern Set (tracks active syncs)
advoware:pending_aktennummern written by Windows Advoware Watcher
{ aktennummer → timestamp }
akte:pending_entity_ids written by EspoCRM webhook
{ akte_id → timestamp }
Eligibility check (either flag triggers a sync):
syncSchalter == True AND aktivierungsstatus in valid list → Advoware sync
aiAktivierungsstatus in valid list → xAI sync
Eligibility (either flag triggers sync):
syncSchalter AND aktivierungsstatus in valid list → Advoware sync
aiAktivierungsstatus in valid list → xAI sync
"""
from motia import FlowContext, cron
@@ -25,9 +24,15 @@ config = {
"enqueues": ["akte.sync"],
}
PENDING_KEY = "advoware:pending_aktennummern"
PROCESSING_KEY = "advoware:processing_aktennummern"
DEBOUNCE_SECS = 10
# Queue 1: written by Windows Advoware Watcher (keyed by Aktennummer)
PENDING_ADVO_KEY = "advoware:pending_aktennummern"
PROCESSING_ADVO_KEY = "advoware:processing_aktennummern"
# Queue 2: written by EspoCRM webhook (keyed by entity ID)
PENDING_ID_KEY = "akte:pending_entity_ids"
PROCESSING_ID_KEY = "akte:processing_entity_ids"
DEBOUNCE_SECS = 10
VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'}
VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'}
@@ -50,86 +55,111 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
espocrm = EspoCRMAPI(ctx)
cutoff = time.time() - DEBOUNCE_SECS
pending_count = redis_client.zcard(PENDING_KEY)
processing_count = redis_client.scard(PROCESSING_KEY)
ctx.logger.info(f" Pending : {pending_count}")
ctx.logger.info(f" Processing : {processing_count}")
advo_pending = redis_client.zcard(PENDING_ADVO_KEY)
id_pending = redis_client.zcard(PENDING_ID_KEY)
ctx.logger.info(f" Pending (aktennr) : {advo_pending}")
ctx.logger.info(f" Pending (akte_id) : {id_pending}")
# Pull oldest entry that has passed the debounce window
old_entries = redis_client.zrangebyscore(PENDING_KEY, min=0, max=cutoff, start=0, num=1)
processed = False
if not old_entries:
if pending_count > 0:
ctx.logger.info(f"⏸️ {pending_count} pending all too recent (< {DEBOUNCE_SECS}s)")
# ── Queue 1: Advoware Watcher (by Aktennummer) ─────────────────────
advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=1)
if advo_entries:
aktennr = advo_entries[0]
if isinstance(aktennr, bytes):
aktennr = aktennr.decode()
score = redis_client.zscore(PENDING_ADVO_KEY, aktennr) or 0
age = time.time() - score
redis_client.zrem(PENDING_ADVO_KEY, aktennr)
redis_client.sadd(PROCESSING_ADVO_KEY, aktennr)
ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)")
processed = True
try:
result = await espocrm.list_entities(
'CAkten',
where=[{'type': 'equals', 'attribute': 'aktennummer', 'value': int(aktennr)}],
max_size=1,
)
if not result or not result.get('list'):
ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} removing")
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
else:
akte = result['list'][0]
await _emit_if_eligible(akte, aktennr, ctx)
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
except Exception as e:
ctx.logger.error(f"❌ Error (aktennr queue) {aktennr}: {e}")
redis_client.zadd(PENDING_ADVO_KEY, {aktennr: time.time()})
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
raise
# ── Queue 2: EspoCRM Webhook (by Entity ID) ────────────────────────
id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=1)
if id_entries:
akte_id = id_entries[0]
if isinstance(akte_id, bytes):
akte_id = akte_id.decode()
score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0
age = time.time() - score
redis_client.zrem(PENDING_ID_KEY, akte_id)
redis_client.sadd(PROCESSING_ID_KEY, akte_id)
ctx.logger.info(f"📋 Entity ID: {akte_id} (age={age:.1f}s)")
processed = True
try:
akte = await espocrm.get_entity('CAkten', akte_id)
if not akte:
ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} removing")
redis_client.srem(PROCESSING_ID_KEY, akte_id)
else:
await _emit_if_eligible(akte, None, ctx)
redis_client.srem(PROCESSING_ID_KEY, akte_id)
except Exception as e:
ctx.logger.error(f"❌ Error (entity-id queue) {akte_id}: {e}")
redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()})
redis_client.srem(PROCESSING_ID_KEY, akte_id)
raise
if not processed:
if advo_pending > 0 or id_pending > 0:
ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)")
else:
ctx.logger.info("Queue empty")
ctx.logger.info("=" * 60)
ctx.logger.info("Both queues empty")
ctx.logger.info("=" * 60)
async def _emit_if_eligible(akte: dict, aktennr, ctx: FlowContext) -> None:
"""Check eligibility and emit akte.sync if applicable."""
akte_id = akte['id']
# Prefer aktennr from argument; fall back to entity field
aktennummer = aktennr or akte.get('aktennummer')
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_status = str(akte.get('aiAktivierungsstatus') or '').lower()
advoware_eligible = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
xai_eligible = ai_status in VALID_AI_STATUSES
ctx.logger.info(f" akte_id : {akte_id}")
ctx.logger.info(f" aktennummer : {aktennummer or ''}")
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'' if advoware_eligible else '⏭️'})")
ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'' if xai_eligible else '⏭️'})")
if not advoware_eligible and not xai_eligible:
ctx.logger.warn(f"⚠️ Akte {akte_id} not eligible for any sync")
return
aktennr = old_entries[0]
if isinstance(aktennr, bytes):
aktennr = aktennr.decode()
score = redis_client.zscore(PENDING_KEY, aktennr) or 0
age = time.time() - score
redis_client.zrem(PENDING_KEY, aktennr)
redis_client.sadd(PROCESSING_KEY, aktennr)
ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)")
try:
# ── Lookup in EspoCRM ──────────────────────────────────────
result = await espocrm.list_entities(
'CAkten',
where=[{
'type': 'equals',
'attribute': 'aktennummer',
'value': aktennr,
}],
max_size=1,
)
if not result or not result.get('list'):
ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} removing")
redis_client.srem(PROCESSING_KEY, aktennr)
ctx.logger.info("=" * 60)
return
akte = result['list'][0]
akte_id = akte['id']
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_status = str(akte.get('aiAktivierungsstatus') or '').lower()
advoware_eligible = sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
xai_eligible = ai_status in VALID_AI_STATUSES
ctx.logger.info(f" Akte ID : {akte_id}")
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'' if advoware_eligible else '⏭️'})")
ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'' if xai_eligible else '⏭️'})")
if not advoware_eligible and not xai_eligible:
ctx.logger.warn(f"⚠️ Akte {aktennr} not eligible for any sync removing")
redis_client.srem(PROCESSING_KEY, aktennr)
ctx.logger.info("=" * 60)
return
# ── Emit sync event ────────────────────────────────────────
await ctx.enqueue({
'topic': 'akte.sync',
'data': {
'aktennummer': aktennr,
'akte_id': akte_id,
},
})
ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id})")
except Exception as e:
ctx.logger.error(f"❌ Error processing {aktennr}: {e}")
# Requeue for retry
redis_client.zadd(PENDING_KEY, {aktennr: time.time()})
redis_client.srem(PROCESSING_KEY, aktennr)
raise
finally:
ctx.logger.info("=" * 60)
await ctx.enqueue({
'topic': 'akte.sync',
'data': {
'akte_id': akte_id,
'aktennummer': aktennummer, # may be None for xAI-only Akten
},
})
ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id}, aktennummer={aktennummer or ''})")

View File

@@ -54,8 +54,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
lock_key = f"akte_sync:{akte_id}"
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800)
if not lock_acquired:
ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer} requeueing")
raise RuntimeError(f"Lock busy for {aktennummer}")
ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} requeueing")
raise RuntimeError(f"Lock busy for akte_id={akte_id}")
espocrm = EspoCRMAPI(ctx)
@@ -64,9 +64,13 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
akte = await espocrm.get_entity('CAkten', akte_id)
if not akte:
ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM")
redis_client.srem("akte:processing", aktennummer)
return
# aktennummer can come from the event payload OR from the entity
# (Akten without Advoware have no aktennummer)
if not aktennummer:
aktennummer = akte.get('aktennummer')
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
@@ -76,7 +80,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}")
ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}")
advoware_enabled = sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active')
# Advoware sync requires an aktennummer (Akten without Advoware won't have one)
advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active')
xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active')
ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}")
@@ -84,7 +89,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
if not advoware_enabled and not xai_enabled:
ctx.logger.info("⏭️ Both syncs disabled nothing to do")
redis_client.srem("akte:processing", aktennummer)
return
# ── ADVOWARE SYNC ──────────────────────────────────────────────────
@@ -102,12 +106,23 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
if advoware_enabled:
final_update['syncStatus'] = 'synced'
final_update['lastSync'] = now
# 'import' = erster Sync → danach auf 'aktiv' setzen
if aktivierungsstatus == 'import':
final_update['aktivierungsstatus'] = 'aktiv'
ctx.logger.info("🔄 aktivierungsstatus: import → aktiv")
if xai_enabled:
final_update['aiSyncStatus'] = 'synced'
final_update['aiLastSync'] = now
# 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen
if ai_aktivierungsstatus == 'new':
final_update['aiAktivierungsstatus'] = 'aktiv'
ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv")
await espocrm.update_entity('CAkten', akte_id, final_update)
redis_client.srem("akte:processing", aktennummer)
# Clean up processing sets (both queues may have triggered this sync)
if aktennummer:
redis_client.srem("advoware:processing_aktennummern", aktennummer)
redis_client.srem("akte:processing_entity_ids", akte_id)
ctx.logger.info("=" * 80)
ctx.logger.info("✅ AKTE SYNC COMPLETE")
@@ -120,9 +135,12 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
import traceback
ctx.logger.error(traceback.format_exc())
# Requeue for retry
# Requeue for retry (into the appropriate queue(s))
import time
redis_client.zadd("akte:pending", {aktennummer: time.time()})
now_ts = time.time()
if aktennummer:
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts})
redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts})
try:
await espocrm.update_entity('CAkten', akte_id, {
@@ -254,8 +272,8 @@ async def _run_advoware_sync(
'name': filename,
'dokumentId': attachment.get('id'),
'hnr': history_entry.get('hNr') if history_entry else None,
'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben',
'advowareBemerkung': history_entry.get('text', '') if history_entry else '',
'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben',
'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '',
'dateipfad': windows_file.get('path', ''),
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
@@ -302,8 +320,8 @@ async def _run_advoware_sync(
}
if history_entry:
update_data['hnr'] = history_entry.get('hNr')
update_data['advowareArt'] = history_entry.get('art', 'Schreiben')
update_data['advowareBemerkung'] = history_entry.get('text', '')
update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100]
update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255]
await espocrm.update_entity('CDokumente', espo_doc['id'], update_data)
results['updated'] += 1
@@ -324,8 +342,15 @@ async def _run_advoware_sync(
elif action.action == 'DELETE':
if espo_doc:
await espocrm.delete_entity('CDokumente', espo_doc['id'])
results['deleted'] += 1
# Only delete if the HNR is genuinely absent from Advoware History
# (not just absent from Windows avoids deleting docs whose file
# is temporarily unavailable on the Windows share)
if hnr in history_by_hnr:
ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows")
results['skipped'] += 1
else:
await espocrm.delete_entity('CDokumente', espo_doc['id'])
results['deleted'] += 1
except Exception as e:
ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}")

View File

@@ -0,0 +1 @@
# Akte webhook steps

View File

@@ -0,0 +1,68 @@
"""
Akte Sync - EspoCRM Webhook
Empfängt EspoCRM-Webhooks für CAkten (create / update / delete).
Schreibt die Entity-ID in die Redis-Queue `akte:pending_entity_ids`
mit 10-Sekunden-Debounce — der Cron-Poller übernimmt den Rest.
Route: POST /akte/webhook/update
Payload: { "id": "..." } oder [{ "id": "..." }, ...]
"""
import json
import time
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "Akte Webhook - EspoCRM",
"description": "Empfängt EspoCRM-Webhooks für CAkten und queued Entity-IDs für den Sync",
"flows": ["akte-sync"],
"triggers": [http("POST", "/akte/webhook/update")],
"enqueues": [],
}
PENDING_KEY = "akte:pending_entity_ids"
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
try:
payload = request.body or {}
ctx.logger.info("=" * 60)
ctx.logger.info("📥 AKTE WEBHOOK")
ctx.logger.info(f" Payload: {json.dumps(payload, ensure_ascii=False)[:200]}")
# ── Collect entity IDs ─────────────────────────────────────
entity_ids: set[str] = set()
if isinstance(payload, list):
for item in payload:
if isinstance(item, dict) and 'id' in item:
entity_ids.add(item['id'])
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
if not entity_ids:
ctx.logger.warn("⚠️ No entity IDs in payload")
return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"})
# ── Push to Redis with current timestamp (debounce in cron) ─
from services.redis_client import get_redis_client
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable")
return ApiResponse(status_code=503, body={"error": "Service unavailable"})
ts = time.time()
mapping = {eid: ts for eid in entity_ids}
redis_client.zadd(PENDING_KEY, mapping)
ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}")
ctx.logger.info("=" * 60)
return ApiResponse(status_code=200, body={"queued": len(entity_ids)})
except Exception as e:
ctx.logger.error(f"❌ Webhook error: {e}")
return ApiResponse(status_code=500, body={"error": str(e)})