From 014947e9e0e951810cc17d519a8bf2d27bcf0fcc Mon Sep 17 00:00:00 2001 From: bsiggel Date: Sun, 1 Mar 2026 22:19:36 +0000 Subject: [PATCH] Migrate VMH Integration - Phase 3: Core sync handlers & utilities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added 5 core service modules: * services/notification_utils.py: NotificationManager for manual actions (412 lines) * services/advoware_service.py: Extended Advoware operations wrapper * services/espocrm_mapper.py: BeteiligteMapper for data transformation (198 lines) * services/bankverbindungen_mapper.py: BankverbindungenMapper (174 lines) * services/beteiligte_sync_utils.py: BeteiligteSync with complex logic (663 lines) - Distributed locking via Redis + syncStatus - rowId-based change detection (primary) + timestamp fallback - Retry with exponential backoff (1min, 5min, 15min, 1h, 4h) - Conflict resolution (EspoCRM wins) - Soft-delete handling & round-trip validation - Added 2 event handler steps: * steps/vmh/beteiligte_sync_event_step.py: Handles vmh.beteiligte.* queue events - CREATE: New Beteiligte → POST to Advoware - UPDATE: Bi-directional sync with conflict resolution - DELETE: Not yet implemented - SYNC_CHECK: Periodic sync check via cron * steps/vmh/bankverbindungen_sync_event_step.py: Handles vmh.bankverbindungen.* events - CREATE: New Bankverbindung → POST to Advoware - UPDATE/DELETE: Send notification (API limitation - no PUT/DELETE support) - Added pytz dependency to pyproject.toml (required for timezone handling) System Status: - 27 steps registered (14 VMH-specific) - 8 queue subscriptions active (4 Beteiligte + 4 Bankverbindungen) - All webhook endpoints operational and emitting queue events - Sync handlers ready for processing Note: Kommunikation sync (kommunikation_sync_utils.py) not yet migrated. Complex bi-directional sync for Telefon/Email/Fax will be added in Phase 4. Updated MIGRATION_STATUS.md --- pyproject.toml | 1 + services/advoware_service.py | 120 ++++ services/bankverbindungen_mapper.py | 174 +++++ services/beteiligte_sync_utils.py | 669 ++++++++++++++++++ services/espocrm_mapper.py | 193 +++++ services/notification_utils.py | 438 ++++++++++++ steps/vmh/bankverbindungen_sync_event_step.py | 264 +++++++ steps/vmh/beteiligte_sync_event_step.py | 413 +++++++++++ uv.lock | 11 + 9 files changed, 2283 insertions(+) create mode 100644 services/advoware_service.py create mode 100644 services/bankverbindungen_mapper.py create mode 100644 services/beteiligte_sync_utils.py create mode 100644 services/espocrm_mapper.py create mode 100644 services/notification_utils.py create mode 100644 steps/vmh/bankverbindungen_sync_event_step.py create mode 100644 steps/vmh/beteiligte_sync_event_step.py diff --git a/pyproject.toml b/pyproject.toml index cefe37f..13c3dc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,5 +12,6 @@ dependencies = [ "aiohttp>=3.10.0", "redis>=5.2.0", "python-dotenv>=1.0.0", + "pytz>=2025.2", ] diff --git a/services/advoware_service.py b/services/advoware_service.py new file mode 100644 index 0000000..bcd3da3 --- /dev/null +++ b/services/advoware_service.py @@ -0,0 +1,120 @@ +""" +Advoware Service Wrapper +Erweitert AdvowareAPI mit höheren Operations +""" + +import logging +from typing import Dict, Any, Optional +from services.advoware import AdvowareAPI + +logger = logging.getLogger(__name__) + + +class AdvowareService: + """ + Service-Layer für Advoware Operations + Verwendet AdvowareAPI für API-Calls + """ + + def __init__(self, context=None): + self.api = AdvowareAPI(context) + self.context = context + + # ========== BETEILIGTE ========== + + async def get_beteiligter(self, betnr: int) -> Optional[Dict]: + """ + Lädt Beteiligten mit allen Daten + + Returns: + Beteiligte-Objekt + """ + try: + endpoint = f"api/v1/advonet/Beteiligte/{betnr}" + result = await self.api.api_call(endpoint, method='GET') + return result + except Exception as e: + logger.error(f"[ADVO] Fehler beim Laden von Beteiligte {betnr}: {e}", exc_info=True) + return None + + # ========== KOMMUNIKATION ========== + + async def create_kommunikation(self, betnr: int, data: Dict[str, Any]) -> Optional[Dict]: + """ + Erstellt neue Kommunikation + + Args: + betnr: Beteiligten-Nummer + data: { + 'tlf': str, # Required + 'bemerkung': str, # Optional + 'kommKz': int, # Required (1-12) + 'online': bool # Optional + } + + Returns: + Neue Kommunikation mit 'id' + """ + try: + endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen" + result = await self.api.api_call(endpoint, method='POST', json_data=data) + + if result: + logger.info(f"[ADVO] ✅ Created Kommunikation: betnr={betnr}, kommKz={data.get('kommKz')}") + + return result + + except Exception as e: + logger.error(f"[ADVO] Fehler beim Erstellen von Kommunikation: {e}", exc_info=True) + return None + + async def update_kommunikation(self, betnr: int, komm_id: int, data: Dict[str, Any]) -> bool: + """ + Aktualisiert bestehende Kommunikation + + Args: + betnr: Beteiligten-Nummer + komm_id: Kommunikation-ID + data: { + 'tlf': str, # Optional + 'bemerkung': str, # Optional + 'online': bool # Optional + } + + NOTE: kommKz ist READ-ONLY und kann nicht geändert werden + + Returns: + True wenn erfolgreich + """ + try: + endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}" + await self.api.api_call(endpoint, method='PUT', json_data=data) + + logger.info(f"[ADVO] ✅ Updated Kommunikation: betnr={betnr}, komm_id={komm_id}") + return True + + except Exception as e: + logger.error(f"[ADVO] Fehler beim Update von Kommunikation: {e}", exc_info=True) + return False + + async def delete_kommunikation(self, betnr: int, komm_id: int) -> bool: + """ + Löscht Kommunikation (aktuell 403 Forbidden) + + NOTE: DELETE ist in Advoware API deaktiviert + Verwende stattdessen: Leere Slots mit empty_slot_marker + + Returns: + True wenn erfolgreich + """ + try: + endpoint = f"api/v1/advonet/Beteiligte/{betnr}/Kommunikationen/{komm_id}" + await self.api.api_call(endpoint, method='DELETE') + + logger.info(f"[ADVO] ✅ Deleted Kommunikation: betnr={betnr}, komm_id={komm_id}") + return True + + except Exception as e: + # Expected: 403 Forbidden + logger.warning(f"[ADVO] DELETE not allowed (expected): {e}") + return False diff --git a/services/bankverbindungen_mapper.py b/services/bankverbindungen_mapper.py new file mode 100644 index 0000000..8880b56 --- /dev/null +++ b/services/bankverbindungen_mapper.py @@ -0,0 +1,174 @@ +""" +EspoCRM ↔ Advoware Bankverbindungen Mapper + +Transformiert Bankverbindungen zwischen den beiden Systemen +""" + +from typing import Dict, Any, Optional, List +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class BankverbindungenMapper: + """Mapper für CBankverbindungen (EspoCRM) ↔ Bankverbindung (Advoware)""" + + @staticmethod + def map_cbankverbindungen_to_advoware(espo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert EspoCRM CBankverbindungen → Advoware Bankverbindung Format + + Args: + espo_entity: CBankverbindungen Entity von EspoCRM + + Returns: + Dict für Advoware API (POST/PUT /api/v1/advonet/Beteiligte/{id}/Bankverbindungen) + """ + logger.debug(f"Mapping EspoCRM → Advoware Bankverbindung: {espo_entity.get('id')}") + + advo_data = {} + + # Bankname + bank = espo_entity.get('bank') + if bank: + advo_data['bank'] = bank + + # Kontonummer (deprecated, aber noch supported) + kto_nr = espo_entity.get('kontoNummer') + if kto_nr: + advo_data['ktoNr'] = kto_nr + + # BLZ (deprecated, aber noch supported) + blz = espo_entity.get('blz') + if blz: + advo_data['blz'] = blz + + # IBAN + iban = espo_entity.get('iban') + if iban: + advo_data['iban'] = iban + + # BIC + bic = espo_entity.get('bic') + if bic: + advo_data['bic'] = bic + + # Kontoinhaber + kontoinhaber = espo_entity.get('kontoinhaber') + if kontoinhaber: + advo_data['kontoinhaber'] = kontoinhaber + + # SEPA Mandat + mandatsreferenz = espo_entity.get('mandatsreferenz') + if mandatsreferenz: + advo_data['mandatsreferenz'] = mandatsreferenz + + mandat_vom = espo_entity.get('mandatVom') + if mandat_vom: + advo_data['mandatVom'] = mandat_vom + + logger.debug(f"Mapped to Advoware: IBAN={advo_data.get('iban')}, Bank={advo_data.get('bank')}") + + return advo_data + + @staticmethod + def map_advoware_to_cbankverbindungen(advo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert Advoware Bankverbindung → EspoCRM CBankverbindungen Format + + Args: + advo_entity: Bankverbindung von Advoware API + + Returns: + Dict für EspoCRM API (POST/PUT /api/v1/CBankverbindungen) + """ + logger.debug(f"Mapping Advoware → EspoCRM: id={advo_entity.get('id')}") + + espo_data = { + 'advowareId': advo_entity.get('id'), # Link zu Advoware + 'advowareRowId': advo_entity.get('rowId'), # Änderungserkennung + } + + # Bankname + bank = advo_entity.get('bank') + if bank: + espo_data['bank'] = bank + + # Kontonummer + kto_nr = advo_entity.get('ktoNr') + if kto_nr: + espo_data['kontoNummer'] = kto_nr + + # BLZ + blz = advo_entity.get('blz') + if blz: + espo_data['blz'] = blz + + # IBAN + iban = advo_entity.get('iban') + if iban: + espo_data['iban'] = iban + + # BIC + bic = advo_entity.get('bic') + if bic: + espo_data['bic'] = bic + + # Kontoinhaber + kontoinhaber = advo_entity.get('kontoinhaber') + if kontoinhaber: + espo_data['kontoinhaber'] = kontoinhaber + + # SEPA Mandat + mandatsreferenz = advo_entity.get('mandatsreferenz') + if mandatsreferenz: + espo_data['mandatsreferenz'] = mandatsreferenz + + mandat_vom = advo_entity.get('mandatVom') + if mandat_vom: + # Konvertiere DateTime zu Date (EspoCRM Format: YYYY-MM-DD) + espo_data['mandatVom'] = mandat_vom.split('T')[0] if 'T' in mandat_vom else mandat_vom + + logger.debug(f"Mapped to EspoCRM: IBAN={espo_data.get('iban')}") + + # Entferne None-Werte (EspoCRM Validierung) + espo_data = {k: v for k, v in espo_data.items() if v is not None} + + return espo_data + + @staticmethod + def get_changed_fields(espo_entity: Dict[str, Any], advo_entity: Dict[str, Any]) -> List[str]: + """ + Vergleicht zwei Entities und gibt Liste der geänderten Felder zurück + + Args: + espo_entity: EspoCRM CBankverbindungen + advo_entity: Advoware Bankverbindung + + Returns: + Liste von Feldnamen die unterschiedlich sind + """ + mapped_advo = BankverbindungenMapper.map_advoware_to_cbankverbindungen(advo_entity) + + changed = [] + + compare_fields = [ + 'bank', 'iban', 'bic', 'kontoNummer', 'blz', + 'kontoinhaber', 'mandatsreferenz', 'mandatVom', + 'advowareId', 'advowareRowId' + ] + + for field in compare_fields: + espo_val = espo_entity.get(field) + advo_val = mapped_advo.get(field) + + # Normalisiere None und leere Strings + espo_val = espo_val if espo_val else None + advo_val = advo_val if advo_val else None + + if espo_val != advo_val: + changed.append(field) + logger.debug(f"Field '{field}' changed: EspoCRM='{espo_val}' vs Advoware='{advo_val}'") + + return changed diff --git a/services/beteiligte_sync_utils.py b/services/beteiligte_sync_utils.py new file mode 100644 index 0000000..6b69ae9 --- /dev/null +++ b/services/beteiligte_sync_utils.py @@ -0,0 +1,669 @@ +""" +Beteiligte Sync Utilities + +Hilfsfunktionen für Sync-Operationen: +- Distributed locking via Redis + syncStatus +- Timestamp-Vergleich mit rowId-basierter Änderungserkennung +- Konfliktauflösung (EspoCRM wins) +- EspoCRM In-App Notifications +- Soft-Delete Handling +- Retry mit Exponential Backoff +""" + +from typing import Dict, Any, Optional, Tuple, Literal +from datetime import datetime, timedelta +import pytz +import logging +import redis +import os + +logger = logging.getLogger(__name__) + +# Timestamp-Vergleich Ergebnis-Typen +TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"] + +# Max retry before permanent failure +MAX_SYNC_RETRIES = 5 +# Lock TTL in seconds (prevents deadlocks) +LOCK_TTL_SECONDS = 900 # 15 minutes +# Retry backoff: Wartezeit zwischen Retries (in Minuten) +RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h +# Auto-Reset nach 24h (für permanently_failed entities) +AUTO_RESET_HOURS = 24 + + +class BeteiligteSync: + """Utility-Klasse für Beteiligte-Synchronisation""" + + def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None): + self.espocrm = espocrm_api + self.context = context + self.logger = context.logger if context else logger + self.redis = redis_client or self._init_redis() + + # Import NotificationManager only when needed + from services.notification_utils import NotificationManager + self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context) + + def _init_redis(self) -> redis.Redis: + """Initialize Redis client for distributed locking""" + try: + redis_host = os.getenv('REDIS_HOST', 'localhost') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + + client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + client.ping() + return client + except Exception as e: + self._log(f"Redis connection failed: {e}", level='error') + return None + + def _log(self, message: str, level: str = 'info'): + """Logging mit Context-Support""" + if self.context and hasattr(self.context, 'logger'): + getattr(self.context.logger, level)(message) + else: + getattr(logger, level)(message) + + async def acquire_sync_lock(self, entity_id: str) -> bool: + """ + Atomic distributed lock via Redis + syncStatus update + + Args: + entity_id: EspoCRM CBeteiligte ID + + Returns: + True wenn Lock erfolgreich, False wenn bereits im Sync + """ + try: + # STEP 1: Atomic Redis lock (prevents race conditions) + if self.redis: + lock_key = f"sync_lock:cbeteiligte:{entity_id}" + acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) + + if not acquired: + self._log(f"Redis lock bereits aktiv für {entity_id}", level='warn') + return False + + # STEP 2: Update syncStatus (für UI visibility) + await self.espocrm.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'syncing' + }) + + self._log(f"Sync-Lock für {entity_id} erworben") + return True + + except Exception as e: + self._log(f"Fehler beim Acquire Lock: {e}", level='error') + # Clean up Redis lock on error + if self.redis: + try: + lock_key = f"sync_lock:cbeteiligte:{entity_id}" + self.redis.delete(lock_key) + except: + pass + return False + + async def release_sync_lock( + self, + entity_id: str, + new_status: str = 'clean', + error_message: Optional[str] = None, + increment_retry: bool = False, + extra_fields: Optional[Dict[str, Any]] = None + ) -> None: + """ + Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields) + + Args: + entity_id: EspoCRM CBeteiligte ID + new_status: Neuer syncStatus (clean, failed, conflict, etc.) + error_message: Optional: Fehlermeldung für syncErrorMessage + increment_retry: Ob syncRetryCount erhöht werden soll + extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr) + """ + try: + # EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!) + now_utc = datetime.now(pytz.UTC) + espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') + + update_data = { + 'syncStatus': new_status, + 'advowareLastSync': espo_datetime + } + + if error_message: + update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars + else: + update_data['syncErrorMessage'] = None + + # Handle retry count + if increment_retry: + # Hole aktuellen Retry-Count + entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + current_retry = entity.get('syncRetryCount') or 0 + new_retry = current_retry + 1 + update_data['syncRetryCount'] = new_retry + + # Exponential backoff - berechne nächsten Retry-Zeitpunkt + if new_retry <= len(RETRY_BACKOFF_MINUTES): + backoff_minutes = RETRY_BACKOFF_MINUTES[new_retry - 1] + else: + backoff_minutes = RETRY_BACKOFF_MINUTES[-1] # Letzte Backoff-Zeit + + next_retry = now_utc + timedelta(minutes=backoff_minutes) + update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S') + + self._log(f"Retry {new_retry}/{MAX_SYNC_RETRIES}, nächster Versuch in {backoff_minutes} Minuten") + + # Check max retries - mark as permanently failed + if new_retry >= MAX_SYNC_RETRIES: + update_data['syncStatus'] = 'permanently_failed' + + # Auto-Reset Timestamp für Wiederherstellung nach 24h + auto_reset_time = now_utc + timedelta(hours=AUTO_RESET_HOURS) + update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S') + + await self.send_notification( + entity_id, + 'error', + extra_data={ + 'message': f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Auto-Reset in {AUTO_RESET_HOURS}h." + } + ) + self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}, Auto-Reset um {auto_reset_time}", level='error') + else: + update_data['syncRetryCount'] = 0 + update_data['syncNextRetry'] = None + + # Merge extra fields (e.g., betnr from create operation) + if extra_fields: + update_data.update(extra_fields) + + await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) + + self._log(f"Sync-Lock released: {entity_id} → {new_status}") + + # Release Redis lock + if self.redis: + lock_key = f"sync_lock:cbeteiligte:{entity_id}" + self.redis.delete(lock_key) + + except Exception as e: + self._log(f"Fehler beim Release Lock: {e}", level='error') + # Ensure Redis lock is released even on error + if self.redis: + try: + lock_key = f"sync_lock:cbeteiligte:{entity_id}" + self.redis.delete(lock_key) + except: + pass + + @staticmethod + def parse_timestamp(ts: Any) -> Optional[datetime]: + """ + Parse verschiedene Timestamp-Formate zu datetime + + Args: + ts: String, datetime oder None + + Returns: + datetime-Objekt oder None + """ + if not ts: + return None + + if isinstance(ts, datetime): + return ts + + if isinstance(ts, str): + # EspoCRM Format: "2026-02-07 14:30:00" + # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" + try: + # Entferne trailing Z falls vorhanden + ts = ts.rstrip('Z') + + # Versuche verschiedene Formate + for fmt in [ + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%d', + ]: + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + + # Fallback: ISO-Format + return datetime.fromisoformat(ts) + + except Exception as e: + logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}") + return None + + return None + + def compare_entities( + self, + espo_entity: Dict[str, Any], + advo_entity: Dict[str, Any] + ) -> TimestampResult: + """ + Vergleicht Änderungen zwischen EspoCRM und Advoware + + PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!) + FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) + + Args: + espo_entity: EspoCRM CBeteiligte + advo_entity: Advoware Beteiligte + + Returns: + "espocrm_newer": EspoCRM wurde geändert + "advoware_newer": Advoware wurde geändert + "conflict": Beide wurden geändert + "no_change": Keine Änderungen + """ + # PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!) + espo_rowid = espo_entity.get('advowareRowId') + advo_rowid = advo_entity.get('rowId') + last_sync = espo_entity.get('advowareLastSync') + espo_modified = espo_entity.get('modifiedAt') + + # Parse timestamps für Initial Sync Check + espo_ts = self.parse_timestamp(espo_modified) + advo_ts = self.parse_timestamp(advo_entity.get('geaendertAm')) + + # SPECIAL CASE: Kein lastSync → Initial Sync + if not last_sync: + self._log(f"Initial Sync (kein lastSync) → Vergleiche Timestamps") + + # Wenn beide Timestamps vorhanden, vergleiche sie + if espo_ts and advo_ts: + if espo_ts > advo_ts: + self._log(f"Initial Sync: EspoCRM neuer ({espo_ts} > {advo_ts})") + return 'espocrm_newer' + elif advo_ts > espo_ts: + self._log(f"Initial Sync: Advoware neuer ({advo_ts} > {espo_ts})") + return 'advoware_newer' + else: + self._log(f"Initial Sync: Beide gleich alt") + return 'no_change' + + # Fallback: Wenn nur einer Timestamp hat, bevorzuge den + if espo_ts and not advo_ts: + return 'espocrm_newer' + if advo_ts and not espo_ts: + return 'advoware_newer' + + # Wenn keine Timestamps verfügbar: EspoCRM bevorzugen (default) + self._log(f"Initial Sync: Keine Timestamps verfügbar → EspoCRM bevorzugt") + return 'espocrm_newer' + + if espo_rowid and advo_rowid: + # Prüfe ob Advoware geändert wurde (rowId) + advo_changed = (espo_rowid != advo_rowid) + + # Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync) + espo_changed = False + if espo_modified: + try: + sync_ts = self.parse_timestamp(last_sync) + if espo_ts and sync_ts: + espo_changed = (espo_ts > sync_ts) + except Exception as e: + self._log(f"Timestamp-Parse-Fehler: {e}", level='debug') + + # Konfliktlogik: Beide geändert seit letztem Sync? + if advo_changed and espo_changed: + self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync") + return 'conflict' + elif advo_changed: + self._log(f"Advoware rowId geändert: {espo_rowid[:20] if espo_rowid else 'None'}... → {advo_rowid[:20] if advo_rowid else 'None'}...") + return 'advoware_newer' + elif espo_changed: + self._log(f"EspoCRM neuer (modifiedAt > lastSync)") + return 'espocrm_newer' + else: + # Weder Advoware noch EspoCRM geändert + self._log("Keine Änderungen (rowId identisch)") + return 'no_change' + + # FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) + self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug') + return self.compare_timestamps( + espo_entity.get('modifiedAt'), + advo_entity.get('geaendertAm'), + espo_entity.get('advowareLastSync') + ) + + def compare_timestamps( + self, + espo_modified_at: Any, + advo_geaendert_am: Any, + last_sync_ts: Any + ) -> TimestampResult: + """ + Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar) + + Args: + espo_modified_at: EspoCRM modifiedAt + advo_geaendert_am: Advoware geaendertAm + last_sync_ts: Letzter Sync (advowareLastSync) + + Returns: + "espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer + "advoware_newer": Advoware wurde nach last_sync geändert und ist neuer + "conflict": Beide wurden nach last_sync geändert + "no_change": Keine Änderungen seit last_sync + """ + espo_ts = self.parse_timestamp(espo_modified_at) + advo_ts = self.parse_timestamp(advo_geaendert_am) + sync_ts = self.parse_timestamp(last_sync_ts) + + # Logging + self._log( + f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}", + level='debug' + ) + + # Falls kein last_sync → erster Sync, vergleiche direkt + if not sync_ts: + if not espo_ts or not advo_ts: + return "no_change" + + if espo_ts > advo_ts: + return "espocrm_newer" + elif advo_ts > espo_ts: + return "advoware_newer" + else: + return "no_change" + + # Check ob seit last_sync Änderungen + espo_changed = espo_ts and espo_ts > sync_ts + advo_changed = advo_ts and advo_ts > sync_ts + + if espo_changed and advo_changed: + # Beide geändert seit last_sync → Konflikt + return "conflict" + elif espo_changed: + # Nur EspoCRM geändert + return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict" + elif advo_changed: + # Nur Advoware geändert + return "advoware_newer" + else: + # Keine Änderungen + return "no_change" + + def merge_for_advoware_put( + self, + advo_entity: Dict[str, Any], + espo_entity: Dict[str, Any], + mapper + ) -> Dict[str, Any]: + """ + Merged EspoCRM updates mit Advoware entity für PUT operation + + Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern). + Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende + Advoware-Objekt. + + Args: + advo_entity: Aktuelles Advoware entity (vollständiges Objekt) + espo_entity: EspoCRM entity mit Updates + mapper: BeteiligteMapper instance + + Returns: + Merged dict für Advoware PUT + """ + # Map EspoCRM → Advoware (nur Stammdaten) + advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity) + + # Merge: Advoware entity als Base, überschreibe mit EspoCRM updates + merged = {**advo_entity, **advo_updates} + + # Logging + self._log( + f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder", + level='info' + ) + self._log( + f" Gesynct: {', '.join(advo_updates.keys())}", + level='debug' + ) + + return merged + + async def send_notification( + self, + entity_id: str, + notification_type: Literal["conflict", "deleted", "error"], + extra_data: Optional[Dict[str, Any]] = None + ) -> None: + """ + Sendet EspoCRM Notification via NotificationManager + + Args: + entity_id: CBeteiligte Entity ID + notification_type: "conflict", "deleted" oder "error" + extra_data: Zusätzliche Daten für Nachricht + """ + try: + # Hole Entity-Daten + entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + name = entity.get('name', 'Unbekannt') + betnr = entity.get('betnr') + + # Map notification_type zu action_type + if notification_type == "conflict": + action_type = 'sync_conflict' + details = { + 'message': f"Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr})", + 'description': ( + f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen.\n\n" + f"Bitte prüfen Sie die Details und stellen Sie sicher, dass die Daten korrekt sind." + ), + 'entity_name': name, + 'betnr': betnr, + 'priority': 'Normal' + } + elif notification_type == "deleted": + deleted_at = entity.get('advowareDeletedAt', 'unbekannt') + action_type = 'entity_deleted_in_source' + details = { + 'message': f"Beteiligter '{name}' wurde in Advoware gelöscht", + 'description': ( + f"Der Beteiligte '{name}' (betNr: {betnr}) wurde am {deleted_at} " + f"in Advoware gelöscht.\n\n" + f"Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. " + f"Bitte prüfen Sie, ob dies beabsichtigt war." + ), + 'entity_name': name, + 'betnr': betnr, + 'deleted_at': deleted_at, + 'priority': 'High' + } + else: # error + action_type = 'general_manual_action' + details = { + 'message': f"Benachrichtigung für Beteiligten '{name}'", + 'entity_name': name, + 'betnr': betnr + } + + # Merge extra_data if provided + if extra_data: + details.update(extra_data) + + # Sende via NotificationManager + await self.notification_manager.notify_manual_action_required( + entity_type='CBeteiligte', + entity_id=entity_id, + action_type=action_type, + details=details, + create_task=True + ) + + self._log(f"Notification via NotificationManager gesendet: {notification_type} für {entity_id}") + + except Exception as e: + self._log(f"Fehler beim Senden der Notification: {e}", level='error') + + async def handle_advoware_deleted( + self, + entity_id: str, + error_details: str + ) -> None: + """ + Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404) + + Args: + entity_id: CBeteiligte Entity ID + error_details: Fehlerdetails von Advoware API + """ + try: + now = datetime.now(pytz.UTC).isoformat() + + # Update Entity: Soft-Delete Flag + await self.espocrm.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'deleted_in_advoware', + 'advowareDeletedAt': now, + 'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}" + }) + + self._log(f"Entity {entity_id} als deleted_in_advoware markiert") + + # Sende Notification + await self.send_notification(entity_id, 'deleted') + + except Exception as e: + self._log(f"Fehler beim Handle Deleted: {e}", level='error') + + async def validate_sync_result( + self, + entity_id: str, + betnr: int, + mapper, + direction: str = 'to_advoware' + ) -> Tuple[bool, Optional[str]]: + """ + Validiert Sync-Ergebnis durch Round-Trip Verification + + Args: + entity_id: EspoCRM CBeteiligte ID + betnr: Advoware betNr + mapper: BeteiligteMapper instance + direction: 'to_advoware' oder 'to_espocrm' + + Returns: + (success: bool, error_message: Optional[str]) + """ + try: + self._log(f"🔍 Validiere Sync-Ergebnis (direction={direction})...", level='debug') + + # Lade beide Entities erneut + espo_entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + + from services.advoware import AdvowareAPI + advoware_api = AdvowareAPI(self.context) + advo_result = await advoware_api.api_call(f'api/v1/advonet/Beteiligte/{betnr}', method='GET') + + if isinstance(advo_result, list): + advo_entity = advo_result[0] if advo_result else None + else: + advo_entity = advo_result + + if not advo_entity: + return False, f"Advoware Entity {betnr} nicht gefunden nach Sync" + + # Validiere Stammdaten + critical_fields = ['name', 'rechtsform'] + differences = [] + + if direction == 'to_advoware': + # EspoCRM → Advoware: Prüfe ob Advoware die EspoCRM-Werte hat + advo_mapped = mapper.map_cbeteiligte_to_advoware(espo_entity) + + for field in critical_fields: + espo_val = advo_mapped.get(field) + advo_val = advo_entity.get(field) + + if espo_val != advo_val: + differences.append(f"{field}: expected '{espo_val}', got '{advo_val}'") + + elif direction == 'to_espocrm': + # Advoware → EspoCRM: Prüfe ob EspoCRM die Advoware-Werte hat + espo_mapped = mapper.map_advoware_to_cbeteiligte(advo_entity) + + for field in critical_fields: + advo_val = espo_mapped.get(field) + espo_val = espo_entity.get(field) + + if advo_val != espo_val: + differences.append(f"{field}: expected '{advo_val}', got '{espo_val}'") + + if differences: + error_msg = f"Validation failed: {', '.join(differences)}" + self._log(f"❌ {error_msg}", level='error') + return False, error_msg + + self._log(f"✅ Validation erfolgreich", level='debug') + return True, None + + except Exception as e: + self._log(f"⚠️ Validation error: {e}", level='error') + return False, f"Validation exception: {str(e)}" + + async def resolve_conflict_espocrm_wins( + self, + entity_id: str, + espo_entity: Dict[str, Any], + advo_entity: Dict[str, Any], + conflict_details: str, + extra_fields: Optional[Dict[str, Any]] = None + ) -> None: + """ + Löst Konflikt auf: EspoCRM wins (überschreibt Advoware) + + Args: + entity_id: CBeteiligte Entity ID + espo_entity: EspoCRM Entity-Daten + advo_entity: Advoware Entity-Daten + conflict_details: Details zum Konflikt + extra_fields: Zusätzliche Felder (z.B. advowareRowId) + """ + try: + # EspoCRM datetime format + now_utc = datetime.now(pytz.UTC) + espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') + + # Markiere als gelöst mit Konflikt-Info + update_data = { + 'syncStatus': 'clean', # Gelöst! + 'advowareLastSync': espo_datetime, + 'syncErrorMessage': f'Konflikt: {conflict_details}', + 'syncRetryCount': 0 + } + + # Merge extra fields (z.B. advowareRowId) + if extra_fields: + update_data.update(extra_fields) + + await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) + + self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins") + + # Sende Notification + await self.send_notification(entity_id, 'conflict', { + 'details': conflict_details + }) + + except Exception as e: + self._log(f"Fehler beim Resolve Conflict: {e}", level='error') diff --git a/services/espocrm_mapper.py b/services/espocrm_mapper.py new file mode 100644 index 0000000..0272f73 --- /dev/null +++ b/services/espocrm_mapper.py @@ -0,0 +1,193 @@ +""" +EspoCRM ↔ Advoware Entity Mapper + +Transformiert Beteiligte zwischen den beiden Systemen basierend auf ENTITY_MAPPING_CBeteiligte_Advoware.md +""" + +from typing import Dict, Any, Optional, List +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class BeteiligteMapper: + """Mapper für CBeteiligte (EspoCRM) ↔ Beteiligte (Advoware)""" + + @staticmethod + def map_cbeteiligte_to_advoware(espo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert EspoCRM CBeteiligte → Advoware Beteiligte Format (STAMMDATEN) + + WICHTIG: Kontaktdaten (Telefon, Email, Fax, Bankverbindungen) werden über + separate Advoware-Endpoints gesynct und sind NICHT Teil dieser Mapping-Funktion. + + Args: + espo_entity: CBeteiligte Entity von EspoCRM + + Returns: + Dict mit Stammdaten für Advoware API (POST/PUT /api/v1/advonet/Beteiligte) + """ + logger.debug(f"Mapping EspoCRM → Advoware STAMMDATEN: {espo_entity.get('id')}") + + # Bestimme ob Person oder Firma (über firmenname-Feld) + firmenname = espo_entity.get('firmenname') + is_firma = bool(firmenname and firmenname.strip()) + + # Basis-Struktur (nur die funktionierenden Felder!) + advo_data = { + 'rechtsform': espo_entity.get('rechtsform', ''), + } + + # NAME: Person vs. Firma + if is_firma: + # Firma: Lese von firmenname-Feld + advo_data['name'] = firmenname + advo_data['vorname'] = None + else: + # Natürliche Person: Lese von lastName/firstName + advo_data['name'] = espo_entity.get('lastName', '') + advo_data['vorname'] = espo_entity.get('firstName', '') + + # ANREDE & TITEL (funktionierende Felder) + salutation = espo_entity.get('salutationName') + if salutation: + advo_data['anrede'] = salutation + + titel = espo_entity.get('titel') + if titel: + advo_data['titel'] = titel + + # BRIEFANREDE (bAnrede) + brief_anrede = espo_entity.get('briefAnrede') + if brief_anrede: + advo_data['bAnrede'] = brief_anrede + + # ZUSATZ + zusatz = espo_entity.get('zusatz') + if zusatz: + advo_data['zusatz'] = zusatz + + # GEBURTSDATUM + date_of_birth = espo_entity.get('dateOfBirth') + if date_of_birth: + advo_data['geburtsdatum'] = date_of_birth + + # HINWEIS: handelsRegisterNummer und registergericht funktionieren NICHT! + # Advoware ignoriert diese Felder im PUT (trotz Swagger Schema) + + logger.debug(f"Mapped to Advoware STAMMDATEN: name={advo_data.get('name')}, vorname={advo_data.get('vorname')}, rechtsform={advo_data.get('rechtsform')}") + + return advo_data + + @staticmethod + def map_advoware_to_cbeteiligte(advo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert Advoware Beteiligte → EspoCRM CBeteiligte Format + + Args: + advo_entity: Beteiligter von Advoware API + + Returns: + Dict für EspoCRM API (POST/PUT /api/v1/CBeteiligte) + """ + logger.debug(f"Mapping Advoware → EspoCRM: betNr={advo_entity.get('betNr')}") + + # Bestimme ob Person oder Firma + vorname = advo_entity.get('vorname') + is_person = bool(vorname) + + # Basis-Struktur + espo_data = { + 'rechtsform': advo_entity.get('rechtsform', ''), + 'betnr': advo_entity.get('betNr'), # Link zu Advoware + 'advowareRowId': advo_entity.get('rowId'), # Änderungserkennung + } + + # NAME: Person vs. Firma (EspoCRM blendet lastName/firstName aus bei Firmen) + if is_person: + # Natürliche Person → lastName/firstName verwenden + espo_data['firstName'] = vorname + espo_data['lastName'] = advo_entity.get('name', '') + espo_data['name'] = f"{vorname} {advo_entity.get('name', '')}".strip() + espo_data['firmenname'] = None # Firma-Feld leer lassen + else: + # Firma → firmenname verwenden (EspoCRM zeigt dann nur dieses Feld) + firma_name = advo_entity.get('name', '') + espo_data['firmenname'] = firma_name + espo_data['name'] = firma_name + # lastName/firstName nicht setzen (EspoCRM blendet sie aus bei Firmen) + espo_data['firstName'] = None + espo_data['lastName'] = None + + # ANREDE & TITEL + anrede = advo_entity.get('anrede') + if anrede: + espo_data['salutationName'] = anrede + + titel = advo_entity.get('titel') + if titel: + espo_data['titel'] = titel + + # BRIEFANREDE + b_anrede = advo_entity.get('bAnrede') + if b_anrede: + espo_data['briefAnrede'] = b_anrede + + # ZUSATZ + zusatz = advo_entity.get('zusatz') + if zusatz: + espo_data['zusatz'] = zusatz + + # GEBURTSDATUM (nur Datum-Teil ohne Zeit) + geburtsdatum = advo_entity.get('geburtsdatum') + if geburtsdatum: + # Advoware gibt '2001-01-05T00:00:00', EspoCRM will nur '2001-01-05' + espo_data['dateOfBirth'] = geburtsdatum.split('T')[0] if 'T' in geburtsdatum else geburtsdatum + + logger.debug(f"Mapped to EspoCRM STAMMDATEN: name={espo_data.get('name')}") + + # WICHTIG: Entferne None-Werte (EspoCRM mag keine expliziten None bei required fields) + espo_data = {k: v for k, v in espo_data.items() if v is not None} + + return espo_data + + @staticmethod + def get_changed_fields(espo_entity: Dict[str, Any], advo_entity: Dict[str, Any]) -> List[str]: + """ + Vergleicht zwei Entities und gibt Liste der geänderten Felder zurück + + Args: + espo_entity: EspoCRM CBeteiligte + advo_entity: Advoware Beteiligte + + Returns: + Liste von Feldnamen die unterschiedlich sind + """ + # Mappe Advoware zu EspoCRM Format für Vergleich + mapped_advo = BeteiligteMapper.map_advoware_to_cbeteiligte(advo_entity) + + changed = [] + + # Vergleiche wichtige Felder + compare_fields = [ + 'name', 'firstName', 'lastName', 'firmenname', + 'emailAddress', 'phoneNumber', + 'dateOfBirth', 'rechtsform', + 'handelsregisterNummer', 'handelsregisterArt', 'registergericht', + 'betnr', 'advowareRowId' + ] + + for field in compare_fields: + espo_val = espo_entity.get(field) + advo_val = mapped_advo.get(field) + + # Normalisiere None und leere Strings + espo_val = espo_val if espo_val else None + advo_val = advo_val if advo_val else None + + if espo_val != advo_val: + changed.append(field) + logger.debug(f"Field '{field}' changed: EspoCRM='{espo_val}' vs Advoware='{advo_val}'") + + return changed diff --git a/services/notification_utils.py b/services/notification_utils.py new file mode 100644 index 0000000..defd935 --- /dev/null +++ b/services/notification_utils.py @@ -0,0 +1,438 @@ +""" +Zentrale Notification-Utilities für manuelle Eingriffe +======================================================= + +Wenn Advoware-API-Limitierungen existieren (z.B. READ-ONLY Felder), +werden Notifications in EspoCRM erstellt, damit User manuelle Eingriffe +vornehmen können. + +Features: +- Notifications an assigned Users +- Task-Erstellung für manuelle Eingriffe +- Zentrale Verwaltung aller Notification-Types +""" + +from typing import Dict, Any, Optional, Literal, List +from datetime import datetime, timedelta +import logging + + +class NotificationManager: + """ + Zentrale Klasse für Notifications bei Sync-Problemen + """ + + def __init__(self, espocrm_api, context=None): + """ + Args: + espocrm_api: EspoCRMAPI instance + context: Optional context für Logging + """ + self.espocrm = espocrm_api + self.context = context + self.logger = context.logger if context else logging.getLogger(__name__) + + async def notify_manual_action_required( + self, + entity_type: str, + entity_id: str, + action_type: Literal[ + "address_delete_required", + "address_reactivate_required", + "address_field_update_required", + "readonly_field_conflict", + "missing_in_advoware", + "sync_conflict", + "entity_deleted_in_source", + "general_manual_action" + ], + details: Dict[str, Any], + assigned_user_id: Optional[str] = None, + create_task: bool = True + ) -> Dict[str, str]: + """ + Erstellt Notification und optional Task für manuelle Eingriffe + + Args: + entity_type: EspoCRM Entity Type (z.B. 'CAdressen', 'CBeteiligte') + entity_id: Entity ID in EspoCRM + action_type: Art der manuellen Aktion + details: Detaillierte Informationen + assigned_user_id: User der benachrichtigt werden soll (optional) + create_task: Ob zusätzlich ein Task erstellt werden soll + + Returns: + Dict mit notification_id und optional task_id + """ + try: + # Hole Entity-Daten + entity = await self.espocrm.get_entity(entity_type, entity_id) + entity_name = entity.get('name', f"{entity_type} {entity_id}") + + # Falls kein assigned_user, versuche aus Entity zu holen + if not assigned_user_id: + assigned_user_id = entity.get('assignedUserId') + + # Erstelle Notification + notification_data = self._build_notification_message( + action_type, entity_type, entity_name, details + ) + + notification_id = await self._create_notification( + user_id=assigned_user_id, + message=notification_data['message'], + entity_type=entity_type, + entity_id=entity_id + ) + + result = {'notification_id': notification_id} + + # Optional: Task erstellen + if create_task: + task_id = await self._create_task( + name=notification_data['task_name'], + description=notification_data['task_description'], + parent_type=entity_type, + parent_id=entity_id, + assigned_user_id=assigned_user_id, + priority=notification_data['priority'] + ) + result['task_id'] = task_id + + self.logger.info( + f"Manual action notification created: {action_type} for " + f"{entity_type}/{entity_id}" + ) + + return result + + except Exception as e: + self.logger.error(f"Failed to create notification: {e}") + raise + + def _build_notification_message( + self, + action_type: str, + entity_type: str, + entity_name: str, + details: Dict[str, Any] + ) -> Dict[str, str]: + """ + Erstellt Notification-Message basierend auf Action-Type + + Returns: + Dict mit 'message', 'task_name', 'task_description', 'priority' + """ + + if action_type == "address_delete_required": + return { + 'message': ( + f"🗑️ Adresse in Advoware löschen erforderlich\n" + f"Adresse: {entity_name}\n" + f"Grund: Advoware API unterstützt kein DELETE und gueltigBis ist READ-ONLY\n" + f"Bitte manuell in Advoware löschen oder deaktivieren." + ), + 'task_name': f"Adresse in Advoware löschen: {entity_name}", + 'task_description': ( + f"MANUELLE AKTION ERFORDERLICH\n\n" + f"Adresse: {entity_name}\n" + f"BetNr: {details.get('betnr', 'N/A')}\n" + f"Adresse: {details.get('strasse', '')}, {details.get('plz', '')} {details.get('ort', '')}\n\n" + f"GRUND:\n" + f"- DELETE API nicht verfügbar (403 Forbidden)\n" + f"- gueltigBis ist READ-ONLY (kann nicht nachträglich gesetzt werden)\n\n" + f"AKTION:\n" + f"1. In Advoware Web-Interface einloggen\n" + f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n" + f"3. Adresse suchen: {details.get('strasse', '')}\n" + f"4. Adresse löschen oder deaktivieren\n\n" + f"Nach Erledigung: Task als 'Completed' markieren." + ), + 'priority': 'Normal' + } + + elif action_type == "address_reactivate_required": + return { + 'message': ( + f"♻️ Adresse-Reaktivierung in Advoware erforderlich\n" + f"Adresse: {entity_name}\n" + f"Grund: gueltigBis kann nicht nachträglich geändert werden\n" + f"Bitte neue Adresse in Advoware erstellen." + ), + 'task_name': f"Neue Adresse in Advoware erstellen: {entity_name}", + 'task_description': ( + f"MANUELLE AKTION ERFORDERLICH\n\n" + f"Adresse: {entity_name}\n" + f"BetNr: {details.get('betnr', 'N/A')}\n\n" + f"GRUND:\n" + f"Diese Adresse wurde reaktiviert, aber die alte Adresse in Advoware " + f"ist abgelaufen (gueltigBis in Vergangenheit). Da gueltigBis READ-ONLY ist, " + f"muss eine neue Adresse erstellt werden.\n\n" + f"AKTION:\n" + f"1. In Advoware Web-Interface einloggen\n" + f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n" + f"3. Neue Adresse erstellen:\n" + f" - Straße: {details.get('strasse', '')}\n" + f" - PLZ: {details.get('plz', '')}\n" + f" - Ort: {details.get('ort', '')}\n" + f" - Land: {details.get('land', '')}\n" + f" - Bemerkung: EspoCRM-ID: {details.get('espocrm_id', '')}\n" + f"4. Sync erneut durchführen, damit Mapping aktualisiert wird\n\n" + f"Nach Erledigung: Task als 'Completed' markieren." + ), + 'priority': 'Normal' + } + + elif action_type == "address_field_update_required": + readonly_fields = details.get('readonly_fields', []) + return { + 'message': ( + f"⚠️ Adressfelder in Advoware können nicht aktualisiert werden\n" + f"Adresse: {entity_name}\n" + f"READ-ONLY Felder: {', '.join(readonly_fields)}\n" + f"Bitte manuell in Advoware ändern." + ), + 'task_name': f"Adressfelder in Advoware aktualisieren: {entity_name}", + 'task_description': ( + f"MANUELLE AKTION ERFORDERLICH\n\n" + f"Adresse: {entity_name}\n" + f"BetNr: {details.get('betnr', 'N/A')}\n\n" + f"GRUND:\n" + f"Folgende Felder sind in Advoware API READ-ONLY und können nicht " + f"via PUT geändert werden:\n" + f"- {', '.join(readonly_fields)}\n\n" + f"GEWÜNSCHTE ÄNDERUNGEN:\n" + + '\n'.join([f" - {k}: {v}" for k, v in details.get('changes', {}).items()]) + + f"\n\nAKTION:\n" + f"1. In Advoware Web-Interface einloggen\n" + f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n" + f"3. Adresse suchen und obige Felder manuell ändern\n" + f"4. Sync erneut durchführen zur Bestätigung\n\n" + f"Nach Erledigung: Task als 'Completed' markieren." + ), + 'priority': 'Low' + } + + elif action_type == "readonly_field_conflict": + return { + 'message': ( + f"⚠️ Sync-Konflikt bei READ-ONLY Feldern\n" + f"{entity_type}: {entity_name}\n" + f"Änderungen konnten nicht synchronisiert werden." + ), + 'task_name': f"Sync-Konflikt prüfen: {entity_name}", + 'task_description': ( + f"SYNC-KONFLIKT\n\n" + f"{entity_type}: {entity_name}\n\n" + f"PROBLEM:\n" + f"Felder wurden in EspoCRM geändert, sind aber in Advoware READ-ONLY.\n\n" + f"BETROFFENE FELDER:\n" + + '\n'.join([f" - {k}: {v}" for k, v in details.get('conflicts', {}).items()]) + + f"\n\nOPTIONEN:\n" + f"1. Änderungen in EspoCRM rückgängig machen (Advoware = Master)\n" + f"2. Änderungen manuell in Advoware vornehmen\n" + f"3. Feld als 'nicht synchronisiert' akzeptieren\n\n" + f"Nach Entscheidung: Task als 'Completed' markieren." + ), + 'priority': 'Normal' + } + + elif action_type == "sync_conflict": + return { + 'message': ( + f"⚠️ Sync-Konflikt\n" + f"{entity_type}: {entity_name}\n" + f"{details.get('message', 'Beide Systeme haben Änderungen')}" + ), + 'task_name': f"Sync-Konflikt: {entity_name}", + 'task_description': details.get('description', 'Keine Details verfügbar'), + 'priority': details.get('priority', 'Normal') + } + + elif action_type == "entity_deleted_in_source": + return { + 'message': ( + f"🗑️ Element in Quellsystem gelöscht\n" + f"{entity_type}: {entity_name}\n" + f"{details.get('message', 'Wurde im Zielsystem gelöscht')}" + ), + 'task_name': f"Gelöscht: {entity_name}", + 'task_description': details.get('description', 'Element wurde gelöscht'), + 'priority': details.get('priority', 'High') + } + + elif action_type == "missing_in_advoware": + return { + 'message': ( + f"❓ Element fehlt in Advoware\n" + f"{entity_type}: {entity_name}\n" + f"Bitte manuell in Advoware erstellen." + ), + 'task_name': f"In Advoware erstellen: {entity_name}", + 'task_description': ( + f"MANUELLE AKTION ERFORDERLICH\n\n" + f"{entity_type}: {entity_name}\n\n" + f"GRUND:\n" + f"Dieses Element existiert in EspoCRM, aber nicht in Advoware.\n" + f"Möglicherweise wurde es direkt in EspoCRM erstellt.\n\n" + f"DATEN:\n" + + '\n'.join([f" - {k}: {v}" for k, v in details.items() if k != 'espocrm_id']) + + f"\n\nAKTION:\n" + f"1. In Advoware Web-Interface einloggen\n" + f"2. Element mit obigen Daten manuell erstellen\n" + f"3. Sync erneut durchführen für Mapping\n\n" + f"Nach Erledigung: Task als 'Completed' markieren." + ), + 'priority': 'Normal' + } + + else: # general_manual_action + return { + 'message': ( + f"🔧 Manuelle Aktion erforderlich\n" + f"{entity_type}: {entity_name}\n" + f"{details.get('message', 'Bitte prüfen.')}" + ), + 'task_name': f"Manuelle Aktion: {entity_name}", + 'task_description': ( + f"MANUELLE AKTION ERFORDERLICH\n\n" + f"{entity_type}: {entity_name}\n\n" + f"{details.get('description', 'Keine Details verfügbar.')}" + ), + 'priority': details.get('priority', 'Normal') + } + + async def _create_notification( + self, + user_id: Optional[str], + message: str, + entity_type: str, + entity_id: str + ) -> str: + """ + Erstellt EspoCRM Notification (In-App) + + Returns: + notification_id + """ + if not user_id: + self.logger.warning("No user assigned - notification not created") + return None + + notification_data = { + 'type': 'Message', + 'message': message, + 'userId': user_id, + 'relatedType': entity_type, + 'relatedId': entity_id, + 'read': False + } + + try: + result = await self.espocrm.create_entity('Notification', notification_data) + return result.get('id') + except Exception as e: + self.logger.error(f"Failed to create notification: {e}") + return None + + async def _create_task( + self, + name: str, + description: str, + parent_type: str, + parent_id: str, + assigned_user_id: Optional[str], + priority: str = 'Normal' + ) -> str: + """ + Erstellt EspoCRM Task + + Returns: + task_id + """ + # Due Date: 7 Tage in Zukunft + due_date = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d') + + task_data = { + 'name': name, + 'description': description, + 'status': 'Not Started', + 'priority': priority, + 'dateEnd': due_date, + 'parentType': parent_type, + 'parentId': parent_id, + 'assignedUserId': assigned_user_id + } + + try: + result = await self.espocrm.create_entity('Task', task_data) + return result.get('id') + except Exception as e: + self.logger.error(f"Failed to create task: {e}") + return None + + async def resolve_task(self, task_id: str) -> bool: + """ + Markiert Task als erledigt + + Args: + task_id: Task ID + + Returns: + True wenn erfolgreich + """ + try: + await self.espocrm.update_entity('Task', task_id, { + 'status': 'Completed' + }) + return True + except Exception as e: + self.logger.error(f"Failed to complete task {task_id}: {e}") + return False + + +# Helper-Funktionen für häufige Use-Cases + +async def notify_address_delete_required( + notification_manager: NotificationManager, + address_entity_id: str, + betnr: str, + address_data: Dict[str, Any] +) -> Dict[str, str]: + """ + Shortcut: Notification für Adresse löschen + """ + return await notification_manager.notify_manual_action_required( + entity_type='CAdressen', + entity_id=address_entity_id, + action_type='address_delete_required', + details={ + 'betnr': betnr, + 'strasse': address_data.get('adresseStreet'), + 'plz': address_data.get('adressePostalCode'), + 'ort': address_data.get('adresseCity'), + 'espocrm_id': address_entity_id + } + ) + + +async def notify_address_readonly_fields( + notification_manager: NotificationManager, + address_entity_id: str, + betnr: str, + readonly_fields: List[str], + changes: Dict[str, Any] +) -> Dict[str, str]: + """ + Shortcut: Notification für READ-ONLY Felder + """ + return await notification_manager.notify_manual_action_required( + entity_type='CAdressen', + entity_id=address_entity_id, + action_type='address_field_update_required', + details={ + 'betnr': betnr, + 'readonly_fields': readonly_fields, + 'changes': changes + } + ) diff --git a/steps/vmh/bankverbindungen_sync_event_step.py b/steps/vmh/bankverbindungen_sync_event_step.py new file mode 100644 index 0000000..4e547bc --- /dev/null +++ b/steps/vmh/bankverbindungen_sync_event_step.py @@ -0,0 +1,264 @@ +""" +VMH Bankverbindungen Sync Handler + +Zentraler Sync-Handler für Bankverbindungen (Webhooks + Cron Events) + +Verarbeitet: +- vmh.bankverbindungen.create: Neu in EspoCRM → Create in Advoware +- vmh.bankverbindungen.update: Geändert in EspoCRM → Notification (nicht unterstützt) +- vmh.bankverbindungen.delete: Gelöscht in EspoCRM → Notification (nicht unterstützt) +- vmh.bankverbindungen.sync_check: Cron-Check → Sync wenn nötig +""" + +from typing import Dict, Any, Optional +from motia import FlowContext +from services.advoware import AdvowareAPI +from services.espocrm import EspoCRMAPI +from services.bankverbindungen_mapper import BankverbindungenMapper +from services.notification_utils import NotificationManager +import json +import redis +import os + +config = { + "name": "VMH Bankverbindungen Sync Handler", + "description": "Zentraler Sync-Handler für Bankverbindungen (Webhooks + Cron Events)", + "flows": ["vmh"], + "triggers": [ + {"type": "queue", "topic": "vmh.bankverbindungen.create"}, + {"type": "queue", "topic": "vmh.bankverbindungen.update"}, + {"type": "queue", "topic": "vmh.bankverbindungen.delete"}, + {"type": "queue", "topic": "vmh.bankverbindungen.sync_check"} + ], + "enqueues": [] +} + + +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): + """Zentraler Sync-Handler für Bankverbindungen""" + + entity_id = event_data.get('entity_id') + action = event_data.get('action', 'sync_check') + source = event_data.get('source', 'unknown') + + if not entity_id: + ctx.logger.error("Keine entity_id im Event gefunden") + return + + ctx.logger.info(f"🔄 Bankverbindungen Sync gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}") + + # Shared Redis client + redis_host = os.getenv('REDIS_HOST', 'localhost') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + + redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + + # APIs initialisieren + espocrm = EspoCRMAPI() + advoware = AdvowareAPI(ctx) + mapper = BankverbindungenMapper() + notification_mgr = NotificationManager(espocrm_api=espocrm, context=ctx) + + try: + # 1. ACQUIRE LOCK + lock_key = f"sync_lock:cbankverbindungen:{entity_id}" + acquired = redis_client.set(lock_key, "locked", nx=True, ex=900) # 15min TTL + + if not acquired: + ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe") + return + + # 2. FETCH ENTITY VON ESPOCRM + try: + espo_entity = await espocrm.get_entity('CBankverbindungen', entity_id) + except Exception as e: + ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}") + redis_client.delete(lock_key) + return + + ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name', 'Unbenannt')} (IBAN: {espo_entity.get('iban', 'N/A')})") + + advoware_id = espo_entity.get('advowareId') + beteiligte_id = espo_entity.get('cBeteiligteId') # Parent Beteiligter + + if not beteiligte_id: + ctx.logger.error(f"❌ Keine cBeteiligteId gefunden - Bankverbindung muss einem Beteiligten zugeordnet sein") + redis_client.delete(lock_key) + return + + # Hole betNr vom Parent + parent = await espocrm.get_entity('CBeteiligte', beteiligte_id) + betnr = parent.get('betnr') + + if not betnr: + ctx.logger.error(f"❌ Parent Beteiligter {beteiligte_id} hat keine betNr") + redis_client.delete(lock_key) + return + + # 3. BESTIMME SYNC-AKTION + + # FALL A: Neu (kein advowareId) → CREATE in Advoware + if not advoware_id and action in ['create', 'sync_check']: + await handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key) + + # FALL B: Existiert (hat advowareId) → UPDATE oder CHECK (nicht unterstützt!) + elif advoware_id and action in ['update', 'sync_check']: + await handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) + + # FALL C: DELETE (nicht unterstützt!) + elif action == 'delete': + await handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) + + else: + ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, advowareId={advoware_id}") + redis_client.delete(lock_key) + + except Exception as e: + ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + try: + redis_client.delete(lock_key) + except: + pass + + +async def handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key): + """Erstellt neue Bankverbindung in Advoware""" + try: + ctx.logger.info(f"🔨 CREATE Bankverbindung in Advoware für Beteiligter {betnr}...") + + advo_data = mapper.map_cbankverbindungen_to_advoware(espo_entity) + + ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...") + + # POST zu Advoware (Beteiligten-spezifischer Endpoint!) + result = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}/Bankverbindungen', + method='POST', + json_data=advo_data + ) + + # Extrahiere ID und rowId + if isinstance(result, list) and len(result) > 0: + new_entity = result[0] + elif isinstance(result, dict): + new_entity = result + else: + raise Exception(f"Unexpected response format: {result}") + + new_id = new_entity.get('id') + new_rowid = new_entity.get('rowId') + + if not new_id: + raise Exception(f"Keine ID in Advoware Response: {result}") + + ctx.logger.info(f"✅ In Advoware erstellt: ID={new_id}, rowId={new_rowid[:20] if new_rowid else 'N/A'}...") + + # Schreibe advowareId + rowId zurück + await espocrm.update_entity('CBankverbindungen', entity_id, { + 'advowareId': new_id, + 'advowareRowId': new_rowid + }) + + redis_client.delete(lock_key) + ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → Advoware ID {new_id}") + + except Exception as e: + ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}") + redis_client.delete(lock_key) + + +async def handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key): + """Update nicht möglich - Sendet Notification an User""" + try: + ctx.logger.warn(f"⚠️ UPDATE: Advoware API unterstützt kein PUT für Bankverbindungen") + + iban = espo_entity.get('iban', 'N/A') + bank = espo_entity.get('bank', 'N/A') + name = espo_entity.get('name', 'Unbenannt') + + # Sende Notification + await notification_mgr.notify_manual_action_required( + entity_type='CBankverbindungen', + entity_id=entity_id, + action_type='general_manual_action', + details={ + 'message': f'UPDATE nicht möglich für Bankverbindung: {name}', + 'description': ( + f"Die Advoware API unterstützt keine Updates für Bankverbindungen.\n\n" + f"**Details:**\n" + f"- Bank: {bank}\n" + f"- IBAN: {iban}\n" + f"- Beteiligter betNr: {betnr}\n" + f"- Advoware ID: {advoware_id}\n\n" + f"**Workaround:**\n" + f"Löschen Sie die Bankverbindung in EspoCRM und erstellen Sie sie neu. " + f"Die neue Bankverbindung wird dann automatisch in Advoware angelegt." + ), + 'entity_name': name, + 'priority': 'Normal' + }, + create_task=True + ) + + ctx.logger.info(f"📧 Notification gesendet: Update-Limitation") + redis_client.delete(lock_key) + + except Exception as e: + ctx.logger.error(f"❌ UPDATE Notification fehlgeschlagen: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + redis_client.delete(lock_key) + + +async def handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key): + """Delete nicht möglich - Sendet Notification an User""" + try: + ctx.logger.warn(f"⚠️ DELETE: Advoware API unterstützt kein DELETE für Bankverbindungen") + + if not advoware_id: + ctx.logger.info(f"ℹ️ Keine advowareId vorhanden, nur EspoCRM-seitiges Delete") + redis_client.delete(lock_key) + return + + iban = espo_entity.get('iban', 'N/A') + bank = espo_entity.get('bank', 'N/A') + name = espo_entity.get('name', 'Unbenannt') + + # Sende Notification + await notification_mgr.notify_manual_action_required( + entity_type='CBankverbindungen', + entity_id=entity_id, + action_type='general_manual_action', + details={ + 'message': f'DELETE erforderlich für Bankverbindung: {name}', + 'description': ( + f"Die Advoware API unterstützt keine Löschungen für Bankverbindungen.\n\n" + f"**Bitte manuell in Advoware löschen:**\n" + f"- Bank: {bank}\n" + f"- IBAN: {iban}\n" + f"- Beteiligter betNr: {betnr}\n" + f"- Advoware ID: {advoware_id}\n\n" + f"Die Bankverbindung wurde in EspoCRM gelöscht, bleibt aber in Advoware " + f"bestehen bis zur manuellen Löschung." + ), + 'entity_name': name, + 'priority': 'Normal' + }, + create_task=True + ) + + ctx.logger.info(f"📧 Notification gesendet: Delete erforderlich") + redis_client.delete(lock_key) + + except Exception as e: + ctx.logger.error(f"❌ DELETE Notification fehlgeschlagen: {e}") + redis_client.delete(lock_key) diff --git a/steps/vmh/beteiligte_sync_event_step.py b/steps/vmh/beteiligte_sync_event_step.py new file mode 100644 index 0000000..ee948b0 --- /dev/null +++ b/steps/vmh/beteiligte_sync_event_step.py @@ -0,0 +1,413 @@ +""" +VMH Beteiligte Sync Handler + +Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events) + +Verarbeitet: +- vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware +- vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware +- vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware (TODO) +- vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig +""" + +from typing import Dict, Any, Optional +from motia import FlowContext +from services.advoware import AdvowareAPI +from services.advoware_service import AdvowareService +from services.espocrm import EspoCRMAPI +from services.espocrm_mapper import BeteiligteMapper +from services.beteiligte_sync_utils import BeteiligteSync +import json +import redis +import os + +config = { + "name": "VMH Beteiligte Sync Handler", + "description": "Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)", + "flows": ["vmh"], + "triggers": [ + {"type": "queue", "topic": "vmh.beteiligte.create"}, + {"type": "queue", "topic": "vmh.beteiligte.update"}, + {"type": "queue", "topic": "vmh.beteiligte.delete"}, + {"type": "queue", "topic": "vmh.beteiligte.sync_check"} + ], + "enqueues": [] +} + + +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): + """Zentraler Sync-Handler für Beteiligte""" + entity_id = event_data.entity_id + action = event_data.action + source = event_data.source + + if not entity_id: + ctx.logger.error("Keine entity_id im Event gefunden") + return + + ctx.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}") + + # Shared Redis client for distributed locking + redis_host = os.getenv('REDIS_HOST', 'localhost') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + + redis_client = redis.Redis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=True + ) + + # APIs initialisieren + espocrm = EspoCRMAPI() + advoware = AdvowareAPI(ctx) + sync_utils = BeteiligteSync(espocrm, redis_client, ctx) + mapper = BeteiligteMapper() + + # NOTE: Kommunikation Sync Manager wird in zukünftiger Version hinzugefügt + # wenn kommunikation_sync_utils.py migriert ist + # advo_service = AdvowareService(ctx) + # komm_sync = KommunikationSyncManager(advo_service, espocrm, ctx) + + try: + # 1. ACQUIRE LOCK (verhindert parallele Syncs) + lock_acquired = await sync_utils.acquire_sync_lock(entity_id) + + if not lock_acquired: + ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe") + return + + # Lock erfolgreich acquired - MUSS im finally block released werden! + try: + # 2. FETCH ENTITY VON ESPOCRM + try: + espo_entity = await espocrm.get_entity('CBeteiligte', entity_id) + except Exception as e: + ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}") + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) + return + + ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})") + + betnr = espo_entity.get('betnr') + sync_status = espo_entity.get('syncStatus', 'pending_sync') + + # Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht + sync_next_retry = espo_entity.get('syncNextRetry') + if sync_next_retry and sync_status == 'failed': + import datetime + import pytz + + try: + next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S') + next_retry_ts = pytz.UTC.localize(next_retry_ts) + now_utc = datetime.datetime.now(pytz.UTC) + + if now_utc < next_retry_ts: + remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60) + ctx.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten") + await sync_utils.release_sync_lock(entity_id, sync_status) + return + except Exception as e: + ctx.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}") + + # 3. BESTIMME SYNC-AKTION + + # FALL A: Neu (kein betnr) → CREATE in Advoware + if not betnr and action in ['create', 'sync_check']: + ctx.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware") + await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) + + # FALL B: Existiert (hat betnr) → UPDATE oder CHECK + elif betnr: + ctx.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK") + await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) + + # FALL C: DELETE (TODO: Implementierung später) + elif action == 'delete': + ctx.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}") + await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert') + + else: + ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}") + await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}') + + except Exception as e: + # Unerwarteter Fehler während Sync - GARANTIERE Lock-Release + ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + try: + await sync_utils.release_sync_lock( + entity_id, + 'failed', + f'Unerwarteter Fehler: {str(e)[:1900]}', + increment_retry=True + ) + except Exception as release_error: + # Selbst Lock-Release failed - logge kritischen Fehler + ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}") + # Force Redis lock release + try: + lock_key = f"sync_lock:cbeteiligte:{entity_id}" + redis_client.delete(lock_key) + ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}") + except: + pass + + except Exception as e: + # Fehler VOR Lock-Acquire - kein Lock-Release nötig + ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + +async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): + """Erstellt neuen Beteiligten in Advoware""" + try: + ctx.logger.info(f"🔨 CREATE in Advoware...") + + # Transform zu Advoware Format + advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity) + + ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...") + + # POST zu Advoware + result = await advoware.api_call( + 'api/v1/advonet/Beteiligte', + method='POST', + json_data=advo_data + ) + + # Extrahiere betNr aus Response (case-insensitive: betNr oder betnr) + new_betnr = None + if isinstance(result, dict): + new_betnr = result.get('betNr') or result.get('betnr') + + if not new_betnr: + raise Exception(f"Keine betNr/betnr in Advoware Response: {result}") + + ctx.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}") + + # Lade Entity nach POST um rowId zu bekommen (WICHTIG für Change Detection!) + created_entity = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{new_betnr}', + method='GET' + ) + + if isinstance(created_entity, list): + new_rowid = created_entity[0].get('rowId') if created_entity else None + else: + new_rowid = created_entity.get('rowId') + + if not new_rowid: + ctx.logger.warn(f"⚠️ Keine rowId nach CREATE - Change Detection nicht möglich!") + + # OPTIMIERT: Kombiniere release_lock + betnr + rowId update in 1 API call + await sync_utils.release_sync_lock( + entity_id, + 'clean', + error_message=None, + extra_fields={ + 'betnr': new_betnr, + 'advowareRowId': new_rowid # WICHTIG für Change Detection! + } + ) + + ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}, rowId {new_rowid[:20] if new_rowid else 'N/A'}...") + + except Exception as e: + ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}") + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) + + +async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): + """Synchronisiert existierenden Beteiligten""" + try: + ctx.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...") + + # Fetch von Advoware + try: + advo_result = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='GET' + ) + + # Advoware gibt manchmal Listen zurück + if isinstance(advo_result, list): + advo_entity = advo_result[0] if advo_result else None + else: + advo_entity = advo_result + + if not advo_entity: + raise Exception(f"Beteiligter betNr={betnr} nicht gefunden") + + except Exception as e: + # 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht + if '404' in str(e) or 'nicht gefunden' in str(e).lower(): + ctx.logger.warn(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}") + await sync_utils.handle_advoware_deleted(entity_id, str(e)) + return + else: + raise + + ctx.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}") + + # ÄNDERUNGSERKENNUNG (Primary: rowId, Fallback: Timestamps) + comparison = sync_utils.compare_entities(espo_entity, advo_entity) + + ctx.logger.info(f"⏱️ Vergleich: {comparison}") + + # KEIN STAMMDATEN-SYNC NÖTIG + if comparison == 'no_change': + ctx.logger.info(f"✅ Keine Stammdaten-Änderungen erkannt") + + # NOTE: Kommunikation-Sync würde hier stattfinden + # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) + + await sync_utils.release_sync_lock(entity_id, 'clean') + return + + # ESPOCRM NEUER → Update Advoware + if comparison == 'espocrm_newer': + ctx.logger.info(f"📤 EspoCRM ist neuer → Update Advoware STAMMDATEN") + + # OPTIMIERT: Use merge utility + merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper) + + put_result = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='PUT', + json_data=merged_data + ) + + # Extrahiere neue rowId aus PUT Response (spart extra GET!) + new_rowid = None + if isinstance(put_result, list) and len(put_result) > 0: + new_rowid = put_result[0].get('rowId') + elif isinstance(put_result, dict): + new_rowid = put_result.get('rowId') + + ctx.logger.info(f"✅ Advoware STAMMDATEN aktualisiert, rowId: {new_rowid[:20] if new_rowid else 'N/A'}...") + + # Validiere Sync-Ergebnis + validation_success, validation_error = await sync_utils.validate_sync_result( + entity_id, betnr, mapper, direction='to_advoware' + ) + + if not validation_success: + ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}") + await sync_utils.release_sync_lock( + entity_id, + 'failed', + error_message=f"Validation failed: {validation_error}", + increment_retry=True + ) + return + + # NOTE: Kommunikation-Sync würde hier stattfinden + # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) + + # Release Lock + Update rowId + await sync_utils.release_sync_lock( + entity_id, + 'clean', + extra_fields={'advowareRowId': new_rowid} + ) + + # ADVOWARE NEUER → Update EspoCRM + elif comparison == 'advoware_newer': + ctx.logger.info(f"📥 Advoware ist neuer → Update EspoCRM STAMMDATEN") + + espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity) + await espocrm.update_entity('CBeteiligte', entity_id, espo_data) + ctx.logger.info(f"✅ EspoCRM STAMMDATEN aktualisiert") + + # Validiere Sync-Ergebnis + validation_success, validation_error = await sync_utils.validate_sync_result( + entity_id, betnr, mapper, direction='to_espocrm' + ) + + if not validation_success: + ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}") + await sync_utils.release_sync_lock( + entity_id, + 'failed', + error_message=f"Validation failed: {validation_error}", + increment_retry=True + ) + return + + # NOTE: Kommunikation-Sync würde hier stattfinden + # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx) + + # Release Lock + Update rowId + await sync_utils.release_sync_lock( + entity_id, + 'clean', + extra_fields={'advowareRowId': advo_entity.get('rowId')} + ) + + # KONFLIKT → EspoCRM WINS + elif comparison == 'conflict': + ctx.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS (STAMMDATEN)") + + # OPTIMIERT: Use merge utility + merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper) + + put_result = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='PUT', + json_data=merged_data + ) + + # Extrahiere neue rowId aus PUT Response + new_rowid = None + if isinstance(put_result, list) and len(put_result) > 0: + new_rowid = put_result[0].get('rowId') + elif isinstance(put_result, dict): + new_rowid = put_result.get('rowId') + + conflict_msg = ( + f"EspoCRM: {espo_entity.get('modifiedAt')}, " + f"Advoware: {advo_entity.get('geaendertAm')}. " + f"EspoCRM hat gewonnen." + ) + + ctx.logger.info(f"✅ Konflikt gelöst (EspoCRM won), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...") + + # Validiere Sync-Ergebnis + validation_success, validation_error = await sync_utils.validate_sync_result( + entity_id, betnr, mapper, direction='to_advoware' + ) + + if not validation_success: + ctx.logger.error(f"❌ Conflict resolution validation fehlgeschlagen: {validation_error}") + await sync_utils.release_sync_lock( + entity_id, + 'failed', + error_message=f"Conflict resolution validation failed: {validation_error}", + increment_retry=True + ) + return + + await sync_utils.resolve_conflict_espocrm_wins( + entity_id, + espo_entity, + advo_entity, + conflict_msg, + extra_fields={'advowareRowId': new_rowid} + ) + + # NOTE: Kommunikation-Sync (nur EspoCRM→Advoware) würde hier stattfinden + # await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx, direction='to_advoware', force_espo_wins=True) + + await sync_utils.release_sync_lock(entity_id, 'clean') + + except Exception as e: + ctx.logger.error(f"❌ UPDATE fehlgeschlagen: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) diff --git a/uv.lock b/uv.lock index 1899921..1ff8842 100644 --- a/uv.lock +++ b/uv.lock @@ -531,6 +531,7 @@ dependencies = [ { name = "motia", extra = ["otel"] }, { name = "pydantic" }, { name = "python-dotenv" }, + { name = "pytz" }, { name = "redis" }, ] @@ -541,6 +542,7 @@ requires-dist = [ { name = "motia", extras = ["otel"], specifier = "==1.0.0rc24" }, { name = "pydantic", specifier = ">=2.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, + { name = "pytz", specifier = ">=2025.2" }, { name = "redis", specifier = ">=5.2.0" }, ] @@ -1066,6 +1068,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" }, ] +[[package]] +name = "pytz" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" }, +] + [[package]] name = "redis" version = "7.2.1"