16 KiB
Beteiligte Sync - Architektur-Analyse
Stand: 7. Februar 2026
Analysiert: Bidirektionale EspoCRM ↔ Advoware Beteiligte-Synchronisation
🏗️ ARCHITEKTUR-ÜBERSICHT
Komponenten
┌─────────────────────────────────────────────────────────────────┐
│ EspoCRM (Master) │
│ Webhooks → Motia │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ WEBHOOK HANDLER (3 Endpoints) │
│ • beteiligte_create_api_step.py │
│ • beteiligte_update_api_step.py ← Loop-Prevention entfernt │
│ • beteiligte_delete_api_step.py │
└────────────────────────────┬────────────────────────────────────┘
│ emits events
▼
┌─────────────────────────────────────────────────────────────────┐
│ CENTRAL SYNC HANDLER (Event-Based) │
│ beteiligte_sync_event_step.py (~329 lines) │
│ │
│ Subscribes: vmh.beteiligte.{create,update,delete,sync_check} │
│ │
│ Flow: │
│ 1. Distributed Lock (Redis + syncStatus) │
│ 2. Fetch EspoCRM Entity │
│ 3. Route: CREATE, UPDATE/CHECK, DELETE │
│ 4. Release Lock + Update Status │
└────────────────────────────┬────────────────────────────────────┘
│
┌─────────┴─────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ handle_create │ │ handle_update │
│ │ │ │
│ • Map to Advo │ │ • Fetch Advo │
│ • POST │ │ • Compare │
│ • GET rowId │ │ • Sync/Skip │
│ • Write back │ │ • Update rowId │
└──────────────────┘ └──────────────────┘
│ │
└─────────┬─────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ SUPPORT SERVICES │
│ │
│ • BeteiligteSync (sync_utils) (~524 lines) │
│ - Locking, Compare, Merge, Conflict Resolution │
│ │
│ • BeteiligteMapper (~200 lines) │
│ - EspoCRM ↔ Advoware transformations │
│ - None-value filtering │
│ - Date format conversion │
│ │
│ • AdvowareAPI / EspoCRMAPI │
│ - HTTP clients mit Token-Caching │
└─────────────────────────────────────────────────────────────────┘
✅ STÄRKEN (Was funktioniert gut)
1. Robustheit durch Distributed Locking
# 2-stufiges Locking verhindert Race Conditions:
# 1. Redis Lock (atomic, TTL 15min)
# 2. syncStatus Update (UI visibility)
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
✅ Gut: Verhindert parallele Syncs derselben Entity
✅ Gut: TTL verhindert Deadlocks bei Crashes
✅ Gut: UI-Sichtbarkeit via syncStatus
2. Primäre Change Detection: rowId
# rowId ändert sich bei JEDEM Advoware PUT → sehr zuverlässig
if espo_rowid and advo_rowid:
advo_changed = (espo_rowid != advo_rowid)
espo_changed = (espo_modified > last_sync)
✅ Sehr gut: rowId ist Base64, ändert sich immer, keine NULLs
✅ Gut: Timestamp als Fallback vorhanden
✅ Gut: Konfliktlogik (beide geändert) implementiert
3. API-Call-Optimierung (50% Reduktion)
# VORHER: PUT + GET (2 Calls)
# NACHHER: PUT Response enthält neue rowId (1 Call)
put_result = await advoware.api_call(...)
new_rowid = put_result[0].get('rowId') # direkt aus Response!
✅ Exzellent: Keine extra GETs nach PUT nötig
✅ Gut: Funktioniert für CREATE, UPDATE, Conflict Resolution
4. Loop-Prevention auf EspoCRM-Seite
# ENTFERNT: should_skip_update() Filterung
# NEU: EspoCRM triggert keine Webhooks für rowId-Updates
✅ Gut: Vereinfacht Code erheblich
✅ Gut: Keine komplexe Filterlogik mehr nötig
✅ Gut: Vertraut auf EspoCRM-Konfiguration
5. Mapper mit Validierung
# None-Filtering verhindert EspoCRM Validation Errors
espo_data = {k: v for k, v in espo_data.items() if v is not None}
# Datumsformat-Konversion
dateOfBirth = geburtsdatum.split('T')[0] # '2001-01-05T00:00:00' → '2001-01-05'
✅ Gut: Robuste Fehlerbehandlung
✅ Gut: Dokumentiert welche Felder funktionieren (nur 8 von 14)
6. Error Handling mit Retry-Logik
MAX_SYNC_RETRIES = 5
if new_retry >= MAX_SYNC_RETRIES:
update_data['syncStatus'] = 'permanently_failed'
await self.send_notification(...)
✅ Gut: Verhindert Endlos-Retries
✅ Gut: Notification an User bei dauerhaftem Fehler
🔴 SCHWÄCHEN & VERBESSERUNGSPOTENZIALE
1. CREATE-Flow ineffizient (Extra GET nach POST)
Problem:
# Nach POST: Extra GET nur für rowId
result = await advoware.api_call(..., method='POST', data=advo_data)
new_betnr = result.get('betNr')
# Extra GET!
created_entity = await advoware.api_call(f'.../{new_betnr}', method='GET')
new_rowid = created_entity.get('rowId')
Lösung:
# POST Response sollte rowId bereits enthalten (prüfen!)
# Falls ja: Extrahiere direkt wie bei PUT
if isinstance(result, dict):
new_rowid = result.get('rowId')
elif isinstance(result, list):
new_rowid = result[0].get('rowId')
⚠️ TODO: Teste ob Advoware POST auch rowId zurückgibt (wie PUT)
2. Doppeltes rowId-Update nach EspoCRM→Advoware
Problem:
# 1. Update via release_sync_lock extra_fields
await sync_utils.release_sync_lock(entity_id, 'clean',
extra_fields={'advowareRowId': new_rowid})
# 2. Aber VORHER bereits direktes Update!
if new_rowid:
await espocrm.update_entity('CBeteiligte', entity_id, {
'advowareRowId': new_rowid
})
Lösung: Entweder/oder - nicht beides!
# OPTION A: Nur release_lock (weniger Code, eleganter)
await sync_utils.release_sync_lock(
entity_id, 'clean',
extra_fields={'advowareRowId': new_rowid}
)
# OPTION B: Direktes Update + release ohne extra_fields
await espocrm.update_entity('CBeteiligte', entity_id, {
'advowareRowId': new_rowid,
'syncStatus': 'clean',
'advowareLastSync': now()
})
await self.redis.delete(lock_key)
⚠️ Recommendation: Option A ist eleganter (1 API Call statt 2)
3. Initial Sync Special Case nicht nötig?
Problem:
# Separate Logik für "kein lastSync"
if not espo_entity.get('advowareLastSync'):
context.logger.info(f"📤 Initial Sync → ...")
# ... exakt derselbe Code wie bei espocrm_newer!
Lösung: compare_entities sollte das automatisch erkennen
# In compare_entities:
if not last_sync:
# Kein lastSync → EspoCRM neuer (always sync on first run)
return 'espocrm_newer'
✅ Eliminiert: ~30 Zeilen Duplikat-Code in event_step.py
4. Conflict Resolution immer EspoCRM Wins
Problem:
# Hardcoded: EspoCRM gewinnt immer
elif comparison == 'conflict':
context.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS")
# ... force update zu Advoware
Überlegungen:
- Für STAMMDATEN ist das sinnvoll (EspoCRM ist Master)
- Für Kontaktdaten könnte Advoware Master sein
- Für Adresse sollte vielleicht Merge stattfinden
✅ Status: OK für aktuelle Scope (nur Stammdaten)
📝 Später: Konfigurierbare Conflict Strategy pro Feld-Gruppe
5. Timestamp-Fallback verwendet geaendertAm (deprecated?)
Code:
return self.compare_timestamps(
espo_entity.get('modifiedAt'),
advo_entity.get('geaendertAm'), # ← Swagger deprecated?
espo_entity.get('advowareLastSync')
)
⚠️ TODO: Prüfe ob geaendertAm zuverlässig ist oder ob Advoware ein anderes Feld hat
6. Keine Batch-Verarbeitung für Webhooks
Problem:
# Webhook-Handler: Emittiert Event pro Entity
for entity_id in entity_ids:
await context.emit({...}) # N Events
Resultat: Bei 100 Updates → 100 separate Event-Handler-Invocations
Lösung (Optional):
# Batch-Event mit allen IDs
await context.emit({
'topic': 'vmh.beteiligte.update_batch',
'data': {
'entity_ids': list(entity_ids), # Alle auf einmal
'source': 'webhook'
}
})
# Handler verarbeitet in Parallel (mit Limit)
async def handler_batch(event_data, context):
entity_ids = event_data['entity_ids']
# Process max 10 parallel
semaphore = asyncio.Semaphore(10)
tasks = [sync_with_semaphore(id, semaphore) for id in entity_ids]
await asyncio.gather(*tasks)
📝 Entscheidung: Aktuell OK (Lock verhindert Probleme), aber bei >50 gleichzeitigen Updates könnte Batch helfen
7. Fehlende Metriken/Monitoring
Was fehlt:
- Durchschnittliche Sync-Dauer pro Entity
- Anzahl Konflikte pro Tag
- API-Call-Count (EspoCRM vs Advoware)
- Failed Sync Ratio
Lösung:
# In sync_utils oder neues monitoring_utils.py
class SyncMetrics:
async def record_sync(self, entity_id, duration, result, comparison):
await redis.hincrby('metrics:sync:daily', 'total', 1)
await redis.hincrby('metrics:sync:daily', f'result_{result}', 1)
await redis.lpush('metrics:sync:durations', duration)
🎯 VERBESSERUNGS-EMPFEHLUNGEN
Priorität 1: SOFORT (Effizienz)
-
✅ Eliminiere doppeltes rowId-Update
# NUR in release_sync_lock, nicht vorher extra UpdateImpact: -1 API Call pro EspoCRM→Advoware Update (ca. 50% weniger EspoCRM calls)
-
✅ Teste POST Response für rowId
# Falls POST auch rowId enthält: Extra GET entfernenImpact: -1 API Call pro CREATE (50% weniger bei Neuanlagen)
Priorität 2: MITTELFRISTIG (Eleganz)
-
📝 Merge Initial Sync in compare_entities
# Eliminiert Special Case, -30 ZeilenImpact: Cleaner Code, leichter wartbar
-
📝 Prüfe geaendertAm Timestamp
# Stelle sicher dass Fallback funktioniertImpact: Robustheit falls rowId mal fehlt
Priorität 3: OPTIONAL (Features)
-
💡 Batch-Processing für Webhooks
- Bei >50 gleichzeitigen Updates könnte Performance leiden
- Aktuell nicht kritisch (Lock verhindert Probleme)
-
💡 Metriken/Dashboard
- Sync-Statistiken für Monitoring
- Nicht kritisch aber nützlich für Ops
📊 PERFORMANCE-SCHÄTZUNG
Aktueller Stand (pro Entity)
CREATE:
- 1× EspoCRM GET (Entity laden)
- 1× Advoware POST
- 1× Advoware GET (rowId holen) ← OPTIMIERBAR
- 1× EspoCRM PUT (betNr + rowId schreiben) ← OPTIMIERBAR
- 1× EspoCRM PUT (syncStatus + lastSync) ← Teil von Lock-Release
Total: 5 API Calls → Mit Optimierung: 3 API Calls (-40%)
UPDATE (EspoCRM→Advoware):
- 1× EspoCRM GET (Entity laden)
- 1× Advoware GET (Vergleich)
- 1× Advoware PUT
- 1× EspoCRM PUT (rowId update) ← DOPPELT
- 1× EspoCRM PUT (Lock-Release mit rowId) ← DOPPELT
Total: 5 API Calls → Mit Optimierung: 4 API Calls (-20%)
UPDATE (Advoware→EspoCRM):
- 1× EspoCRM GET
- 1× Advoware GET
- 1× EspoCRM PUT (Daten)
- 1× EspoCRM PUT (Lock-Release)
Total: 4 API Calls → Bereits optimal
🎨 ARCHITEKTUR-BEWERTUNG
✅ Was ist ROBUST?
- Distributed Locking - Verhindert Race Conditions
- rowId Change Detection - Sehr zuverlässig
- Retry Logic - Graceful Degradation
- Error Handling - Try-catch auf allen Ebenen
- TTL auf Locks - Keine Deadlocks
✅ Was ist EFFIZIENT?
- PUT Response Parsing - Spart GET nach Updates
- None-Filtering - Verhindert unnötige Validierungsfehler
- Early Returns - "no_change" skipped sofort
- Redis Token Caching - Nicht bei jedem Call neu authentifizieren
✅ Was ist ELEGANT?
- Event-Driven Architecture - Entkoppelt Webhook von Sync-Logik
- Mapper Pattern - Transformationen zentral
- Utility Class - Wiederverwendbare Funktionen
- Descriptive Logging - Mit Emojis, sehr lesbar
⚠️ Was könnte ELEGANTER sein?
- Doppelte rowId-Updates - Redundant
- Initial Sync Special Case - Unnötige Duplikation
- Keine Config für Conflict Strategy - Hardcoded
- Fehlende Metriken - Monitoring schwierig
🏆 GESAMTBEWERTUNG
| Kategorie | Bewertung | Note |
|---|---|---|
| Robustheit | ⭐⭐⭐⭐⭐ | 9/10 - Sehr stabil durch Locking + Retry |
| Effizienz | ⭐⭐⭐⭐☆ | 7/10 - Gut, aber 2 klare Optimierungen möglich |
| Eleganz | ⭐⭐⭐⭐☆ | 8/10 - Sauber strukturiert, kleine Code-Duplikate |
| Wartbarkeit | ⭐⭐⭐⭐⭐ | 9/10 - Gut dokumentiert, klare Struktur |
| Erweiterbarkeit | ⭐⭐⭐⭐☆ | 8/10 - Event-Driven macht Extensions einfach |
Gesamt: 8.2/10 - SEHR GUT
🚀 EMPFOHLENE NÄCHSTE SCHRITTE
Sofort (1-2h Aufwand):
- ✅ Eliminiere doppeltes rowId-Update (5min)
- ✅ Teste Advoware POST Response auf rowId (15min)
- ✅ Falls ja: Entferne GET nach CREATE (5min)
Mittelfristig (2-4h):
- 📝 Merge Initial Sync in compare_entities (30min)
- 📝 Add Metrics Collection (1-2h)
Optional:
- 💡 Batch-Processing (nur wenn Performance-Problem)
- 💡 Configurable Conflict Strategy (bei neuen Requirements)
📝 FAZIT
Das System ist produktionsreif und robust.
- Stärken: Exzellentes Locking, zuverlässige Change Detection, gutes Error Handling
- Optimierungen: 2-3 kleine Fixes können 20-40% API Calls sparen
- Architektur: Sauber, wartbar, erweiterbar
Recommendation: Ship it mit den 2 Quick-Fixes (doppeltes Update + POST rowId-Check).