Files
motia-iii/src/steps/crm/akte/akte_sync_cron_step.py
bsiggel 0c97d97726 feat(webhooks): Add webhook handlers for Beteiligte and Document entities
- Implemented create, update, and delete webhook handlers for Beteiligte.
- Implemented create, update, and delete webhook handlers for Document entities.
- Added logging and error handling for each webhook handler.
- Created a universal step for generating document previews.
- Ensured payload validation and entity ID extraction for batch processing.
2026-03-26 10:07:42 +00:00

166 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Akte Sync - Cron Poller
Polls two Redis Sorted Sets every 10 seconds (10 s debounce each):
advoware:pending_aktennummern written by Windows Advoware Watcher
{ aktennummer → timestamp }
akte:pending_entity_ids written by EspoCRM webhook
{ akte_id → timestamp }
Eligibility (either flag triggers sync):
syncSchalter AND aktivierungsstatus in valid list → Advoware sync
aiAktivierungsstatus in valid list → xAI sync
"""
from motia import FlowContext, cron
config = {
"name": "Akte Sync - Cron Poller",
"description": "Poll Redis for pending Aktennummern and emit akte.sync events (10 s debounce)",
"flows": ["akte-sync"],
"triggers": [cron("*/10 * * * * *")],
"enqueues": ["akte.sync"],
}
# Queue 1: written by Windows Advoware Watcher (keyed by Aktennummer)
PENDING_ADVO_KEY = "advoware:pending_aktennummern"
PROCESSING_ADVO_KEY = "advoware:processing_aktennummern"
# Queue 2: written by EspoCRM webhook (keyed by entity ID)
PENDING_ID_KEY = "akte:pending_entity_ids"
PROCESSING_ID_KEY = "akte:processing_entity_ids"
DEBOUNCE_SECS = 10
VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'}
VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'}
async def handler(input_data: None, ctx: FlowContext) -> None:
import time
from services.redis_client import get_redis_client
from services.espocrm import EspoCRMAPI
ctx.logger.info("=" * 60)
ctx.logger.info("⏰ AKTE CRON POLLER")
redis_client = get_redis_client(strict=False)
if not redis_client:
ctx.logger.error("❌ Redis unavailable")
ctx.logger.info("=" * 60)
return
espocrm = EspoCRMAPI(ctx)
cutoff = time.time() - DEBOUNCE_SECS
advo_pending = redis_client.zcard(PENDING_ADVO_KEY)
id_pending = redis_client.zcard(PENDING_ID_KEY)
ctx.logger.info(f" Pending (aktennr) : {advo_pending}")
ctx.logger.info(f" Pending (akte_id) : {id_pending}")
processed = False
# ── Queue 1: Advoware Watcher (by Aktennummer) ─────────────────────
advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=1)
if advo_entries:
aktennr = advo_entries[0]
if isinstance(aktennr, bytes):
aktennr = aktennr.decode()
score = redis_client.zscore(PENDING_ADVO_KEY, aktennr) or 0
age = time.time() - score
redis_client.zrem(PENDING_ADVO_KEY, aktennr)
redis_client.sadd(PROCESSING_ADVO_KEY, aktennr)
ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)")
processed = True
try:
result = await espocrm.list_entities(
'CAkten',
where=[{'type': 'equals', 'attribute': 'aktennummer', 'value': int(aktennr)}],
max_size=1,
)
if not result or not result.get('list'):
ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} removing")
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
else:
akte = result['list'][0]
await _emit_if_eligible(akte, aktennr, ctx)
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
except Exception as e:
ctx.logger.error(f"❌ Error (aktennr queue) {aktennr}: {e}")
redis_client.zadd(PENDING_ADVO_KEY, {aktennr: time.time()})
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
raise
# ── Queue 2: EspoCRM Webhook (by Entity ID) ────────────────────────
id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=1)
if id_entries:
akte_id = id_entries[0]
if isinstance(akte_id, bytes):
akte_id = akte_id.decode()
score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0
age = time.time() - score
redis_client.zrem(PENDING_ID_KEY, akte_id)
redis_client.sadd(PROCESSING_ID_KEY, akte_id)
ctx.logger.info(f"📋 Entity ID: {akte_id} (age={age:.1f}s)")
processed = True
try:
akte = await espocrm.get_entity('CAkten', akte_id)
if not akte:
ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} removing")
redis_client.srem(PROCESSING_ID_KEY, akte_id)
else:
await _emit_if_eligible(akte, None, ctx)
redis_client.srem(PROCESSING_ID_KEY, akte_id)
except Exception as e:
ctx.logger.error(f"❌ Error (entity-id queue) {akte_id}: {e}")
redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()})
redis_client.srem(PROCESSING_ID_KEY, akte_id)
raise
if not processed:
if advo_pending > 0 or id_pending > 0:
ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)")
else:
ctx.logger.info("✓ Both queues empty")
ctx.logger.info("=" * 60)
async def _emit_if_eligible(akte: dict, aktennr, ctx: FlowContext) -> None:
"""Check eligibility and emit akte.sync if applicable."""
akte_id = akte['id']
# Prefer aktennr from argument; fall back to entity field
aktennummer = aktennr or akte.get('aktennummer')
sync_schalter = akte.get('syncSchalter', False)
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
ai_status = str(akte.get('aiAktivierungsstatus') or '').lower()
advoware_eligible = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
xai_eligible = ai_status in VALID_AI_STATUSES
ctx.logger.info(f" akte_id : {akte_id}")
ctx.logger.info(f" aktennummer : {aktennummer or ''}")
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'' if advoware_eligible else '⏭️'})")
ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'' if xai_eligible else '⏭️'})")
if not advoware_eligible and not xai_eligible:
ctx.logger.warn(f"⚠️ Akte {akte_id} not eligible for any sync")
return
await ctx.enqueue({
'topic': 'akte.sync',
'data': {
'akte_id': akte_id,
'aktennummer': aktennummer, # may be None for xAI-only Akten
},
})
ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id}, aktennummer={aktennummer or ''})")