Files
motia/bitbylaw/services/beteiligte_sync_utils.py

519 lines
19 KiB
Python

"""
Beteiligte Sync Utilities
Hilfsfunktionen für Sync-Operationen:
- Locking via syncStatus
- Timestamp-Vergleich
- Konfliktauflösung (EspoCRM wins)
- EspoCRM In-App Notifications
- Soft-Delete Handling
"""
from typing import Dict, Any, Optional, Tuple, Literal
from datetime import datetime
import pytz
import logging
import redis
from config import Config
from services.espocrm import EspoCRMAPI
logger = logging.getLogger(__name__)
# Timestamp-Vergleich Ergebnis-Typen
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
# Max retry before permanent failure
MAX_SYNC_RETRIES = 5
# Lock TTL in seconds (prevents deadlocks)
LOCK_TTL_SECONDS = 900 # 15 minutes
class BeteiligteSync:
"""Utility-Klasse für Beteiligte-Synchronisation"""
def __init__(self, espocrm_api: EspoCRMAPI, redis_client: redis.Redis = None, context=None):
self.espocrm = espocrm_api
self.context = context
self.redis = redis_client or self._init_redis()
def _init_redis(self) -> redis.Redis:
"""Initialize Redis client for distributed locking"""
try:
client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
decode_responses=True
)
client.ping()
return client
except Exception as e:
self._log(f"Redis connection failed: {e}", level='error')
return None
def _log(self, message: str, level: str = 'info'):
"""Logging mit Context-Support"""
if self.context and hasattr(self.context, 'logger'):
getattr(self.context.logger, level)(message)
else:
getattr(logger, level)(message)
async def acquire_sync_lock(self, entity_id: str) -> bool:
"""
Atomic distributed lock via Redis + syncStatus update
Args:
entity_id: EspoCRM CBeteiligte ID
Returns:
True wenn Lock erfolgreich, False wenn bereits im Sync
"""
try:
# STEP 1: Atomic Redis lock (prevents race conditions)
if self.redis:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
if not acquired:
self._log(f"Redis lock bereits aktiv für {entity_id}", level='warning')
return False
# STEP 2: Update syncStatus (für UI visibility)
await self.espocrm.update_entity('CBeteiligte', entity_id, {
'syncStatus': 'syncing'
})
self._log(f"Sync-Lock für {entity_id} erworben")
return True
except Exception as e:
self._log(f"Fehler beim Acquire Lock: {e}", level='error')
# Clean up Redis lock on error
if self.redis:
try:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
self.redis.delete(lock_key)
except:
pass
return False
async def release_sync_lock(
self,
entity_id: str,
new_status: str = 'clean',
error_message: Optional[str] = None,
increment_retry: bool = False,
extra_fields: Optional[Dict[str, Any]] = None
) -> None:
"""
Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields)
Args:
entity_id: EspoCRM CBeteiligte ID
new_status: Neuer syncStatus (clean, failed, conflict, etc.)
error_message: Optional: Fehlermeldung für syncErrorMessage
increment_retry: Ob syncRetryCount erhöht werden soll
extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr)
"""
try:
# EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!)
now_utc = datetime.now(pytz.UTC)
espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S')
update_data = {
'syncStatus': new_status,
'advowareLastSync': espo_datetime
}
if error_message:
update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars
else:
update_data['syncErrorMessage'] = None
# Handle retry count
if increment_retry:
# Hole aktuellen Retry-Count
entity = await self.espocrm.get_entity('CBeteiligte', entity_id)
current_retry = entity.get('syncRetryCount') or 0
new_retry = current_retry + 1
update_data['syncRetryCount'] = new_retry
# Check max retries - mark as permanently failed
if new_retry >= MAX_SYNC_RETRIES:
update_data['syncStatus'] = 'permanently_failed'
await self.send_notification(
entity_id,
f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Manuelle Prüfung erforderlich.",
notification_type='error'
)
self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}", level='error')
else:
update_data['syncRetryCount'] = 0
# Merge extra fields (e.g., betnr from create operation)
if extra_fields:
update_data.update(extra_fields)
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
self._log(f"Sync-Lock released: {entity_id}{new_status}")
# Release Redis lock
if self.redis:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
self.redis.delete(lock_key)
except Exception as e:
self._log(f"Fehler beim Release Lock: {e}", level='error')
# Ensure Redis lock is released even on error
if self.redis:
try:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
self.redis.delete(lock_key)
except:
pass
@staticmethod
def parse_timestamp(ts: Any) -> Optional[datetime]:
"""
Parse verschiedene Timestamp-Formate zu datetime
Args:
ts: String, datetime oder None
Returns:
datetime-Objekt oder None
"""
if not ts:
return None
if isinstance(ts, datetime):
return ts
if isinstance(ts, str):
# EspoCRM Format: "2026-02-07 14:30:00"
# Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z"
try:
# Entferne trailing Z falls vorhanden
ts = ts.rstrip('Z')
# Versuche verschiedene Formate
for fmt in [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d',
]:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
# Fallback: ISO-Format
return datetime.fromisoformat(ts)
except Exception as e:
logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}")
return None
return None
def compare_entities(
self,
espo_entity: Dict[str, Any],
advo_entity: Dict[str, Any]
) -> TimestampResult:
"""
Vergleicht Änderungen zwischen EspoCRM und Advoware
PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!)
FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
Args:
espo_entity: EspoCRM CBeteiligte
advo_entity: Advoware Beteiligte
Returns:
"espocrm_newer": EspoCRM wurde geändert
"advoware_newer": Advoware wurde geändert
"conflict": Beide wurden geändert
"no_change": Keine Änderungen
"""
# PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!)
espo_rowid = espo_entity.get('advowareRowId')
advo_rowid = advo_entity.get('rowId')
if espo_rowid and advo_rowid:
if espo_rowid != advo_rowid:
# rowId unterschiedlich → Advoware wurde geändert
self._log(f"Advoware rowId geändert: {espo_rowid[:20]}... → {advo_rowid[:20]}...")
return 'advoware_newer'
else:
# rowId gleich → keine Änderung in Advoware
# Prüfe ob EspoCRM geändert wurde (via modifiedAt)
espo_modified = espo_entity.get('modifiedAt')
last_sync = espo_entity.get('advowareLastSync')
if espo_modified and last_sync:
try:
espo_ts = self.parse_timestamp(espo_modified)
sync_ts = self.parse_timestamp(last_sync)
if espo_ts and sync_ts and espo_ts > sync_ts:
self._log(f"EspoCRM neuer (rowId gleich, aber modifiedAt > lastSync)")
return 'espocrm_newer'
except Exception as e:
self._log(f"Timestamp-Parse-Fehler: {e}", level='debug')
# Keine Änderungen
self._log("Keine Änderungen (rowId identisch)")
return 'no_change'
# FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug')
return self.compare_timestamps(
espo_entity.get('modifiedAt'),
advo_entity.get('geaendertAm'),
espo_entity.get('advowareLastSync')
)
def compare_timestamps(
self,
espo_modified_at: Any,
advo_geaendert_am: Any,
last_sync_ts: Any
) -> TimestampResult:
"""
Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar)
Args:
espo_modified_at: EspoCRM modifiedAt
advo_geaendert_am: Advoware geaendertAm
last_sync_ts: Letzter Sync (advowareLastSync)
Returns:
"espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer
"advoware_newer": Advoware wurde nach last_sync geändert und ist neuer
"conflict": Beide wurden nach last_sync geändert
"no_change": Keine Änderungen seit last_sync
"""
espo_ts = self.parse_timestamp(espo_modified_at)
advo_ts = self.parse_timestamp(advo_geaendert_am)
sync_ts = self.parse_timestamp(last_sync_ts)
# Logging
self._log(
f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}",
level='debug'
)
# Falls kein last_sync → erster Sync, vergleiche direkt
if not sync_ts:
if not espo_ts or not advo_ts:
return "no_change"
if espo_ts > advo_ts:
return "espocrm_newer"
elif advo_ts > espo_ts:
return "advoware_newer"
else:
return "no_change"
# Check ob seit last_sync Änderungen
espo_changed = espo_ts and espo_ts > sync_ts
advo_changed = advo_ts and advo_ts > sync_ts
if espo_changed and advo_changed:
# Beide geändert seit last_sync → Konflikt
return "conflict"
elif espo_changed:
# Nur EspoCRM geändert
return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict"
elif advo_changed:
# Nur Advoware geändert
return "advoware_newer"
else:
# Keine Änderungen
return "no_change"
def merge_for_advoware_put(
self,
advo_entity: Dict[str, Any],
espo_entity: Dict[str, Any],
mapper
) -> Dict[str, Any]:
"""
Merged EspoCRM updates mit Advoware entity für PUT operation
Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern).
Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende
Advoware-Objekt.
Args:
advo_entity: Aktuelles Advoware entity (vollständiges Objekt)
espo_entity: EspoCRM entity mit Updates
mapper: BeteiligteMapper instance
Returns:
Merged dict für Advoware PUT
"""
# Map EspoCRM → Advoware (nur Stammdaten)
advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity)
# Merge: Advoware entity als Base, überschreibe mit EspoCRM updates
merged = {**advo_entity, **advo_updates}
# Logging
self._log(
f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder",
level='info'
)
self._log(
f" Gesynct: {', '.join(advo_updates.keys())}",
level='debug'
)
return merged
async def send_notification(
self,
entity_id: str,
notification_type: Literal["conflict", "deleted", "error"],
extra_data: Optional[Dict[str, Any]] = None
) -> None:
"""
Sendet EspoCRM In-App Notification
Args:
entity_id: CBeteiligte Entity ID
notification_type: "conflict" oder "deleted"
extra_data: Zusätzliche Daten für Nachricht
"""
try:
# Hole Entity-Daten
entity = await self.espocrm.get_entity('CBeteiligte', entity_id)
name = entity.get('name', 'Unbekannt')
betnr = entity.get('betnr')
assigned_user = entity.get('assignedUserId')
# Erstelle Nachricht basierend auf Typ
if notification_type == "conflict":
message = (
f"⚠️ Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr}). "
f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen. "
f"Bitte prüfen Sie die Details."
)
elif notification_type == "deleted":
deleted_at = entity.get('advowareDeletedAt', 'unbekannt')
message = (
f"🗑️ Beteiligter '{name}' (betNr: {betnr}) wurde in Advoware gelöscht "
f"(am {deleted_at}). Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. "
f"Bitte prüfen Sie, ob dies beabsichtigt war."
)
else:
message = f"Benachrichtigung für Beteiligten '{name}'"
# Erstelle Notification in EspoCRM
notification_data = {
'type': 'message',
'message': message,
'relatedType': 'CBeteiligte',
'relatedId': entity_id,
}
# Wenn assigned user vorhanden, sende an diesen
if assigned_user:
notification_data['userId'] = assigned_user
# Sende via API
result = await self.espocrm.api_call(
'Notification',
method='POST',
data=notification_data
)
self._log(f"Notification gesendet für {entity_id}: {notification_type}")
except Exception as e:
self._log(f"Fehler beim Senden der Notification: {e}", level='error')
async def handle_advoware_deleted(
self,
entity_id: str,
error_details: str
) -> None:
"""
Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404)
Args:
entity_id: CBeteiligte Entity ID
error_details: Fehlerdetails von Advoware API
"""
try:
now = datetime.now(pytz.UTC).isoformat()
# Update Entity: Soft-Delete Flag
await self.espocrm.update_entity('CBeteiligte', entity_id, {
'syncStatus': 'deleted_in_advoware',
'advowareDeletedAt': now,
'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}"
})
self._log(f"Entity {entity_id} als deleted_in_advoware markiert")
# Sende Notification
await self.send_notification(entity_id, 'deleted')
except Exception as e:
self._log(f"Fehler beim Handle Deleted: {e}", level='error')
async def resolve_conflict_espocrm_wins(
self,
entity_id: str,
espo_entity: Dict[str, Any],
advo_entity: Dict[str, Any],
conflict_details: str,
extra_fields: Optional[Dict[str, Any]] = None
) -> None:
"""
Löst Konflikt auf: EspoCRM wins (überschreibt Advoware)
Args:
entity_id: CBeteiligte Entity ID
espo_entity: EspoCRM Entity-Daten
advo_entity: Advoware Entity-Daten
conflict_details: Details zum Konflikt
extra_fields: Zusätzliche Felder (z.B. advowareRowId)
"""
try:
# EspoCRM datetime format
now_utc = datetime.now(pytz.UTC)
espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S')
# Markiere als gelöst mit Konflikt-Info
update_data = {
'syncStatus': 'clean', # Gelöst!
'advowareLastSync': espo_datetime,
'syncErrorMessage': f'Konflikt: {conflict_details}',
'syncRetryCount': 0
}
# Merge extra fields (z.B. advowareRowId)
if extra_fields:
update_data.update(extra_fields)
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
'advowareLastSync': now,
'syncErrorMessage': f"Konflikt am {now}: {conflict_details}. EspoCRM hat gewonnen.",
'syncRetryCount': 0
})
self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins")
# Sende Notification
await self.send_notification(entity_id, 'conflict', {
'details': conflict_details
})
except Exception as e:
self._log(f"Fehler beim Resolve Conflict: {e}", level='error')