- Added `config.py` for centralized configuration management including Sync, API, Advoware, EspoCRM, Redis, Logging, Calendar Sync, and Feature Flags. - Created `exceptions.py` with a hierarchy of custom exceptions for integration errors, API errors, sync errors, and Redis errors. - Developed `logging_utils.py` for a unified logging wrapper supporting structured logging and performance tracking. - Defined Pydantic models in `models.py` for data validation of Advoware and EspoCRM entities, including sync operation models. - Introduced `redis_client.py` for a centralized Redis client factory with connection pooling, automatic reconnection, and health checks.
667 lines
26 KiB
Python
667 lines
26 KiB
Python
"""
|
|
Beteiligte Sync Utilities
|
|
|
|
Hilfsfunktionen für Sync-Operationen:
|
|
- Distributed locking via Redis + syncStatus
|
|
- Timestamp-Vergleich mit rowId-basierter Änderungserkennung
|
|
- Konfliktauflösung (EspoCRM wins)
|
|
- EspoCRM In-App Notifications
|
|
- Soft-Delete Handling
|
|
- Retry mit Exponential Backoff
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, Tuple, Literal
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
|
|
from services.exceptions import LockAcquisitionError, SyncError, ValidationError
|
|
from services.redis_client import get_redis_client
|
|
from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds
|
|
from services.logging_utils import get_logger
|
|
|
|
import redis
|
|
|
|
# Timestamp-Vergleich Ergebnis-Typen
|
|
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
|
|
|
|
|
|
class BeteiligteSync:
|
|
"""Utility-Klasse für Beteiligte-Synchronisation"""
|
|
|
|
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
|
self.espocrm = espocrm_api
|
|
self.context = context
|
|
self.logger = get_logger('beteiligte_sync', context)
|
|
|
|
# Use provided Redis client or get from factory
|
|
self.redis = redis_client or get_redis_client(strict=False)
|
|
|
|
if not self.redis:
|
|
self.logger.error(
|
|
"⚠️ KRITISCH: Redis nicht verfügbar! "
|
|
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
|
)
|
|
|
|
# Import NotificationManager only when needed
|
|
from services.notification_utils import NotificationManager
|
|
self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context)
|
|
|
|
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
|
"""
|
|
Atomic distributed lock via Redis + syncStatus update
|
|
|
|
Args:
|
|
entity_id: EspoCRM CBeteiligte ID
|
|
|
|
Returns:
|
|
True wenn Lock erfolgreich, False wenn bereits im Sync
|
|
|
|
Raises:
|
|
SyncError: Bei kritischen Sync-Problemen
|
|
"""
|
|
try:
|
|
# STEP 1: Atomic Redis lock (prevents race conditions)
|
|
if self.redis:
|
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
|
acquired = self.redis.set(
|
|
lock_key,
|
|
"locked",
|
|
nx=True,
|
|
ex=SYNC_CONFIG.lock_ttl_seconds
|
|
)
|
|
|
|
if not acquired:
|
|
self.logger.warning(f"Redis lock bereits aktiv für {entity_id}")
|
|
return False
|
|
else:
|
|
self.logger.error(
|
|
f"⚠️ WARNUNG: Sync ohne Redis-Lock für {entity_id} - Race Condition möglich!"
|
|
)
|
|
|
|
# STEP 2: Update syncStatus (für UI visibility)
|
|
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
|
'syncStatus': 'syncing'
|
|
})
|
|
|
|
self.logger.info(f"Sync-Lock für {entity_id} erworben")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._log(f"Fehler beim Acquire Lock: {e}", level='error')
|
|
# Clean up Redis lock on error
|
|
if self.redis:
|
|
try:
|
|
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
|
self.redis.delete(lock_key)
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
async def release_sync_lock(
|
|
self,
|
|
entity_id: str,
|
|
new_status: str = 'clean',
|
|
error_message: Optional[str] = None,
|
|
increment_retry: bool = False,
|
|
extra_fields: Optional[Dict[str, Any]] = None
|
|
) -> None:
|
|
"""
|
|
Gibt Sync-Lock frei und setzt finalen Status (kombiniert mit extra fields)
|
|
|
|
Args:
|
|
entity_id: EspoCRM CBeteiligte ID
|
|
new_status: Neuer syncStatus (clean, failed, conflict, etc.)
|
|
error_message: Optional: Fehlermeldung für syncErrorMessage
|
|
increment_retry: Ob syncRetryCount erhöht werden soll
|
|
extra_fields: Optional: Zusätzliche Felder für EspoCRM update (z.B. betnr)
|
|
"""
|
|
try:
|
|
# EspoCRM datetime format: YYYY-MM-DD HH:MM:SS (keine Timezone!)
|
|
now_utc = datetime.now(pytz.UTC)
|
|
espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
update_data = {
|
|
'syncStatus': new_status,
|
|
'advowareLastSync': espo_datetime
|
|
}
|
|
|
|
if error_message:
|
|
update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars
|
|
else:
|
|
update_data['syncErrorMessage'] = None
|
|
|
|
# Handle retry count
|
|
if increment_retry:
|
|
# Hole aktuellen Retry-Count
|
|
entity = await self.espocrm.get_entity('CBeteiligte', entity_id)
|
|
current_retry = entity.get('syncRetryCount') or 0
|
|
new_retry = current_retry + 1
|
|
update_data['syncRetryCount'] = new_retry
|
|
|
|
# Exponential backoff - berechne nächsten Retry-Zeitpunkt
|
|
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
|
if new_retry <= len(backoff_minutes):
|
|
backoff_min = backoff_minutes[new_retry - 1]
|
|
else:
|
|
backoff_min = backoff_minutes[-1] # Letzte Backoff-Zeit
|
|
|
|
next_retry = now_utc + timedelta(minutes=backoff_min)
|
|
update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
self.logger.info(
|
|
f"Retry {new_retry}/{SYNC_CONFIG.max_retries}, "
|
|
f"nächster Versuch in {backoff_min} Minuten"
|
|
)
|
|
|
|
# Check max retries - mark as permanently failed
|
|
if new_retry >= SYNC_CONFIG.max_retries:
|
|
update_data['syncStatus'] = 'permanently_failed'
|
|
|
|
# Auto-Reset Timestamp für Wiederherstellung nach 24h
|
|
auto_reset_time = now_utc + timedelta(hours=SYNC_CONFIG.auto_reset_hours)
|
|
update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
await self.send_notification(
|
|
entity_id,
|
|
'error',
|
|
extra_data={
|
|
'message': (
|
|
f"Sync fehlgeschlagen nach {SYNC_CONFIG.max_retries} Versuchen. "
|
|
f"Auto-Reset in {SYNC_CONFIG.auto_reset_hours}h."
|
|
)
|
|
}
|
|
)
|
|
self.logger.error(
|
|
f"Max retries ({SYNC_CONFIG.max_retries}) erreicht für {entity_id}, "
|
|
f"Auto-Reset um {auto_reset_time}"
|
|
)
|
|
else:
|
|
update_data['syncRetryCount'] = 0
|
|
update_data['syncNextRetry'] = None
|
|
|
|
# Merge extra fields (e.g., betnr from create operation)
|
|
if extra_fields:
|
|
update_data.update(extra_fields)
|
|
|
|
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
|
|
|
self.logger.info(f"Sync-Lock released: {entity_id} → {new_status}")
|
|
|
|
# Release Redis lock
|
|
if self.redis:
|
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
|
self.redis.delete(lock_key)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Fehler beim Release Lock: {e}")
|
|
# Ensure Redis lock is released even on error
|
|
if self.redis:
|
|
try:
|
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
|
self.redis.delete(lock_key)
|
|
except:
|
|
pass
|
|
|
|
@staticmethod
|
|
def parse_timestamp(ts: Any) -> Optional[datetime]:
|
|
"""
|
|
Parse verschiedene Timestamp-Formate zu datetime
|
|
|
|
Args:
|
|
ts: String, datetime oder None
|
|
|
|
Returns:
|
|
datetime-Objekt oder None
|
|
"""
|
|
if not ts:
|
|
return None
|
|
|
|
if isinstance(ts, datetime):
|
|
return ts
|
|
|
|
if isinstance(ts, str):
|
|
# EspoCRM Format: "2026-02-07 14:30:00"
|
|
# Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z"
|
|
try:
|
|
# Entferne trailing Z falls vorhanden
|
|
ts = ts.rstrip('Z')
|
|
|
|
# Versuche verschiedene Formate
|
|
for fmt in [
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y-%m-%dT%H:%M:%S',
|
|
'%Y-%m-%d',
|
|
]:
|
|
try:
|
|
return datetime.strptime(ts, fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
# Fallback: ISO-Format
|
|
return datetime.fromisoformat(ts)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
def compare_entities(
|
|
self,
|
|
espo_entity: Dict[str, Any],
|
|
advo_entity: Dict[str, Any]
|
|
) -> TimestampResult:
|
|
"""
|
|
Vergleicht Änderungen zwischen EspoCRM und Advoware
|
|
|
|
PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update - SEHR zuverlässig!)
|
|
FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
|
|
|
|
Args:
|
|
espo_entity: EspoCRM CBeteiligte
|
|
advo_entity: Advoware Beteiligte
|
|
|
|
Returns:
|
|
"espocrm_newer": EspoCRM wurde geändert
|
|
"advoware_newer": Advoware wurde geändert
|
|
"conflict": Beide wurden geändert
|
|
"no_change": Keine Änderungen
|
|
"""
|
|
# PRIMÄR: rowId-basierte Änderungserkennung (zuverlässiger!)
|
|
espo_rowid = espo_entity.get('advowareRowId')
|
|
advo_rowid = advo_entity.get('rowId')
|
|
last_sync = espo_entity.get('advowareLastSync')
|
|
espo_modified = espo_entity.get('modifiedAt')
|
|
|
|
# Parse timestamps für Initial Sync Check
|
|
espo_ts = self.parse_timestamp(espo_modified)
|
|
advo_ts = self.parse_timestamp(advo_entity.get('geaendertAm'))
|
|
|
|
# SPECIAL CASE: Kein lastSync → Initial Sync
|
|
if not last_sync:
|
|
self._log(f"Initial Sync (kein lastSync) → Vergleiche Timestamps")
|
|
|
|
# Wenn beide Timestamps vorhanden, vergleiche sie
|
|
if espo_ts and advo_ts:
|
|
if espo_ts > advo_ts:
|
|
self._log(f"Initial Sync: EspoCRM neuer ({espo_ts} > {advo_ts})")
|
|
return 'espocrm_newer'
|
|
elif advo_ts > espo_ts:
|
|
self._log(f"Initial Sync: Advoware neuer ({advo_ts} > {espo_ts})")
|
|
return 'advoware_newer'
|
|
else:
|
|
self._log(f"Initial Sync: Beide gleich alt")
|
|
return 'no_change'
|
|
|
|
# Fallback: Wenn nur einer Timestamp hat, bevorzuge den
|
|
if espo_ts and not advo_ts:
|
|
return 'espocrm_newer'
|
|
if advo_ts and not espo_ts:
|
|
return 'advoware_newer'
|
|
|
|
# Wenn keine Timestamps verfügbar: EspoCRM bevorzugen (default)
|
|
self._log(f"Initial Sync: Keine Timestamps verfügbar → EspoCRM bevorzugt")
|
|
return 'espocrm_newer'
|
|
|
|
if espo_rowid and advo_rowid:
|
|
# Prüfe ob Advoware geändert wurde (rowId)
|
|
advo_changed = (espo_rowid != advo_rowid)
|
|
|
|
# Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync)
|
|
espo_changed = False
|
|
if espo_modified:
|
|
try:
|
|
sync_ts = self.parse_timestamp(last_sync)
|
|
if espo_ts and sync_ts:
|
|
espo_changed = (espo_ts > sync_ts)
|
|
except Exception as e:
|
|
self._log(f"Timestamp-Parse-Fehler: {e}", level='debug')
|
|
|
|
# Konfliktlogik: Beide geändert seit letztem Sync?
|
|
if advo_changed and espo_changed:
|
|
self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync")
|
|
return 'conflict'
|
|
elif advo_changed:
|
|
self._log(f"Advoware rowId geändert: {espo_rowid[:20] if espo_rowid else 'None'}... → {advo_rowid[:20] if advo_rowid else 'None'}...")
|
|
return 'advoware_newer'
|
|
elif espo_changed:
|
|
self._log(f"EspoCRM neuer (modifiedAt > lastSync)")
|
|
return 'espocrm_newer'
|
|
else:
|
|
# Weder Advoware noch EspoCRM geändert
|
|
self._log("Keine Änderungen (rowId identisch)")
|
|
return 'no_change'
|
|
|
|
# FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
|
|
self._log("rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='debug')
|
|
return self.compare_timestamps(
|
|
espo_entity.get('modifiedAt'),
|
|
advo_entity.get('geaendertAm'),
|
|
espo_entity.get('advowareLastSync')
|
|
)
|
|
|
|
def compare_timestamps(
|
|
self,
|
|
espo_modified_at: Any,
|
|
advo_geaendert_am: Any,
|
|
last_sync_ts: Any
|
|
) -> TimestampResult:
|
|
"""
|
|
Vergleicht Timestamps und bestimmt Sync-Richtung (FALLBACK wenn rowId nicht verfügbar)
|
|
|
|
Args:
|
|
espo_modified_at: EspoCRM modifiedAt
|
|
advo_geaendert_am: Advoware geaendertAm
|
|
last_sync_ts: Letzter Sync (advowareLastSync)
|
|
|
|
Returns:
|
|
"espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer
|
|
"advoware_newer": Advoware wurde nach last_sync geändert und ist neuer
|
|
"conflict": Beide wurden nach last_sync geändert
|
|
"no_change": Keine Änderungen seit last_sync
|
|
"""
|
|
espo_ts = self.parse_timestamp(espo_modified_at)
|
|
advo_ts = self.parse_timestamp(advo_geaendert_am)
|
|
sync_ts = self.parse_timestamp(last_sync_ts)
|
|
|
|
# Logging
|
|
self._log(
|
|
f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}",
|
|
level='debug'
|
|
)
|
|
|
|
# Falls kein last_sync → erster Sync, vergleiche direkt
|
|
if not sync_ts:
|
|
if not espo_ts or not advo_ts:
|
|
return "no_change"
|
|
|
|
if espo_ts > advo_ts:
|
|
return "espocrm_newer"
|
|
elif advo_ts > espo_ts:
|
|
return "advoware_newer"
|
|
else:
|
|
return "no_change"
|
|
|
|
# Check ob seit last_sync Änderungen
|
|
espo_changed = espo_ts and espo_ts > sync_ts
|
|
advo_changed = advo_ts and advo_ts > sync_ts
|
|
|
|
if espo_changed and advo_changed:
|
|
# Beide geändert seit last_sync → Konflikt
|
|
return "conflict"
|
|
elif espo_changed:
|
|
# Nur EspoCRM geändert
|
|
return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict"
|
|
elif advo_changed:
|
|
# Nur Advoware geändert
|
|
return "advoware_newer"
|
|
else:
|
|
# Keine Änderungen
|
|
return "no_change"
|
|
|
|
def merge_for_advoware_put(
|
|
self,
|
|
advo_entity: Dict[str, Any],
|
|
espo_entity: Dict[str, Any],
|
|
mapper
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Merged EspoCRM updates mit Advoware entity für PUT operation
|
|
|
|
Advoware benötigt vollständige Objekte für PUT (Read-Modify-Write pattern).
|
|
Diese Funktion merged die gemappten EspoCRM-Updates in das bestehende
|
|
Advoware-Objekt.
|
|
|
|
Args:
|
|
advo_entity: Aktuelles Advoware entity (vollständiges Objekt)
|
|
espo_entity: EspoCRM entity mit Updates
|
|
mapper: BeteiligteMapper instance
|
|
|
|
Returns:
|
|
Merged dict für Advoware PUT
|
|
"""
|
|
# Map EspoCRM → Advoware (nur Stammdaten)
|
|
advo_updates = mapper.map_cbeteiligte_to_advoware(espo_entity)
|
|
|
|
# Merge: Advoware entity als Base, überschreibe mit EspoCRM updates
|
|
merged = {**advo_entity, **advo_updates}
|
|
|
|
# Logging
|
|
self._log(
|
|
f"📝 Merge: {len(advo_updates)} Stammdaten-Felder → {len(merged)} Gesamt-Felder",
|
|
level='info'
|
|
)
|
|
self._log(
|
|
f" Gesynct: {', '.join(advo_updates.keys())}",
|
|
level='debug'
|
|
)
|
|
|
|
return merged
|
|
|
|
async def send_notification(
|
|
self,
|
|
entity_id: str,
|
|
notification_type: Literal["conflict", "deleted", "error"],
|
|
extra_data: Optional[Dict[str, Any]] = None
|
|
) -> None:
|
|
"""
|
|
Sendet EspoCRM Notification via NotificationManager
|
|
|
|
Args:
|
|
entity_id: CBeteiligte Entity ID
|
|
notification_type: "conflict", "deleted" oder "error"
|
|
extra_data: Zusätzliche Daten für Nachricht
|
|
"""
|
|
try:
|
|
# Hole Entity-Daten
|
|
entity = await self.espocrm.get_entity('CBeteiligte', entity_id)
|
|
name = entity.get('name', 'Unbekannt')
|
|
betnr = entity.get('betnr')
|
|
|
|
# Map notification_type zu action_type
|
|
if notification_type == "conflict":
|
|
action_type = 'sync_conflict'
|
|
details = {
|
|
'message': f"Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr})",
|
|
'description': (
|
|
f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen.\n\n"
|
|
f"Bitte prüfen Sie die Details und stellen Sie sicher, dass die Daten korrekt sind."
|
|
),
|
|
'entity_name': name,
|
|
'betnr': betnr,
|
|
'priority': 'Normal'
|
|
}
|
|
elif notification_type == "deleted":
|
|
deleted_at = entity.get('advowareDeletedAt', 'unbekannt')
|
|
action_type = 'entity_deleted_in_source'
|
|
details = {
|
|
'message': f"Beteiligter '{name}' wurde in Advoware gelöscht",
|
|
'description': (
|
|
f"Der Beteiligte '{name}' (betNr: {betnr}) wurde am {deleted_at} "
|
|
f"in Advoware gelöscht.\n\n"
|
|
f"Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. "
|
|
f"Bitte prüfen Sie, ob dies beabsichtigt war."
|
|
),
|
|
'entity_name': name,
|
|
'betnr': betnr,
|
|
'deleted_at': deleted_at,
|
|
'priority': 'High'
|
|
}
|
|
else: # error
|
|
action_type = 'general_manual_action'
|
|
details = {
|
|
'message': f"Benachrichtigung für Beteiligten '{name}'",
|
|
'entity_name': name,
|
|
'betnr': betnr
|
|
}
|
|
|
|
# Merge extra_data if provided
|
|
if extra_data:
|
|
details.update(extra_data)
|
|
|
|
# Sende via NotificationManager
|
|
await self.notification_manager.notify_manual_action_required(
|
|
entity_type='CBeteiligte',
|
|
entity_id=entity_id,
|
|
action_type=action_type,
|
|
details=details,
|
|
create_task=True
|
|
)
|
|
|
|
self._log(f"Notification via NotificationManager gesendet: {notification_type} für {entity_id}")
|
|
|
|
except Exception as e:
|
|
self._log(f"Fehler beim Senden der Notification: {e}", level='error')
|
|
|
|
async def handle_advoware_deleted(
|
|
self,
|
|
entity_id: str,
|
|
error_details: str
|
|
) -> None:
|
|
"""
|
|
Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404)
|
|
|
|
Args:
|
|
entity_id: CBeteiligte Entity ID
|
|
error_details: Fehlerdetails von Advoware API
|
|
"""
|
|
try:
|
|
now = datetime.now(pytz.UTC).isoformat()
|
|
|
|
# Update Entity: Soft-Delete Flag
|
|
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
|
'syncStatus': 'deleted_in_advoware',
|
|
'advowareDeletedAt': now,
|
|
'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}"
|
|
})
|
|
|
|
self._log(f"Entity {entity_id} als deleted_in_advoware markiert")
|
|
|
|
# Sende Notification
|
|
await self.send_notification(entity_id, 'deleted')
|
|
|
|
except Exception as e:
|
|
self._log(f"Fehler beim Handle Deleted: {e}", level='error')
|
|
|
|
async def validate_sync_result(
|
|
self,
|
|
entity_id: str,
|
|
betnr: int,
|
|
mapper,
|
|
direction: str = 'to_advoware'
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validiert Sync-Ergebnis durch Round-Trip Verification
|
|
|
|
Args:
|
|
entity_id: EspoCRM CBeteiligte ID
|
|
betnr: Advoware betNr
|
|
mapper: BeteiligteMapper instance
|
|
direction: 'to_advoware' oder 'to_espocrm'
|
|
|
|
Returns:
|
|
(success: bool, error_message: Optional[str])
|
|
"""
|
|
try:
|
|
self._log(f"🔍 Validiere Sync-Ergebnis (direction={direction})...", level='debug')
|
|
|
|
# Lade beide Entities erneut
|
|
espo_entity = await self.espocrm.get_entity('CBeteiligte', entity_id)
|
|
|
|
from services.advoware import AdvowareAPI
|
|
advoware_api = AdvowareAPI(self.context)
|
|
advo_result = await advoware_api.api_call(f'api/v1/advonet/Beteiligte/{betnr}', method='GET')
|
|
|
|
if isinstance(advo_result, list):
|
|
advo_entity = advo_result[0] if advo_result else None
|
|
else:
|
|
advo_entity = advo_result
|
|
|
|
if not advo_entity:
|
|
return False, f"Advoware Entity {betnr} nicht gefunden nach Sync"
|
|
|
|
# Validiere Stammdaten
|
|
critical_fields = ['name', 'rechtsform']
|
|
differences = []
|
|
|
|
if direction == 'to_advoware':
|
|
# EspoCRM → Advoware: Prüfe ob Advoware die EspoCRM-Werte hat
|
|
advo_mapped = mapper.map_cbeteiligte_to_advoware(espo_entity)
|
|
|
|
for field in critical_fields:
|
|
espo_val = advo_mapped.get(field)
|
|
advo_val = advo_entity.get(field)
|
|
|
|
if espo_val != advo_val:
|
|
differences.append(f"{field}: expected '{espo_val}', got '{advo_val}'")
|
|
|
|
elif direction == 'to_espocrm':
|
|
# Advoware → EspoCRM: Prüfe ob EspoCRM die Advoware-Werte hat
|
|
espo_mapped = mapper.map_advoware_to_cbeteiligte(advo_entity)
|
|
|
|
for field in critical_fields:
|
|
advo_val = espo_mapped.get(field)
|
|
espo_val = espo_entity.get(field)
|
|
|
|
if advo_val != espo_val:
|
|
differences.append(f"{field}: expected '{advo_val}', got '{espo_val}'")
|
|
|
|
if differences:
|
|
error_msg = f"Validation failed: {', '.join(differences)}"
|
|
self._log(f"❌ {error_msg}", level='error')
|
|
return False, error_msg
|
|
|
|
self._log(f"✅ Validation erfolgreich", level='debug')
|
|
return True, None
|
|
|
|
except Exception as e:
|
|
self._log(f"⚠️ Validation error: {e}", level='error')
|
|
return False, f"Validation exception: {str(e)}"
|
|
|
|
async def resolve_conflict_espocrm_wins(
|
|
self,
|
|
entity_id: str,
|
|
espo_entity: Dict[str, Any],
|
|
advo_entity: Dict[str, Any],
|
|
conflict_details: str,
|
|
extra_fields: Optional[Dict[str, Any]] = None
|
|
) -> None:
|
|
"""
|
|
Löst Konflikt auf: EspoCRM wins (überschreibt Advoware)
|
|
|
|
Args:
|
|
entity_id: CBeteiligte Entity ID
|
|
espo_entity: EspoCRM Entity-Daten
|
|
advo_entity: Advoware Entity-Daten
|
|
conflict_details: Details zum Konflikt
|
|
extra_fields: Zusätzliche Felder (z.B. advowareRowId)
|
|
"""
|
|
try:
|
|
# EspoCRM datetime format
|
|
now_utc = datetime.now(pytz.UTC)
|
|
espo_datetime = now_utc.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
# Markiere als gelöst mit Konflikt-Info
|
|
update_data = {
|
|
'syncStatus': 'clean', # Gelöst!
|
|
'advowareLastSync': espo_datetime,
|
|
'syncErrorMessage': f'Konflikt: {conflict_details}',
|
|
'syncRetryCount': 0
|
|
}
|
|
|
|
# Merge extra fields (z.B. advowareRowId)
|
|
if extra_fields:
|
|
update_data.update(extra_fields)
|
|
|
|
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
|
|
|
self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins")
|
|
|
|
# Sende Notification
|
|
await self.send_notification(entity_id, 'conflict', {
|
|
'details': conflict_details
|
|
})
|
|
|
|
except Exception as e:
|
|
self._log(f"Fehler beim Resolve Conflict: {e}", level='error')
|