Compare commits

...

3 Commits

6 changed files with 478 additions and 88 deletions

View File

@@ -0,0 +1,458 @@
# Beteiligte Sync - Architektur-Analyse
**Stand:** 7. Februar 2026
**Analysiert:** Bidirektionale EspoCRM ↔ Advoware Beteiligte-Synchronisation
---
## 🏗️ ARCHITEKTUR-ÜBERSICHT
### Komponenten
```
┌─────────────────────────────────────────────────────────────────┐
│ EspoCRM (Master) │
│ Webhooks → Motia │
└────────────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ WEBHOOK HANDLER (3 Endpoints) │
│ • beteiligte_create_api_step.py │
│ • beteiligte_update_api_step.py ← Loop-Prevention entfernt │
│ • beteiligte_delete_api_step.py │
└────────────────────────────┬────────────────────────────────────┘
│ emits events
┌─────────────────────────────────────────────────────────────────┐
│ CENTRAL SYNC HANDLER (Event-Based) │
│ beteiligte_sync_event_step.py (~329 lines) │
│ │
│ Subscribes: vmh.beteiligte.{create,update,delete,sync_check} │
│ │
│ Flow: │
│ 1. Distributed Lock (Redis + syncStatus) │
│ 2. Fetch EspoCRM Entity │
│ 3. Route: CREATE, UPDATE/CHECK, DELETE │
│ 4. Release Lock + Update Status │
└────────────────────────────┬────────────────────────────────────┘
┌─────────┴─────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ handle_create │ │ handle_update │
│ │ │ │
│ • Map to Advo │ │ • Fetch Advo │
│ • POST │ │ • Compare │
│ • GET rowId │ │ • Sync/Skip │
│ • Write back │ │ • Update rowId │
└──────────────────┘ └──────────────────┘
│ │
└─────────┬─────────┘
┌─────────────────────────────────────────────────────────────────┐
│ SUPPORT SERVICES │
│ │
│ • BeteiligteSync (sync_utils) (~524 lines) │
│ - Locking, Compare, Merge, Conflict Resolution │
│ │
│ • BeteiligteMapper (~200 lines) │
│ - EspoCRM ↔ Advoware transformations │
│ - None-value filtering │
│ - Date format conversion │
│ │
│ • AdvowareAPI / EspoCRMAPI │
│ - HTTP clients mit Token-Caching │
└─────────────────────────────────────────────────────────────────┘
```
---
## ✅ STÄRKEN (Was funktioniert gut)
### 1. **Robustheit durch Distributed Locking**
```python
# 2-stufiges Locking verhindert Race Conditions:
# 1. Redis Lock (atomic, TTL 15min)
# 2. syncStatus Update (UI visibility)
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
```
**Gut:** Verhindert parallele Syncs derselben Entity
**Gut:** TTL verhindert Deadlocks bei Crashes
**Gut:** UI-Sichtbarkeit via syncStatus
### 2. **Primäre Change Detection: rowId**
```python
# rowId ändert sich bei JEDEM Advoware PUT → sehr zuverlässig
if espo_rowid and advo_rowid:
advo_changed = (espo_rowid != advo_rowid)
espo_changed = (espo_modified > last_sync)
```
**Sehr gut:** rowId ist Base64, ändert sich immer, keine NULLs
**Gut:** Timestamp als Fallback vorhanden
**Gut:** Konfliktlogik (beide geändert) implementiert
### 3. **API-Call-Optimierung (50% Reduktion)**
```python
# VORHER: PUT + GET (2 Calls)
# NACHHER: PUT Response enthält neue rowId (1 Call)
put_result = await advoware.api_call(...)
new_rowid = put_result[0].get('rowId') # direkt aus Response!
```
**Exzellent:** Keine extra GETs nach PUT nötig
**Gut:** Funktioniert für CREATE, UPDATE, Conflict Resolution
### 4. **Loop-Prevention auf EspoCRM-Seite**
```python
# ENTFERNT: should_skip_update() Filterung
# NEU: EspoCRM triggert keine Webhooks für rowId-Updates
```
**Gut:** Vereinfacht Code erheblich
**Gut:** Keine komplexe Filterlogik mehr nötig
**Gut:** Vertraut auf EspoCRM-Konfiguration
### 5. **Mapper mit Validierung**
```python
# None-Filtering verhindert EspoCRM Validation Errors
espo_data = {k: v for k, v in espo_data.items() if v is not None}
# Datumsformat-Konversion
dateOfBirth = geburtsdatum.split('T')[0] # '2001-01-05T00:00:00' → '2001-01-05'
```
**Gut:** Robuste Fehlerbehandlung
**Gut:** Dokumentiert welche Felder funktionieren (nur 8 von 14)
### 6. **Error Handling mit Retry-Logik**
```python
MAX_SYNC_RETRIES = 5
if new_retry >= MAX_SYNC_RETRIES:
update_data['syncStatus'] = 'permanently_failed'
await self.send_notification(...)
```
**Gut:** Verhindert Endlos-Retries
**Gut:** Notification an User bei dauerhaftem Fehler
---
## 🔴 SCHWÄCHEN & VERBESSERUNGSPOTENZIALE
### 1. **CREATE-Flow ineffizient (Extra GET nach POST)**
**Problem:**
```python
# Nach POST: Extra GET nur für rowId
result = await advoware.api_call(..., method='POST', data=advo_data)
new_betnr = result.get('betNr')
# Extra GET!
created_entity = await advoware.api_call(f'.../{new_betnr}', method='GET')
new_rowid = created_entity.get('rowId')
```
**Lösung:**
```python
# POST Response sollte rowId bereits enthalten (prüfen!)
# Falls ja: Extrahiere direkt wie bei PUT
if isinstance(result, dict):
new_rowid = result.get('rowId')
elif isinstance(result, list):
new_rowid = result[0].get('rowId')
```
⚠️ **TODO:** Teste ob Advoware POST auch rowId zurückgibt (wie PUT)
---
### 2. **Doppeltes rowId-Update nach EspoCRM→Advoware**
**Problem:**
```python
# 1. Update via release_sync_lock extra_fields
await sync_utils.release_sync_lock(entity_id, 'clean',
extra_fields={'advowareRowId': new_rowid})
# 2. Aber VORHER bereits direktes Update!
if new_rowid:
await espocrm.update_entity('CBeteiligte', entity_id, {
'advowareRowId': new_rowid
})
```
**Lösung:** Entweder/oder - nicht beides!
```python
# OPTION A: Nur release_lock (weniger Code, eleganter)
await sync_utils.release_sync_lock(
entity_id, 'clean',
extra_fields={'advowareRowId': new_rowid}
)
# OPTION B: Direktes Update + release ohne extra_fields
await espocrm.update_entity('CBeteiligte', entity_id, {
'advowareRowId': new_rowid,
'syncStatus': 'clean',
'advowareLastSync': now()
})
await self.redis.delete(lock_key)
```
⚠️ **Recommendation:** Option A ist eleganter (1 API Call statt 2)
---
### 3. **Initial Sync Special Case nicht nötig?**
**Problem:**
```python
# Separate Logik für "kein lastSync"
if not espo_entity.get('advowareLastSync'):
context.logger.info(f"📤 Initial Sync → ...")
# ... exakt derselbe Code wie bei espocrm_newer!
```
**Lösung:** compare_entities sollte das automatisch erkennen
```python
# In compare_entities:
if not last_sync:
# Kein lastSync → EspoCRM neuer (always sync on first run)
return 'espocrm_newer'
```
**Eliminiert:** ~30 Zeilen Duplikat-Code in event_step.py
---
### 4. **Conflict Resolution immer EspoCRM Wins**
**Problem:**
```python
# Hardcoded: EspoCRM gewinnt immer
elif comparison == 'conflict':
context.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS")
# ... force update zu Advoware
```
**Überlegungen:**
- Für **STAMMDATEN** ist das sinnvoll (EspoCRM ist Master)
- Für **Kontaktdaten** könnte Advoware Master sein
- Für **Adresse** sollte vielleicht Merge stattfinden
**Status:** OK für aktuelle Scope (nur Stammdaten)
📝 **Später:** Konfigurierbare Conflict Strategy pro Feld-Gruppe
---
### 5. **Timestamp-Fallback verwendet geaendertAm (deprecated?)**
**Code:**
```python
return self.compare_timestamps(
espo_entity.get('modifiedAt'),
advo_entity.get('geaendertAm'), # ← Swagger deprecated?
espo_entity.get('advowareLastSync')
)
```
⚠️ **TODO:** Prüfe ob `geaendertAm` zuverlässig ist oder ob Advoware ein anderes Feld hat
---
### 6. **Keine Batch-Verarbeitung für Webhooks**
**Problem:**
```python
# Webhook-Handler: Emittiert Event pro Entity
for entity_id in entity_ids:
await context.emit({...}) # N Events
```
**Resultat:** Bei 100 Updates → 100 separate Event-Handler-Invocations
**Lösung (Optional):**
```python
# Batch-Event mit allen IDs
await context.emit({
'topic': 'vmh.beteiligte.update_batch',
'data': {
'entity_ids': list(entity_ids), # Alle auf einmal
'source': 'webhook'
}
})
# Handler verarbeitet in Parallel (mit Limit)
async def handler_batch(event_data, context):
entity_ids = event_data['entity_ids']
# Process max 10 parallel
semaphore = asyncio.Semaphore(10)
tasks = [sync_with_semaphore(id, semaphore) for id in entity_ids]
await asyncio.gather(*tasks)
```
📝 **Entscheidung:** Aktuell OK (Lock verhindert Probleme), aber bei >50 gleichzeitigen Updates könnte Batch helfen
---
### 7. **Fehlende Metriken/Monitoring**
**Was fehlt:**
- Durchschnittliche Sync-Dauer pro Entity
- Anzahl Konflikte pro Tag
- API-Call-Count (EspoCRM vs Advoware)
- Failed Sync Ratio
**Lösung:**
```python
# In sync_utils oder neues monitoring_utils.py
class SyncMetrics:
async def record_sync(self, entity_id, duration, result, comparison):
await redis.hincrby('metrics:sync:daily', 'total', 1)
await redis.hincrby('metrics:sync:daily', f'result_{result}', 1)
await redis.lpush('metrics:sync:durations', duration)
```
---
## 🎯 VERBESSERUNGS-EMPFEHLUNGEN
### Priorität 1: SOFORT (Effizienz)
1. **✅ Eliminiere doppeltes rowId-Update**
```python
# NUR in release_sync_lock, nicht vorher extra Update
```
Impact: -1 API Call pro EspoCRM→Advoware Update (ca. 50% weniger EspoCRM calls)
2. **✅ Teste POST Response für rowId**
```python
# Falls POST auch rowId enthält: Extra GET entfernen
```
Impact: -1 API Call pro CREATE (50% weniger bei Neuanlagen)
### Priorität 2: MITTELFRISTIG (Eleganz)
3. **📝 Merge Initial Sync in compare_entities**
```python
# Eliminiert Special Case, -30 Zeilen
```
Impact: Cleaner Code, leichter wartbar
4. **📝 Prüfe geaendertAm Timestamp**
```python
# Stelle sicher dass Fallback funktioniert
```
Impact: Robustheit falls rowId mal fehlt
### Priorität 3: OPTIONAL (Features)
5. **💡 Batch-Processing für Webhooks**
- Bei >50 gleichzeitigen Updates könnte Performance leiden
- Aktuell nicht kritisch (Lock verhindert Probleme)
6. **💡 Metriken/Dashboard**
- Sync-Statistiken für Monitoring
- Nicht kritisch aber nützlich für Ops
---
## 📊 PERFORMANCE-SCHÄTZUNG
### Aktueller Stand (pro Entity)
**CREATE:**
- 1× EspoCRM GET (Entity laden)
- 1× Advoware POST
- 1× Advoware GET (rowId holen) ← **OPTIMIERBAR**
- 1× EspoCRM PUT (betNr + rowId schreiben) ← **OPTIMIERBAR**
- 1× EspoCRM PUT (syncStatus + lastSync) ← Teil von Lock-Release
**Total: 5 API Calls** → Mit Optimierung: **3 API Calls (-40%)**
**UPDATE (EspoCRM→Advoware):**
- 1× EspoCRM GET (Entity laden)
- 1× Advoware GET (Vergleich)
- 1× Advoware PUT
- 1× EspoCRM PUT (rowId update) ← **DOPPELT**
- 1× EspoCRM PUT (Lock-Release mit rowId) ← **DOPPELT**
**Total: 5 API Calls** → Mit Optimierung: **4 API Calls (-20%)**
**UPDATE (Advoware→EspoCRM):**
- 1× EspoCRM GET
- 1× Advoware GET
- 1× EspoCRM PUT (Daten)
- 1× EspoCRM PUT (Lock-Release)
**Total: 4 API Calls** → Bereits optimal
---
## 🎨 ARCHITEKTUR-BEWERTUNG
### ✅ Was ist ROBUST?
1. **Distributed Locking** - Verhindert Race Conditions
2. **rowId Change Detection** - Sehr zuverlässig
3. **Retry Logic** - Graceful Degradation
4. **Error Handling** - Try-catch auf allen Ebenen
5. **TTL auf Locks** - Keine Deadlocks
### ✅ Was ist EFFIZIENT?
1. **PUT Response Parsing** - Spart GET nach Updates
2. **None-Filtering** - Verhindert unnötige Validierungsfehler
3. **Early Returns** - "no_change" skipped sofort
4. **Redis Token Caching** - Nicht bei jedem Call neu authentifizieren
### ✅ Was ist ELEGANT?
1. **Event-Driven Architecture** - Entkoppelt Webhook von Sync-Logik
2. **Mapper Pattern** - Transformationen zentral
3. **Utility Class** - Wiederverwendbare Funktionen
4. **Descriptive Logging** - Mit Emojis, sehr lesbar
### ⚠️ Was könnte ELEGANTER sein?
1. **Doppelte rowId-Updates** - Redundant
2. **Initial Sync Special Case** - Unnötige Duplikation
3. **Keine Config für Conflict Strategy** - Hardcoded
4. **Fehlende Metriken** - Monitoring schwierig
---
## 🏆 GESAMTBEWERTUNG
| Kategorie | Bewertung | Note |
|-----------|-----------|------|
| Robustheit | ⭐⭐⭐⭐⭐ | 9/10 - Sehr stabil durch Locking + Retry |
| Effizienz | ⭐⭐⭐⭐☆ | 7/10 - Gut, aber 2 klare Optimierungen möglich |
| Eleganz | ⭐⭐⭐⭐☆ | 8/10 - Sauber strukturiert, kleine Code-Duplikate |
| Wartbarkeit | ⭐⭐⭐⭐⭐ | 9/10 - Gut dokumentiert, klare Struktur |
| Erweiterbarkeit | ⭐⭐⭐⭐☆ | 8/10 - Event-Driven macht Extensions einfach |
**Gesamt: 8.2/10 - SEHR GUT**
---
## 🚀 EMPFOHLENE NÄCHSTE SCHRITTE
### Sofort (1-2h Aufwand):
1. ✅ Eliminiere doppeltes rowId-Update (5min)
2. ✅ Teste Advoware POST Response auf rowId (15min)
3. ✅ Falls ja: Entferne GET nach CREATE (5min)
### Mittelfristig (2-4h):
4. 📝 Merge Initial Sync in compare_entities (30min)
5. 📝 Add Metrics Collection (1-2h)
### Optional:
6. 💡 Batch-Processing (nur wenn Performance-Problem)
7. 💡 Configurable Conflict Strategy (bei neuen Requirements)
---
## 📝 FAZIT
**Das System ist produktionsreif und robust.**
- **Stärken:** Exzellentes Locking, zuverlässige Change Detection, gutes Error Handling
- **Optimierungen:** 2-3 kleine Fixes können 20-40% API Calls sparen
- **Architektur:** Sauber, wartbar, erweiterbar
**Recommendation:** Ship it mit den 2 Quick-Fixes (doppeltes Update + POST rowId-Check).

