""" VMH Beteiligte Sync Handler Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events) Verarbeitet: - vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware - vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware - vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware (TODO) - vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig """ from typing import Dict, Any, Optional from motia import FlowContext from services.advoware import AdvowareAPI from services.advoware_service import AdvowareService from services.espocrm import EspoCRMAPI from services.espocrm_mapper import BeteiligteMapper from services.beteiligte_sync_utils import BeteiligteSync import json import redis import os config = { "name": "VMH Beteiligte Sync Handler", "description": "Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)", "flows": ["vmh-beteiligte"], "triggers": [ {"type": "queue", "topic": "vmh.beteiligte.create"}, {"type": "queue", "topic": "vmh.beteiligte.update"}, {"type": "queue", "topic": "vmh.beteiligte.delete"}, {"type": "queue", "topic": "vmh.beteiligte.sync_check"} ], "enqueues": [] } async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): """Zentraler Sync-Handler für Beteiligte""" entity_id = event_data.get('entity_id') action = event_data.get('action') source = event_data.get('source') if not entity_id: ctx.logger.error("Keine entity_id im Event gefunden") return ctx.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}") # Shared Redis client for distributed locking redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) # APIs initialisieren espocrm = EspoCRMAPI() advoware = AdvowareAPI(ctx) sync_utils = BeteiligteSync(espocrm, redis_client, ctx) mapper = BeteiligteMapper() # NOTE: Kommunikation Sync Manager wird in zukünftiger Version hinzugefügt # wenn kommunikation_sync_utils.py migriert ist # advo_service = AdvowareService(ctx) # komm_sync = KommunikationSyncManager(advo_service, espocrm, ctx) try: # 1. ACQUIRE LOCK (verhindert parallele Syncs) lock_acquired = await sync_utils.acquire_sync_lock(entity_id) if not lock_acquired: ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe") return # Lock erfolgreich acquired - MUSS im finally block released werden! try: # 2. FETCH ENTITY VON ESPOCRM try: espo_entity = await espocrm.get_entity('CBeteiligte', entity_id) except Exception as e: ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}") await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) return ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})") betnr = espo_entity.get('betnr') sync_status = espo_entity.get('syncStatus', 'pending_sync') # Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht sync_next_retry = espo_entity.get('syncNextRetry') if sync_next_retry and sync_status == 'failed': import datetime import pytz try: next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S') next_retry_ts = pytz.UTC.localize(next_retry_ts) now_utc = datetime.datetime.now(pytz.UTC) if now_utc < next_retry_ts: remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60) ctx.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten") await sync_utils.release_sync_lock(entity_id, sync_status) return except Exception as e: ctx.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}") # 3. BESTIMME SYNC-AKTION # FALL A: Neu (kein betnr) → CREATE in Advoware if not betnr and action in ['create', 'sync_check']: ctx.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware") await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) # FALL B: Existiert (hat betnr) → UPDATE oder CHECK elif betnr: ctx.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK") await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) # FALL C: DELETE (TODO: Implementierung später) elif action == 'delete': ctx.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}") await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert') else: ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}") await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}') except Exception as e: # Unerwarteter Fehler während Sync - GARANTIERE Lock-Release ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") import traceback ctx.logger.error(traceback.format_exc()) try: await sync_utils.release_sync_lock( entity_id, 'failed', f'Unerwarteter Fehler: {str(e)[:1900]}', increment_retry=True ) except Exception as release_error: # Selbst Lock-Release failed - logge kritischen Fehler ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}") # Force Redis lock release try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" redis_client.delete(lock_key) ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}") except: pass except Exception as e: # Fehler VOR Lock-Acquire - kein Lock-Release nötig ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}") import traceback ctx.logger.error(traceback.format_exc()) async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): """Erstellt neuen Beteiligten in Advoware""" try: ctx.logger.info(f"🔨 CREATE in Advoware...") # Transform zu Advoware Format advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity) ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...") # POST zu Advoware result = await advoware.api_call( 'api/v1/advonet/Beteiligte', method='POST', json_data=advo_data ) # Extrahiere betNr aus Response (case-insensitive: betNr oder betnr) new_betnr = None if isinstance(result, dict): new_betnr = result.get('betNr') or result.get('betnr') if not new_betnr: raise Exception(f"Keine betNr/betnr in Advoware Response: {result}") ctx.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}") # Lade Entity nach POST um rowId zu bekommen (WICHTIG für Change Detection!) created_entity = await advoware.api_call( f'api/v1/advonet/Beteiligte/{new_betnr}', method='GET' ) if isinstance(created_entity, list): new_rowid = created_entity[0].get('rowId') if created_entity else None else: new_rowid = created_entity.get('rowId') if not new_rowid: ctx.logger.warn(f"⚠️ Keine rowId nach CREATE - Change Detection nicht möglich!") # OPTIMIERT: Kombiniere release_lock + betnr + rowId update in 1 API call await sync_utils.release_sync_lock( entity_id, 'clean', error_message=None, extra_fields={ 'betnr': new_betnr, 'advowareRowId': new_rowid # WICHTIG für Change Detection! } ) ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}, rowId {new_rowid[:20] if new_rowid else 'N/A'}...") except Exception as e: ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}") await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): """Synchronisiert existierenden Beteiligten""" try: ctx.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...") # Fetch von Advoware try: advo_result = await advoware.api_call( f'api/v1/advonet/Beteiligte/{betnr}', method='GET' ) # Advoware gibt manchmal Listen zurück if isinstance(advo_result, list): advo_entity = advo_result[0] if advo_result else None else: advo_entity = advo_result if not advo_entity: raise Exception(f"Beteiligter betNr={betnr} nicht gefunden") except Exception as e: # 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht if '404' in str(e) or 'nicht gefunden' in str(e).lower(): ctx.logger.warn(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}") await sync_utils.handle_advoware_deleted(entity_id, str(e)) return else: raise ctx.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}") # ÄNDERUNGSERKENNUNG (Primary: rowId, Fallback: Timestamps) comparison = sync_utils.compare_entities(espo_entity, advo_entity) ctx.logger.info(f"⏱️ Vergleich: {comparison}") # KEIN STAMMDATEN-SYNC NÖTIG if comparison == 'no_change': ctx.logger.info(f"✅ Keine Stammdaten-Änderungen erkannt") # NOTE: Kommunikation-Sync würde hier stattfinden # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) await sync_utils.release_sync_lock(entity_id, 'clean') return # ESPOCRM NEUER → Update Advoware if comparison == 'espocrm_newer': ctx.logger.info(f"📤 EspoCRM ist neuer → Update Advoware STAMMDATEN") # OPTIMIERT: Use merge utility merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper) put_result = await advoware.api_call( f'api/v1/advonet/Beteiligte/{betnr}', method='PUT', json_data=merged_data ) # Extrahiere neue rowId aus PUT Response (spart extra GET!) new_rowid = None if isinstance(put_result, list) and len(put_result) > 0: new_rowid = put_result[0].get('rowId') elif isinstance(put_result, dict): new_rowid = put_result.get('rowId') ctx.logger.info(f"✅ Advoware STAMMDATEN aktualisiert, rowId: {new_rowid[:20] if new_rowid else 'N/A'}...") # Validiere Sync-Ergebnis validation_success, validation_error = await sync_utils.validate_sync_result( entity_id, betnr, mapper, direction='to_advoware' ) if not validation_success: ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}") await sync_utils.release_sync_lock( entity_id, 'failed', error_message=f"Validation failed: {validation_error}", increment_retry=True ) return # NOTE: Kommunikation-Sync würde hier stattfinden # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) # Release Lock + Update rowId await sync_utils.release_sync_lock( entity_id, 'clean', extra_fields={'advowareRowId': new_rowid} ) # ADVOWARE NEUER → Update EspoCRM elif comparison == 'advoware_newer': ctx.logger.info(f"📥 Advoware ist neuer → Update EspoCRM STAMMDATEN") espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity) await espocrm.update_entity('CBeteiligte', entity_id, espo_data) ctx.logger.info(f"✅ EspoCRM STAMMDATEN aktualisiert") # Validiere Sync-Ergebnis validation_success, validation_error = await sync_utils.validate_sync_result( entity_id, betnr, mapper, direction='to_espocrm' ) if not validation_success: ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}") await sync_utils.release_sync_lock( entity_id, 'failed', error_message=f"Validation failed: {validation_error}", increment_retry=True ) return # NOTE: Kommunikation-Sync würde hier stattfinden # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) # Release Lock + Update rowId await sync_utils.release_sync_lock( entity_id, 'clean', extra_fields={'advowareRowId': advo_entity.get('rowId')} ) # KONFLIKT → EspoCRM WINS elif comparison == 'conflict': ctx.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS (STAMMDATEN)") # OPTIMIERT: Use merge utility merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper) put_result = await advoware.api_call( f'api/v1/advonet/Beteiligte/{betnr}', method='PUT', json_data=merged_data ) # Extrahiere neue rowId aus PUT Response new_rowid = None if isinstance(put_result, list) and len(put_result) > 0: new_rowid = put_result[0].get('rowId') elif isinstance(put_result, dict): new_rowid = put_result.get('rowId') conflict_msg = ( f"EspoCRM: {espo_entity.get('modifiedAt')}, " f"Advoware: {advo_entity.get('geaendertAm')}. " f"EspoCRM hat gewonnen." ) ctx.logger.info(f"✅ Konflikt gelöst (EspoCRM won), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...") # Validiere Sync-Ergebnis validation_success, validation_error = await sync_utils.validate_sync_result( entity_id, betnr, mapper, direction='to_advoware' ) if not validation_success: ctx.logger.error(f"❌ Conflict resolution validation fehlgeschlagen: {validation_error}") await sync_utils.release_sync_lock( entity_id, 'failed', error_message=f"Conflict resolution validation failed: {validation_error}", increment_retry=True ) return await sync_utils.resolve_conflict_espocrm_wins( entity_id, espo_entity, advo_entity, conflict_msg, extra_fields={'advowareRowId': new_rowid} ) # NOTE: Kommunikation-Sync (nur EspoCRM→Advoware) würde hier stattfinden # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx, direction='to_advoware', force_espo_wins=True) await sync_utils.release_sync_lock(entity_id, 'clean') except Exception as e: ctx.logger.error(f"❌ UPDATE fehlgeschlagen: {e}") import traceback ctx.logger.error(traceback.format_exc()) await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)