diff --git a/src/steps/crm/akte/akte_sync_cron_step.py b/src/steps/crm/akte/akte_sync_cron_step.py index e68742c..61970f3 100644 --- a/src/steps/crm/akte/akte_sync_cron_step.py +++ b/src/steps/crm/akte/akte_sync_cron_step.py @@ -28,14 +28,15 @@ config = { PENDING_ADVO_KEY = "advoware:pending_aktennummern" PROCESSING_ADVO_KEY = "advoware:processing_aktennummern" -# Queue 2: written by EspoCRM webhook (keyed by entity ID) +# Queue 2: retry queue for failed akte.sync events (EspoCRM webhooks now emit directly) PENDING_ID_KEY = "akte:pending_entity_ids" PROCESSING_ID_KEY = "akte:processing_entity_ids" DEBOUNCE_SECS = 10 +BATCH_SIZE = 5 # max items to process per queue per cron tick -VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'} -VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'} +VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'}) +VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'}) async def handler(input_data: None, ctx: FlowContext) -> None: @@ -60,23 +61,18 @@ async def handler(input_data: None, ctx: FlowContext) -> None: ctx.logger.info(f" Pending (aktennr) : {advo_pending}") ctx.logger.info(f" Pending (akte_id) : {id_pending}") - processed = False + processed_count = 0 # ── Queue 1: Advoware Watcher (by Aktennummer) ───────────────────── - advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=1) - if advo_entries: - aktennr = advo_entries[0] - if isinstance(aktennr, bytes): - aktennr = aktennr.decode() - + advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE) + for raw in advo_entries: + aktennr = raw.decode() if isinstance(raw, bytes) else raw score = redis_client.zscore(PENDING_ADVO_KEY, aktennr) or 0 age = time.time() - score redis_client.zrem(PENDING_ADVO_KEY, aktennr) redis_client.sadd(PROCESSING_ADVO_KEY, aktennr) - + processed_count += 1 ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)") - processed = True - try: result = await espocrm.list_entities( 'CAkten', @@ -85,51 +81,44 @@ async def handler(input_data: None, ctx: FlowContext) -> None: ) if not result or not result.get('list'): ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} – removing") - redis_client.srem(PROCESSING_ADVO_KEY, aktennr) else: akte = result['list'][0] await _emit_if_eligible(akte, aktennr, ctx) - redis_client.srem(PROCESSING_ADVO_KEY, aktennr) except Exception as e: ctx.logger.error(f"❌ Error (aktennr queue) {aktennr}: {e}") redis_client.zadd(PENDING_ADVO_KEY, {aktennr: time.time()}) + finally: redis_client.srem(PROCESSING_ADVO_KEY, aktennr) - raise - - # ── Queue 2: EspoCRM Webhook (by Entity ID) ──────────────────────── - id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=1) - if id_entries: - akte_id = id_entries[0] - if isinstance(akte_id, bytes): - akte_id = akte_id.decode() + # ── Queue 2: Retry queue for failed syncs ────────────────────────── + id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE) + for raw in id_entries: + akte_id = raw.decode() if isinstance(raw, bytes) else raw score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0 age = time.time() - score redis_client.zrem(PENDING_ID_KEY, akte_id) redis_client.sadd(PROCESSING_ID_KEY, akte_id) - - ctx.logger.info(f"📋 Entity ID: {akte_id} (age={age:.1f}s)") - processed = True - + processed_count += 1 + ctx.logger.info(f"📋 Entity ID (retry): {akte_id} (age={age:.1f}s)") try: akte = await espocrm.get_entity('CAkten', akte_id) if not akte: ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} – removing") - redis_client.srem(PROCESSING_ID_KEY, akte_id) else: await _emit_if_eligible(akte, None, ctx) - redis_client.srem(PROCESSING_ID_KEY, akte_id) except Exception as e: - ctx.logger.error(f"❌ Error (entity-id queue) {akte_id}: {e}") + ctx.logger.error(f"❌ Error (retry queue) {akte_id}: {e}") redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()}) + finally: redis_client.srem(PROCESSING_ID_KEY, akte_id) - raise - if not processed: + if not processed_count: if advo_pending > 0 or id_pending > 0: ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)") else: ctx.logger.info("✓ Both queues empty") + else: + ctx.logger.info(f"✓ Processed {processed_count} item(s)") ctx.logger.info("=" * 60) diff --git a/src/steps/crm/akte/akte_sync_event_step.py b/src/steps/crm/akte/akte_sync_event_step.py index 27c8ff5..ac1b81a 100644 --- a/src/steps/crm/akte/akte_sync_event_step.py +++ b/src/steps/crm/akte/akte_sync_event_step.py @@ -28,6 +28,8 @@ config = { "enqueues": ["document.generate_preview"], } +VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'}) +VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'}) # ───────────────────────────────────────────────────────────────────────────── # Entry point @@ -52,7 +54,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: return lock_key = f"akte_sync:{akte_id}" - lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) + lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=600) if not lock_acquired: ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing") raise RuntimeError(f"Lock busy for akte_id={akte_id}") @@ -81,8 +83,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}") # Advoware sync requires an aktennummer (Akten without Advoware won't have one) - advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active') - xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active') + advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES + xai_enabled = ai_aktivierungsstatus in VALID_AI_STATUSES ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}") ctx.logger.info(f" xAI sync : {'✅ ON' if xai_enabled else '⏭️ OFF'}") @@ -91,14 +93,20 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: ctx.logger.info("⏭️ Both syncs disabled – nothing to do") return - # ── ADVOWARE SYNC ────────────────────────────────────────────────── + # ── Load CDokumente once (shared by Advoware + xAI sync) ───────────────── + espo_docs: list = [] + if advoware_enabled or xai_enabled: + docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes', max_size=1000) + espo_docs = docs_result.get('list', []) + + # ── ADVOWARE SYNC ──────────────────────────────────────────── advoware_results = None if advoware_enabled: - advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx) + advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx, espo_docs) - # ── xAI SYNC ────────────────────────────────────────────────────── + # ── xAI SYNC ──────────────────────────────────────────────── if xai_enabled: - await _run_xai_sync(akte, akte_id, espocrm, ctx) + await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs) # ── Final Status ─────────────────────────────────────────────────── now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -154,7 +162,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: finally: if lock_acquired and redis_client: redis_client.delete(lock_key) - ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}") + ctx.logger.info(f"🔓 Lock released for Akte {akte_id}") # ───────────────────────────────────────────────────────────────────────────── @@ -167,6 +175,7 @@ async def _run_advoware_sync( akte_id: str, espocrm, ctx: FlowContext, + espo_docs: list, ) -> Dict[str, int]: from services.advoware_watcher_service import AdvowareWatcherService from services.advoware_history_service import AdvowareHistoryService @@ -187,10 +196,7 @@ async def _run_advoware_sync( ctx.logger.info("📂 ADVOWARE SYNC") ctx.logger.info("─" * 60) - # ── Fetch from all 3 sources ─────────────────────────────────────── - espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') - espo_docs = espo_docs_result.get('list', []) - + # ── Fetch Windows files + Advoware History ─────────────────────────── try: windows_files = await watcher.get_akte_files(aktennummer) except Exception as e: @@ -225,7 +231,7 @@ async def _run_advoware_sync( all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys()) ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}") - + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # ── 3-way merge per HNR ─────────────────────────────────────────── for hnr in all_hnrs: espo_doc = espo_by_hnr.get(hnr) @@ -259,7 +265,6 @@ async def _run_advoware_sync( blake3_hash = compute_blake3(content) mime_type, _ = mimetypes.guess_type(filename) mime_type = mime_type or 'application/octet-stream' - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') attachment = await espocrm.upload_attachment_for_file_field( file_content=content, @@ -307,7 +312,6 @@ async def _run_advoware_sync( blake3_hash = compute_blake3(content) mime_type, _ = mimetypes.guess_type(filename) mime_type = mime_type or 'application/octet-stream' - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') update_data: Dict[str, Any] = { 'name': filename, @@ -323,15 +327,12 @@ async def _run_advoware_sync( update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255] + # Mark for re-sync to xAI if content changed + if espo_doc.get('aiSyncStatus') == 'synced': + update_data['aiSyncStatus'] = 'unclean' await espocrm.update_entity('CDokumente', espo_doc['id'], update_data) results['updated'] += 1 - # Mark for re-sync to xAI (hash changed) - if espo_doc.get('aiSyncStatus') == 'synced': - await espocrm.update_entity('CDokumente', espo_doc['id'], { - 'aiSyncStatus': 'unclean', - }) - try: await ctx.emit('document.generate_preview', { 'entity_id': espo_doc['id'], @@ -388,6 +389,7 @@ async def _run_xai_sync( akte_id: str, espocrm, ctx: FlowContext, + docs: list, ) -> None: from services.xai_service import XAIService from services.xai_upload_utils import XAIUploadUtils @@ -408,9 +410,6 @@ async def _run_xai_sync( await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) return - # ── Load all linked documents ────────────────────────────────── - docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') - docs = docs_result.get('list', []) ctx.logger.info(f" Documents to check: {len(docs)}") synced = 0 @@ -418,9 +417,16 @@ async def _run_xai_sync( failed = 0 for doc in docs: + # Determine skip condition based on pre-sync state (avoids stale-dict stats bug) + will_skip = ( + doc.get('aiSyncStatus') == 'synced' + and doc.get('aiSyncHash') + and doc.get('blake3hash') + and doc.get('aiSyncHash') == doc.get('blake3hash') + ) ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm) if ok: - if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'): + if will_skip: skipped += 1 else: synced += 1