View File

@@ -244,7 +244,12 @@ class BeteiligteSync:
last_sync = espo_entity.get('advowareLastSync')
espo_modified = espo_entity.get('modifiedAt')
if espo_rowid and advo_rowid and last_sync:
# SPECIAL CASE: Kein lastSync → Initial Sync (EspoCRM→Advoware)
if not last_sync:
self._log(f"Initial Sync (kein lastSync) → EspoCRM neuer")
return 'espocrm_newer'
if espo_rowid and advo_rowid:
# Prüfe ob Advoware geändert wurde (rowId)
advo_changed = (espo_rowid != advo_rowid)

View File

@@ -140,10 +140,11 @@ class BeteiligteMapper:
if zusatz:
espo_data['zusatz'] = zusatz
# GEBURTSDATUM
# GEBURTSDATUM (nur Datum-Teil ohne Zeit)
geburtsdatum = advo_entity.get('geburtsdatum')
if geburtsdatum:
espo_data['dateOfBirth'] = geburtsdatum
# Advoware gibt '2001-01-05T00:00:00', EspoCRM will nur '2001-01-05'
espo_data['dateOfBirth'] = geburtsdatum.split('T')[0] if 'T' in geburtsdatum else geburtsdatum
# HINWEIS: handelsRegisterNummer und registergericht werden NICHT gemappt
# Advoware ignoriert diese Felder im PUT (trotz Swagger Schema)
@@ -151,6 +152,9 @@ class BeteiligteMapper:
logger.debug(f"Mapped to EspoCRM STAMMDATEN: name={espo_data.get('name')}")
# WICHTIG: Entferne None-Werte (EspoCRM mag keine expliziten None bei required fields)
espo_data = {k: v for k, v in espo_data.items() if v is not None}
return espo_data
@staticmethod

