""" Beteiligte Sync Utilities Hilfsfunktionen für Sync-Operationen: - Locking via syncStatus - Timestamp-Vergleich - Konfliktauflösung (EspoCRM wins) - EspoCRM In-App Notifications - Soft-Delete Handling """ from typing import Dict, Any, Optional, Tuple, Literal from datetime import datetime import pytz import logging import redis from config import Config from services.espocrm import EspoCRMAPI from services.notification_utils import NotificationManager logger = logging.getLogger(__name__) # Timestamp-Vergleich Ergebnis-Typen TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"] # Max retry before permanent failure MAX_SYNC_RETRIES = 5 # Lock TTL in seconds (prevents deadlocks) LOCK_TTL_SECONDS = 900 # 15 minutes # Retry backoff: Wartezeit zwischen Retries (in Minuten) RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h # Auto-Reset nach 24h (für permanently_failed entities) AUTO_RESET_HOURS = 24 class BeteiligteSync: """Utility-Klasse für Beteiligte-Synchronisation""" def __init__(self, espocrm_api: EspoCRMAPI, redis_client: redis.Redis = None, context=None): self.espocrm = espocrm_api self.context = context self.logger = context.logger if context else logger self.redis = redis_client or self._init_redis() self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context) def _init_redis(self) -> redis.Redis: """Initialize Redis client for distributed locking""" try: client = redis.Redis( host=Config.REDIS_HOST, port=int(Config.REDIS_PORT), db=int(Config.REDIS_DB_ADVOWARE_CACHE), decode_responses=True ) client.ping() return client except Exception as e: self._log(f"Redis connection failed: {e}", level='error') return None def _log(self, message: str, level: str = 'info'): """Logging mit Context-Support""" if self.context and hasattr(self.context, 'logger'): getattr(self.context.logger, level)(message) else: getattr(logger, level)(message) async def acquire_sync_lock(self, entity_id: str) -> bool: """ Atomic distributed lock via Redis + syncStatus update Args: entity_id: EspoCRM CBeteiligte ID Returns: True wenn Lock erfolgreich, False wenn bereits im Sync """ try: # STEP 1: Atomic Redis lock (prevents race conditions) if self.redis: lock_key = f"sync_lock:cbeteiligte:{entity_id}" acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) if not acquired: self._log(f"Redis lock bereits aktiv für {entity_id}", level='warn') return False # STEP 2: Update syncStatus (für UI visibility) await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'syncing' }) self._log(f"Sync-Lock für {entity_id} erworben") return True except Exception as e: self._log(f"Fehler beim Acquire Lock: {e}", level='error') # Clean up Redis lock on error if self.redis: try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except: pass return False async def release_sync_lock( self, entity_id: str, new_status: str = 'clean', error_message: Optional[str] = None, increment_retry: bool = False, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields) Args: entity_id: EspoCRM CBeteiligte ID new_status: Neuer syncStatus (clean, failed, conflict, etc.) error_message: Optional: Fehlermeldung für syncErrorMessage increment_retry: Ob syncRetryCount erhöht werden soll extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr) """ try: # EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!) now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') update_data = { 'syncStatus': new_status, 'advowareLastSync': espo_datetime } if error_message: update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars else: update_data['syncErrorMessage'] = None # Handle retry count if increment_retry: # Hole aktuellen Retry-Count entity = await self.espocrm.get_entity('CBeteiligte', entity_id) current_retry = entity.get('syncRetryCount') or 0 new_retry = current_retry + 1 update_data['syncRetryCount'] = new_retry # FIX #12: Exponential backoff - berechne nächsten Retry-Zeitpunkt if new_retry <= len(RETRY_BACKOFF_MINUTES): backoff_minutes = RETRY_BACKOFF_MINUTES[new_retry - 1] else: backoff_minutes = RETRY_BACKOFF_MINUTES[-1] # Letzte Backoff-Zeit from datetime import timedelta next_retry = now_utc + timedelta(minutes=backoff_minutes) update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S') self._log(f"Retry {new_retry}/{MAX_SYNC_RETRIES}, nächster Versuch in {backoff_minutes} Minuten") # Check max retries - mark as permanently failed if new_retry >= MAX_SYNC_RETRIES: update_data['syncStatus'] = 'permanently_failed' # FIX #12: Auto-Reset Timestamp für Wiederherstellung nach 24h auto_reset_time = now_utc + timedelta(hours=AUTO_RESET_HOURS) update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S') await self.send_notification( entity_id, f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Auto-Reset in {AUTO_RESET_HOURS}h.", notification_type='error' ) self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}, Auto-Reset um {auto_reset_time}", level='error') else: update_data['syncRetryCount'] = 0 update_data['syncNextRetry'] = None # Merge extra fields (e.g., betnr from create operation) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self._log(f"Sync-Lock released: {entity_id} → {new_status}") # Release Redis lock if self.redis: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except Exception as e: self._log(f"Fehler beim Release Lock: {e}", level='error') # Ensure Redis lock is released even on error if self.redis: try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except: pass @staticmethod def parse_timestamp(ts: Any) -> Optional[datetime]: """ Parse verschiedene Timestamp-Formate zu datetime Args: ts: String, datetime oder None Returns: datetime-Objekt oder None """ if not ts: return None if isinstance(ts, datetime): return ts if isinstance(ts, str): # EspoCRM Format: "2026-02-07 14:30:00" # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" try: # Entferne trailing Z falls vorhanden ts = ts.rstrip('Z') # Versuche verschiedene Formate for fmt in [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d', ]: try: return datetime.strptime(ts, fmt) except ValueError: continue # Fallback: ISO-Format return datetime.fromisoformat(ts) except Exception as e: self.logger.warn(f"Konnte Timestamp nicht parsen: {ts} - {e}") return None return None def compare_entities( self, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any] ) -> TimestampResult: """ Vergleicht Änderungen zwischen EspoCRM und Advoware PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!) FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) Args: espo_entity: EspoCRM CBeteiligte advo_entity: Advoware Beteiligte Returns: "espocrm_newer": EspoCRM wurde geändert "advoware_newer": Advoware wurde geändert "conflict": Beide wurden geändert "no_change": Keine Änderungen """ # PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!) espo_rowid = espo_entity.get('advowareRowId') advo_rowid = advo_entity.get('rowId') last_sync = espo_entity.get('advowareLastSync') espo_modified = espo_entity.get('modifiedAt') # SPECIAL CASE: Kein lastSync → Initial Sync # FIX #11: Vergleiche Timestamps statt blind EspoCRM zu bevorzugen if not last_sync: self._log(f"Initial Sync (kein lastSync) → Vergleiche Timestamps") # Wenn beide Timestamps vorhanden, vergleiche sie if espo_ts and advo_ts: if espo_ts > advo_ts: self._log(f"Initial Sync: EspoCRM neuer ({espo_ts} > {advo_ts})") return 'espocrm_newer' elif advo_ts > espo_ts: self._log(f"Initial Sync: Advoware neuer ({advo_ts} > {espo_ts})") return 'advoware_newer' else: self._log(f"Initial Sync: Beide gleich alt") return 'no_change' # Fallback: Wenn nur einer Timestamp hat, bevorzuge den if espo_ts and not advo_ts: return 'espocrm_newer' if advo_ts and not espo_ts: return 'advoware_newer' # Wenn keine Timestamps verfügbar: EspoCRM bevorzugen (default) self._log(f"Initial Sync: Keine Timestamps verfügbar → EspoCRM bevorzugt") return 'espocrm_newer' if espo_rowid and advo_rowid: # Prüfe ob Advoware geändert wurde (rowId) advo_changed = (espo_rowid != advo_rowid) # Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync) espo_changed = False if espo_modified: try: espo_ts = self.parse_timestamp(espo_modified) sync_ts = self.parse_timestamp(last_sync) if espo_ts and sync_ts: espo_changed = (espo_ts > sync_ts) except Exception as e: self._log(f"Timestamp-Parse-Fehler: {e}", level='debug') # Konfliktlogik: Beide geändert seit letztem Sync? if advo_changed and espo_changed: self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync") return 'conflict' elif advo_changed: self._log(f"Advoware rowId geändert: {espo_rowid[:20]}... → {advo_rowid[:20]}...") return 'advoware_newer' elif espo_changed: self._log(f"EspoCRM neuer (modifiedAt > lastSync)") return 'espocrm_newer' else: # Weder Advoware noch EspoCRM geändert return 'no_change' # Keine Änderungen self._log("Keine Änderungen (rowId identisch)") return 'no_change' # FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug') return self.compare_timestamps( espo_entity.get('modifiedAt'), advo_entity.get('geaendertAm'), espo_entity.get('advowareLastSync') ) def compare_timestamps( self, espo_modified_at: Any, advo_geaendert_am: Any, last_sync_ts: Any ) -> TimestampResult: """ Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar) Args: espo_modified_at: EspoCRM modifiedAt advo_geaendert_am: Advoware geaendertAm last_sync_ts: Letzter Sync (advowareLastSync) Returns: "espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer "advoware_newer": Advoware wurde nach last_sync geändert und ist neuer "conflict": Beide wurden nach last_sync geändert "no_change": Keine Änderungen seit last_sync """ espo_ts = self.parse_timestamp(espo_modified_at) advo_ts = self.parse_timestamp(advo_geaendert_am) sync_ts = self.parse_timestamp(last_sync_ts) # Logging self._log( f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}", level='debug' ) # Falls kein last_sync → erster Sync, vergleiche direkt if not sync_ts: if not espo_ts or not advo_ts: return "no_change" if espo_ts > advo_ts: return "espocrm_newer" elif advo_ts > espo_ts: return "advoware_newer" else: return "no_change" # Check ob seit last_sync Änderungen espo_changed = espo_ts and espo_ts > sync_ts advo_changed = advo_ts and advo_ts > sync_ts if espo_changed and advo_changed: # Beide geändert seit last_sync → Konflikt return "conflict" elif espo_changed: # Nur EspoCRM geändert return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict" elif advo_changed: # Nur Advoware geändert return "advoware_newer" else: # Keine Änderungen return "no_change" def merge_for_advoware_put( self, advo_entity: Dict[str, Any], espo_entity: Dict[str, Any], mapper ) -> Dict[str, Any]: """ Merged EspoCRM updates mit Advoware entity für PUT operation Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern). Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende Advoware-Objekt. Args: advo_entity: Aktuelles Advoware entity (vollständiges Objekt) espo_entity: EspoCRM entity mit Updates mapper: BeteiligteMapper instance Returns: Merged dict für Advoware PUT """ # Map EspoCRM → Advoware (nur Stammdaten) advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity) # Merge: Advoware entity als Base, überschreibe mit EspoCRM updates merged = {**advo_entity, **advo_updates} # Logging self._log( f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder", level='info' ) self._log( f" Gesynct: {', '.join(advo_updates.keys())}", level='debug' ) return merged async def send_notification( self, entity_id: str, notification_type: Literal["conflict", "deleted", "error"], extra_data: Optional[Dict[str, Any]] = None ) -> None: """ Sendet EspoCRM Notification via NotificationManager Args: entity_id: CBeteiligte Entity ID notification_type: "conflict", "deleted" oder "error" extra_data: Zusätzliche Daten für Nachricht """ try: # Hole Entity-Daten entity = await self.espocrm.get_entity('CBeteiligte', entity_id) name = entity.get('name', 'Unbekannt') betnr = entity.get('betnr') # Map notification_type zu action_type if notification_type == "conflict": action_type = 'sync_conflict' details = { 'message': f"Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr})", 'description': ( f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen.\n\n" f"Bitte prüfen Sie die Details und stellen Sie sicher, dass die Daten korrekt sind." ), 'entity_name': name, 'betnr': betnr, 'priority': 'Normal' } elif notification_type == "deleted": deleted_at = entity.get('advowareDeletedAt', 'unbekannt') action_type = 'entity_deleted_in_source' details = { 'message': f"Beteiligter '{name}' wurde in Advoware gelöscht", 'description': ( f"Der Beteiligte '{name}' (betNr: {betnr}) wurde am {deleted_at} " f"in Advoware gelöscht.\n\n" f"Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. " f"Bitte prüfen Sie, ob dies beabsichtigt war." ), 'entity_name': name, 'betnr': betnr, 'deleted_at': deleted_at, 'priority': 'High' } else: action_type = 'general_manual_action' details = { 'message': f"Benachrichtigung für Beteiligten '{name}'", 'entity_name': name, 'betnr': betnr } # Merge extra_data if provided if extra_data: details.update(extra_data) # Sende via NotificationManager await self.notification_manager.notify_manual_action_required( entity_type='CBeteiligte', entity_id=entity_id, action_type=action_type, details=details, create_task=True ) self._log(f"Notification via NotificationManager gesendet: {notification_type} für {entity_id}") except Exception as e: self._log(f"Fehler beim Senden der Notification: {e}", level='error') async def handle_advoware_deleted( self, entity_id: str, error_details: str ) -> None: """ Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404) Args: entity_id: CBeteiligte Entity ID error_details: Fehlerdetails von Advoware API """ try: now = datetime.now(pytz.UTC).isoformat() # Update Entity: Soft-Delete Flag await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'deleted_in_advoware', 'advowareDeletedAt': now, 'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}" }) self._log(f"Entity {entity_id} als deleted_in_advoware markiert") # Sende Notification await self.send_notification(entity_id, 'deleted') except Exception as e: self._log(f"Fehler beim Handle Deleted: {e}", level='error') async def validate_sync_result( self, entity_id: str, betnr: int, mapper, direction: str = 'to_advoware' ) -> Tuple[bool, Optional[str]]: """ FIX #13: Validiert Sync-Ergebnis durch Round-Trip Verification Args: entity_id: EspoCRM CBeteiligte ID betnr: Advoware betNr mapper: BeteiligteMapper instance direction: 'to_advoware' oder 'to_espocrm' Returns: (success: bool, error_message: Optional[str]) """ try: self._log(f"🔍 Validiere Sync-Ergebnis (direction={direction})...", level='debug') # Lade beide Entities erneut espo_entity = await self.espocrm.get_entity('CBeteiligte', entity_id) from services.advoware import AdvowareAPI advoware_api = AdvowareAPI(self.context) advo_result = await advoware_api.api_call(f'api/v1/advonet/Beteiligte/{betnr}', method='GET') if isinstance(advo_result, list): advo_entity = advo_result[0] if advo_result else None else: advo_entity = advo_result if not advo_entity: return False, f"Advoware Entity {betnr} nicht gefunden nach Sync" # Validiere Stammdaten critical_fields = ['name', 'rechtsform'] differences = [] if direction == 'to_advoware': # EspoCRM → Advoware: Prüfe ob Advoware die EspoCRM-Werte hat advo_mapped = mapper.map_cbeteiligte_to_advoware(espo_entity) for field in critical_fields: espo_val = advo_mapped.get(field) advo_val = advo_entity.get(field) if espo_val != advo_val: differences.append(f"{field}: expected '{espo_val}', got '{advo_val}'") elif direction == 'to_espocrm': # Advoware → EspoCRM: Prüfe ob EspoCRM die Advoware-Werte hat espo_mapped = mapper.map_advoware_to_cbeteiligte(advo_entity) for field in critical_fields: advo_val = espo_mapped.get(field) espo_val = espo_entity.get(field) if advo_val != espo_val: differences.append(f"{field}: expected '{advo_val}', got '{espo_val}'") if differences: error_msg = f"Validation failed: {', '.join(differences)}" self._log(f"❌ {error_msg}", level='error') return False, error_msg self._log(f"✅ Validation erfolgreich", level='debug') return True, None except Exception as e: self._log(f"⚠️ Validation error: {e}", level='error') return False, f"Validation exception: {str(e)}" async def resolve_conflict_espocrm_wins( self, entity_id: str, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any], conflict_details: str, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Löst Konflikt auf: EspoCRM wins (überschreibt Advoware) Args: entity_id: CBeteiligte Entity ID espo_entity: EspoCRM Entity-Daten advo_entity: Advoware Entity-Daten conflict_details: Details zum Konflikt extra_fields: Zusätzliche Felder (z.B. advowareRowId) """ try: # EspoCRM datetime format now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') # Markiere als gelöst mit Konflikt-Info update_data = { 'syncStatus': 'clean', # Gelöst! 'advowareLastSync': espo_datetime, 'syncErrorMessage': f'Konflikt: {conflict_details}', 'syncRetryCount': 0 } # Merge extra fields (z.B. advowareRowId) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins") # Sende Notification await self.send_notification(entity_id, 'conflict', { 'details': conflict_details }) except Exception as e: self._log(f"Fehler beim Resolve Conflict: {e}", level='error')