diff --git a/services/advoware_service.py b/services/advoware_service.py index 954ba9a..79fc0e7 100644 --- a/services/advoware_service.py +++ b/services/advoware_service.py @@ -150,12 +150,16 @@ class AdvowareService: try: endpoint = f"api/v1/advonet/Akten/{akte_id}" result = await self.api.api_call(endpoint, method='GET') - + + # API may return a list (batch response) or a single dict + if isinstance(result, list): + result = result[0] if result else None + if result: self._log(f"[ADVO] ✅ Fetched Akte {akte_id}: {result.get('az', 'N/A')}") - + return result - + except Exception as e: self._log(f"[ADVO] Error loading Akte {akte_id}: {e}", level='error') return None diff --git a/services/espocrm.py b/services/espocrm.py index c576683..29279bc 100644 --- a/services/espocrm.py +++ b/services/espocrm.py @@ -162,11 +162,33 @@ class EspoCRMAPI: self._log(f"⚠️ Could not load entity def for {entity_type}: {e}", level='warn') return {} + @staticmethod + def _flatten_params(data, prefix: str = '') -> list: + """ + Flatten nested dict/list into PHP-style repeated query params. + EspoCRM expects where[0][type]=equals&where[0][attribute]=x format. + """ + result = [] + if isinstance(data, dict): + for k, v in data.items(): + new_key = f"{prefix}[{k}]" if prefix else str(k) + result.extend(EspoCRMAPI._flatten_params(v, new_key)) + elif isinstance(data, (list, tuple)): + for i, v in enumerate(data): + result.extend(EspoCRMAPI._flatten_params(v, f"{prefix}[{i}]")) + elif isinstance(data, bool): + result.append((prefix, 'true' if data else 'false')) + elif data is None: + result.append((prefix, '')) + else: + result.append((prefix, str(data))) + return result + async def api_call( self, endpoint: str, method: str = 'GET', - params: Optional[Dict] = None, + params=None, json_data: Optional[Dict] = None, timeout_seconds: Optional[int] = None ) -> Any: @@ -292,22 +314,22 @@ class EspoCRMAPI: Returns: Dict with 'list' and 'total' keys """ - params = { + search_params: Dict[str, Any] = { 'offset': offset, - 'maxSize': max_size + 'maxSize': max_size, } - if where: - import json - # EspoCRM expects JSON-encoded where clause - params['where'] = where if isinstance(where, str) else json.dumps(where) + search_params['where'] = where if select: - params['select'] = select + search_params['select'] = select if order_by: - params['orderBy'] = order_by - + search_params['orderBy'] = order_by + self._log(f"Listing {entity_type} entities") - return await self.api_call(f"/{entity_type}", method='GET', params=params) + return await self.api_call( + f"/{entity_type}", method='GET', + params=self._flatten_params(search_params) + ) async def list_related( self, @@ -321,23 +343,24 @@ class EspoCRMAPI: offset: int = 0, max_size: int = 50 ) -> Dict[str, Any]: - params = { + search_params: Dict[str, Any] = { 'offset': offset, - 'maxSize': max_size + 'maxSize': max_size, } - if where: - import json - params['where'] = where if isinstance(where, str) else json.dumps(where) + search_params['where'] = where if select: - params['select'] = select + search_params['select'] = select if order_by: - params['orderBy'] = order_by + search_params['orderBy'] = order_by if order: - params['order'] = order + search_params['order'] = order self._log(f"Listing related {entity_type}/{entity_id}/{link}") - return await self.api_call(f"/{entity_type}/{entity_id}/{link}", method='GET', params=params) + return await self.api_call( + f"/{entity_type}/{entity_id}/{link}", method='GET', + params=self._flatten_params(search_params) + ) async def create_entity( self, diff --git a/src/steps/akte/akte_sync_cron_step.py b/src/steps/akte/akte_sync_cron_step.py index b8d8cdd..e68742c 100644 --- a/src/steps/akte/akte_sync_cron_step.py +++ b/src/steps/akte/akte_sync_cron_step.py @@ -1,17 +1,16 @@ """ Akte Sync - Cron Poller -Polls Redis Sorted Set for pending Aktennummern every 10 seconds. -Respects a 10-second debounce window so that rapid filesystem events -(e.g. many files being updated at once) are batched into a single sync. +Polls two Redis Sorted Sets every 10 seconds (10 s debounce each): -Redis keys (same as advoware-watcher writes to): - advoware:pending_aktennummern – Sorted Set { aktennummer → timestamp } - advoware:processing_aktennummern – Set (tracks active syncs) + advoware:pending_aktennummern – written by Windows Advoware Watcher + { aktennummer → timestamp } + akte:pending_entity_ids – written by EspoCRM webhook + { akte_id → timestamp } -Eligibility check (either flag triggers a sync): - syncSchalter == True AND aktivierungsstatus in valid list → Advoware sync - aiAktivierungsstatus in valid list → xAI sync +Eligibility (either flag triggers sync): + syncSchalter AND aktivierungsstatus in valid list → Advoware sync + aiAktivierungsstatus in valid list → xAI sync """ from motia import FlowContext, cron @@ -25,9 +24,15 @@ config = { "enqueues": ["akte.sync"], } -PENDING_KEY = "advoware:pending_aktennummern" -PROCESSING_KEY = "advoware:processing_aktennummern" -DEBOUNCE_SECS = 10 +# Queue 1: written by Windows Advoware Watcher (keyed by Aktennummer) +PENDING_ADVO_KEY = "advoware:pending_aktennummern" +PROCESSING_ADVO_KEY = "advoware:processing_aktennummern" + +# Queue 2: written by EspoCRM webhook (keyed by entity ID) +PENDING_ID_KEY = "akte:pending_entity_ids" +PROCESSING_ID_KEY = "akte:processing_entity_ids" + +DEBOUNCE_SECS = 10 VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'} VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'} @@ -50,86 +55,111 @@ async def handler(input_data: None, ctx: FlowContext) -> None: espocrm = EspoCRMAPI(ctx) cutoff = time.time() - DEBOUNCE_SECS - pending_count = redis_client.zcard(PENDING_KEY) - processing_count = redis_client.scard(PROCESSING_KEY) - ctx.logger.info(f" Pending : {pending_count}") - ctx.logger.info(f" Processing : {processing_count}") + advo_pending = redis_client.zcard(PENDING_ADVO_KEY) + id_pending = redis_client.zcard(PENDING_ID_KEY) + ctx.logger.info(f" Pending (aktennr) : {advo_pending}") + ctx.logger.info(f" Pending (akte_id) : {id_pending}") - # Pull oldest entry that has passed the debounce window - old_entries = redis_client.zrangebyscore(PENDING_KEY, min=0, max=cutoff, start=0, num=1) + processed = False - if not old_entries: - if pending_count > 0: - ctx.logger.info(f"⏸️ {pending_count} pending – all too recent (< {DEBOUNCE_SECS}s)") + # ── Queue 1: Advoware Watcher (by Aktennummer) ───────────────────── + advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=1) + if advo_entries: + aktennr = advo_entries[0] + if isinstance(aktennr, bytes): + aktennr = aktennr.decode() + + score = redis_client.zscore(PENDING_ADVO_KEY, aktennr) or 0 + age = time.time() - score + redis_client.zrem(PENDING_ADVO_KEY, aktennr) + redis_client.sadd(PROCESSING_ADVO_KEY, aktennr) + + ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)") + processed = True + + try: + result = await espocrm.list_entities( + 'CAkten', + where=[{'type': 'equals', 'attribute': 'aktennummer', 'value': int(aktennr)}], + max_size=1, + ) + if not result or not result.get('list'): + ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} – removing") + redis_client.srem(PROCESSING_ADVO_KEY, aktennr) + else: + akte = result['list'][0] + await _emit_if_eligible(akte, aktennr, ctx) + redis_client.srem(PROCESSING_ADVO_KEY, aktennr) + except Exception as e: + ctx.logger.error(f"❌ Error (aktennr queue) {aktennr}: {e}") + redis_client.zadd(PENDING_ADVO_KEY, {aktennr: time.time()}) + redis_client.srem(PROCESSING_ADVO_KEY, aktennr) + raise + + # ── Queue 2: EspoCRM Webhook (by Entity ID) ──────────────────────── + id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=1) + if id_entries: + akte_id = id_entries[0] + if isinstance(akte_id, bytes): + akte_id = akte_id.decode() + + score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0 + age = time.time() - score + redis_client.zrem(PENDING_ID_KEY, akte_id) + redis_client.sadd(PROCESSING_ID_KEY, akte_id) + + ctx.logger.info(f"📋 Entity ID: {akte_id} (age={age:.1f}s)") + processed = True + + try: + akte = await espocrm.get_entity('CAkten', akte_id) + if not akte: + ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} – removing") + redis_client.srem(PROCESSING_ID_KEY, akte_id) + else: + await _emit_if_eligible(akte, None, ctx) + redis_client.srem(PROCESSING_ID_KEY, akte_id) + except Exception as e: + ctx.logger.error(f"❌ Error (entity-id queue) {akte_id}: {e}") + redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()}) + redis_client.srem(PROCESSING_ID_KEY, akte_id) + raise + + if not processed: + if advo_pending > 0 or id_pending > 0: + ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)") else: - ctx.logger.info("✓ Queue empty") - ctx.logger.info("=" * 60) + ctx.logger.info("✓ Both queues empty") + + ctx.logger.info("=" * 60) + + +async def _emit_if_eligible(akte: dict, aktennr, ctx: FlowContext) -> None: + """Check eligibility and emit akte.sync if applicable.""" + akte_id = akte['id'] + # Prefer aktennr from argument; fall back to entity field + aktennummer = aktennr or akte.get('aktennummer') + sync_schalter = akte.get('syncSchalter', False) + aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() + ai_status = str(akte.get('aiAktivierungsstatus') or '').lower() + + advoware_eligible = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES + xai_eligible = ai_status in VALID_AI_STATUSES + + ctx.logger.info(f" akte_id : {akte_id}") + ctx.logger.info(f" aktennummer : {aktennummer or '—'}") + ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'✅' if advoware_eligible else '⏭️'})") + ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'✅' if xai_eligible else '⏭️'})") + + if not advoware_eligible and not xai_eligible: + ctx.logger.warn(f"⚠️ Akte {akte_id} not eligible for any sync") return - aktennr = old_entries[0] - if isinstance(aktennr, bytes): - aktennr = aktennr.decode() - - score = redis_client.zscore(PENDING_KEY, aktennr) or 0 - age = time.time() - score - redis_client.zrem(PENDING_KEY, aktennr) - redis_client.sadd(PROCESSING_KEY, aktennr) - - ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)") - - try: - # ── Lookup in EspoCRM ────────────────────────────────────── - result = await espocrm.list_entities( - 'CAkten', - where=[{ - 'type': 'equals', - 'attribute': 'aktennummer', - 'value': aktennr, - }], - max_size=1, - ) - - if not result or not result.get('list'): - ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} – removing") - redis_client.srem(PROCESSING_KEY, aktennr) - ctx.logger.info("=" * 60) - return - - akte = result['list'][0] - akte_id = akte['id'] - sync_schalter = akte.get('syncSchalter', False) - aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() - ai_status = str(akte.get('aiAktivierungsstatus') or '').lower() - - advoware_eligible = sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES - xai_eligible = ai_status in VALID_AI_STATUSES - - ctx.logger.info(f" Akte ID : {akte_id}") - ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus} ({'✅' if advoware_eligible else '⏭️'})") - ctx.logger.info(f" aiAktivierungsstatus : {ai_status} ({'✅' if xai_eligible else '⏭️'})") - - if not advoware_eligible and not xai_eligible: - ctx.logger.warn(f"⚠️ Akte {aktennr} not eligible for any sync – removing") - redis_client.srem(PROCESSING_KEY, aktennr) - ctx.logger.info("=" * 60) - return - - # ── Emit sync event ──────────────────────────────────────── - await ctx.enqueue({ - 'topic': 'akte.sync', - 'data': { - 'aktennummer': aktennr, - 'akte_id': akte_id, - }, - }) - ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id})") - - except Exception as e: - ctx.logger.error(f"❌ Error processing {aktennr}: {e}") - # Requeue for retry - redis_client.zadd(PENDING_KEY, {aktennr: time.time()}) - redis_client.srem(PROCESSING_KEY, aktennr) - raise - - finally: - ctx.logger.info("=" * 60) + await ctx.enqueue({ + 'topic': 'akte.sync', + 'data': { + 'akte_id': akte_id, + 'aktennummer': aktennummer, # may be None for xAI-only Akten + }, + }) + ctx.logger.info(f"📤 akte.sync emitted (akte_id={akte_id}, aktennummer={aktennummer or '—'})") diff --git a/src/steps/akte/akte_sync_event_step.py b/src/steps/akte/akte_sync_event_step.py index 93f8ef9..20d56a8 100644 --- a/src/steps/akte/akte_sync_event_step.py +++ b/src/steps/akte/akte_sync_event_step.py @@ -54,8 +54,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: lock_key = f"akte_sync:{akte_id}" lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) if not lock_acquired: - ctx.logger.warn(f"⏸️ Lock busy for Akte {aktennummer} – requeueing") - raise RuntimeError(f"Lock busy for {aktennummer}") + ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing") + raise RuntimeError(f"Lock busy for akte_id={akte_id}") espocrm = EspoCRMAPI(ctx) @@ -64,9 +64,13 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: akte = await espocrm.get_entity('CAkten', akte_id) if not akte: ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM") - redis_client.srem("akte:processing", aktennummer) return + # aktennummer can come from the event payload OR from the entity + # (Akten without Advoware have no aktennummer) + if not aktennummer: + aktennummer = akte.get('aktennummer') + sync_schalter = akte.get('syncSchalter', False) aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() @@ -76,7 +80,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}") ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}") - advoware_enabled = sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active') + # Advoware sync requires an aktennummer (Akten without Advoware won't have one) + advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active') xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active') ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}") @@ -84,7 +89,6 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: if not advoware_enabled and not xai_enabled: ctx.logger.info("⏭️ Both syncs disabled – nothing to do") - redis_client.srem("akte:processing", aktennummer) return # ── ADVOWARE SYNC ────────────────────────────────────────────────── @@ -102,12 +106,23 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: if advoware_enabled: final_update['syncStatus'] = 'synced' final_update['lastSync'] = now + # 'import' = erster Sync → danach auf 'aktiv' setzen + if aktivierungsstatus == 'import': + final_update['aktivierungsstatus'] = 'aktiv' + ctx.logger.info("🔄 aktivierungsstatus: import → aktiv") if xai_enabled: final_update['aiSyncStatus'] = 'synced' final_update['aiLastSync'] = now + # 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen + if ai_aktivierungsstatus == 'new': + final_update['aiAktivierungsstatus'] = 'aktiv' + ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv") await espocrm.update_entity('CAkten', akte_id, final_update) - redis_client.srem("akte:processing", aktennummer) + # Clean up processing sets (both queues may have triggered this sync) + if aktennummer: + redis_client.srem("advoware:processing_aktennummern", aktennummer) + redis_client.srem("akte:processing_entity_ids", akte_id) ctx.logger.info("=" * 80) ctx.logger.info("✅ AKTE SYNC COMPLETE") @@ -120,9 +135,12 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: import traceback ctx.logger.error(traceback.format_exc()) - # Requeue for retry + # Requeue for retry (into the appropriate queue(s)) import time - redis_client.zadd("akte:pending", {aktennummer: time.time()}) + now_ts = time.time() + if aktennummer: + redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts}) + redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts}) try: await espocrm.update_entity('CAkten', akte_id, { @@ -254,8 +272,8 @@ async def _run_advoware_sync( 'name': filename, 'dokumentId': attachment.get('id'), 'hnr': history_entry.get('hNr') if history_entry else None, - 'advowareArt': history_entry.get('art', 'Schreiben') if history_entry else 'Schreiben', - 'advowareBemerkung': history_entry.get('text', '') if history_entry else '', + 'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben', + 'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '', 'dateipfad': windows_file.get('path', ''), 'blake3hash': blake3_hash, 'syncedHash': blake3_hash, @@ -302,8 +320,8 @@ async def _run_advoware_sync( } if history_entry: update_data['hnr'] = history_entry.get('hNr') - update_data['advowareArt'] = history_entry.get('art', 'Schreiben') - update_data['advowareBemerkung'] = history_entry.get('text', '') + update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] + update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255] await espocrm.update_entity('CDokumente', espo_doc['id'], update_data) results['updated'] += 1 @@ -324,8 +342,15 @@ async def _run_advoware_sync( elif action.action == 'DELETE': if espo_doc: - await espocrm.delete_entity('CDokumente', espo_doc['id']) - results['deleted'] += 1 + # Only delete if the HNR is genuinely absent from Advoware History + # (not just absent from Windows – avoids deleting docs whose file + # is temporarily unavailable on the Windows share) + if hnr in history_by_hnr: + ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows") + results['skipped'] += 1 + else: + await espocrm.delete_entity('CDokumente', espo_doc['id']) + results['deleted'] += 1 except Exception as e: ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}") diff --git a/src/steps/akte/webhook/__init__.py b/src/steps/akte/webhook/__init__.py new file mode 100644 index 0000000..d93801b --- /dev/null +++ b/src/steps/akte/webhook/__init__.py @@ -0,0 +1 @@ +# Akte webhook steps diff --git a/src/steps/akte/webhook/akte_webhook_step.py b/src/steps/akte/webhook/akte_webhook_step.py new file mode 100644 index 0000000..466a586 --- /dev/null +++ b/src/steps/akte/webhook/akte_webhook_step.py @@ -0,0 +1,68 @@ +""" +Akte Sync - EspoCRM Webhook + +Empfängt EspoCRM-Webhooks für CAkten (create / update / delete). +Schreibt die Entity-ID in die Redis-Queue `akte:pending_entity_ids` +mit 10-Sekunden-Debounce — der Cron-Poller übernimmt den Rest. + +Route: POST /akte/webhook/update +Payload: { "id": "..." } oder [{ "id": "..." }, ...] +""" + +import json +import time +from typing import Any +from motia import FlowContext, http, ApiRequest, ApiResponse + + +config = { + "name": "Akte Webhook - EspoCRM", + "description": "Empfängt EspoCRM-Webhooks für CAkten und queued Entity-IDs für den Sync", + "flows": ["akte-sync"], + "triggers": [http("POST", "/akte/webhook/update")], + "enqueues": [], +} + +PENDING_KEY = "akte:pending_entity_ids" + + +async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: + try: + payload = request.body or {} + + ctx.logger.info("=" * 60) + ctx.logger.info("📥 AKTE WEBHOOK") + ctx.logger.info(f" Payload: {json.dumps(payload, ensure_ascii=False)[:200]}") + + # ── Collect entity IDs ───────────────────────────────────── + entity_ids: set[str] = set() + if isinstance(payload, list): + for item in payload: + if isinstance(item, dict) and 'id' in item: + entity_ids.add(item['id']) + elif isinstance(payload, dict) and 'id' in payload: + entity_ids.add(payload['id']) + + if not entity_ids: + ctx.logger.warn("⚠️ No entity IDs in payload") + return ApiResponse(status_code=400, body={"error": "No entity ID found in payload"}) + + # ── Push to Redis with current timestamp (debounce in cron) ─ + from services.redis_client import get_redis_client + redis_client = get_redis_client(strict=False) + if not redis_client: + ctx.logger.error("❌ Redis unavailable") + return ApiResponse(status_code=503, body={"error": "Service unavailable"}) + + ts = time.time() + mapping = {eid: ts for eid in entity_ids} + redis_client.zadd(PENDING_KEY, mapping) + + ctx.logger.info(f"✅ Queued {len(entity_ids)} entity ID(s): {entity_ids}") + ctx.logger.info("=" * 60) + + return ApiResponse(status_code=200, body={"queued": len(entity_ids)}) + + except Exception as e: + ctx.logger.error(f"❌ Webhook error: {e}") + return ApiResponse(status_code=500, body={"error": str(e)})