View File

@@ -204,35 +204,6 @@ async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_u
context.logger.info(f"⏱️ Vergleich: {comparison}")
# SPECIAL: Wenn LastSync null → immer von EspoCRM syncen (initial sync)
if not espo_entity.get('advowareLastSync'):
context.logger.info(f"📤 Initial Sync → EspoCRM STAMMDATEN zu Advoware")
# OPTIMIERT: Use merge utility (reduces code duplication)
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
put_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
data=merged_data
)
# Extrahiere neue rowId aus PUT Response (spart extra GET!)
new_rowid = None
if isinstance(put_result, list) and len(put_result) > 0:
new_rowid = put_result[0].get('rowId')
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
# Speichere neue rowId für zukünftige Vergleiche
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': new_rowid}
)
context.logger.info(f"✅ Advoware aktualisiert (initial sync), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
return
# KEIN SYNC NÖTIG
if comparison == 'no_change':
context.logger.info(f"✅ Keine Änderungen, Sync übersprungen")
@@ -259,12 +230,13 @@ async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_u
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
# Release Lock + Update rowId in einem Call (effizienter!)
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': new_rowid}
)
context.logger.info(f"✅ Advoware aktualisiert, neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
context.logger.info(f"✅ Advoware aktualisiert, rowId in EspoCRM geschrieben: {new_rowid[:20] if new_rowid else 'N/A'}...")
# ADVOWARE NEUER → Update EspoCRM
elif comparison == 'advoware_newer':

View File

@@ -1,44 +1,8 @@
import json
import datetime
def should_skip_update(entity_data):
"""
Prüft ob Update gefiltert werden soll (verhindert Webhook-Loop)
SKIP wenn:
- Nur Sync-Felder geändert UND
- syncStatus ist "clean" oder "syncing" (normale Sync-Completion)
EMIT wenn:
- Echte Datenänderung (nicht nur Sync-Felder) ODER
- syncStatus ist "dirty", "failed", "pending_sync" (braucht Sync)
"""
if not isinstance(entity_data, dict):
return False
# Felder die von Sync-Handler gesetzt werden
sync_fields = {'syncStatus', 'advowareLastSync', 'syncErrorMessage', 'syncRetryCount'}
# Meta-Felder die immer vorhanden sind
meta_fields = {'id', 'modifiedAt', 'modifiedById', 'modifiedByName'}
# Alle ignorierbaren Felder
ignorable = sync_fields | meta_fields
# Prüfe ob es relevante (nicht-sync) Felder gibt
entity_keys = set(entity_data.keys())
relevant_keys = entity_keys - ignorable
# Wenn echte Datenänderung → Emit (nicht skippen)
if len(relevant_keys) > 0:
return False
# Nur Sync-Felder vorhanden → Prüfe syncStatus
sync_status = entity_data.get('syncStatus')
# Skip nur wenn Status "clean" oder "syncing" (normale Completion)
# Emit wenn "dirty", "failed", "pending_sync" (braucht Sync)
should_skip = sync_status in ['clean', 'syncing']
return should_skip
# HINWEIS: Loop-Prevention wurde auf EspoCRM-Seite implementiert
# rowId-Updates triggern keine Webhooks mehr, daher keine Filterung nötig
config = {
'type': 'api',
@@ -57,29 +21,16 @@ async def handler(req, context):
context.logger.info("VMH Webhook Beteiligte Update empfangen")
context.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch (filtere Sync-Only-Updates)
# Sammle alle IDs aus dem Batch
entity_ids = set()
filtered_count = 0
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
# Prüfe ob Update gefiltert werden soll (verhindert Loop)
if should_skip_update(entity):
context.logger.info(f"Sync-Completion gefiltert: {entity['id']} (syncStatus={entity.get('syncStatus')})")
filtered_count += 1
continue
entity_ids.add(entity['id'])
elif isinstance(payload, dict) and 'id' in payload:
if not should_skip_update(payload):
entity_ids.add(payload['id'])
else:
context.logger.info(f"Sync-Completion gefiltert: {payload['id']} (syncStatus={payload.get('syncStatus')})")
filtered_count += 1
if filtered_count > 0:
context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden ({filtered_count} Sync-Completions gefiltert)")
else:
context.logger.info(f"{len(entity_ids)} IDs zum Update-Sync gefunden")
# Emittiere Events direkt (Deduplizierung erfolgt im Event-Handler via Lock)

2
bitbylaw/types.d.ts vendored
View File

@@ -12,7 +12,7 @@ declare module 'motia' {
}
interface Handlers {
'VMH Beteiligte Sync': EventHandler<never, never>
'VMH Beteiligte Sync Handler': EventHandler<never, never>
'VMH Webhook Beteiligte Update': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.update'; data: never }>
'VMH Webhook Beteiligte Delete': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.delete'; data: never }>
'VMH Webhook Beteiligte Create': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'vmh.beteiligte.create'; data: never }>