""" Beteiligte Sync Utilities Hilfsfunktionen für Sync-Operationen: - Distributed locking via Redis + syncStatus - Timestamp-Vergleich mit rowId-basierter Änderungserkennung - Konfliktauflösung (EspoCRM wins) - EspoCRM In-App Notifications - Soft-Delete Handling - Retry mit Exponential Backoff """ from typing import Dict, Any, Optional, Tuple, Literal from datetime import datetime, timedelta import pytz from services.exceptions import LockAcquisitionError, SyncError, ValidationError from services.redis_client import get_redis_client from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds from services.logging_utils import get_logger import redis # Timestamp-Vergleich Ergebnis-Typen TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"] class BeteiligteSync: """Utility-Klasse für Beteiligte-Synchronisation""" def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None): self.espocrm = espocrm_api self.context = context self.logger = get_logger('beteiligte_sync', context) # Use provided Redis client or get from factory self.redis = redis_client or get_redis_client(strict=False) if not self.redis: self.logger.error( "⚠️ KRITISCH: Redis nicht verfügbar! " "Distributed Locking deaktiviert - Race Conditions möglich!" ) # Import NotificationManager only when needed from services.notification_utils import NotificationManager self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context) async def acquire_sync_lock(self, entity_id: str) -> bool: """ Atomic distributed lock via Redis + syncStatus update Args: entity_id: EspoCRM CBeteiligte ID Returns: True wenn Lock erfolgreich, False wenn bereits im Sync Raises: SyncError: Bei kritischen Sync-Problemen """ try: # STEP 1: Atomic Redis lock (prevents race conditions) if self.redis: lock_key = get_lock_key('cbeteiligte', entity_id) acquired = self.redis.set( lock_key, "locked", nx=True, ex=SYNC_CONFIG.lock_ttl_seconds ) if not acquired: self.logger.warning(f"Redis lock bereits aktiv für {entity_id}") return False else: self.logger.error( f"⚠️ WARNUNG: Sync ohne Redis-Lock für {entity_id} - Race Condition möglich!" ) # STEP 2: Update syncStatus (für UI visibility) await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'syncing' }) self.logger.info(f"Sync-Lock für {entity_id} erworben") return True except Exception as e: self._log(f"Fehler beim Acquire Lock: {e}", level='error') # Clean up Redis lock on error if self.redis: try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except: pass return False async def release_sync_lock( self, entity_id: str, new_status: str = 'clean', error_message: Optional[str] = None, increment_retry: bool = False, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields) Args: entity_id: EspoCRM CBeteiligte ID new_status: Neuer syncStatus (clean, failed, conflict, etc.) error_message: Optional: Fehlermeldung für syncErrorMessage increment_retry: Ob syncRetryCount erhöht werden soll extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr) """ try: # EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!) now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') update_data = { 'syncStatus': new_status, 'advowareLastSync': espo_datetime } if error_message: update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars else: update_data['syncErrorMessage'] = None # Handle retry count if increment_retry: # Hole aktuellen Retry-Count entity = await self.espocrm.get_entity('CBeteiligte', entity_id) current_retry = entity.get('syncRetryCount') or 0 new_retry = current_retry + 1 update_data['syncRetryCount'] = new_retry # Exponential backoff - berechne nächsten Retry-Zeitpunkt backoff_minutes = SYNC_CONFIG.retry_backoff_minutes if new_retry <= len(backoff_minutes): backoff_min = backoff_minutes[new_retry - 1] else: backoff_min = backoff_minutes[-1] # Letzte Backoff-Zeit next_retry = now_utc + timedelta(minutes=backoff_min) update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S') self.logger.info( f"Retry {new_retry}/{SYNC_CONFIG.max_retries}, " f"nächster Versuch in {backoff_min} Minuten" ) # Check max retries - mark as permanently failed if new_retry >= SYNC_CONFIG.max_retries: update_data['syncStatus'] = 'permanently_failed' # Auto-Reset Timestamp für Wiederherstellung nach 24h auto_reset_time = now_utc + timedelta(hours=SYNC_CONFIG.auto_reset_hours) update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S') await self.send_notification( entity_id, 'error', extra_data={ 'message': ( f"Sync fehlgeschlagen nach {SYNC_CONFIG.max_retries} Versuchen. " f"Auto-Reset in {SYNC_CONFIG.auto_reset_hours}h." ) } ) self.logger.error( f"Max retries ({SYNC_CONFIG.max_retries}) erreicht für {entity_id}, " f"Auto-Reset um {auto_reset_time}" ) else: update_data['syncRetryCount'] = 0 update_data['syncNextRetry'] = None # Merge extra fields (e.g., betnr from create operation) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self.logger.info(f"Sync-Lock released: {entity_id} → {new_status}") # Release Redis lock if self.redis: lock_key = get_lock_key('cbeteiligte', entity_id) self.redis.delete(lock_key) except Exception as e: self.logger.error(f"Fehler beim Release Lock: {e}") # Ensure Redis lock is released even on error if self.redis: try: lock_key = get_lock_key('cbeteiligte', entity_id) self.redis.delete(lock_key) except: pass @staticmethod def parse_timestamp(ts: Any) -> Optional[datetime]: """ Parse verschiedene Timestamp-Formate zu datetime Args: ts: String, datetime oder None Returns: datetime-Objekt oder None """ if not ts: return None if isinstance(ts, datetime): return ts if isinstance(ts, str): # EspoCRM Format: "2026-02-07 14:30:00" # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" try: # Entferne trailing Z falls vorhanden ts = ts.rstrip('Z') # Versuche verschiedene Formate for fmt in [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d', ]: try: return datetime.strptime(ts, fmt) except ValueError: continue # Fallback: ISO-Format return datetime.fromisoformat(ts) except Exception as e: logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}") return None return None def compare_entities( self, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any] ) -> TimestampResult: """ Vergleicht Änderungen zwischen EspoCRM und Advoware PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!) FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) Args: espo_entity: EspoCRM CBeteiligte advo_entity: Advoware Beteiligte Returns: "espocrm_newer": EspoCRM wurde geändert "advoware_newer": Advoware wurde geändert "conflict": Beide wurden geändert "no_change": Keine Änderungen """ # PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!) espo_rowid = espo_entity.get('advowareRowId') advo_rowid = advo_entity.get('rowId') last_sync = espo_entity.get('advowareLastSync') espo_modified = espo_entity.get('modifiedAt') # Parse timestamps für Initial Sync Check espo_ts = self.parse_timestamp(espo_modified) advo_ts = self.parse_timestamp(advo_entity.get('geaendertAm')) # SPECIAL CASE: Kein lastSync → Initial Sync if not last_sync: self._log(f"Initial Sync (kein lastSync) → Vergleiche Timestamps") # Wenn beide Timestamps vorhanden, vergleiche sie if espo_ts and advo_ts: if espo_ts > advo_ts: self._log(f"Initial Sync: EspoCRM neuer ({espo_ts} > {advo_ts})") return 'espocrm_newer' elif advo_ts > espo_ts: self._log(f"Initial Sync: Advoware neuer ({advo_ts} > {espo_ts})") return 'advoware_newer' else: self._log(f"Initial Sync: Beide gleich alt") return 'no_change' # Fallback: Wenn nur einer Timestamp hat, bevorzuge den if espo_ts and not advo_ts: return 'espocrm_newer' if advo_ts and not espo_ts: return 'advoware_newer' # Wenn keine Timestamps verfügbar: EspoCRM bevorzugen (default) self._log(f"Initial Sync: Keine Timestamps verfügbar → EspoCRM bevorzugt") return 'espocrm_newer' if espo_rowid and advo_rowid: # Prüfe ob Advoware geändert wurde (rowId) advo_changed = (espo_rowid != advo_rowid) # Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync) espo_changed = False if espo_modified: try: sync_ts = self.parse_timestamp(last_sync) if espo_ts and sync_ts: espo_changed = (espo_ts > sync_ts) except Exception as e: self._log(f"Timestamp-Parse-Fehler: {e}", level='debug') # Konfliktlogik: Beide geändert seit letztem Sync? if advo_changed and espo_changed: self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync") return 'conflict' elif advo_changed: self._log(f"Advoware rowId geändert: {espo_rowid[:20] if espo_rowid else 'None'}... → {advo_rowid[:20] if advo_rowid else 'None'}...") return 'advoware_newer' elif espo_changed: self._log(f"EspoCRM neuer (modifiedAt > lastSync)") return 'espocrm_newer' else: # Weder Advoware noch EspoCRM geändert self._log("Keine Änderungen (rowId identisch)") return 'no_change' # FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug') return self.compare_timestamps( espo_entity.get('modifiedAt'), advo_entity.get('geaendertAm'), espo_entity.get('advowareLastSync') ) def compare_timestamps( self, espo_modified_at: Any, advo_geaendert_am: Any, last_sync_ts: Any ) -> TimestampResult: """ Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar) Args: espo_modified_at: EspoCRM modifiedAt advo_geaendert_am: Advoware geaendertAm last_sync_ts: Letzter Sync (advowareLastSync) Returns: "espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer "advoware_newer": Advoware wurde nach last_sync geändert und ist neuer "conflict": Beide wurden nach last_sync geändert "no_change": Keine Änderungen seit last_sync """ espo_ts = self.parse_timestamp(espo_modified_at) advo_ts = self.parse_timestamp(advo_geaendert_am) sync_ts = self.parse_timestamp(last_sync_ts) # Logging self._log( f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}", level='debug' ) # Falls kein last_sync → erster Sync, vergleiche direkt if not sync_ts: if not espo_ts or not advo_ts: return "no_change" if espo_ts > advo_ts: return "espocrm_newer" elif advo_ts > espo_ts: return "advoware_newer" else: return "no_change" # Check ob seit last_sync Änderungen espo_changed = espo_ts and espo_ts > sync_ts advo_changed = advo_ts and advo_ts > sync_ts if espo_changed and advo_changed: # Beide geändert seit last_sync → Konflikt return "conflict" elif espo_changed: # Nur EspoCRM geändert return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict" elif advo_changed: # Nur Advoware geändert return "advoware_newer" else: # Keine Änderungen return "no_change" def merge_for_advoware_put( self, advo_entity: Dict[str, Any], espo_entity: Dict[str, Any], mapper ) -> Dict[str, Any]: """ Merged EspoCRM updates mit Advoware entity für PUT operation Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern). Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende Advoware-Objekt. Args: advo_entity: Aktuelles Advoware entity (vollständiges Objekt) espo_entity: EspoCRM entity mit Updates mapper: BeteiligteMapper instance Returns: Merged dict für Advoware PUT """ # Map EspoCRM → Advoware (nur Stammdaten) advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity) # Merge: Advoware entity als Base, überschreibe mit EspoCRM updates merged = {**advo_entity, **advo_updates} # Logging self._log( f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder", level='info' ) self._log( f" Gesynct: {', '.join(advo_updates.keys())}", level='debug' ) return merged async def send_notification( self, entity_id: str, notification_type: Literal["conflict", "deleted", "error"], extra_data: Optional[Dict[str, Any]] = None ) -> None: """ Sendet EspoCRM Notification via NotificationManager Args: entity_id: CBeteiligte Entity ID notification_type: "conflict", "deleted" oder "error" extra_data: Zusätzliche Daten für Nachricht """ try: # Hole Entity-Daten entity = await self.espocrm.get_entity('CBeteiligte', entity_id) name = entity.get('name', 'Unbekannt') betnr = entity.get('betnr') # Map notification_type zu action_type if notification_type == "conflict": action_type = 'sync_conflict' details = { 'message': f"Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr})", 'description': ( f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen.\n\n" f"Bitte prüfen Sie die Details und stellen Sie sicher, dass die Daten korrekt sind." ), 'entity_name': name, 'betnr': betnr, 'priority': 'Normal' } elif notification_type == "deleted": deleted_at = entity.get('advowareDeletedAt', 'unbekannt') action_type = 'entity_deleted_in_source' details = { 'message': f"Beteiligter '{name}' wurde in Advoware gelöscht", 'description': ( f"Der Beteiligte '{name}' (betNr: {betnr}) wurde am {deleted_at} " f"in Advoware gelöscht.\n\n" f"Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. " f"Bitte prüfen Sie, ob dies beabsichtigt war." ), 'entity_name': name, 'betnr': betnr, 'deleted_at': deleted_at, 'priority': 'High' } else: # error action_type = 'general_manual_action' details = { 'message': f"Benachrichtigung für Beteiligten '{name}'", 'entity_name': name, 'betnr': betnr } # Merge extra_data if provided if extra_data: details.update(extra_data) # Sende via NotificationManager await self.notification_manager.notify_manual_action_required( entity_type='CBeteiligte', entity_id=entity_id, action_type=action_type, details=details, create_task=True ) self._log(f"Notification via NotificationManager gesendet: {notification_type} für {entity_id}") except Exception as e: self._log(f"Fehler beim Senden der Notification: {e}", level='error') async def handle_advoware_deleted( self, entity_id: str, error_details: str ) -> None: """ Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404) Args: entity_id: CBeteiligte Entity ID error_details: Fehlerdetails von Advoware API """ try: now = datetime.now(pytz.UTC).isoformat() # Update Entity: Soft-Delete Flag await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'deleted_in_advoware', 'advowareDeletedAt': now, 'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}" }) self._log(f"Entity {entity_id} als deleted_in_advoware markiert") # Sende Notification await self.send_notification(entity_id, 'deleted') except Exception as e: self._log(f"Fehler beim Handle Deleted: {e}", level='error') async def validate_sync_result( self, entity_id: str, betnr: int, mapper, direction: str = 'to_advoware' ) -> Tuple[bool, Optional[str]]: """ Validiert Sync-Ergebnis durch Round-Trip Verification Args: entity_id: EspoCRM CBeteiligte ID betnr: Advoware betNr mapper: BeteiligteMapper instance direction: 'to_advoware' oder 'to_espocrm' Returns: (success: bool, error_message: Optional[str]) """ try: self._log(f"🔍 Validiere Sync-Ergebnis (direction={direction})...", level='debug') # Lade beide Entities erneut espo_entity = await self.espocrm.get_entity('CBeteiligte', entity_id) from services.advoware import AdvowareAPI advoware_api = AdvowareAPI(self.context) advo_result = await advoware_api.api_call(f'api/v1/advonet/Beteiligte/{betnr}', method='GET') if isinstance(advo_result, list): advo_entity = advo_result[0] if advo_result else None else: advo_entity = advo_result if not advo_entity: return False, f"Advoware Entity {betnr} nicht gefunden nach Sync" # Validiere Stammdaten critical_fields = ['name', 'rechtsform'] differences = [] if direction == 'to_advoware': # EspoCRM → Advoware: Prüfe ob Advoware die EspoCRM-Werte hat advo_mapped = mapper.map_cbeteiligte_to_advoware(espo_entity) for field in critical_fields: espo_val = advo_mapped.get(field) advo_val = advo_entity.get(field) if espo_val != advo_val: differences.append(f"{field}: expected '{espo_val}', got '{advo_val}'") elif direction == 'to_espocrm': # Advoware → EspoCRM: Prüfe ob EspoCRM die Advoware-Werte hat espo_mapped = mapper.map_advoware_to_cbeteiligte(advo_entity) for field in critical_fields: advo_val = espo_mapped.get(field) espo_val = espo_entity.get(field) if advo_val != espo_val: differences.append(f"{field}: expected '{advo_val}', got '{espo_val}'") if differences: error_msg = f"Validation failed: {', '.join(differences)}" self._log(f"❌ {error_msg}", level='error') return False, error_msg self._log(f"✅ Validation erfolgreich", level='debug') return True, None except Exception as e: self._log(f"⚠️ Validation error: {e}", level='error') return False, f"Validation exception: {str(e)}" async def resolve_conflict_espocrm_wins( self, entity_id: str, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any], conflict_details: str, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Löst Konflikt auf: EspoCRM wins (überschreibt Advoware) Args: entity_id: CBeteiligte Entity ID espo_entity: EspoCRM Entity-Daten advo_entity: Advoware Entity-Daten conflict_details: Details zum Konflikt extra_fields: Zusätzliche Felder (z.B. advowareRowId) """ try: # EspoCRM datetime format now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') # Markiere als gelöst mit Konflikt-Info update_data = { 'syncStatus': 'clean', # Gelöst! 'advowareLastSync': espo_datetime, 'syncErrorMessage': f'Konflikt: {conflict_details}', 'syncRetryCount': 0 } # Merge extra fields (z.B. advowareRowId) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins") # Sende Notification await self.send_notification(entity_id, 'conflict', { 'details': conflict_details }) except Exception as e: self._log(f"Fehler beim Resolve Conflict: {e}", level='error')