From 459fa4103394bdb310612d7ffc4d110dfc4f3fa8 Mon Sep 17 00:00:00 2001 From: bsiggel Date: Thu, 26 Mar 2026 13:06:32 +0000 Subject: [PATCH] feat(sync): Refactor Akte sync status handling and remove deprecated event step --- services/xai_upload_utils.py | 2 +- src/steps/akte/akte_sync_event_step.py | 435 --------------------- src/steps/crm/akte/akte_sync_cron_step.py | 4 +- src/steps/crm/akte/akte_sync_event_step.py | 14 +- 4 files changed, 10 insertions(+), 445 deletions(-) delete mode 100644 src/steps/akte/akte_sync_event_step.py diff --git a/services/xai_upload_utils.py b/services/xai_upload_utils.py index 5808c8b..981c5a3 100644 --- a/services/xai_upload_utils.py +++ b/services/xai_upload_utils.py @@ -67,7 +67,7 @@ class XAIUploadUtils: 'rubrum': str(akte.get('rubrum', '') or ''), } ) - collection_id = col['id'] + collection_id = col.get('collection_id') or col.get('id') self._log.info(f"✅ Collection created: {collection_id}") # Save back to EspoCRM diff --git a/src/steps/akte/akte_sync_event_step.py b/src/steps/akte/akte_sync_event_step.py deleted file mode 100644 index 27c8ff5..0000000 --- a/src/steps/akte/akte_sync_event_step.py +++ /dev/null @@ -1,435 +0,0 @@ -""" -Akte Sync - Event Handler - -Unified sync for one CAkten entity across all configured backends: - - Advoware (3-way merge: Windows ↔ EspoCRM ↔ History) - - xAI (Blake3 hash-based upload to Collection) - -Both run in the same event to keep CDokumente perfectly in sync. - -Trigger: akte.sync { akte_id, aktennummer } -Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte) -Parallel: Different Akten sync simultaneously. - -Enqueues: - - document.generate_preview (after CREATE / UPDATE_ESPO) -""" - -from typing import Dict, Any -from datetime import datetime -from motia import FlowContext, queue - - -config = { - "name": "Akte Sync - Event Handler", - "description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload", - "flows": ["akte-sync"], - "triggers": [queue("akte.sync")], - "enqueues": ["document.generate_preview"], -} - - -# ───────────────────────────────────────────────────────────────────────────── -# Entry point -# ───────────────────────────────────────────────────────────────────────────── - -async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: - akte_id = event_data.get('akte_id') - aktennummer = event_data.get('aktennummer') - - ctx.logger.info("=" * 80) - ctx.logger.info("🔄 AKTE SYNC STARTED") - ctx.logger.info(f" Aktennummer : {aktennummer}") - ctx.logger.info(f" EspoCRM ID : {akte_id}") - ctx.logger.info("=" * 80) - - from services.redis_client import get_redis_client - from services.espocrm import EspoCRMAPI - - redis_client = get_redis_client(strict=False) - if not redis_client: - ctx.logger.error("❌ Redis unavailable") - return - - lock_key = f"akte_sync:{akte_id}" - lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) - if not lock_acquired: - ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing") - raise RuntimeError(f"Lock busy for akte_id={akte_id}") - - espocrm = EspoCRMAPI(ctx) - - try: - # ── Load Akte ────────────────────────────────────────────────────── - akte = await espocrm.get_entity('CAkten', akte_id) - if not akte: - ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM") - return - - # aktennummer can come from the event payload OR from the entity - # (Akten without Advoware have no aktennummer) - if not aktennummer: - aktennummer = akte.get('aktennummer') - - sync_schalter = akte.get('syncSchalter', False) - aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() - ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() - - ctx.logger.info(f"📋 Akte '{akte.get('name')}'") - ctx.logger.info(f" syncSchalter : {sync_schalter}") - ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}") - ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}") - - # Advoware sync requires an aktennummer (Akten without Advoware won't have one) - advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active') - xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active') - - ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}") - ctx.logger.info(f" xAI sync : {'✅ ON' if xai_enabled else '⏭️ OFF'}") - - if not advoware_enabled and not xai_enabled: - ctx.logger.info("⏭️ Both syncs disabled – nothing to do") - return - - # ── ADVOWARE SYNC ────────────────────────────────────────────────── - advoware_results = None - if advoware_enabled: - advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx) - - # ── xAI SYNC ────────────────────────────────────────────────────── - if xai_enabled: - await _run_xai_sync(akte, akte_id, espocrm, ctx) - - # ── Final Status ─────────────────────────────────────────────────── - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'} - if advoware_enabled: - final_update['syncStatus'] = 'synced' - final_update['lastSync'] = now - # 'import' = erster Sync → danach auf 'aktiv' setzen - if aktivierungsstatus == 'import': - final_update['aktivierungsstatus'] = 'aktiv' - ctx.logger.info("🔄 aktivierungsstatus: import → aktiv") - if xai_enabled: - final_update['aiSyncStatus'] = 'synced' - final_update['aiLastSync'] = now - # 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen - if ai_aktivierungsstatus == 'new': - final_update['aiAktivierungsstatus'] = 'aktiv' - ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv") - - await espocrm.update_entity('CAkten', akte_id, final_update) - # Clean up processing sets (both queues may have triggered this sync) - if aktennummer: - redis_client.srem("advoware:processing_aktennummern", aktennummer) - redis_client.srem("akte:processing_entity_ids", akte_id) - - ctx.logger.info("=" * 80) - ctx.logger.info("✅ AKTE SYNC COMPLETE") - if advoware_results: - ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}") - ctx.logger.info("=" * 80) - - except Exception as e: - ctx.logger.error(f"❌ Sync failed: {e}") - import traceback - ctx.logger.error(traceback.format_exc()) - - # Requeue for retry (into the appropriate queue(s)) - import time - now_ts = time.time() - if aktennummer: - redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts}) - redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts}) - - try: - await espocrm.update_entity('CAkten', akte_id, { - 'syncStatus': 'failed', - 'globalSyncStatus': 'failed', - }) - except Exception: - pass - raise - - finally: - if lock_acquired and redis_client: - redis_client.delete(lock_key) - ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}") - - -# ───────────────────────────────────────────────────────────────────────────── -# Advoware 3-way merge -# ───────────────────────────────────────────────────────────────────────────── - -async def _run_advoware_sync( - akte: Dict[str, Any], - aktennummer: str, - akte_id: str, - espocrm, - ctx: FlowContext, -) -> Dict[str, int]: - from services.advoware_watcher_service import AdvowareWatcherService - from services.advoware_history_service import AdvowareHistoryService - from services.advoware_service import AdvowareService - from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils - from services.blake3_utils import compute_blake3 - import mimetypes - - watcher = AdvowareWatcherService(ctx) - history_service = AdvowareHistoryService(ctx) - advoware_service = AdvowareService(ctx) - sync_utils = AdvowareDocumentSyncUtils(ctx) - - results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0} - - ctx.logger.info("") - ctx.logger.info("─" * 60) - ctx.logger.info("📂 ADVOWARE SYNC") - ctx.logger.info("─" * 60) - - # ── Fetch from all 3 sources ─────────────────────────────────────── - espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') - espo_docs = espo_docs_result.get('list', []) - - try: - windows_files = await watcher.get_akte_files(aktennummer) - except Exception as e: - ctx.logger.error(f"❌ Windows watcher failed: {e}") - windows_files = [] - - try: - advo_history = await history_service.get_akte_history(aktennummer) - except Exception as e: - ctx.logger.error(f"❌ Advoware history failed: {e}") - advo_history = [] - - ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}") - ctx.logger.info(f" Windows files : {len(windows_files)}") - ctx.logger.info(f" History entries: {len(advo_history)}") - - # ── Cleanup Windows list (only files in History) ─────────────────── - windows_files = sync_utils.cleanup_file_list(windows_files, advo_history) - - # ── Build indexes by HNR (stable identifier from Advoware) ──────── - espo_by_hnr = {} - for doc in espo_docs: - if doc.get('hnr'): - espo_by_hnr[doc['hnr']] = doc - - history_by_hnr = {} - for entry in advo_history: - if entry.get('hNr'): - history_by_hnr[entry['hNr']] = entry - - windows_by_path = {f.get('path', '').lower(): f for f in windows_files} - - all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys()) - ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}") - - # ── 3-way merge per HNR ─────────────────────────────────────────── - for hnr in all_hnrs: - espo_doc = espo_by_hnr.get(hnr) - history_entry = history_by_hnr.get(hnr) - - windows_file = None - if history_entry and history_entry.get('datei'): - windows_file = windows_by_path.get(history_entry['datei'].lower()) - - if history_entry and history_entry.get('datei'): - filename = history_entry['datei'].split('\\')[-1] - elif espo_doc: - filename = espo_doc.get('name', f'hnr_{hnr}') - else: - filename = f'hnr_{hnr}' - - try: - action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry) - ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) – {action.reason}") - - if action.action == 'SKIP': - results['skipped'] += 1 - - elif action.action == 'CREATE': - if not windows_file: - ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}") - results['errors'] += 1 - continue - - content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) - blake3_hash = compute_blake3(content) - mime_type, _ = mimetypes.guess_type(filename) - mime_type = mime_type or 'application/octet-stream' - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - attachment = await espocrm.upload_attachment_for_file_field( - file_content=content, - filename=filename, - related_type='CDokumente', - field='dokument', - mime_type=mime_type, - ) - new_doc = await espocrm.create_entity('CDokumente', { - 'name': filename, - 'dokumentId': attachment.get('id'), - 'hnr': history_entry.get('hNr') if history_entry else None, - 'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben', - 'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '', - 'dateipfad': windows_file.get('path', ''), - 'blake3hash': blake3_hash, - 'syncedHash': blake3_hash, - 'usn': windows_file.get('usn', 0), - 'syncStatus': 'synced', - 'lastSyncTimestamp': now, - 'cAktenId': akte_id, # Direct FK to CAkten - }) - doc_id = new_doc.get('id') - - # Link to Akte - await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id) - results['created'] += 1 - - # Trigger preview - try: - await ctx.emit('document.generate_preview', { - 'entity_id': doc_id, - 'entity_type': 'CDokumente', - }) - except Exception as e: - ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") - - elif action.action == 'UPDATE_ESPO': - if not windows_file: - ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}") - results['errors'] += 1 - continue - - content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) - blake3_hash = compute_blake3(content) - mime_type, _ = mimetypes.guess_type(filename) - mime_type = mime_type or 'application/octet-stream' - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - update_data: Dict[str, Any] = { - 'name': filename, - 'blake3hash': blake3_hash, - 'syncedHash': blake3_hash, - 'usn': windows_file.get('usn', 0), - 'dateipfad': windows_file.get('path', ''), - 'syncStatus': 'synced', - 'lastSyncTimestamp': now, - } - if history_entry: - update_data['hnr'] = history_entry.get('hNr') - update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] - update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255] - - await espocrm.update_entity('CDokumente', espo_doc['id'], update_data) - results['updated'] += 1 - - # Mark for re-sync to xAI (hash changed) - if espo_doc.get('aiSyncStatus') == 'synced': - await espocrm.update_entity('CDokumente', espo_doc['id'], { - 'aiSyncStatus': 'unclean', - }) - - try: - await ctx.emit('document.generate_preview', { - 'entity_id': espo_doc['id'], - 'entity_type': 'CDokumente', - }) - except Exception as e: - ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") - - elif action.action == 'DELETE': - if espo_doc: - # Only delete if the HNR is genuinely absent from Advoware History - # (not just absent from Windows – avoids deleting docs whose file - # is temporarily unavailable on the Windows share) - if hnr in history_by_hnr: - ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows") - results['skipped'] += 1 - else: - await espocrm.delete_entity('CDokumente', espo_doc['id']) - results['deleted'] += 1 - - except Exception as e: - ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}") - results['errors'] += 1 - - # ── Ablage check + Rubrum sync ───────────────────────────────────── - try: - akte_details = await advoware_service.get_akte(aktennummer) - if akte_details: - espo_update: Dict[str, Any] = {} - - if akte_details.get('ablage') == 1: - ctx.logger.info("📁 Akte marked as ablage → deactivating") - espo_update['aktivierungsstatus'] = 'deaktiviert' - - rubrum = akte_details.get('rubrum') - if rubrum and rubrum != akte.get('rubrum'): - espo_update['rubrum'] = rubrum - ctx.logger.info(f"📝 Rubrum synced: {rubrum[:80]}") - - if espo_update: - await espocrm.update_entity('CAkten', akte_id, espo_update) - except Exception as e: - ctx.logger.warn(f"⚠️ Ablage/Rubrum check failed: {e}") - - return results - - -# ───────────────────────────────────────────────────────────────────────────── -# xAI sync -# ───────────────────────────────────────────────────────────────────────────── - -async def _run_xai_sync( - akte: Dict[str, Any], - akte_id: str, - espocrm, - ctx: FlowContext, -) -> None: - from services.xai_service import XAIService - from services.xai_upload_utils import XAIUploadUtils - - xai = XAIService(ctx) - upload_utils = XAIUploadUtils(ctx) - - ctx.logger.info("") - ctx.logger.info("─" * 60) - ctx.logger.info("🤖 xAI SYNC") - ctx.logger.info("─" * 60) - - try: - # ── Ensure collection exists ─────────────────────────────────── - collection_id = await upload_utils.ensure_collection(akte, xai, espocrm) - if not collection_id: - ctx.logger.error("❌ Could not obtain xAI collection – aborting xAI sync") - await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return - - # ── Load all linked documents ────────────────────────────────── - docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') - docs = docs_result.get('list', []) - ctx.logger.info(f" Documents to check: {len(docs)}") - - synced = 0 - skipped = 0 - failed = 0 - - for doc in docs: - ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm) - if ok: - if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'): - skipped += 1 - else: - synced += 1 - else: - failed += 1 - - ctx.logger.info(f" ✅ Synced : {synced}") - ctx.logger.info(f" ⏭️ Skipped : {skipped}") - ctx.logger.info(f" ❌ Failed : {failed}") - - finally: - await xai.close() diff --git a/src/steps/crm/akte/akte_sync_cron_step.py b/src/steps/crm/akte/akte_sync_cron_step.py index 4b5e6d8..0ac6879 100644 --- a/src/steps/crm/akte/akte_sync_cron_step.py +++ b/src/steps/crm/akte/akte_sync_cron_step.py @@ -32,8 +32,8 @@ PROCESSING_ADVO_KEY = "advoware:processing_aktennummern" DEBOUNCE_SECS = 10 BATCH_SIZE = 5 # max items to process per cron tick -VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'}) -VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'}) +VALID_ADVOWARE_STATUSES = frozenset({'import', 'new', 'active'}) +VALID_AI_STATUSES = frozenset({'new', 'active'}) async def handler(input_data: None, ctx: FlowContext) -> None: diff --git a/src/steps/crm/akte/akte_sync_event_step.py b/src/steps/crm/akte/akte_sync_event_step.py index c618cdc..73cf49d 100644 --- a/src/steps/crm/akte/akte_sync_event_step.py +++ b/src/steps/crm/akte/akte_sync_event_step.py @@ -28,8 +28,8 @@ config = { "enqueues": ["document.generate_preview"], } -VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'}) -VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'}) +VALID_ADVOWARE_STATUSES = frozenset({'import', 'new', 'active'}) +VALID_AI_STATUSES = frozenset({'new', 'active'}) # ───────────────────────────────────────────────────────────────────────────── # Entry point @@ -115,15 +115,15 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: final_update['lastSync'] = now # 'import' = erster Sync → danach auf 'aktiv' setzen if aktivierungsstatus == 'import': - final_update['aktivierungsstatus'] = 'aktiv' - ctx.logger.info("🔄 aktivierungsstatus: import → aktiv") + final_update['aktivierungsstatus'] = 'active' + ctx.logger.info("🔄 aktivierungsstatus: import → active") if xai_enabled: final_update['aiSyncStatus'] = 'synced' final_update['aiLastSync'] = now # 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen if ai_aktivierungsstatus == 'new': - final_update['aiAktivierungsstatus'] = 'aktiv' - ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv") + final_update['aiAktivierungsstatus'] = 'active' + ctx.logger.info("🔄 aiAktivierungsstatus: new → active") await espocrm.update_entity('CAkten', akte_id, final_update) # Clean up processing set (Advoware Watcher queue) @@ -361,7 +361,7 @@ async def _run_advoware_sync( if akte_details.get('ablage') == 1: ctx.logger.info("📁 Akte marked as ablage → deactivating") - espo_update['aktivierungsstatus'] = 'deaktiviert' + espo_update['aktivierungsstatus'] = 'inactive' rubrum = akte_details.get('rubrum') if rubrum and rubrum != akte.get('rubrum'):