- Add SYNC_STRATEGY_ESPOCRM_BASED.md detailing the sync flows and status management. - Create utilities for sync operations in services/beteiligte_sync_utils.py, including locking, timestamp comparison, conflict resolution, and notification handling. - Implement entity mapping between EspoCRM and Advoware in services/espocrm_mapper.py. - Develop a cron job for periodic sync checks in steps/vmh/beteiligte_sync_cron_step.py, emitting events for entities needing synchronization.
23 KiB
Sync-Strategie: EspoCRM ↔ Advoware Beteiligte
Analysiert am: 2026-02-07
Anforderungen:
- a) EspoCRM Update → Advoware Update
- b) Bi-direktionaler Sync mit Konfliktauflösung
- c) Neue Beteiligte in EspoCRM → Neue in Advoware (Status-Feld)
- d) Cron-basierter Sync
Problem: EspoCRM hat Webhooks, Advoware nicht.
📊 Bestehende Architektur (aus Calendar Sync)
Das bestehende Calendar Sync System bietet ein exzellentes Template:
Aktuelle Komponenten:
- Webhook-Receiver:
beteiligte_create/update/delete_api_step.py✓ Bereits vorhanden - Event-Handler:
beteiligte_sync_event_step.py⚠️ Placeholder - Cron-Job: Analog zu
calendar_sync_cron_step.py - PostgreSQL State DB: Für Sync-State und Konfliktauflösung
- Redis: Deduplication + Rate Limiting
Bewährte Patterns aus Calendar Sync:
1. Datenbank-Schema (PostgreSQL):
CREATE TABLE beteiligte_sync (
sync_id SERIAL PRIMARY KEY,
employee_kuerzel VARCHAR(10),
-- IDs beider Systeme
espocrm_id VARCHAR(255) UNIQUE,
advoware_betnr INTEGER UNIQUE,
-- Metadaten
source_system VARCHAR(20), -- 'espocrm' oder 'advoware'
sync_strategy VARCHAR(50) DEFAULT 'source_system_wins',
sync_status VARCHAR(20) DEFAULT 'synced', -- 'synced', 'pending', 'failed', 'conflict'
-- Timestamps
espocrm_modified_at TIMESTAMP WITH TIME ZONE,
advoware_modified_at TIMESTAMP WITH TIME ZONE,
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Flags
deleted BOOLEAN DEFAULT FALSE,
advoware_write_allowed BOOLEAN DEFAULT TRUE,
-- Audit
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_espocrm_id ON beteiligte_sync(espocrm_id);
CREATE INDEX idx_advoware_betnr ON beteiligte_sync(advoware_betnr);
CREATE INDEX idx_sync_status ON beteiligte_sync(sync_status);
CREATE INDEX idx_deleted ON beteiligte_sync(deleted) WHERE NOT deleted;
2. Sync-Phasen (3-Phasen-Modell):
- Phase 1: Neue aus EspoCRM → Advoware
- Phase 2: Neue aus Advoware → EspoCRM
- Phase 3: Updates + Konfliktauflösung
- Phase 4: Deletes
3. Konfliktauflösung via Timestamps:
# Aus calendar_sync_event_step.py, Zeile 870+
if espo_ts and advo_ts:
if espo_ts > advo_ts:
# EspoCRM ist neuer → Update Advoware
await update_advoware(...)
elif advo_ts > espo_ts:
# Advoware ist neuer → Update EspoCRM
await update_espocrm(...)
else:
# Gleich alt → Skip
pass
🎯 Empfohlene Architektur für Beteiligte-Sync
Komponenten-Übersicht
┌─────────────────────────────────────────────────────────────────┐
│ EspoCRM │
│ │
│ CBeteiligte Entity │
│ - Webhooks: create/update/delete │
│ - Felder: betnr, syncStatus, advowareLastSync │
└─────────┬────────────────────────────────────────────────────────┘
│ Webhook (HTTP POST)
▼
┌─────────────────────────────────────────────────────────────────┐
│ KONG API Gateway → Motia Framework │
│ │
│ 1. beteiligte_create/update/delete_api_step.py │
│ - Empfängt Webhooks │
│ - Dedupliziert via Redis │
│ - Emittiert Events │
│ │
│ 2. beteiligte_sync_event_step.py │
│ - Subscribed zu Events │
│ - Holt vollständige Entity aus EspoCRM │
│ - Transformiert via Mapper │
│ - Schreibt nach Advoware │
│ - Updated PostgreSQL Sync-State │
│ │
│ 3. beteiligte_sync_cron_step.py (NEU) │
│ - Läuft alle 15 Minuten │
│ - Emittiert "beteiligte.sync_all" Event │
│ │
│ 4. beteiligte_sync_all_event_step.py (NEU) │
│ - Fetcht alle Beteiligte aus Advoware │
│ - Vergleicht mit PostgreSQL State │
│ - Identifiziert Neue/Geänderte/Gelöschte in Advoware │
│ - Sync nach EspoCRM │
│ - 3-Phasen-Modell wie Calendar Sync │
└─────────┬────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ PostgreSQL Sync-State DB │
│ │
│ Tabelle: beteiligte_sync │
│ - Mapping: espocrm_id ↔ advoware_betnr │
│ - Timestamps beider Systeme │
│ - Sync-Status & Konfliktflags │
└─────────┬────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Redis (Caching & Deduplication) │
│ │
│ - vmh:beteiligte:create_pending (SET) │
│ - vmh:beteiligte:update_pending (SET) │
│ - vmh:beteiligte:delete_pending (SET) │
│ - vmh:beteiligte:sync_lock:{espocrm_id} (Key mit TTL) │
└──────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Advoware API │
│ │
│ /api/v1/advonet/Beteiligte │
│ - GET: Liste + Einzelabfrage │
│ - POST: Create │
│ - PUT: Update │
│ - DELETE: Delete │
│ │
│ ⚠️ KEIN Webhook-Support │
│ → Polling via Cron erforderlich │
└──────────────────────────────────────────────────────────────────┘
🔄 Detaillierte Sync-Flows
Flow A: EspoCRM Update → Advoware (Webhook-getrieben)
Trigger: EspoCRM sendet Webhook bei Create/Update/Delete
1. EspoCRM: User ändert CBeteiligte Entity
└─> Webhook: POST /vmh/webhook/beteiligte/update
Body: [{"id": "68e4af00172be7924", ...}]
2. beteiligte_update_api_step.py:
├─> Deduplizierung via Redis SET
├─> Neue IDs → Redis: vmh:beteiligte:update_pending
└─> Emit Event: "vmh.beteiligte.update"
3. beteiligte_sync_event_step.py:
├─> Receive Event mit entity_id
├─> Fetch full entity von EspoCRM API
│ GET /api/v1/CBeteiligte/{entity_id}
│
├─> Check PostgreSQL Sync-State:
│ SELECT * FROM beteiligte_sync WHERE espocrm_id = ?
│
├─> Falls NEU (nicht in DB):
│ ├─> Check syncStatus in EspoCRM:
│ │ - "pending_sync" → Create in Advoware
│ │ - "clean" → Skip (bereits gesynct von anderem Flow)
│ │
│ ├─> Transform via Mapper:
│ │ espocrm_mapper.map_cbeteiligte_to_advoware(entity)
│ │
│ ├─> POST /api/v1/advonet/Beteiligte
│ │ → Response: {betNr: 123456, ...}
│ │
│ ├─> Insert in PostgreSQL:
│ │ INSERT INTO beteiligte_sync (
│ │ espocrm_id, advoware_betnr,
│ │ source_system = 'espocrm',
│ │ espocrm_modified_at = entity.modifiedAt
│ │ )
│ │
│ └─> Update EspoCRM:
│ PUT /api/v1/CBeteiligte/{entity_id}
│ {
│ betnr: 123456,
│ syncStatus: "clean",
│ advowareLastSync: NOW()
│ }
│
└─> Falls EXISTIERT (in DB):
├─> Get Advoware timestamp:
│ Fetch /api/v1/advonet/Beteiligte/{betnr}
│ → advoware.geaendertAm
│
├─> Konfliktauflösung:
│ IF espocrm.modifiedAt > advoware.geaendertAm:
│ → Update Advoware (EspoCRM gewinnt)
│ PUT /api/v1/advonet/Beteiligte/{betnr}
│ ELSE IF advoware.geaendertAm > espocrm.modifiedAt:
│ → Update EspoCRM (Advoware gewinnt)
│ PUT /api/v1/CBeteiligte/{entity_id}
│ ELSE:
│ → Skip (gleich alt)
│
└─> Update PostgreSQL:
UPDATE beteiligte_sync SET
espocrm_modified_at = ...,
advoware_modified_at = ...,
last_sync = NOW(),
sync_status = 'synced'
4. Cleanup:
└─> Redis: SREM vmh:beteiligte:update_pending {entity_id}
Timing: ~2-5 Sekunden nach Änderung in EspoCRM
Flow B: Advoware Update → EspoCRM (Polling via Cron)
Trigger: Cron-Job alle 15 Minuten
1. beteiligte_sync_cron_step.py (Cron: */15 * * * *):
└─> Emit Event: "beteiligte.sync_all"
2. beteiligte_sync_all_event_step.py:
├─> Fetch alle Beteiligte aus PostgreSQL Sync-DB:
│ SELECT * FROM beteiligte_sync WHERE NOT deleted
│
├─> Fetch alle Beteiligte aus Advoware:
│ GET /api/v1/advonet/Beteiligte
│ (Optional mit Filter: nur geändert seit last_sync - 1 Tag)
│
├─> Build Maps:
│ db_map = {betnr: row}
│ advo_map = {betNr: entity}
│
├─> PHASE 1: Neue in Advoware (nicht in DB):
│ FOR betnr IN advo_map:
│ IF betnr NOT IN db_map:
│ ├─> Transform: map_advoware_to_cbeteiligte(advo_entity)
│ │
│ ├─> Check if exists in EspoCRM:
│ │ Search by name/email (Fuzzy Match)
│ │
│ ├─> IF NOT EXISTS:
│ │ ├─> POST /api/v1/CBeteiligte
│ │ │ {
│ │ │ ...fields...,
│ │ │ betnr: {betnr},
│ │ │ syncStatus: "clean",
│ │ │ advowareLastSync: NOW()
│ │ │ }
│ │ │
│ │ └─> INSERT INTO beteiligte_sync (
│ │ espocrm_id = new_id,
│ │ advoware_betnr = betnr,
│ │ source_system = 'advoware'
│ │ )
│ │
│ └─> ELSE (Match gefunden):
│ └─> UPDATE beteiligte_sync SET
│ advoware_betnr = betnr,
│ source_system = 'merged'
│
├─> PHASE 2: Updates (beide vorhanden):
│ FOR row IN db_map:
│ IF row.advoware_betnr IN advo_map:
│ advo_entity = advo_map[row.advoware_betnr]
│ espo_entity = fetch_from_espocrm(row.espocrm_id)
│
│ ├─> Get Timestamps:
│ │ advo_ts = advo_entity.geaendertAm
│ │ espo_ts = espo_entity.modifiedAt
│ │ last_sync_ts = row.last_sync
│ │
│ ├─> Konfliktauflösung:
│ │ IF advo_ts > espo_ts AND advo_ts > last_sync_ts:
│ │ → Advoware ist neuer
│ │ PUT /api/v1/CBeteiligte/{espocrm_id}
│ │ (Update EspoCRM mit Advoware-Daten)
│ │
│ │ ELSE IF espo_ts > advo_ts AND espo_ts > last_sync_ts:
│ │ → EspoCRM ist neuer
│ │ (Wurde bereits in Flow A behandelt, skip)
│ │
│ │ ELSE IF advo_ts == espo_ts:
│ │ → Keine Änderung, skip
│ │
│ │ ELSE IF advo_ts > last_sync_ts AND espo_ts > last_sync_ts:
│ │ → KONFLIKT: Beide seit last_sync geändert
│ │ ├─> Strategy: "advoware_wins" (konfigurierbar)
│ │ ├─> UPDATE mit Winner-Daten
│ │ ├─> Log Conflict
│ │ └─> SET sync_status = 'conflict_resolved'
│ │
│ └─> UPDATE beteiligte_sync SET
│ espocrm_modified_at = espo_ts,
│ advoware_modified_at = advo_ts,
│ last_sync = NOW(),
│ sync_status = 'synced'
│
└─> PHASE 3: Deletes (in DB, nicht in Advoware):
FOR row IN db_map:
IF row.advoware_betnr NOT IN advo_map:
├─> Check if exists in EspoCRM:
│ GET /api/v1/CBeteiligte/{espocrm_id}
│
├─> IF EXISTS:
│ ├─> Soft-Delete in EspoCRM:
│ │ PUT /api/v1/CBeteiligte/{espocrm_id}
│ │ {deleted: true, syncStatus: "deleted"}
│ │
│ └─> UPDATE beteiligte_sync SET
│ deleted = TRUE,
│ sync_status = 'synced'
│
└─> ELSE (auch in EspoCRM nicht da):
└─> UPDATE beteiligte_sync SET
deleted = TRUE
Timing: Alle 15 Minuten (konfigurierbar)
🎛️ Status-Feld in EspoCRM
Feld: syncStatus (String, Custom Field)
Werte:
"pending_sync"→ Neu erstellt, wartet auf Sync nach Advoware"clean"→ Synchronisiert, keine Änderungen"dirty"→ Geändert seit letztem Sync, wartet auf Sync"syncing"→ Sync läuft gerade"failed"→ Sync fehlgeschlagen (+ Fehlerlog)"conflict"→ Konflikt detektiert"deleted"→ In Advoware gelöscht
Zusatzfeld: advowareLastSync (DateTime)
Alternative: PostgreSQL als Single Source of Truth
Statt syncStatus in EspoCRM:
- Alle Status in PostgreSQL
beteiligte_sync.sync_status - EspoCRM hat nur
betnrundadvowareLastSync - Vorteil: Keine Schema-Änderung in EspoCRM nötig
- Nachteil: Status nicht direkt in EspoCRM UI sichtbar
Empfehlung: Beides nutzen
- PostgreSQL: Master-Status für Sync-Logic
- EspoCRM: Read-only Display für User
⚙️ Cron-Job Konfiguration
Option 1: Separate Cron-Steps (empfohlen)
# beteiligte_sync_cron_step.py
config = {
'type': 'cron',
'name': 'Beteiligte Sync Cron',
'cron': '*/15 * * * *', # Alle 15 Minuten
'emits': ['beteiligte.sync_all'],
'flows': ['vmh']
}
Option 2: Integriert in bestehenden Cron
# Kombiniert mit anderen Sync-Tasks
config = {
'type': 'cron',
'name': 'All Syncs Cron',
'cron': '*/5 * * * *', # Alle 5 Minuten
'emits': ['calendar_sync_all', 'beteiligte.sync_all'],
'flows': ['advoware', 'vmh']
}
Timing-Überlegungen:
Frequenz:
- 15 Minuten: Guter Kompromiss (wie bestehende Calendar Sync)
- 5 Minuten: Wenn schnellere Reaktion auf Advoware-Änderungen nötig
- 1 Stunde: Wenn Last auf APIs minimiert werden soll
Offset:
- Calendar Sync: 0 Minuten
- Beteiligte Sync: +5 Minuten
- → Verhindert API-Überlast durch gleichzeitige Requests
🔐 Sicherheit & Fehlerbehandlung
1. Rate Limiting
Advoware API (aus Calendar Sync):
# Bereits implementiert in AdvowareAPI Service
# - Token-basiertes Rate Limiting via Redis
# - Backoff-Strategie bei 429 Errors
EspoCRM API:
# Zu implementieren in EspoCRMAPI Service
ESPOCRM_RATE_LIMIT_KEY = 'espocrm_api_tokens'
MAX_TOKENS = 100 # Basierend auf API-Limits
REFILL_RATE = 100 / 60 # Tokens pro Sekunde
2. Lock-Mechanismus (Verhindert Race Conditions)
# Redis Lock für Entity während Sync
lock_key = f'vmh:beteiligte:sync_lock:{entity_id}'
if redis.set(lock_key, '1', nx=True, ex=300): # 5 Min TTL
try:
await perform_sync(entity_id)
finally:
redis.delete(lock_key)
else:
logger.warning(f"Entity {entity_id} ist bereits im Sync-Prozess")
3. Retry-Logic mit Exponential Backoff
import backoff
@backoff.on_exception(
backoff.expo,
(AdvowareAPIError, EspoCRMAPIError),
max_tries=3,
max_time=60
)
async def sync_entity_with_retry(entity_id):
# ... Sync-Logic
pass
4. Fehler-Logging & Monitoring
# PostgreSQL Error-Log Tabelle
CREATE TABLE beteiligte_sync_errors (
error_id SERIAL PRIMARY KEY,
sync_id INTEGER REFERENCES beteiligte_sync(sync_id),
error_type VARCHAR(50),
error_message TEXT,
error_stack TEXT,
retry_count INTEGER DEFAULT 0,
resolved BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
5. Write-Protection Flag
Global (Config):
# config.py
ADVOWARE_WRITE_PROTECTION = os.getenv('ADVOWARE_WRITE_PROTECTION', 'false').lower() == 'true'
Per-Entity (DB):
ALTER TABLE beteiligte_sync ADD COLUMN advoware_write_allowed BOOLEAN DEFAULT TRUE;
📦 Zu implementierende Module
Priorität 1 (Core Sync):
-
services/espocrm_mapper.py ⭐⭐⭐
map_cbeteiligte_to_advoware(espo_entity) -> advo_datamap_advoware_to_cbeteiligte(advo_entity) -> espo_data- Feld-Mapping gemäß
ENTITY_MAPPING_CBeteiligte_Advoware.md
-
steps/vmh/beteiligte_sync_event_step.py ⭐⭐⭐
- Implementiert Flow A (Webhook → Advoware)
- Subscribe to create/update/delete Events
- PostgreSQL Integration
- Konfliktauflösung
-
PostgreSQL Migration ⭐⭐⭐
migrations/001_create_beteiligte_sync_table.sql- Connection in Config
-
steps/vmh/beteiligte_sync_cron_step.py ⭐⭐
- Emittiert Sync-All Event alle 15 Min
-
steps/vmh/beteiligte_sync_all_event_step.py ⭐⭐
- Implementiert Flow B (Advoware Polling)
- 3-Phasen-Sync-Modell
Priorität 2 (Optimierungen):
-
services/beteiligte_sync_utils.py ⭐
- Shared Utilities
- Lock-Management
- Timestamp-Handling
- Conflict-Resolution Logic
-
Testing ⭐
- Unit Tests für Mapper
- Integration Tests für Sync-Flows
- Konflikt-Szenarien
Priorität 3 (Monitoring):
-
steps/vmh/audit_beteiligte_sync.py
- Analog zu
audit_calendar_sync.py - CLI-Tool für Sync-Status
- Analog zu
-
Dashboard/Metrics
- Prometheus Metrics
- Grafana Dashboard
🚀 Rollout-Plan
Phase 1: Setup (Tag 1-2)
- PostgreSQL Datenbank-Schema erstellen
- Config erweitern (DB-Connection)
- Mapper-Modul erstellen
- Unit Tests für Mapper
Phase 2: Webhook-Flow (Tag 3-4)
beteiligte_sync_event_step.pyimplementieren- Integration mit bestehenden Webhook-Steps
- Testing mit EspoCRM Sandbox
Phase 3: Polling-Flow (Tag 5-6)
- Cron-Step erstellen
beteiligte_sync_all_event_step.pyimplementieren- 3-Phasen-Modell
- Integration Tests
Phase 4: Konfliktauflösung (Tag 7)
- Timestamp-Vergleich
- Konflikt-Strategies
- Error-Handling
- Retry-Logic
Phase 5: Monitoring & Docs (Tag 8)
- Audit-Tool
- Logging
- Dokumentation
- Runbook
Phase 6: Production (Tag 9-10)
- Staging-Tests
- Performance-Tests
- Production-Rollout
- Monitoring
⚠️ Wichtige Entscheidungen
1. Conflict Resolution Strategy
Option A: "Source System Wins" (empfohlen für Start):
sync_strategy = 'source_system_wins'
# EspoCRM-created Entities → EspoCRM gewinnt bei Konflikt
# Advoware-created Entities → Advoware gewinnt bei Konflikt
Option B: "Advoware Always Wins":
sync_strategy = 'advoware_master'
# Advoware ist Master, EspoCRM ist read-only View
Option C: "Last Modified Wins":
sync_strategy = 'last_modified_wins'
# Timestamp-Vergleich, neuester gewinnt
Empfehlung: Start mit "Source System Wins", später auf "Last Modified Wins" mit Manual Conflict Review.
2. Advoware Polling-Frequenz
Überlegungen:
- API-Last auf Advoware
- Aktualitäts-Anforderungen
- Anzahl Beteiligte (~1000? ~10.000?)
Optimierung:
- Incremental Fetch: Nur geändert seit
last_sync - 1 Tag - Delta-Detection via
geaendertAmTimestamp - Pagination bei großen Datenmengen
3. EspoCRM Field: syncStatus
Zu klären:
- Bestehendes Custom Field oder neu anlegen?
- Dropdown-Werte konfigurieren
- Permissions (nur Sync-System kann schreiben?)
📝 Zusammenfassung
Was funktioniert:
✅ EspoCRM Webhooks → Motia Events ✅ Webhook-Deduplication via Redis
✅ EspoCRM API Client ✅ Advoware API Client
✅ Entity-Mapping definiert
Was zu implementieren ist:
🔨 Mapper-Modul
🔨 PostgreSQL Sync-State DB
🔨 Event-Handler für Webhooks
🔨 Cron-Job für Polling
🔨 3-Phasen-Sync für Advoware → EspoCRM
🔨 Konfliktauflösung
🔨 Error-Handling & Monitoring
Geschätzter Aufwand:
- Setup & Core: 3-4 Tage
- Flows: 2-3 Tage
- Konfliktauflösung: 1-2 Tage
- Testing & Docs: 1-2 Tage
- Rollout: 1-2 Tage
- Total: ~8-13 Tage
Nächster Schritt:
- Entscheidung zu Status-Feld in EspoCRM
- PostgreSQL DB-Schema aufsetzen
- Mapper-Modul implementieren
- Webhook-Flow komplettieren
Fragen zur Klärung:
- Existiert
syncStatusField bereits in EspoCRM CBeteiligte? - Wie viele Beteiligte gibt es ca. in Advoware?
- Gibt es Performance-Anforderungen? (z.B. Sync innerhalb X Sekunden)
- Soll es manuelle Conflict-Resolution geben oder automatisch?
- PostgreSQL Server bereits vorhanden? (Wie Calendar Sync)