feat: Optimize initial sync logic and remove redundant rowId updates in sync process
This commit is contained in:
458
bitbylaw/docs/BETEILIGTE_SYNC_ANALYSIS.md
Normal file
458
bitbylaw/docs/BETEILIGTE_SYNC_ANALYSIS.md
Normal file
@@ -0,0 +1,458 @@
|
||||
# Beteiligte Sync - Architektur-Analyse
|
||||
|
||||
**Stand:** 7. Februar 2026
|
||||
**Analysiert:** Bidirektionale EspoCRM ↔ Advoware Beteiligte-Synchronisation
|
||||
|
||||
---
|
||||
|
||||
## 🏗️ ARCHITEKTUR-ÜBERSICHT
|
||||
|
||||
### Komponenten
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ EspoCRM (Master) │
|
||||
│ Webhooks → Motia │
|
||||
└────────────────────────────┬────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ WEBHOOK HANDLER (3 Endpoints) │
|
||||
│ • beteiligte_create_api_step.py │
|
||||
│ • beteiligte_update_api_step.py ← Loop-Prevention entfernt │
|
||||
│ • beteiligte_delete_api_step.py │
|
||||
└────────────────────────────┬────────────────────────────────────┘
|
||||
│ emits events
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ CENTRAL SYNC HANDLER (Event-Based) │
|
||||
│ beteiligte_sync_event_step.py (~329 lines) │
|
||||
│ │
|
||||
│ Subscribes: vmh.beteiligte.{create,update,delete,sync_check} │
|
||||
│ │
|
||||
│ Flow: │
|
||||
│ 1. Distributed Lock (Redis + syncStatus) │
|
||||
│ 2. Fetch EspoCRM Entity │
|
||||
│ 3. Route: CREATE, UPDATE/CHECK, DELETE │
|
||||
│ 4. Release Lock + Update Status │
|
||||
└────────────────────────────┬────────────────────────────────────┘
|
||||
│
|
||||
┌─────────┴─────────┐
|
||||
▼ ▼
|
||||
┌──────────────────┐ ┌──────────────────┐
|
||||
│ handle_create │ │ handle_update │
|
||||
│ │ │ │
|
||||
│ • Map to Advo │ │ • Fetch Advo │
|
||||
│ • POST │ │ • Compare │
|
||||
│ • GET rowId │ │ • Sync/Skip │
|
||||
│ • Write back │ │ • Update rowId │
|
||||
└──────────────────┘ └──────────────────┘
|
||||
│ │
|
||||
└─────────┬─────────┘
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ SUPPORT SERVICES │
|
||||
│ │
|
||||
│ • BeteiligteSync (sync_utils) (~524 lines) │
|
||||
│ - Locking, Compare, Merge, Conflict Resolution │
|
||||
│ │
|
||||
│ • BeteiligteMapper (~200 lines) │
|
||||
│ - EspoCRM ↔ Advoware transformations │
|
||||
│ - None-value filtering │
|
||||
│ - Date format conversion │
|
||||
│ │
|
||||
│ • AdvowareAPI / EspoCRMAPI │
|
||||
│ - HTTP clients mit Token-Caching │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ STÄRKEN (Was funktioniert gut)
|
||||
|
||||
### 1. **Robustheit durch Distributed Locking**
|
||||
```python
|
||||
# 2-stufiges Locking verhindert Race Conditions:
|
||||
# 1. Redis Lock (atomic, TTL 15min)
|
||||
# 2. syncStatus Update (UI visibility)
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
```
|
||||
✅ **Gut:** Verhindert parallele Syncs derselben Entity
|
||||
✅ **Gut:** TTL verhindert Deadlocks bei Crashes
|
||||
✅ **Gut:** UI-Sichtbarkeit via syncStatus
|
||||
|
||||
### 2. **Primäre Change Detection: rowId**
|
||||
```python
|
||||
# rowId ändert sich bei JEDEM Advoware PUT → sehr zuverlässig
|
||||
if espo_rowid and advo_rowid:
|
||||
advo_changed = (espo_rowid != advo_rowid)
|
||||
espo_changed = (espo_modified > last_sync)
|
||||
```
|
||||
✅ **Sehr gut:** rowId ist Base64, ändert sich immer, keine NULLs
|
||||
✅ **Gut:** Timestamp als Fallback vorhanden
|
||||
✅ **Gut:** Konfliktlogik (beide geändert) implementiert
|
||||
|
||||
### 3. **API-Call-Optimierung (50% Reduktion)**
|
||||
```python
|
||||
# VORHER: PUT + GET (2 Calls)
|
||||
# NACHHER: PUT Response enthält neue rowId (1 Call)
|
||||
put_result = await advoware.api_call(...)
|
||||
new_rowid = put_result[0].get('rowId') # direkt aus Response!
|
||||
```
|
||||
✅ **Exzellent:** Keine extra GETs nach PUT nötig
|
||||
✅ **Gut:** Funktioniert für CREATE, UPDATE, Conflict Resolution
|
||||
|
||||
### 4. **Loop-Prevention auf EspoCRM-Seite**
|
||||
```python
|
||||
# ENTFERNT: should_skip_update() Filterung
|
||||
# NEU: EspoCRM triggert keine Webhooks für rowId-Updates
|
||||
```
|
||||
✅ **Gut:** Vereinfacht Code erheblich
|
||||
✅ **Gut:** Keine komplexe Filterlogik mehr nötig
|
||||
✅ **Gut:** Vertraut auf EspoCRM-Konfiguration
|
||||
|
||||
### 5. **Mapper mit Validierung**
|
||||
```python
|
||||
# None-Filtering verhindert EspoCRM Validation Errors
|
||||
espo_data = {k: v for k, v in espo_data.items() if v is not None}
|
||||
|
||||
# Datumsformat-Konversion
|
||||
dateOfBirth = geburtsdatum.split('T')[0] # '2001-01-05T00:00:00' → '2001-01-05'
|
||||
```
|
||||
✅ **Gut:** Robuste Fehlerbehandlung
|
||||
✅ **Gut:** Dokumentiert welche Felder funktionieren (nur 8 von 14)
|
||||
|
||||
### 6. **Error Handling mit Retry-Logik**
|
||||
```python
|
||||
MAX_SYNC_RETRIES = 5
|
||||
if new_retry >= MAX_SYNC_RETRIES:
|
||||
update_data['syncStatus'] = 'permanently_failed'
|
||||
await self.send_notification(...)
|
||||
```
|
||||
✅ **Gut:** Verhindert Endlos-Retries
|
||||
✅ **Gut:** Notification an User bei dauerhaftem Fehler
|
||||
|
||||
---
|
||||
|
||||
## 🔴 SCHWÄCHEN & VERBESSERUNGSPOTENZIALE
|
||||
|
||||
### 1. **CREATE-Flow ineffizient (Extra GET nach POST)**
|
||||
|
||||
**Problem:**
|
||||
```python
|
||||
# Nach POST: Extra GET nur für rowId
|
||||
result = await advoware.api_call(..., method='POST', data=advo_data)
|
||||
new_betnr = result.get('betNr')
|
||||
|
||||
# Extra GET!
|
||||
created_entity = await advoware.api_call(f'.../{new_betnr}', method='GET')
|
||||
new_rowid = created_entity.get('rowId')
|
||||
```
|
||||
|
||||
**Lösung:**
|
||||
```python
|
||||
# POST Response sollte rowId bereits enthalten (prüfen!)
|
||||
# Falls ja: Extrahiere direkt wie bei PUT
|
||||
if isinstance(result, dict):
|
||||
new_rowid = result.get('rowId')
|
||||
elif isinstance(result, list):
|
||||
new_rowid = result[0].get('rowId')
|
||||
```
|
||||
⚠️ **TODO:** Teste ob Advoware POST auch rowId zurückgibt (wie PUT)
|
||||
|
||||
---
|
||||
|
||||
### 2. **Doppeltes rowId-Update nach EspoCRM→Advoware**
|
||||
|
||||
**Problem:**
|
||||
```python
|
||||
# 1. Update via release_sync_lock extra_fields
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean',
|
||||
extra_fields={'advowareRowId': new_rowid})
|
||||
|
||||
# 2. Aber VORHER bereits direktes Update!
|
||||
if new_rowid:
|
||||
await espocrm.update_entity('CBeteiligte', entity_id, {
|
||||
'advowareRowId': new_rowid
|
||||
})
|
||||
```
|
||||
|
||||
**Lösung:** Entweder/oder - nicht beides!
|
||||
```python
|
||||
# OPTION A: Nur release_lock (weniger Code, eleganter)
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id, 'clean',
|
||||
extra_fields={'advowareRowId': new_rowid}
|
||||
)
|
||||
|
||||
# OPTION B: Direktes Update + release ohne extra_fields
|
||||
await espocrm.update_entity('CBeteiligte', entity_id, {
|
||||
'advowareRowId': new_rowid,
|
||||
'syncStatus': 'clean',
|
||||
'advowareLastSync': now()
|
||||
})
|
||||
await self.redis.delete(lock_key)
|
||||
```
|
||||
⚠️ **Recommendation:** Option A ist eleganter (1 API Call statt 2)
|
||||
|
||||
---
|
||||
|
||||
### 3. **Initial Sync Special Case nicht nötig?**
|
||||
|
||||
**Problem:**
|
||||
```python
|
||||
# Separate Logik für "kein lastSync"
|
||||
if not espo_entity.get('advowareLastSync'):
|
||||
context.logger.info(f"📤 Initial Sync → ...")
|
||||
# ... exakt derselbe Code wie bei espocrm_newer!
|
||||
```
|
||||
|
||||
**Lösung:** compare_entities sollte das automatisch erkennen
|
||||
```python
|
||||
# In compare_entities:
|
||||
if not last_sync:
|
||||
# Kein lastSync → EspoCRM neuer (always sync on first run)
|
||||
return 'espocrm_newer'
|
||||
```
|
||||
✅ **Eliminiert:** ~30 Zeilen Duplikat-Code in event_step.py
|
||||
|
||||
---
|
||||
|
||||
### 4. **Conflict Resolution immer EspoCRM Wins**
|
||||
|
||||
**Problem:**
|
||||
```python
|
||||
# Hardcoded: EspoCRM gewinnt immer
|
||||
elif comparison == 'conflict':
|
||||
context.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS")
|
||||
# ... force update zu Advoware
|
||||
```
|
||||
|
||||
**Überlegungen:**
|
||||
- Für **STAMMDATEN** ist das sinnvoll (EspoCRM ist Master)
|
||||
- Für **Kontaktdaten** könnte Advoware Master sein
|
||||
- Für **Adresse** sollte vielleicht Merge stattfinden
|
||||
|
||||
✅ **Status:** OK für aktuelle Scope (nur Stammdaten)
|
||||
📝 **Später:** Konfigurierbare Conflict Strategy pro Feld-Gruppe
|
||||
|
||||
---
|
||||
|
||||
### 5. **Timestamp-Fallback verwendet geaendertAm (deprecated?)**
|
||||
|
||||
**Code:**
|
||||
```python
|
||||
return self.compare_timestamps(
|
||||
espo_entity.get('modifiedAt'),
|
||||
advo_entity.get('geaendertAm'), # ← Swagger deprecated?
|
||||
espo_entity.get('advowareLastSync')
|
||||
)
|
||||
```
|
||||
|
||||
⚠️ **TODO:** Prüfe ob `geaendertAm` zuverlässig ist oder ob Advoware ein anderes Feld hat
|
||||
|
||||
---
|
||||
|
||||
### 6. **Keine Batch-Verarbeitung für Webhooks**
|
||||
|
||||
**Problem:**
|
||||
```python
|
||||
# Webhook-Handler: Emittiert Event pro Entity
|
||||
for entity_id in entity_ids:
|
||||
await context.emit({...}) # N Events
|
||||
```
|
||||
|
||||
**Resultat:** Bei 100 Updates → 100 separate Event-Handler-Invocations
|
||||
|
||||
**Lösung (Optional):**
|
||||
```python
|
||||
# Batch-Event mit allen IDs
|
||||
await context.emit({
|
||||
'topic': 'vmh.beteiligte.update_batch',
|
||||
'data': {
|
||||
'entity_ids': list(entity_ids), # Alle auf einmal
|
||||
'source': 'webhook'
|
||||
}
|
||||
})
|
||||
|
||||
# Handler verarbeitet in Parallel (mit Limit)
|
||||
async def handler_batch(event_data, context):
|
||||
entity_ids = event_data['entity_ids']
|
||||
|
||||
# Process max 10 parallel
|
||||
semaphore = asyncio.Semaphore(10)
|
||||
tasks = [sync_with_semaphore(id, semaphore) for id in entity_ids]
|
||||
await asyncio.gather(*tasks)
|
||||
```
|
||||
📝 **Entscheidung:** Aktuell OK (Lock verhindert Probleme), aber bei >50 gleichzeitigen Updates könnte Batch helfen
|
||||
|
||||
---
|
||||
|
||||
### 7. **Fehlende Metriken/Monitoring**
|
||||
|
||||
**Was fehlt:**
|
||||
- Durchschnittliche Sync-Dauer pro Entity
|
||||
- Anzahl Konflikte pro Tag
|
||||
- API-Call-Count (EspoCRM vs Advoware)
|
||||
- Failed Sync Ratio
|
||||
|
||||
**Lösung:**
|
||||
```python
|
||||
# In sync_utils oder neues monitoring_utils.py
|
||||
class SyncMetrics:
|
||||
async def record_sync(self, entity_id, duration, result, comparison):
|
||||
await redis.hincrby('metrics:sync:daily', 'total', 1)
|
||||
await redis.hincrby('metrics:sync:daily', f'result_{result}', 1)
|
||||
await redis.lpush('metrics:sync:durations', duration)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 🎯 VERBESSERUNGS-EMPFEHLUNGEN
|
||||
|
||||
### Priorität 1: SOFORT (Effizienz)
|
||||
|
||||
1. **✅ Eliminiere doppeltes rowId-Update**
|
||||
```python
|
||||
# NUR in release_sync_lock, nicht vorher extra Update
|
||||
```
|
||||
Impact: -1 API Call pro EspoCRM→Advoware Update (ca. 50% weniger EspoCRM calls)
|
||||
|
||||
2. **✅ Teste POST Response für rowId**
|
||||
```python
|
||||
# Falls POST auch rowId enthält: Extra GET entfernen
|
||||
```
|
||||
Impact: -1 API Call pro CREATE (50% weniger bei Neuanlagen)
|
||||
|
||||
### Priorität 2: MITTELFRISTIG (Eleganz)
|
||||
|
||||
3. **📝 Merge Initial Sync in compare_entities**
|
||||
```python
|
||||
# Eliminiert Special Case, -30 Zeilen
|
||||
```
|
||||
Impact: Cleaner Code, leichter wartbar
|
||||
|
||||
4. **📝 Prüfe geaendertAm Timestamp**
|
||||
```python
|
||||
# Stelle sicher dass Fallback funktioniert
|
||||
```
|
||||
Impact: Robustheit falls rowId mal fehlt
|
||||
|
||||
### Priorität 3: OPTIONAL (Features)
|
||||
|
||||
5. **💡 Batch-Processing für Webhooks**
|
||||
- Bei >50 gleichzeitigen Updates könnte Performance leiden
|
||||
- Aktuell nicht kritisch (Lock verhindert Probleme)
|
||||
|
||||
6. **💡 Metriken/Dashboard**
|
||||
- Sync-Statistiken für Monitoring
|
||||
- Nicht kritisch aber nützlich für Ops
|
||||
|
||||
---
|
||||
|
||||
## 📊 PERFORMANCE-SCHÄTZUNG
|
||||
|
||||
### Aktueller Stand (pro Entity)
|
||||
|
||||
**CREATE:**
|
||||
- 1× EspoCRM GET (Entity laden)
|
||||
- 1× Advoware POST
|
||||
- 1× Advoware GET (rowId holen) ← **OPTIMIERBAR**
|
||||
- 1× EspoCRM PUT (betNr + rowId schreiben) ← **OPTIMIERBAR**
|
||||
- 1× EspoCRM PUT (syncStatus + lastSync) ← Teil von Lock-Release
|
||||
|
||||
**Total: 5 API Calls** → Mit Optimierung: **3 API Calls (-40%)**
|
||||
|
||||
**UPDATE (EspoCRM→Advoware):**
|
||||
- 1× EspoCRM GET (Entity laden)
|
||||
- 1× Advoware GET (Vergleich)
|
||||
- 1× Advoware PUT
|
||||
- 1× EspoCRM PUT (rowId update) ← **DOPPELT**
|
||||
- 1× EspoCRM PUT (Lock-Release mit rowId) ← **DOPPELT**
|
||||
|
||||
**Total: 5 API Calls** → Mit Optimierung: **4 API Calls (-20%)**
|
||||
|
||||
**UPDATE (Advoware→EspoCRM):**
|
||||
- 1× EspoCRM GET
|
||||
- 1× Advoware GET
|
||||
- 1× EspoCRM PUT (Daten)
|
||||
- 1× EspoCRM PUT (Lock-Release)
|
||||
|
||||
**Total: 4 API Calls** → Bereits optimal
|
||||
|
||||
---
|
||||
|
||||
## 🎨 ARCHITEKTUR-BEWERTUNG
|
||||
|
||||
### ✅ Was ist ROBUST?
|
||||
|
||||
1. **Distributed Locking** - Verhindert Race Conditions
|
||||
2. **rowId Change Detection** - Sehr zuverlässig
|
||||
3. **Retry Logic** - Graceful Degradation
|
||||
4. **Error Handling** - Try-catch auf allen Ebenen
|
||||
5. **TTL auf Locks** - Keine Deadlocks
|
||||
|
||||
### ✅ Was ist EFFIZIENT?
|
||||
|
||||
1. **PUT Response Parsing** - Spart GET nach Updates
|
||||
2. **None-Filtering** - Verhindert unnötige Validierungsfehler
|
||||
3. **Early Returns** - "no_change" skipped sofort
|
||||
4. **Redis Token Caching** - Nicht bei jedem Call neu authentifizieren
|
||||
|
||||
### ✅ Was ist ELEGANT?
|
||||
|
||||
1. **Event-Driven Architecture** - Entkoppelt Webhook von Sync-Logik
|
||||
2. **Mapper Pattern** - Transformationen zentral
|
||||
3. **Utility Class** - Wiederverwendbare Funktionen
|
||||
4. **Descriptive Logging** - Mit Emojis, sehr lesbar
|
||||
|
||||
### ⚠️ Was könnte ELEGANTER sein?
|
||||
|
||||
1. **Doppelte rowId-Updates** - Redundant
|
||||
2. **Initial Sync Special Case** - Unnötige Duplikation
|
||||
3. **Keine Config für Conflict Strategy** - Hardcoded
|
||||
4. **Fehlende Metriken** - Monitoring schwierig
|
||||
|
||||
---
|
||||
|
||||
## 🏆 GESAMTBEWERTUNG
|
||||
|
||||
| Kategorie | Bewertung | Note |
|
||||
|-----------|-----------|------|
|
||||
| Robustheit | ⭐⭐⭐⭐⭐ | 9/10 - Sehr stabil durch Locking + Retry |
|
||||
| Effizienz | ⭐⭐⭐⭐☆ | 7/10 - Gut, aber 2 klare Optimierungen möglich |
|
||||
| Eleganz | ⭐⭐⭐⭐☆ | 8/10 - Sauber strukturiert, kleine Code-Duplikate |
|
||||
| Wartbarkeit | ⭐⭐⭐⭐⭐ | 9/10 - Gut dokumentiert, klare Struktur |
|
||||
| Erweiterbarkeit | ⭐⭐⭐⭐☆ | 8/10 - Event-Driven macht Extensions einfach |
|
||||
|
||||
**Gesamt: 8.2/10 - SEHR GUT**
|
||||
|
||||
---
|
||||
|
||||
## 🚀 EMPFOHLENE NÄCHSTE SCHRITTE
|
||||
|
||||
### Sofort (1-2h Aufwand):
|
||||
1. ✅ Eliminiere doppeltes rowId-Update (5min)
|
||||
2. ✅ Teste Advoware POST Response auf rowId (15min)
|
||||
3. ✅ Falls ja: Entferne GET nach CREATE (5min)
|
||||
|
||||
### Mittelfristig (2-4h):
|
||||
4. 📝 Merge Initial Sync in compare_entities (30min)
|
||||
5. 📝 Add Metrics Collection (1-2h)
|
||||
|
||||
### Optional:
|
||||
6. 💡 Batch-Processing (nur wenn Performance-Problem)
|
||||
7. 💡 Configurable Conflict Strategy (bei neuen Requirements)
|
||||
|
||||
---
|
||||
|
||||
## 📝 FAZIT
|
||||
|
||||
**Das System ist produktionsreif und robust.**
|
||||
|
||||
- **Stärken:** Exzellentes Locking, zuverlässige Change Detection, gutes Error Handling
|
||||
- **Optimierungen:** 2-3 kleine Fixes können 20-40% API Calls sparen
|
||||
- **Architektur:** Sauber, wartbar, erweiterbar
|
||||
|
||||
**Recommendation:** Ship it mit den 2 Quick-Fixes (doppeltes Update + POST rowId-Check).
|
||||
Reference in New Issue
Block a user