diff --git a/src/steps/crm/akte/akte_sync_cron_step.py b/src/steps/crm/akte/akte_sync_cron_step.py index 61970f3..4b5e6d8 100644 --- a/src/steps/crm/akte/akte_sync_cron_step.py +++ b/src/steps/crm/akte/akte_sync_cron_step.py @@ -1,16 +1,17 @@ """ Akte Sync - Cron Poller -Polls two Redis Sorted Sets every 10 seconds (10 s debounce each): +Polls the Advoware Watcher Redis Sorted Set every 10 seconds (10 s debounce): advoware:pending_aktennummern – written by Windows Advoware Watcher { aktennummer → timestamp } - akte:pending_entity_ids – written by EspoCRM webhook - { akte_id → timestamp } Eligibility (either flag triggers sync): syncSchalter AND aktivierungsstatus in valid list → Advoware sync aiAktivierungsstatus in valid list → xAI sync + +EspoCRM webhooks emit akte.sync directly (no queue needed). +Failed akte.sync events are retried by Motia automatically. """ from motia import FlowContext, cron @@ -28,12 +29,8 @@ config = { PENDING_ADVO_KEY = "advoware:pending_aktennummern" PROCESSING_ADVO_KEY = "advoware:processing_aktennummern" -# Queue 2: retry queue for failed akte.sync events (EspoCRM webhooks now emit directly) -PENDING_ID_KEY = "akte:pending_entity_ids" -PROCESSING_ID_KEY = "akte:processing_entity_ids" - DEBOUNCE_SECS = 10 -BATCH_SIZE = 5 # max items to process per queue per cron tick +BATCH_SIZE = 5 # max items to process per cron tick VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'}) VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'}) @@ -57,13 +54,11 @@ async def handler(input_data: None, ctx: FlowContext) -> None: cutoff = time.time() - DEBOUNCE_SECS advo_pending = redis_client.zcard(PENDING_ADVO_KEY) - id_pending = redis_client.zcard(PENDING_ID_KEY) ctx.logger.info(f" Pending (aktennr) : {advo_pending}") - ctx.logger.info(f" Pending (akte_id) : {id_pending}") processed_count = 0 - # ── Queue 1: Advoware Watcher (by Aktennummer) ───────────────────── + # ── Queue: Advoware Watcher (by Aktennummer) ─────────────────────── advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE) for raw in advo_entries: aktennr = raw.decode() if isinstance(raw, bytes) else raw @@ -90,33 +85,11 @@ async def handler(input_data: None, ctx: FlowContext) -> None: finally: redis_client.srem(PROCESSING_ADVO_KEY, aktennr) - # ── Queue 2: Retry queue for failed syncs ────────────────────────── - id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE) - for raw in id_entries: - akte_id = raw.decode() if isinstance(raw, bytes) else raw - score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0 - age = time.time() - score - redis_client.zrem(PENDING_ID_KEY, akte_id) - redis_client.sadd(PROCESSING_ID_KEY, akte_id) - processed_count += 1 - ctx.logger.info(f"📋 Entity ID (retry): {akte_id} (age={age:.1f}s)") - try: - akte = await espocrm.get_entity('CAkten', akte_id) - if not akte: - ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} – removing") - else: - await _emit_if_eligible(akte, None, ctx) - except Exception as e: - ctx.logger.error(f"❌ Error (retry queue) {akte_id}: {e}") - redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()}) - finally: - redis_client.srem(PROCESSING_ID_KEY, akte_id) - if not processed_count: - if advo_pending > 0 or id_pending > 0: + if advo_pending > 0: ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)") else: - ctx.logger.info("✓ Both queues empty") + ctx.logger.info("✓ Queue empty") else: ctx.logger.info(f"✓ Processed {processed_count} item(s)") diff --git a/src/steps/crm/akte/akte_sync_event_step.py b/src/steps/crm/akte/akte_sync_event_step.py index ac1b81a..d323b81 100644 --- a/src/steps/crm/akte/akte_sync_event_step.py +++ b/src/steps/crm/akte/akte_sync_event_step.py @@ -127,10 +127,9 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv") await espocrm.update_entity('CAkten', akte_id, final_update) - # Clean up processing sets (both queues may have triggered this sync) + # Clean up processing set (Advoware Watcher queue) if aktennummer: redis_client.srem("advoware:processing_aktennummern", aktennummer) - redis_client.srem("akte:processing_entity_ids", akte_id) ctx.logger.info("=" * 80) ctx.logger.info("✅ AKTE SYNC COMPLETE") @@ -143,12 +142,10 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: import traceback ctx.logger.error(traceback.format_exc()) - # Requeue for retry (into the appropriate queue(s)) + # Requeue Advoware aktennummer for retry (Motia retries the akte.sync event itself) import time - now_ts = time.time() if aktennummer: - redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts}) - redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts}) + redis_client.zadd("advoware:pending_aktennummern", {aktennummer: time.time()}) try: await espocrm.update_entity('CAkten', akte_id, {