# Sync-Strategie: EspoCRM-basiert (ohne PostgreSQL) **Analysiert am**: 2026-02-07 **Anpassung**: EspoCRM als primäre State-Datenbank --- ## 🎯 EspoCRM Felder (CBeteiligte Entity) ```json { "betnr": 1234, // Link zu Advoware betNr (int, unique) "syncStatus": "clean", // Sync-Status (enum) "advowareLastSync": null, // Letzter Sync (datetime oder null) "advowareDeletedAt": null, // Gelöscht in Advoware am (datetime, NEU) "syncErrorMessage": null, // Fehlerdetails (text, NEU) "syncRetryCount": 0, // Anzahl Retry-Versuche (int, NEU) "modifiedAt": "2026-01-23 21:58:41" // EspoCRM Änderungszeit } ``` ### syncStatus-Werte (Enum in EspoCRM): - `"pending_sync"` - Neu erstellt, noch nicht nach Advoware gesynct - `"clean"` - Synchronisiert, keine ausstehenden Änderungen - `"dirty"` - In EspoCRM geändert, wartet auf Sync nach Advoware - `"syncing"` - Sync läuft gerade (verhindert Race Conditions) - `"failed"` - Sync fehlgeschlagen (mit syncErrorMessage + syncRetryCount) - `"conflict"` - Konflikt erkannt → **EspoCRM WINS** (mit Notification) - `"deleted_in_advoware"` - In Advoware gelöscht (Soft-Delete Flag mit Notification) --- ## 🔄 Flow A: EspoCRM Update → Advoware (Webhook) **Trigger**: EspoCRM Webhook bei Create/Update ``` 1. EspoCRM: User ändert CBeteiligte └─> Webhook: POST /vmh/webhook/beteiligte/update Body: [{"id": "68e4af00172be7924"}] 2. beteiligte_update_api_step.py: ├─> Redis Deduplication └─> Emit Event: "vmh.beteiligte.update" 3. beteiligte_sync_event_step.py: ├─> Fetch Entity von EspoCRM: │ GET /api/v1/CBeteiligte/{id} │ { │ "id": "...", │ "firstName": "Angela", │ "lastName": "Mustermann", │ "betnr": 104860, // Bereits vorhanden │ "syncStatus": "clean", │ "advowareLastSync": "2026-02-01T10:00:00", │ "modifiedAt": "2026-02-07T14:30:00" │ } │ ├─> Check syncStatus: │ ├─> IF syncStatus == "syncing": │ │ → Skip (bereits im Sync-Prozess) │ │ │ ├─> IF syncStatus == "pending_sync" AND betnr == NULL: │ │ → NEU: Create in Advoware │ │ ├─> Set syncStatus = "syncing" │ │ ├─> Transform via Mapper │ │ ├─> POST /api/v1/advonet/Beteiligte │ │ │ Response: {betNr: 123456} │ │ └─> Update EspoCRM: │ │ PUT /api/v1/CBeteiligte/{id} │ │ { │ │ betnr: 123456, │ │ syncStatus: "clean", │ │ advowareLastSync: NOW() │ │ } │ │ │ └─> IF betnr != NULL (bereits gesynct): │ → UPDATE: Vergleiche Timestamps │ ├─> Fetch von Advoware: │ │ GET /api/v1/advonet/Beteiligte/{betnr} │ │ {betNr: 104860, geaendertAm: "2026-02-07T12:00:00"} │ │ │ ├─> Vergleiche Timestamps: │ │ espocrm_ts = entity.modifiedAt │ │ advoware_ts = advo_entity.geaendertAm │ │ last_sync_ts = entity.advowareLastSync │ │ │ │ IF espocrm_ts > last_sync_ts AND espocrm_ts > advoware_ts: │ │ → EspoCRM ist neuer → Update Advoware │ │ ├─> Set syncStatus = "syncing" │ │ ├─> PUT /api/v1/advonet/Beteiligte/{betnr} │ │ └─> Update EspoCRM: │ │ syncStatus = "clean" │ │ advowareLastSync = NOW() │ │ syncErrorMessage = NULL │ │ syncRetryCount = 0 │ │ │ │ ELSE IF advoware_ts > last_sync_ts AND advoware_ts > espocrm_ts: │ │ → Advoware ist neuer → Update EspoCRM │ │ ├─> Set syncStatus = "syncing" │ │ ├─> Transform von Advoware │ │ └─> Update EspoCRM mit Advoware-Daten │ │ syncStatus = "clean" │ │ advowareLastSync = NOW() │ │ syncErrorMessage = NULL │ │ syncRetryCount = 0 │ │ │ │ ELSE IF espocrm_ts > last_sync_ts AND advoware_ts > last_sync_ts: │ │ → KONFLIKT: Beide geändert seit last_sync │ │ │ │ **REGEL: EspoCRM WINS!** │ │ │ │ ├─> Set syncStatus = "conflict" │ │ ├─> Überschreibe Advoware mit EspoCRM-Daten: │ │ │ PUT /api/v1/advonet/Beteiligte/{betnr} │ │ │ │ │ ├─> Update EspoCRM: │ │ │ syncStatus = "clean" (gelöst!) │ │ │ advowareLastSync = NOW() │ │ │ syncErrorMessage = "Konflikt am {NOW}: EspoCRM={espocrm_ts}, Advoware={advoware_ts}. EspoCRM hat gewonnen." │ │ │ │ │ └─> Send Notification: │ │ Template: "beteiligte_sync_conflict" │ │ To: Admin-User oder zugewiesener User │ │ │ │ ELSE: │ │ → Keine Änderungen seit last_sync │ │ └─> Skip │ │ │ └─> Bei Fehler: │ syncStatus = "failed" │ syncErrorMessage = Error-Details (inkl. Stack Trace) │ syncRetryCount += 1 │ Log Error │ └─> Handle 404 von Advoware (gelöscht): IF advoware.api_call returns 404: ├─> Update EspoCRM: │ syncStatus = "deleted_in_advoware" │ advowareDeletedAt = NOW() │ syncErrorMessage = "Beteiligter existiert nicht mehr in Advoware" │ └─> Send Notification: Template: "beteiligte_advoware_deleted" To: Admin-User oder zugewiesener User ``` **Timing**: ~2-5 Sekunden nach Webhook oder Cron-Event --- ## 🔄 Flow B: Advoware → EspoCRM (Cron-basiert mit Events) **Trigger**: Cron alle 15 Minuten ``` 1. beteiligte_sync_cron_step.py (*/15 * * * *): ├─> Query EspoCRM: Alle Entities die Sync benötigen │ │ SELECT * FROM CBeteiligte WHERE: │ - syncStatus IN ('pending_sync', 'dirty', 'failed') │ - OR (syncStatus = 'clean' AND betnr IS NOT NULL │ AND advowareLastSync < NOW() - 24 HOURS) │ ├─> Für JEDEN Beteiligten einzeln: │ └─> Emit Event: "vmh.beteiligte.sync_check" │ payload: { │ entity_id: "68e4af00172be7924", │ source: "cron", │ timestamp: "2026-02-07T14:30:00Z" │ } │ └─> Log: "Emitted {count} sync_check events" 2. beteiligte_sync_event_step.py (GLEICHER Handler wie Webhook!): └─> Subscribe zu: "vmh.beteiligte.sync_check" (Dieser Event kommt von Cron oder manuellen Triggers) ├─> Fetch entity_id aus Event-Payload │ └─> Führe GLEICHE Logik aus wie bei Webhook (siehe Flow A oben!) - Lock via syncStatus - Timestamp-Vergleich - Create/Update - Konfliktauflösung (EspoCRM wins) - 404 Handling (deleted_in_advoware) - Update syncStatus + Felder **WICHTIG**: Flow B nutzt Events statt Batch-Processing! - Cron emittiert nur Events für zu syncende Entities - Der normale Sync-Handler (Flow A) verarbeitet beide Quellen gleich - Code-Wiederverwendung: KEIN separater Batch-Handler nötig! ``` **Timing**: - Cron läuft alle 15 Minuten - Events werden sofort verarbeitet (wie Webhooks) --- ## 📊 Optimierung: Nur veraltete checken ### Cron-Query für zu prüfende Entities: ```javascript // In beteiligte_sync_all_event_step.py // 1. Holen von Entities die Sync benötigen const needsSyncFilter = { where: [ { type: 'or', value: [ // Neu und noch nicht gesynct { type: 'and', value: [ {type: 'equals', attribute: 'syncStatus', value: 'pending_sync'}, {type: 'isNull', attribute: 'betnr'} ] }, // Dirty (geändert in EspoCRM) {type: 'equals', attribute: 'syncStatus', value: 'dirty'}, // Failed (Retry) {type: 'equals', attribute: 'syncStatus', value: 'failed'}, // Clean aber lange nicht gesynct (> 24h) { type: 'and', value: [ {type: 'equals', attribute: 'syncStatus', value: 'clean'}, {type: 'isNotNull', attribute: 'betnr'}, { type: 'or', value: [ {type: 'isNull', attribute: 'advowareLastSync'}, {type: 'before', attribute: 'advowareLastSync', value: 'NOW() - 24 HOURS'} ] } ] } ] } ] }; ``` ### Advoware Query-Optimierung: ```python # Nur kürzlich geänderte aus Advoware holen last_full_sync = get_last_full_sync_timestamp() # z.B. vor 7 Tagen if last_full_sync: # Incremental Fetch params = { 'filter': f'geaendertAm gt {last_full_sync.isoformat()}', 'orderBy': 'geaendertAm desc' } else: # Full Fetch (beim ersten Mal oder nach langer Zeit) params = {} result = await advoware.api_call( 'api/v1/advonet/Beteiligte', method='GET', params=params ) ``` --- ## 🔐 Locking via syncStatus **Verhindert Race Conditions ohne Redis Lock**: ```python # Vor Sync-Operation: async def acquire_sync_lock(espocrm_api, entity_id): """ Setzt syncStatus auf "syncing" wenn möglich. Returns: True wenn Lock erhalten, False sonst """ try: # Fetch current entity = await espocrm_api.get_entity('CBeteiligte', entity_id) if entity.get('syncStatus') == 'syncing': # Bereits im Sync-Prozess return False # Atomic Update (EspoCRM sollte Optimistic Locking unterstützen) await espocrm_api.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'syncing' }) return True except Exception as e: logger.error(f"Failed to acquire sync lock: {e}") return False # Nach Sync-Operation (im finally-Block): async def release_sync_lock(espocrm_api, entity_id, new_status='clean'): """Setzt syncStatus zurück""" try: await espocrm_api.update_entity('CBeteiligte', entity_id, { 'syncStatus': new_status, 'advowareLastSync': datetime.now(pytz.UTC).isoformat() }) except Exception as e: logger.error(f"Failed to release sync lock: {e}") ``` --- ## 📋 Status-Übergänge ``` pending_sync → syncing → clean (erfolgreicher Create) pending_sync → syncing → failed (fehlgeschlagener Create) clean → dirty → syncing → clean (Update nach Änderung) clean → dirty → syncing → conflict (Konflikt detektiert) clean → dirty → syncing → failed (Update fehlgeschlagen) failed → syncing → clean (erfolgreicher Retry) failed → syncing → failed (erneuter Fehler) conflict → syncing → clean (manuell aufgelöst) clean → deleted_in_advoware (in Advoware gelöscht) ``` --- ## 🎯 Implementierungs-Checkliste ### Phase 1: Core Sync (Flow A - Webhook + Cron Events) - [ ] **services/espocrm_mapper.py** - [ ] `map_cbeteiligte_to_advoware(espo_entity)` - [ ] `map_advoware_to_cbeteiligte(advo_entity)` - [ ] **steps/vmh/beteiligte_sync_event_step.py** (ZENTRALER Handler!) - [ ] Subscribe zu: `vmh.beteiligte.create`, `vmh.beteiligte.update`, `vmh.beteiligte.delete`, `vmh.beteiligte.sync_check` - [ ] Fetch Entity von EspoCRM - [ ] Lock via syncStatus="syncing" - [ ] Timestamp-Vergleich - [ ] Create/Update in Advoware - [ ] **Konfliktauflösung: EspoCRM wins!** - [ ] **404 Handling: Soft-Delete (deleted_in_advoware)** - [ ] **Notifications: Bei Konflikt + Soft-Delete** - [ ] Update syncStatus + advowareLastSync + syncErrorMessage + syncRetryCount - [ ] Error Handling (→ syncStatus="failed" mit Retry-Counter) - [ ] Redis Cleanup (SREM pending sets) ### Phase 2: Cron Event Emitter (Flow B) - [ ] **steps/vmh/beteiligte_sync_cron_step.py** - [ ] Cron: `*/15 * * * *` - [ ] Query EspoCRM: Entities mit Status `IN (pending_sync, dirty, failed)` ODER `clean + advowareLastSync < NOW() - 24h` - [ ] Für JEDEN Beteiligten: Emit `vmh.beteiligte.sync_check` Event - [ ] Log: Anzahl emittierter Events - [ ] **KEIN** Batch-Processing - Events werden einzeln vom Handler verarbeitet! ### Phase 3: Utilities - [ ] **services/betei & Notifications - [ ] **services/beteiligte_sync_utils.py** - [ ] `acquire_sync_lock(entity_id)` → Setzt syncStatus="syncing" - [ ] `release_sync_lock(entity_id, new_status)` → Setzt syncStatus + Updates - [ ] `compare_timestamps(espo_ts, advo_ts, last_sync)` → Returns: "espocrm_newer", "advoware_newer", "conflict", "no_change" - [ ] `resolve_conflict_espocrm_wins(espo_entity, advo_entity)` → Überschreibt Advoware - [ ] `send_notification(entity_id, template_name, extra_data=None)` → EspoCRM Notification - [ ] `handle_advoware_deleted(entity_id, error_msg)` → Soft-Delete + Notification - [ ] Unit Tests für Mapper - [ ] Integration Tests für beide Flows - [ ] Konflikt-Szenarien testen - [ ] Load-Tests (Performance mit 1000+ Entities) - [ ] CLI Audit-Tool (analog zu calendar_sync audit) → clean (Konflikt → EspoCRM wins → gelöst!) clean → dirty → syncing → failed (Update fehlgeschlagen) dirty → syncing → deleted_in_advoware (404 von Advoware → Soft-Delete) failed → syncing → clean (erfolgreicher Retry) failed → syncing → failed (erneuter Fehler, syncRetryCount++) conflict → clean (automatisch via EspoCRM wins) clean → deleted_in_advoware (Advoware hat gelöscht) deleted_in_advoware → clean (Re-create in Advoware via Manual-Trigger GET /api/v1/CBeteiligte?select=syncStatus&maxSize=1000 → Gruppiere und zähle // Entities die Sync benötigen GET /api/v1/CBeteiligte?where=[ {type: 'in', attribute: 'syncStatus', value: ['pending_sync', 'dirty', 'failed']} ] // Lange nicht gesynct (> 7 Tage) GET /api/v1/CBeteiligte?where=[ {type: 'before', attribute: 'advowareLastSync', value: 'NOW() - 7 DAYS'} ] // Konflikte GET /api/v1/CBeteiligte?where=[ {type: 'equals', attribute: 'syncStatus', value: 'conflict'} ] ``` --- ## 📈 Performance-Überlegungen ### Batch-Größen: ```python # Cron-Job Configuration CRON_BATCH_SIZE = 50 # Max 50 Entities pro Cron-Run CRON_TIMEOUT = 300 # 5 Minuten Timeout # Advoware Fetch ADVOWARE_PAGE_SIZE = 100 # Entities pro API-Request ``` ### Timing: - **Webhook (Flow A)**: 2-5 Sekunden (near real-time) - **Cron (Flow B)**: 15 Minuten Intervall - **Veraltete Check**: 24 Stunden (täglich syncen) - **Full Sync**: 7 Tage (wöchentlich alle prüfen) ### Rate Limiting: ```python # Aus bestehender AdvowareAPI # - Bereits implementiert # - Token-based Rate Limiting via Redis # Für EspoCRM hinzufügen: ESPOCRM_MAX_REQUESTS_PER_MINUTE = 100 ``` --- ## 🎯 Vorteile dieser Architektur ✅ **Kein PostgreSQL nötig** - EspoCRM ist State-Datenbank ✅ **Alle Daten in EspoCRM** - Single Source of Truth ✅ **Status sichtbar** - User können syncStatus in UI sehen ✅ **Optimiert** - Nur veraltete werden geprüft ✅ **Robust** - Locking via syncStatus verhindert Race Conditions ✅ **Konflikt-Tracking** - Konflikte werden explizit markiert ✅ **Wiederverwendbar** - Lock-Pattern nutzbar für andere Syncs --- ## 🔧 Nächste Schritte 1. **Mapper implementieren** (services/espocrm_mapper.py) 2. **Webhook-Handler komplettieren** (Flow A) 3. **Cron + Polling implementieren** (Flow B) 4. **Testing mit echten Daten** 5. **Monitoring & Dashboard** **Geschätzte Zeit**: 5-7 Tage --- Entscheidungen (vom User bestätigt)**: 1. ✅ syncStatus als Enum in EspoCRM mit definierten Werten 2. ✅ Soft-Delete: Nur Flag (deleted_in_advoware + advowareDeletedAt) 3. ✅ Automatisch: **EspoCRM WINS** bei Konflikten 4. ✅ Notifications: Ja, bei Konflikten + Soft-Deletes (EspoCRM Notifications) **Architektur-Entscheidung**: - ✅ Cron emittiert Events (`vmh.beteiligte.sync_check`), statt Batch-Processing - ✅ Ein zentraler Sync-Handler für Webhooks UND Cron-Events - ✅ Code-Wiederverwendung maximiertdvoware wins"? 4. Benachrichtigung bei Konflikten? (Email, Webhook, ...)