""" Beteiligte Sync Utilities Hilfsfunktionen für Sync-Operationen: - Locking via syncStatus - Timestamp-Vergleich - Konfliktauflösung (EspoCRM wins) - EspoCRM In-App Notifications - Soft-Delete Handling """ from typing import Dict, Any, Optional, Tuple, Literal from datetime import datetime import pytz import logging import redis from config import Config from services.espocrm import EspoCRMAPI from services.notification_utils import NotificationManager logger = logging.getLogger(__name__) # Timestamp-Vergleich Ergebnis-Typen TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"] # Max retry before permanent failure MAX_SYNC_RETRIES = 5 # Lock TTL in seconds (prevents deadlocks) LOCK_TTL_SECONDS = 900 # 15 minutes class BeteiligteSync: """Utility-Klasse für Beteiligte-Synchronisation""" def __init__(self, espocrm_api: EspoCRMAPI, redis_client: redis.Redis = None, context=None): self.espocrm = espocrm_api self.context = context self.redis = redis_client or self._init_redis() self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context) def _init_redis(self) -> redis.Redis: """Initialize Redis client for distributed locking""" try: client = redis.Redis( host=Config.REDIS_HOST, port=int(Config.REDIS_PORT), db=int(Config.REDIS_DB_ADVOWARE_CACHE), decode_responses=True ) client.ping() return client except Exception as e: self._log(f"Redis connection failed: {e}", level='error') return None def _log(self, message: str, level: str = 'info'): """Logging mit Context-Support""" if self.context and hasattr(self.context, 'logger'): getattr(self.context.logger, level)(message) else: getattr(logger, level)(message) async def acquire_sync_lock(self, entity_id: str) -> bool: """ Atomic distributed lock via Redis + syncStatus update Args: entity_id: EspoCRM CBeteiligte ID Returns: True wenn Lock erfolgreich, False wenn bereits im Sync """ try: # STEP 1: Atomic Redis lock (prevents race conditions) if self.redis: lock_key = f"sync_lock:cbeteiligte:{entity_id}" acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) if not acquired: self._log(f"Redis lock bereits aktiv für {entity_id}", level='warning') return False # STEP 2: Update syncStatus (für UI visibility) await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'syncing' }) self._log(f"Sync-Lock für {entity_id} erworben") return True except Exception as e: self._log(f"Fehler beim Acquire Lock: {e}", level='error') # Clean up Redis lock on error if self.redis: try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except: pass return False async def release_sync_lock( self, entity_id: str, new_status: str = 'clean', error_message: Optional[str] = None, increment_retry: bool = False, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields) Args: entity_id: EspoCRM CBeteiligte ID new_status: Neuer syncStatus (clean, failed, conflict, etc.) error_message: Optional: Fehlermeldung für syncErrorMessage increment_retry: Ob syncRetryCount erhöht werden soll extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr) """ try: # EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!) now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') update_data = { 'syncStatus': new_status, 'advowareLastSync': espo_datetime } if error_message: update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars else: update_data['syncErrorMessage'] = None # Handle retry count if increment_retry: # Hole aktuellen Retry-Count entity = await self.espocrm.get_entity('CBeteiligte', entity_id) current_retry = entity.get('syncRetryCount') or 0 new_retry = current_retry + 1 update_data['syncRetryCount'] = new_retry # Check max retries - mark as permanently failed if new_retry >= MAX_SYNC_RETRIES: update_data['syncStatus'] = 'permanently_failed' await self.send_notification( entity_id, f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Manuelle Prüfung erforderlich.", notification_type='error' ) self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}", level='error') else: update_data['syncRetryCount'] = 0 # Merge extra fields (e.g., betnr from create operation) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self._log(f"Sync-Lock released: {entity_id} → {new_status}") # Release Redis lock if self.redis: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except Exception as e: self._log(f"Fehler beim Release Lock: {e}", level='error') # Ensure Redis lock is released even on error if self.redis: try: lock_key = f"sync_lock:cbeteiligte:{entity_id}" self.redis.delete(lock_key) except: pass @staticmethod def parse_timestamp(ts: Any) -> Optional[datetime]: """ Parse verschiedene Timestamp-Formate zu datetime Args: ts: String, datetime oder None Returns: datetime-Objekt oder None """ if not ts: return None if isinstance(ts, datetime): return ts if isinstance(ts, str): # EspoCRM Format: "2026-02-07 14:30:00" # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" try: # Entferne trailing Z falls vorhanden ts = ts.rstrip('Z') # Versuche verschiedene Formate for fmt in [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d', ]: try: return datetime.strptime(ts, fmt) except ValueError: continue # Fallback: ISO-Format return datetime.fromisoformat(ts) except Exception as e: logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}") return None return None def compare_entities( self, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any] ) -> TimestampResult: """ Vergleicht Änderungen zwischen EspoCRM und Advoware PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!) FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) Args: espo_entity: EspoCRM CBeteiligte advo_entity: Advoware Beteiligte Returns: "espocrm_newer": EspoCRM wurde geändert "advoware_newer": Advoware wurde geändert "conflict": Beide wurden geändert "no_change": Keine Änderungen """ # PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!) espo_rowid = espo_entity.get('advowareRowId') advo_rowid = advo_entity.get('rowId') last_sync = espo_entity.get('advowareLastSync') espo_modified = espo_entity.get('modifiedAt') # SPECIAL CASE: Kein lastSync → Initial Sync (EspoCRM→Advoware) if not last_sync: self._log(f"Initial Sync (kein lastSync) → EspoCRM neuer") return 'espocrm_newer' if espo_rowid and advo_rowid: # Prüfe ob Advoware geändert wurde (rowId) advo_changed = (espo_rowid != advo_rowid) # Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync) espo_changed = False if espo_modified: try: espo_ts = self.parse_timestamp(espo_modified) sync_ts = self.parse_timestamp(last_sync) if espo_ts and sync_ts: espo_changed = (espo_ts > sync_ts) except Exception as e: self._log(f"Timestamp-Parse-Fehler: {e}", level='debug') # Konfliktlogik: Beide geändert seit letztem Sync? if advo_changed and espo_changed: self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync") return 'conflict' elif advo_changed: self._log(f"Advoware rowId geändert: {espo_rowid[:20]}... → {advo_rowid[:20]}...") return 'advoware_newer' elif espo_changed: self._log(f"EspoCRM neuer (modifiedAt > lastSync)") return 'espocrm_newer' else: # Weder Advoware noch EspoCRM geändert return 'no_change' # Keine Änderungen self._log("Keine Änderungen (rowId identisch)") return 'no_change' # FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar) self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug') return self.compare_timestamps( espo_entity.get('modifiedAt'), advo_entity.get('geaendertAm'), espo_entity.get('advowareLastSync') ) def compare_timestamps( self, espo_modified_at: Any, advo_geaendert_am: Any, last_sync_ts: Any ) -> TimestampResult: """ Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar) Args: espo_modified_at: EspoCRM modifiedAt advo_geaendert_am: Advoware geaendertAm last_sync_ts: Letzter Sync (advowareLastSync) Returns: "espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer "advoware_newer": Advoware wurde nach last_sync geändert und ist neuer "conflict": Beide wurden nach last_sync geändert "no_change": Keine Änderungen seit last_sync """ espo_ts = self.parse_timestamp(espo_modified_at) advo_ts = self.parse_timestamp(advo_geaendert_am) sync_ts = self.parse_timestamp(last_sync_ts) # Logging self._log( f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}", level='debug' ) # Falls kein last_sync → erster Sync, vergleiche direkt if not sync_ts: if not espo_ts or not advo_ts: return "no_change" if espo_ts > advo_ts: return "espocrm_newer" elif advo_ts > espo_ts: return "advoware_newer" else: return "no_change" # Check ob seit last_sync Änderungen espo_changed = espo_ts and espo_ts > sync_ts advo_changed = advo_ts and advo_ts > sync_ts if espo_changed and advo_changed: # Beide geändert seit last_sync → Konflikt return "conflict" elif espo_changed: # Nur EspoCRM geändert return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict" elif advo_changed: # Nur Advoware geändert return "advoware_newer" else: # Keine Änderungen return "no_change" def merge_for_advoware_put( self, advo_entity: Dict[str, Any], espo_entity: Dict[str, Any], mapper ) -> Dict[str, Any]: """ Merged EspoCRM updates mit Advoware entity für PUT operation Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern). Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende Advoware-Objekt. Args: advo_entity: Aktuelles Advoware entity (vollständiges Objekt) espo_entity: EspoCRM entity mit Updates mapper: BeteiligteMapper instance Returns: Merged dict für Advoware PUT """ # Map EspoCRM → Advoware (nur Stammdaten) advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity) # Merge: Advoware entity als Base, überschreibe mit EspoCRM updates merged = {**advo_entity, **advo_updates} # Logging self._log( f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder", level='info' ) self._log( f" Gesynct: {', '.join(advo_updates.keys())}", level='debug' ) return merged async def send_notification( self, entity_id: str, notification_type: Literal["conflict", "deleted", "error"], extra_data: Optional[Dict[str, Any]] = None ) -> None: """ Sendet EspoCRM Notification via NotificationManager Args: entity_id: CBeteiligte Entity ID notification_type: "conflict", "deleted" oder "error" extra_data: Zusätzliche Daten für Nachricht """ try: # Hole Entity-Daten entity = await self.espocrm.get_entity('CBeteiligte', entity_id) name = entity.get('name', 'Unbekannt') betnr = entity.get('betnr') # Map notification_type zu action_type if notification_type == "conflict": action_type = 'sync_conflict' details = { 'message': f"Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr})", 'description': ( f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen.\n\n" f"Bitte prüfen Sie die Details und stellen Sie sicher, dass die Daten korrekt sind." ), 'entity_name': name, 'betnr': betnr, 'priority': 'Normal' } elif notification_type == "deleted": deleted_at = entity.get('advowareDeletedAt', 'unbekannt') action_type = 'entity_deleted_in_source' details = { 'message': f"Beteiligter '{name}' wurde in Advoware gelöscht", 'description': ( f"Der Beteiligte '{name}' (betNr: {betnr}) wurde am {deleted_at} " f"in Advoware gelöscht.\n\n" f"Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. " f"Bitte prüfen Sie, ob dies beabsichtigt war." ), 'entity_name': name, 'betnr': betnr, 'deleted_at': deleted_at, 'priority': 'High' } else: action_type = 'general_manual_action' details = { 'message': f"Benachrichtigung für Beteiligten '{name}'", 'entity_name': name, 'betnr': betnr } # Merge extra_data if provided if extra_data: details.update(extra_data) # Sende via NotificationManager await self.notification_manager.notify_manual_action_required( entity_type='CBeteiligte', entity_id=entity_id, action_type=action_type, details=details, create_task=True ) self._log(f"Notification via NotificationManager gesendet: {notification_type} für {entity_id}") except Exception as e: self._log(f"Fehler beim Senden der Notification: {e}", level='error') async def handle_advoware_deleted( self, entity_id: str, error_details: str ) -> None: """ Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404) Args: entity_id: CBeteiligte Entity ID error_details: Fehlerdetails von Advoware API """ try: now = datetime.now(pytz.UTC).isoformat() # Update Entity: Soft-Delete Flag await self.espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'deleted_in_advoware', 'advowareDeletedAt': now, 'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}" }) self._log(f"Entity {entity_id} als deleted_in_advoware markiert") # Sende Notification await self.send_notification(entity_id, 'deleted') except Exception as e: self._log(f"Fehler beim Handle Deleted: {e}", level='error') async def resolve_conflict_espocrm_wins( self, entity_id: str, espo_entity: Dict[str, Any], advo_entity: Dict[str, Any], conflict_details: str, extra_fields: Optional[Dict[str, Any]] = None ) -> None: """ Löst Konflikt auf: EspoCRM wins (überschreibt Advoware) Args: entity_id: CBeteiligte Entity ID espo_entity: EspoCRM Entity-Daten advo_entity: Advoware Entity-Daten conflict_details: Details zum Konflikt extra_fields: Zusätzliche Felder (z.B. advowareRowId) """ try: # EspoCRM datetime format now_utc = datetime.now(pytz.UTC) espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S') # Markiere als gelöst mit Konflikt-Info update_data = { 'syncStatus': 'clean', # Gelöst! 'advowareLastSync': espo_datetime, 'syncErrorMessage': f'Konflikt: {conflict_details}', 'syncRetryCount': 0 } # Merge extra fields (z.B. advowareRowId) if extra_fields: update_data.update(extra_fields) await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins") # Sende Notification await self.send_notification(entity_id, 'conflict', { 'details': conflict_details }) except Exception as e: self._log(f"Fehler beim Resolve Conflict: {e}", level='error')