feat(sync): Update Akte sync process to remove unused entity ID queue and streamline processing
This commit is contained in:
@@ -1,16 +1,17 @@
|
||||
"""
|
||||
Akte Sync - Cron Poller
|
||||
|
||||
Polls two Redis Sorted Sets every 10 seconds (10 s debounce each):
|
||||
Polls the Advoware Watcher Redis Sorted Set every 10 seconds (10 s debounce):
|
||||
|
||||
advoware:pending_aktennummern – written by Windows Advoware Watcher
|
||||
{ aktennummer → timestamp }
|
||||
akte:pending_entity_ids – written by EspoCRM webhook
|
||||
{ akte_id → timestamp }
|
||||
|
||||
Eligibility (either flag triggers sync):
|
||||
syncSchalter AND aktivierungsstatus in valid list → Advoware sync
|
||||
aiAktivierungsstatus in valid list → xAI sync
|
||||
|
||||
EspoCRM webhooks emit akte.sync directly (no queue needed).
|
||||
Failed akte.sync events are retried by Motia automatically.
|
||||
"""
|
||||
|
||||
from motia import FlowContext, cron
|
||||
@@ -28,12 +29,8 @@ config = {
|
||||
PENDING_ADVO_KEY = "advoware:pending_aktennummern"
|
||||
PROCESSING_ADVO_KEY = "advoware:processing_aktennummern"
|
||||
|
||||
# Queue 2: retry queue for failed akte.sync events (EspoCRM webhooks now emit directly)
|
||||
PENDING_ID_KEY = "akte:pending_entity_ids"
|
||||
PROCESSING_ID_KEY = "akte:processing_entity_ids"
|
||||
|
||||
DEBOUNCE_SECS = 10
|
||||
BATCH_SIZE = 5 # max items to process per queue per cron tick
|
||||
BATCH_SIZE = 5 # max items to process per cron tick
|
||||
|
||||
VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'})
|
||||
VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'})
|
||||
@@ -57,13 +54,11 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
|
||||
cutoff = time.time() - DEBOUNCE_SECS
|
||||
|
||||
advo_pending = redis_client.zcard(PENDING_ADVO_KEY)
|
||||
id_pending = redis_client.zcard(PENDING_ID_KEY)
|
||||
ctx.logger.info(f" Pending (aktennr) : {advo_pending}")
|
||||
ctx.logger.info(f" Pending (akte_id) : {id_pending}")
|
||||
|
||||
processed_count = 0
|
||||
|
||||
# ── Queue 1: Advoware Watcher (by Aktennummer) ─────────────────────
|
||||
# ── Queue: Advoware Watcher (by Aktennummer) ───────────────────────
|
||||
advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE)
|
||||
for raw in advo_entries:
|
||||
aktennr = raw.decode() if isinstance(raw, bytes) else raw
|
||||
@@ -90,33 +85,11 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
|
||||
finally:
|
||||
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
|
||||
|
||||
# ── Queue 2: Retry queue for failed syncs ──────────────────────────
|
||||
id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE)
|
||||
for raw in id_entries:
|
||||
akte_id = raw.decode() if isinstance(raw, bytes) else raw
|
||||
score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0
|
||||
age = time.time() - score
|
||||
redis_client.zrem(PENDING_ID_KEY, akte_id)
|
||||
redis_client.sadd(PROCESSING_ID_KEY, akte_id)
|
||||
processed_count += 1
|
||||
ctx.logger.info(f"📋 Entity ID (retry): {akte_id} (age={age:.1f}s)")
|
||||
try:
|
||||
akte = await espocrm.get_entity('CAkten', akte_id)
|
||||
if not akte:
|
||||
ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} – removing")
|
||||
else:
|
||||
await _emit_if_eligible(akte, None, ctx)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Error (retry queue) {akte_id}: {e}")
|
||||
redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()})
|
||||
finally:
|
||||
redis_client.srem(PROCESSING_ID_KEY, akte_id)
|
||||
|
||||
if not processed_count:
|
||||
if advo_pending > 0 or id_pending > 0:
|
||||
if advo_pending > 0:
|
||||
ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)")
|
||||
else:
|
||||
ctx.logger.info("✓ Both queues empty")
|
||||
ctx.logger.info("✓ Queue empty")
|
||||
else:
|
||||
ctx.logger.info(f"✓ Processed {processed_count} item(s)")
|
||||
|
||||
|
||||
@@ -127,10 +127,9 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv")
|
||||
|
||||
await espocrm.update_entity('CAkten', akte_id, final_update)
|
||||
# Clean up processing sets (both queues may have triggered this sync)
|
||||
# Clean up processing set (Advoware Watcher queue)
|
||||
if aktennummer:
|
||||
redis_client.srem("advoware:processing_aktennummern", aktennummer)
|
||||
redis_client.srem("akte:processing_entity_ids", akte_id)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ AKTE SYNC COMPLETE")
|
||||
@@ -143,12 +142,10 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
# Requeue for retry (into the appropriate queue(s))
|
||||
# Requeue Advoware aktennummer for retry (Motia retries the akte.sync event itself)
|
||||
import time
|
||||
now_ts = time.time()
|
||||
if aktennummer:
|
||||
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts})
|
||||
redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts})
|
||||
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: time.time()})
|
||||
|
||||
try:
|
||||
await espocrm.update_entity('CAkten', akte_id, {
|
||||
|
||||
Reference in New Issue
Block a user