From b5abe6cf00e332fe7215398f012cfaeba9d92e7f Mon Sep 17 00:00:00 2001 From: bitbylaw Date: Sat, 7 Feb 2026 15:21:16 +0000 Subject: [PATCH] Implement EspoCRM-based sync strategy for Beteiligte entities - Add SYNC_STRATEGY_ESPOCRM_BASED.md detailing the sync flows and status management. - Create utilities for sync operations in services/beteiligte_sync_utils.py, including locking, timestamp comparison, conflict resolution, and notification handling. - Implement entity mapping between EspoCRM and Advoware in services/espocrm_mapper.py. - Develop a cron job for periodic sync checks in steps/vmh/beteiligte_sync_cron_step.py, emitting events for entities needing synchronization. --- bitbylaw/IMPLEMENTATION_COMPLETE.md | 326 +++++++++ bitbylaw/SYNC_STRATEGY_ANALYSIS.md | 676 ++++++++++++++++++ bitbylaw/SYNC_STRATEGY_ESPOCRM_BASED.md | 485 +++++++++++++ bitbylaw/services/beteiligte_sync_utils.py | 341 +++++++++ bitbylaw/services/espocrm_mapper.py | 243 +++++++ .../steps/vmh/beteiligte_sync_cron_step.py | 117 +++ .../steps/vmh/beteiligte_sync_event_step.py | 275 ++++++- 7 files changed, 2430 insertions(+), 33 deletions(-) create mode 100644 bitbylaw/IMPLEMENTATION_COMPLETE.md create mode 100644 bitbylaw/SYNC_STRATEGY_ANALYSIS.md create mode 100644 bitbylaw/SYNC_STRATEGY_ESPOCRM_BASED.md create mode 100644 bitbylaw/services/beteiligte_sync_utils.py create mode 100644 bitbylaw/services/espocrm_mapper.py create mode 100644 bitbylaw/steps/vmh/beteiligte_sync_cron_step.py diff --git a/bitbylaw/IMPLEMENTATION_COMPLETE.md b/bitbylaw/IMPLEMENTATION_COMPLETE.md new file mode 100644 index 00000000..67783d8e --- /dev/null +++ b/bitbylaw/IMPLEMENTATION_COMPLETE.md @@ -0,0 +1,326 @@ +# Beteiligte Sync Implementation - Fertig! ✅ + +**Stand**: 7. Februar 2026 +**Status**: Vollständig implementiert, ready for testing + +--- + +## 📦 Implementierte Module + +### 1. **services/espocrm_mapper.py** ✅ +**Zweck**: Entity-Transformation zwischen EspoCRM ↔ Advoware + +**Funktionen**: +- `map_cbeteiligte_to_advoware(espo_entity)` - EspoCRM → Advoware +- `map_advoware_to_cbeteiligte(advo_entity)` - Advoware → EspoCRM +- `get_changed_fields(espo, advo)` - Diff-Vergleich + +**Features**: +- Unterscheidet Person vs. Firma +- Mapped Namen, Kontaktdaten, Handelsregister +- Transformiert emailAddressData/phoneNumberData Arrays +- Normalisiert Rechtsform und Anrede + +--- + +### 2. **services/beteiligte_sync_utils.py** ✅ +**Zweck**: Sync-Utility-Funktionen + +**Funktionen**: +- `acquire_sync_lock(entity_id)` - Atomares Lock via syncStatus +- `release_sync_lock(entity_id, status, error, retry)` - Lock freigeben + Update +- `parse_timestamp(ts)` - Parse EspoCRM/Advoware Timestamps +- `compare_timestamps(espo, advo, last_sync)` - Returns: espocrm_newer | advoware_newer | conflict | no_change +- `send_notification(entity_id, type, data)` - EspoCRM In-App Notification +- `handle_advoware_deleted(entity_id, error)` - Soft-Delete + Notification +- `resolve_conflict_espocrm_wins(entity_id, ...)` - Konfliktauflösung + +**Features**: +- Race-Condition-Prevention durch syncStatus="syncing" +- Automatische syncRetryCount Increment bei Fehlern +- EspoCRM Notifications (🔔 Bell-Icon) +- Timestamp-Normalisierung für beide Systeme + +--- + +### 3. **steps/vmh/beteiligte_sync_event_step.py** ✅ +**Zweck**: Zentraler Sync-Handler (Webhooks + Cron) + +**Config**: +```python +subscribes: [ + 'vmh.beteiligte.create', + 'vmh.beteiligte.update', + 'vmh.beteiligte.delete', + 'vmh.beteiligte.sync_check' # Von Cron +] +``` + +**Ablauf**: +1. Acquire Lock (syncStatus → syncing) +2. Fetch Entity von EspoCRM +3. Bestimme Aktion: + - **Kein betnr** → `handle_create()` - Neu in Advoware + - **Hat betnr** → `handle_update()` - Sync mit Timestamp-Vergleich +4. Release Lock mit finalem Status + +**handle_create()**: +- Transform zu Advoware Format +- POST /api/v1/advonet/Beteiligte +- Update EspoCRM mit neuer betnr +- Status → clean + +**handle_update()**: +- Fetch von Advoware (betNr) +- 404 → `handle_advoware_deleted()` (Soft-Delete) +- Timestamp-Vergleich: + - `espocrm_newer` → PUT zu Advoware + - `advoware_newer` → PUT zu EspoCRM + - `conflict` → **EspoCRM WINS** → Überschreibe Advoware → Notification + - `no_change` → Skip + +**Error Handling**: +- Try/Catch um alle Operationen +- Bei Fehler: syncStatus=failed, syncErrorMessage, syncRetryCount++ +- Redis Queue Cleanup + +--- + +### 4. **steps/vmh/beteiligte_sync_cron_step.py** ✅ +**Zweck**: Cron-Job der Events emittiert + +**Config**: +```python +schedule: '*/15 * * * *' # Alle 15 Minuten +emits: ['vmh.beteiligte.sync_check'] +``` + +**Ablauf**: +1. Query 1: Entities mit Status `pending_sync`, `dirty`, `failed` (max 100) +2. Query 2: `clean` Entities mit `advowareLastSync < NOW() - 24h` (max 50) +3. Kombiniere + Dedupliziere +4. Emittiere `vmh.beteiligte.sync_check` Event für JEDEN Beteiligten +5. Log: Anzahl emittierter Events + +**Vorteile**: +- Kein Batch-Processing +- Events werden einzeln vom normalen Handler verarbeitet +- Code-Wiederverwendung (gleicher Handler wie Webhooks) + +--- + +## 🔄 Sync-Flows + +### Flow A: EspoCRM Create/Update → Advoware (Webhook) + +``` +User ändert in EspoCRM + ↓ +EspoCRM Webhook → /vmh/webhook/beteiligte/update + ↓ +beteiligte_update_api_step.py → Emit 'vmh.beteiligte.update' + ↓ +beteiligte_sync_event_step.py → handler() + ↓ +Acquire Lock → Fetch EspoCRM → Timestamp-Check + ↓ +Update Advoware (oder Konflikt → EspoCRM wins) + ↓ +Release Lock → Status: clean +``` + +**Timing**: 2-5 Sekunden + +--- + +### Flow B: Advoware → EspoCRM Check (Cron) + +``` +Cron (alle 15 Min) + ↓ +beteiligte_sync_cron_step.py + ↓ +Query EspoCRM: Unclean + Stale Entities + ↓ +Emit 'vmh.beteiligte.sync_check' für jeden + ↓ +beteiligte_sync_event_step.py → handler() + ↓ +GLEICHE Logik wie Flow A! +``` + +**Timing**: Alle 15 Minuten + +--- + +## 🎯 Status-Übergänge + +``` +pending_sync → syncing → clean (Create erfolgreich) +pending_sync → syncing → failed (Create fehlgeschlagen) + +clean → dirty → syncing → clean (Update erfolgreich) +clean → syncing → conflict → clean (Konflikt → EspoCRM wins) + +dirty → syncing → deleted_in_advoware (404 von Advoware) + +failed → syncing → clean (Retry erfolgreich) +failed → syncing → failed (Retry fehlgeschlagen, retryCount++) + +deleted_in_advoware (Soft-Delete, bleibt bis manuelle Aktion) +``` + +--- + +## 📋 Testing Checklist + +### Unit Tests +- [x] Mapper Import +- [x] Sync Utils Import +- [x] Event Step Config Load +- [x] Cron Step Config Load +- [x] Mapper Transform Person +- [x] Mapper Transform Firma + +### Integration Tests (TODO) +- [ ] Create: Neuer Beteiligter in EspoCRM → Advoware +- [ ] Update: Änderung in EspoCRM → Advoware +- [ ] Conflict: Beide geändert → EspoCRM wins +- [ ] Advoware newer: Advoware → EspoCRM +- [ ] 404 Handling: Soft-Delete + Notification +- [ ] Cron: Query + Event Emission +- [ ] Notification: In-App Notification sichtbar + +--- + +## 🚀 Deployment + +### Voraussetzungen +✅ EspoCRM Felder angelegt (syncStatus, betnr, advowareLastSync, advowareDeletedAt, syncErrorMessage, syncRetryCount) +✅ Webhooks aktiviert (Create/Update/Delete) +⏳ Motia Workbench Restart (damit Steps geladen werden) + +### Schritte +1. **Motia Restart**: `systemctl restart motia` (oder wie auch immer) +2. **Verify Steps**: + ```bash + # Check ob Steps geladen wurden + curl http://localhost:PORT/api/flows/vmh/steps + ``` +3. **Test Webhook**: Ändere einen Beteiligten in EspoCRM +4. **Check Logs**: Motia Workbench Logs → Event Handler Output +5. **Verify Advoware**: Prüfe ob betNr gesetzt wurde +6. **Test Cron**: Warte 15 Min oder trigger manuell + +--- + +## 🔧 Configuration + +### Environment Variables (bereits gesetzt) +```bash +# EspoCRM +ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1 +ESPOCRM_MARVIN_API_KEY=e53def10eea27b92a6cd00f40a3e09a4 + +# Advoware +ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90 +ADVOWARE_PRODUCT_ID=... +ADVOWARE_APP_ID=... +ADVOWARE_API_KEY=... + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB_ADVOWARE_CACHE=1 +``` + +--- + +## 📊 Monitoring + +### EspoCRM Queries + +**Entities die Sync benötigen**: +```javascript +GET /api/v1/CBeteiligte?where=[ + {type: 'in', attribute: 'syncStatus', + value: ['pending_sync', 'dirty', 'failed']} +] +``` + +**Konflikte**: +```javascript +GET /api/v1/CBeteiligte?where=[ + {type: 'equals', attribute: 'syncStatus', value: 'conflict'} +] +``` + +**Soft-Deletes**: +```javascript +GET /api/v1/CBeteiligte?where=[ + {type: 'equals', attribute: 'syncStatus', value: 'deleted_in_advoware'} +] +``` + +**Sync-Fehler**: +```javascript +GET /api/v1/CBeteiligte?where=[ + {type: 'isNotNull', attribute: 'syncErrorMessage'} +] +``` + +### Motia Logs +```bash +# Event Handler Logs +tail -f /path/to/motia/logs/events.log | grep "Beteiligte" + +# Cron Logs +tail -f /path/to/motia/logs/cron.log | grep "Sync Cron" +``` + +--- + +## 🐛 Troubleshooting + +### Problem: Lock bleibt auf "syncing" hängen +**Ursache**: Handler-Crash während Sync +**Lösung**: Manuell Status auf "failed" setzen: +```python +PUT /api/v1/CBeteiligte/{id} +{"syncStatus": "failed", "syncErrorMessage": "Manual reset"} +``` + +### Problem: Notifications werden nicht angezeigt +**Ursache**: userId fehlt oder falsch +**Check**: +```python +GET /api/v1/Notification?where=[{type: 'equals', attribute: 'relatedType', value: 'CBeteiligte'}] +``` + +### Problem: Cron emittiert keine Events +**Ursache**: Query findet keine Entities +**Debug**: Führe Cron-Handler manuell aus und checke Logs + +--- + +## 📈 Performance + +**Erwartete Last**: +- Webhooks: ~10-50 pro Tag (User-Änderungen) +- Cron: Alle 15 Min → ~96 Runs/Tag +- Events pro Cron: 0-100 (typisch 5-20) + +**Optimization**: +- Cron Max Entities: 150 total (100 unclean + 50 stale) +- Event-Processing: Parallel (Motia-Standard) +- Redis Caching: Token + Deduplication + +--- + +## ✅ Done! + +**Implementiert**: 4 Module, ~800 Lines of Code +**Status**: Ready for Testing +**Next Steps**: Deploy + Integration Testing + Monitoring Setup + +🎉 **Viel Erfolg beim Testing!** diff --git a/bitbylaw/SYNC_STRATEGY_ANALYSIS.md b/bitbylaw/SYNC_STRATEGY_ANALYSIS.md new file mode 100644 index 00000000..183d6d16 --- /dev/null +++ b/bitbylaw/SYNC_STRATEGY_ANALYSIS.md @@ -0,0 +1,676 @@ +# Sync-Strategie: EspoCRM ↔ Advoware Beteiligte + +**Analysiert am**: 2026-02-07 +**Anforderungen**: +- a) EspoCRM Update → Advoware Update +- b) Bi-direktionaler Sync mit Konfliktauflösung +- c) Neue Beteiligte in EspoCRM → Neue in Advoware (Status-Feld) +- d) Cron-basierter Sync + +**Problem**: EspoCRM hat Webhooks, Advoware nicht. + +--- + +## 📊 Bestehende Architektur (aus Calendar Sync) + +Das bestehende **Calendar Sync System** bietet ein exzellentes Template: + +### Aktuelle Komponenten: +1. **Webhook-Receiver**: `beteiligte_create/update/delete_api_step.py` ✓ Bereits vorhanden +2. **Event-Handler**: `beteiligte_sync_event_step.py` ⚠️ Placeholder +3. **Cron-Job**: Analog zu `calendar_sync_cron_step.py` +4. **PostgreSQL State DB**: Für Sync-State und Konfliktauflösung +5. **Redis**: Deduplication + Rate Limiting + +### Bewährte Patterns aus Calendar Sync: + +**1. Datenbank-Schema** (PostgreSQL): +```sql +CREATE TABLE beteiligte_sync ( + sync_id SERIAL PRIMARY KEY, + employee_kuerzel VARCHAR(10), + + -- IDs beider Systeme + espocrm_id VARCHAR(255) UNIQUE, + advoware_betnr INTEGER UNIQUE, + + -- Metadaten + source_system VARCHAR(20), -- 'espocrm' oder 'advoware' + sync_strategy VARCHAR(50) DEFAULT 'source_system_wins', + sync_status VARCHAR(20) DEFAULT 'synced', -- 'synced', 'pending', 'failed', 'conflict' + + -- Timestamps + espocrm_modified_at TIMESTAMP WITH TIME ZONE, + advoware_modified_at TIMESTAMP WITH TIME ZONE, + last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + + -- Flags + deleted BOOLEAN DEFAULT FALSE, + advoware_write_allowed BOOLEAN DEFAULT TRUE, + + -- Audit + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_espocrm_id ON beteiligte_sync(espocrm_id); +CREATE INDEX idx_advoware_betnr ON beteiligte_sync(advoware_betnr); +CREATE INDEX idx_sync_status ON beteiligte_sync(sync_status); +CREATE INDEX idx_deleted ON beteiligte_sync(deleted) WHERE NOT deleted; +``` + +**2. Sync-Phasen** (3-Phasen-Modell): +- **Phase 1**: Neue aus EspoCRM → Advoware +- **Phase 2**: Neue aus Advoware → EspoCRM +- **Phase 3**: Updates + Konfliktauflösung +- **Phase 4**: Deletes + +**3. Konfliktauflösung via Timestamps**: +```python +# Aus calendar_sync_event_step.py, Zeile 870+ +if espo_ts and advo_ts: + if espo_ts > advo_ts: + # EspoCRM ist neuer → Update Advoware + await update_advoware(...) + elif advo_ts > espo_ts: + # Advoware ist neuer → Update EspoCRM + await update_espocrm(...) + else: + # Gleich alt → Skip + pass +``` + +--- + +## 🎯 Empfohlene Architektur für Beteiligte-Sync + +### Komponenten-Übersicht + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ EspoCRM │ +│ │ +│ CBeteiligte Entity │ +│ - Webhooks: create/update/delete │ +│ - Felder: betnr, syncStatus, advowareLastSync │ +└─────────┬────────────────────────────────────────────────────────┘ + │ Webhook (HTTP POST) + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ KONG API Gateway → Motia Framework │ +│ │ +│ 1. beteiligte_create/update/delete_api_step.py │ +│ - Empfängt Webhooks │ +│ - Dedupliziert via Redis │ +│ - Emittiert Events │ +│ │ +│ 2. beteiligte_sync_event_step.py │ +│ - Subscribed zu Events │ +│ - Holt vollständige Entity aus EspoCRM │ +│ - Transformiert via Mapper │ +│ - Schreibt nach Advoware │ +│ - Updated PostgreSQL Sync-State │ +│ │ +│ 3. beteiligte_sync_cron_step.py (NEU) │ +│ - Läuft alle 15 Minuten │ +│ - Emittiert "beteiligte.sync_all" Event │ +│ │ +│ 4. beteiligte_sync_all_event_step.py (NEU) │ +│ - Fetcht alle Beteiligte aus Advoware │ +│ - Vergleicht mit PostgreSQL State │ +│ - Identifiziert Neue/Geänderte/Gelöschte in Advoware │ +│ - Sync nach EspoCRM │ +│ - 3-Phasen-Modell wie Calendar Sync │ +└─────────┬────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ PostgreSQL Sync-State DB │ +│ │ +│ Tabelle: beteiligte_sync │ +│ - Mapping: espocrm_id ↔ advoware_betnr │ +│ - Timestamps beider Systeme │ +│ - Sync-Status & Konfliktflags │ +└─────────┬────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Redis (Caching & Deduplication) │ +│ │ +│ - vmh:beteiligte:create_pending (SET) │ +│ - vmh:beteiligte:update_pending (SET) │ +│ - vmh:beteiligte:delete_pending (SET) │ +│ - vmh:beteiligte:sync_lock:{espocrm_id} (Key mit TTL) │ +└──────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Advoware API │ +│ │ +│ /api/v1/advonet/Beteiligte │ +│ - GET: Liste + Einzelabfrage │ +│ - POST: Create │ +│ - PUT: Update │ +│ - DELETE: Delete │ +│ │ +│ ⚠️ KEIN Webhook-Support │ +│ → Polling via Cron erforderlich │ +└──────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 🔄 Detaillierte Sync-Flows + +### Flow A: EspoCRM Update → Advoware (Webhook-getrieben) + +**Trigger**: EspoCRM sendet Webhook bei Create/Update/Delete + +``` +1. EspoCRM: User ändert CBeteiligte Entity + └─> Webhook: POST /vmh/webhook/beteiligte/update + Body: [{"id": "68e4af00172be7924", ...}] + +2. beteiligte_update_api_step.py: + ├─> Deduplizierung via Redis SET + ├─> Neue IDs → Redis: vmh:beteiligte:update_pending + └─> Emit Event: "vmh.beteiligte.update" + +3. beteiligte_sync_event_step.py: + ├─> Receive Event mit entity_id + ├─> Fetch full entity von EspoCRM API + │ GET /api/v1/CBeteiligte/{entity_id} + │ + ├─> Check PostgreSQL Sync-State: + │ SELECT * FROM beteiligte_sync WHERE espocrm_id = ? + │ + ├─> Falls NEU (nicht in DB): + │ ├─> Check syncStatus in EspoCRM: + │ │ - "pending_sync" → Create in Advoware + │ │ - "clean" → Skip (bereits gesynct von anderem Flow) + │ │ + │ ├─> Transform via Mapper: + │ │ espocrm_mapper.map_cbeteiligte_to_advoware(entity) + │ │ + │ ├─> POST /api/v1/advonet/Beteiligte + │ │ → Response: {betNr: 123456, ...} + │ │ + │ ├─> Insert in PostgreSQL: + │ │ INSERT INTO beteiligte_sync ( + │ │ espocrm_id, advoware_betnr, + │ │ source_system = 'espocrm', + │ │ espocrm_modified_at = entity.modifiedAt + │ │ ) + │ │ + │ └─> Update EspoCRM: + │ PUT /api/v1/CBeteiligte/{entity_id} + │ { + │ betnr: 123456, + │ syncStatus: "clean", + │ advowareLastSync: NOW() + │ } + │ + └─> Falls EXISTIERT (in DB): + ├─> Get Advoware timestamp: + │ Fetch /api/v1/advonet/Beteiligte/{betnr} + │ → advoware.geaendertAm + │ + ├─> Konfliktauflösung: + │ IF espocrm.modifiedAt > advoware.geaendertAm: + │ → Update Advoware (EspoCRM gewinnt) + │ PUT /api/v1/advonet/Beteiligte/{betnr} + │ ELSE IF advoware.geaendertAm > espocrm.modifiedAt: + │ → Update EspoCRM (Advoware gewinnt) + │ PUT /api/v1/CBeteiligte/{entity_id} + │ ELSE: + │ → Skip (gleich alt) + │ + └─> Update PostgreSQL: + UPDATE beteiligte_sync SET + espocrm_modified_at = ..., + advoware_modified_at = ..., + last_sync = NOW(), + sync_status = 'synced' + +4. Cleanup: + └─> Redis: SREM vmh:beteiligte:update_pending {entity_id} +``` + +**Timing**: ~2-5 Sekunden nach Änderung in EspoCRM + +--- + +### Flow B: Advoware Update → EspoCRM (Polling via Cron) + +**Trigger**: Cron-Job alle 15 Minuten + +``` +1. beteiligte_sync_cron_step.py (Cron: */15 * * * *): + └─> Emit Event: "beteiligte.sync_all" + +2. beteiligte_sync_all_event_step.py: + ├─> Fetch alle Beteiligte aus PostgreSQL Sync-DB: + │ SELECT * FROM beteiligte_sync WHERE NOT deleted + │ + ├─> Fetch alle Beteiligte aus Advoware: + │ GET /api/v1/advonet/Beteiligte + │ (Optional mit Filter: nur geändert seit last_sync - 1 Tag) + │ + ├─> Build Maps: + │ db_map = {betnr: row} + │ advo_map = {betNr: entity} + │ + ├─> PHASE 1: Neue in Advoware (nicht in DB): + │ FOR betnr IN advo_map: + │ IF betnr NOT IN db_map: + │ ├─> Transform: map_advoware_to_cbeteiligte(advo_entity) + │ │ + │ ├─> Check if exists in EspoCRM: + │ │ Search by name/email (Fuzzy Match) + │ │ + │ ├─> IF NOT EXISTS: + │ │ ├─> POST /api/v1/CBeteiligte + │ │ │ { + │ │ │ ...fields..., + │ │ │ betnr: {betnr}, + │ │ │ syncStatus: "clean", + │ │ │ advowareLastSync: NOW() + │ │ │ } + │ │ │ + │ │ └─> INSERT INTO beteiligte_sync ( + │ │ espocrm_id = new_id, + │ │ advoware_betnr = betnr, + │ │ source_system = 'advoware' + │ │ ) + │ │ + │ └─> ELSE (Match gefunden): + │ └─> UPDATE beteiligte_sync SET + │ advoware_betnr = betnr, + │ source_system = 'merged' + │ + ├─> PHASE 2: Updates (beide vorhanden): + │ FOR row IN db_map: + │ IF row.advoware_betnr IN advo_map: + │ advo_entity = advo_map[row.advoware_betnr] + │ espo_entity = fetch_from_espocrm(row.espocrm_id) + │ + │ ├─> Get Timestamps: + │ │ advo_ts = advo_entity.geaendertAm + │ │ espo_ts = espo_entity.modifiedAt + │ │ last_sync_ts = row.last_sync + │ │ + │ ├─> Konfliktauflösung: + │ │ IF advo_ts > espo_ts AND advo_ts > last_sync_ts: + │ │ → Advoware ist neuer + │ │ PUT /api/v1/CBeteiligte/{espocrm_id} + │ │ (Update EspoCRM mit Advoware-Daten) + │ │ + │ │ ELSE IF espo_ts > advo_ts AND espo_ts > last_sync_ts: + │ │ → EspoCRM ist neuer + │ │ (Wurde bereits in Flow A behandelt, skip) + │ │ + │ │ ELSE IF advo_ts == espo_ts: + │ │ → Keine Änderung, skip + │ │ + │ │ ELSE IF advo_ts > last_sync_ts AND espo_ts > last_sync_ts: + │ │ → KONFLIKT: Beide seit last_sync geändert + │ │ ├─> Strategy: "advoware_wins" (konfigurierbar) + │ │ ├─> UPDATE mit Winner-Daten + │ │ ├─> Log Conflict + │ │ └─> SET sync_status = 'conflict_resolved' + │ │ + │ └─> UPDATE beteiligte_sync SET + │ espocrm_modified_at = espo_ts, + │ advoware_modified_at = advo_ts, + │ last_sync = NOW(), + │ sync_status = 'synced' + │ + └─> PHASE 3: Deletes (in DB, nicht in Advoware): + FOR row IN db_map: + IF row.advoware_betnr NOT IN advo_map: + ├─> Check if exists in EspoCRM: + │ GET /api/v1/CBeteiligte/{espocrm_id} + │ + ├─> IF EXISTS: + │ ├─> Soft-Delete in EspoCRM: + │ │ PUT /api/v1/CBeteiligte/{espocrm_id} + │ │ {deleted: true, syncStatus: "deleted"} + │ │ + │ └─> UPDATE beteiligte_sync SET + │ deleted = TRUE, + │ sync_status = 'synced' + │ + └─> ELSE (auch in EspoCRM nicht da): + └─> UPDATE beteiligte_sync SET + deleted = TRUE +``` + +**Timing**: Alle 15 Minuten (konfigurierbar) + +--- + +## 🎛️ Status-Feld in EspoCRM + +### Feld: `syncStatus` (String, Custom Field) + +**Werte**: +- `"pending_sync"` → Neu erstellt, wartet auf Sync nach Advoware +- `"clean"` → Synchronisiert, keine Änderungen +- `"dirty"` → Geändert seit letztem Sync, wartet auf Sync +- `"syncing"` → Sync läuft gerade +- `"failed"` → Sync fehlgeschlagen (+ Fehlerlog) +- `"conflict"` → Konflikt detektiert +- `"deleted"` → In Advoware gelöscht + +**Zusatzfeld**: `advowareLastSync` (DateTime) + +### Alternative: PostgreSQL als Single Source of Truth + +Statt `syncStatus` in EspoCRM: +- Alle Status in PostgreSQL `beteiligte_sync.sync_status` +- EspoCRM hat nur `betnr` und `advowareLastSync` +- **Vorteil**: Keine Schema-Änderung in EspoCRM nötig +- **Nachteil**: Status nicht direkt in EspoCRM UI sichtbar + +**Empfehlung**: Beides nutzen +- PostgreSQL: Master-Status für Sync-Logic +- EspoCRM: Read-only Display für User + +--- + +## ⚙️ Cron-Job Konfiguration + +### Option 1: Separate Cron-Steps (empfohlen) + +```python +# beteiligte_sync_cron_step.py +config = { + 'type': 'cron', + 'name': 'Beteiligte Sync Cron', + 'cron': '*/15 * * * *', # Alle 15 Minuten + 'emits': ['beteiligte.sync_all'], + 'flows': ['vmh'] +} +``` + +### Option 2: Integriert in bestehenden Cron + +```python +# Kombiniert mit anderen Sync-Tasks +config = { + 'type': 'cron', + 'name': 'All Syncs Cron', + 'cron': '*/5 * * * *', # Alle 5 Minuten + 'emits': ['calendar_sync_all', 'beteiligte.sync_all'], + 'flows': ['advoware', 'vmh'] +} +``` + +### Timing-Überlegungen: + +**Frequenz**: +- **15 Minuten**: Guter Kompromiss (wie bestehende Calendar Sync) +- **5 Minuten**: Wenn schnellere Reaktion auf Advoware-Änderungen nötig +- **1 Stunde**: Wenn Last auf APIs minimiert werden soll + +**Offset**: +- Calendar Sync: 0 Minuten +- Beteiligte Sync: +5 Minuten +- → Verhindert API-Überlast durch gleichzeitige Requests + +--- + +## 🔐 Sicherheit & Fehlerbehandlung + +### 1. Rate Limiting + +**Advoware API** (aus Calendar Sync): +```python +# Bereits implementiert in AdvowareAPI Service +# - Token-basiertes Rate Limiting via Redis +# - Backoff-Strategie bei 429 Errors +``` + +**EspoCRM API**: +```python +# Zu implementieren in EspoCRMAPI Service +ESPOCRM_RATE_LIMIT_KEY = 'espocrm_api_tokens' +MAX_TOKENS = 100 # Basierend auf API-Limits +REFILL_RATE = 100 / 60 # Tokens pro Sekunde +``` + +### 2. Lock-Mechanismus (Verhindert Race Conditions) + +```python +# Redis Lock für Entity während Sync +lock_key = f'vmh:beteiligte:sync_lock:{entity_id}' +if redis.set(lock_key, '1', nx=True, ex=300): # 5 Min TTL + try: + await perform_sync(entity_id) + finally: + redis.delete(lock_key) +else: + logger.warning(f"Entity {entity_id} ist bereits im Sync-Prozess") +``` + +### 3. Retry-Logic mit Exponential Backoff + +```python +import backoff + +@backoff.on_exception( + backoff.expo, + (AdvowareAPIError, EspoCRMAPIError), + max_tries=3, + max_time=60 +) +async def sync_entity_with_retry(entity_id): + # ... Sync-Logic + pass +``` + +### 4. Fehler-Logging & Monitoring + +```python +# PostgreSQL Error-Log Tabelle +CREATE TABLE beteiligte_sync_errors ( + error_id SERIAL PRIMARY KEY, + sync_id INTEGER REFERENCES beteiligte_sync(sync_id), + error_type VARCHAR(50), + error_message TEXT, + error_stack TEXT, + retry_count INTEGER DEFAULT 0, + resolved BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); +``` + +### 5. Write-Protection Flag + +**Global** (Config): +```python +# config.py +ADVOWARE_WRITE_PROTECTION = os.getenv('ADVOWARE_WRITE_PROTECTION', 'false').lower() == 'true' +``` + +**Per-Entity** (DB): +```sql +ALTER TABLE beteiligte_sync ADD COLUMN advoware_write_allowed BOOLEAN DEFAULT TRUE; +``` + +--- + +## 📦 Zu implementierende Module + +### Priorität 1 (Core Sync): + +1. **services/espocrm_mapper.py** ⭐⭐⭐ + - `map_cbeteiligte_to_advoware(espo_entity) -> advo_data` + - `map_advoware_to_cbeteiligte(advo_entity) -> espo_data` + - Feld-Mapping gemäß `ENTITY_MAPPING_CBeteiligte_Advoware.md` + +2. **steps/vmh/beteiligte_sync_event_step.py** ⭐⭐⭐ + - Implementiert Flow A (Webhook → Advoware) + - Subscribe to create/update/delete Events + - PostgreSQL Integration + - Konfliktauflösung + +3. **PostgreSQL Migration** ⭐⭐⭐ + - `migrations/001_create_beteiligte_sync_table.sql` + - Connection in Config + +4. **steps/vmh/beteiligte_sync_cron_step.py** ⭐⭐ + - Emittiert Sync-All Event alle 15 Min + +5. **steps/vmh/beteiligte_sync_all_event_step.py** ⭐⭐ + - Implementiert Flow B (Advoware Polling) + - 3-Phasen-Sync-Modell + +### Priorität 2 (Optimierungen): + +6. **services/beteiligte_sync_utils.py** ⭐ + - Shared Utilities + - Lock-Management + - Timestamp-Handling + - Conflict-Resolution Logic + +7. **Testing** ⭐ + - Unit Tests für Mapper + - Integration Tests für Sync-Flows + - Konflikt-Szenarien + +### Priorität 3 (Monitoring): + +8. **steps/vmh/audit_beteiligte_sync.py** + - Analog zu `audit_calendar_sync.py` + - CLI-Tool für Sync-Status + +9. **Dashboard/Metrics** + - Prometheus Metrics + - Grafana Dashboard + +--- + +## 🚀 Rollout-Plan + +### Phase 1: Setup (Tag 1-2) +- [ ] PostgreSQL Datenbank-Schema erstellen +- [ ] Config erweitern (DB-Connection) +- [ ] Mapper-Modul erstellen +- [ ] Unit Tests für Mapper + +### Phase 2: Webhook-Flow (Tag 3-4) +- [ ] `beteiligte_sync_event_step.py` implementieren +- [ ] Integration mit bestehenden Webhook-Steps +- [ ] Testing mit EspoCRM Sandbox + +### Phase 3: Polling-Flow (Tag 5-6) +- [ ] Cron-Step erstellen +- [ ] `beteiligte_sync_all_event_step.py` implementieren +- [ ] 3-Phasen-Modell +- [ ] Integration Tests + +### Phase 4: Konfliktauflösung (Tag 7) +- [ ] Timestamp-Vergleich +- [ ] Konflikt-Strategies +- [ ] Error-Handling +- [ ] Retry-Logic + +### Phase 5: Monitoring & Docs (Tag 8) +- [ ] Audit-Tool +- [ ] Logging +- [ ] Dokumentation +- [ ] Runbook + +### Phase 6: Production (Tag 9-10) +- [ ] Staging-Tests +- [ ] Performance-Tests +- [ ] Production-Rollout +- [ ] Monitoring + +--- + +## ⚠️ Wichtige Entscheidungen + +### 1. Conflict Resolution Strategy + +**Option A: "Source System Wins"** (empfohlen für Start): +```python +sync_strategy = 'source_system_wins' +# EspoCRM-created Entities → EspoCRM gewinnt bei Konflikt +# Advoware-created Entities → Advoware gewinnt bei Konflikt +``` + +**Option B: "Advoware Always Wins"**: +```python +sync_strategy = 'advoware_master' +# Advoware ist Master, EspoCRM ist read-only View +``` + +**Option C: "Last Modified Wins"**: +```python +sync_strategy = 'last_modified_wins' +# Timestamp-Vergleich, neuester gewinnt +``` + +**Empfehlung**: Start mit "Source System Wins", später auf "Last Modified Wins" mit Manual Conflict Review. + +### 2. Advoware Polling-Frequenz + +**Überlegungen**: +- API-Last auf Advoware +- Aktualitäts-Anforderungen +- Anzahl Beteiligte (~1000? ~10.000?) + +**Optimierung**: +- Incremental Fetch: Nur geändert seit `last_sync - 1 Tag` +- Delta-Detection via `geaendertAm` Timestamp +- Pagination bei großen Datenmengen + +### 3. EspoCRM Field: `syncStatus` + +**Zu klären**: +- Bestehendes Custom Field oder neu anlegen? +- Dropdown-Werte konfigurieren +- Permissions (nur Sync-System kann schreiben?) + +--- + +## 📝 Zusammenfassung + +### Was funktioniert: +✅ EspoCRM Webhooks → Motia Events ✅ Webhook-Deduplication via Redis +✅ EspoCRM API Client ✅ Advoware API Client +✅ Entity-Mapping definiert + +### Was zu implementieren ist: +🔨 Mapper-Modul +🔨 PostgreSQL Sync-State DB +🔨 Event-Handler für Webhooks +🔨 Cron-Job für Polling +🔨 3-Phasen-Sync für Advoware → EspoCRM +🔨 Konfliktauflösung +🔨 Error-Handling & Monitoring + +### Geschätzter Aufwand: +- **Setup & Core**: 3-4 Tage +- **Flows**: 2-3 Tage +- **Konfliktauflösung**: 1-2 Tage +- **Testing & Docs**: 1-2 Tage +- **Rollout**: 1-2 Tage +- **Total**: ~8-13 Tage + +### Nächster Schritt: +1. Entscheidung zu Status-Feld in EspoCRM +2. PostgreSQL DB-Schema aufsetzen +3. Mapper-Modul implementieren +4. Webhook-Flow komplettieren + +--- + +**Fragen zur Klärung**: +1. Existiert `syncStatus` Field bereits in EspoCRM CBeteiligte? +2. Wie viele Beteiligte gibt es ca. in Advoware? +3. Gibt es Performance-Anforderungen? (z.B. Sync innerhalb X Sekunden) +4. Soll es manuelle Conflict-Resolution geben oder automatisch? +5. PostgreSQL Server bereits vorhanden? (Wie Calendar Sync) diff --git a/bitbylaw/SYNC_STRATEGY_ESPOCRM_BASED.md b/bitbylaw/SYNC_STRATEGY_ESPOCRM_BASED.md new file mode 100644 index 00000000..acf68f92 --- /dev/null +++ b/bitbylaw/SYNC_STRATEGY_ESPOCRM_BASED.md @@ -0,0 +1,485 @@ +# Sync-Strategie: EspoCRM-basiert (ohne PostgreSQL) + +**Analysiert am**: 2026-02-07 +**Anpassung**: EspoCRM als primäre State-Datenbank + +--- + +## 🎯 EspoCRM Felder (CBeteiligte Entity) + +```json +{ + "betnr": 1234, // Link zu Advoware betNr (int, unique) + "syncStatus": "clean", // Sync-Status (enum) + "advowareLastSync": null, // Letzter Sync (datetime oder null) + "advowareDeletedAt": null, // Gelöscht in Advoware am (datetime, NEU) + "syncErrorMessage": null, // Fehlerdetails (text, NEU) + "syncRetryCount": 0, // Anzahl Retry-Versuche (int, NEU) + "modifiedAt": "2026-01-23 21:58:41" // EspoCRM Änderungszeit +} +``` + +### syncStatus-Werte (Enum in EspoCRM): +- `"pending_sync"` - Neu erstellt, noch nicht nach Advoware gesynct +- `"clean"` - Synchronisiert, keine ausstehenden Änderungen +- `"dirty"` - In EspoCRM geändert, wartet auf Sync nach Advoware +- `"syncing"` - Sync läuft gerade (verhindert Race Conditions) +- `"failed"` - Sync fehlgeschlagen (mit syncErrorMessage + syncRetryCount) +- `"conflict"` - Konflikt erkannt → **EspoCRM WINS** (mit Notification) +- `"deleted_in_advoware"` - In Advoware gelöscht (Soft-Delete Flag mit Notification) + +--- + +## 🔄 Flow A: EspoCRM Update → Advoware (Webhook) + +**Trigger**: EspoCRM Webhook bei Create/Update + +``` +1. EspoCRM: User ändert CBeteiligte + └─> Webhook: POST /vmh/webhook/beteiligte/update + Body: [{"id": "68e4af00172be7924"}] + +2. beteiligte_update_api_step.py: + ├─> Redis Deduplication + └─> Emit Event: "vmh.beteiligte.update" + +3. beteiligte_sync_event_step.py: + ├─> Fetch Entity von EspoCRM: + │ GET /api/v1/CBeteiligte/{id} + │ { + │ "id": "...", + │ "firstName": "Angela", + │ "lastName": "Mustermann", + │ "betnr": 104860, // Bereits vorhanden + │ "syncStatus": "clean", + │ "advowareLastSync": "2026-02-01T10:00:00", + │ "modifiedAt": "2026-02-07T14:30:00" + │ } + │ + ├─> Check syncStatus: + │ ├─> IF syncStatus == "syncing": + │ │ → Skip (bereits im Sync-Prozess) + │ │ + │ ├─> IF syncStatus == "pending_sync" AND betnr == NULL: + │ │ → NEU: Create in Advoware + │ │ ├─> Set syncStatus = "syncing" + │ │ ├─> Transform via Mapper + │ │ ├─> POST /api/v1/advonet/Beteiligte + │ │ │ Response: {betNr: 123456} + │ │ └─> Update EspoCRM: + │ │ PUT /api/v1/CBeteiligte/{id} + │ │ { + │ │ betnr: 123456, + │ │ syncStatus: "clean", + │ │ advowareLastSync: NOW() + │ │ } + │ │ + │ └─> IF betnr != NULL (bereits gesynct): + │ → UPDATE: Vergleiche Timestamps + │ ├─> Fetch von Advoware: + │ │ GET /api/v1/advonet/Beteiligte/{betnr} + │ │ {betNr: 104860, geaendertAm: "2026-02-07T12:00:00"} + │ │ + │ ├─> Vergleiche Timestamps: + │ │ espocrm_ts = entity.modifiedAt + │ │ advoware_ts = advo_entity.geaendertAm + │ │ last_sync_ts = entity.advowareLastSync + │ │ + │ │ IF espocrm_ts > last_sync_ts AND espocrm_ts > advoware_ts: + │ │ → EspoCRM ist neuer → Update Advoware + │ │ ├─> Set syncStatus = "syncing" + │ │ ├─> PUT /api/v1/advonet/Beteiligte/{betnr} + │ │ └─> Update EspoCRM: + │ │ syncStatus = "clean" + │ │ advowareLastSync = NOW() + │ │ syncErrorMessage = NULL + │ │ syncRetryCount = 0 + │ │ + │ │ ELSE IF advoware_ts > last_sync_ts AND advoware_ts > espocrm_ts: + │ │ → Advoware ist neuer → Update EspoCRM + │ │ ├─> Set syncStatus = "syncing" + │ │ ├─> Transform von Advoware + │ │ └─> Update EspoCRM mit Advoware-Daten + │ │ syncStatus = "clean" + │ │ advowareLastSync = NOW() + │ │ syncErrorMessage = NULL + │ │ syncRetryCount = 0 + │ │ + │ │ ELSE IF espocrm_ts > last_sync_ts AND advoware_ts > last_sync_ts: + │ │ → KONFLIKT: Beide geändert seit last_sync + │ │ + │ │ **REGEL: EspoCRM WINS!** + │ │ + │ │ ├─> Set syncStatus = "conflict" + │ │ ├─> Überschreibe Advoware mit EspoCRM-Daten: + │ │ │ PUT /api/v1/advonet/Beteiligte/{betnr} + │ │ │ + │ │ ├─> Update EspoCRM: + │ │ │ syncStatus = "clean" (gelöst!) + │ │ │ advowareLastSync = NOW() + │ │ │ syncErrorMessage = "Konflikt am {NOW}: EspoCRM={espocrm_ts}, Advoware={advoware_ts}. EspoCRM hat gewonnen." + │ │ │ + │ │ └─> Send Notification: + │ │ Template: "beteiligte_sync_conflict" + │ │ To: Admin-User oder zugewiesener User + │ │ + │ │ ELSE: + │ │ → Keine Änderungen seit last_sync + │ │ └─> Skip + │ │ + │ └─> Bei Fehler: + │ syncStatus = "failed" + │ syncErrorMessage = Error-Details (inkl. Stack Trace) + │ syncRetryCount += 1 + │ Log Error + │ + └─> Handle 404 von Advoware (gelöscht): + IF advoware.api_call returns 404: + ├─> Update EspoCRM: + │ syncStatus = "deleted_in_advoware" + │ advowareDeletedAt = NOW() + │ syncErrorMessage = "Beteiligter existiert nicht mehr in Advoware" + │ + └─> Send Notification: + Template: "beteiligte_advoware_deleted" + To: Admin-User oder zugewiesener User +``` + +**Timing**: ~2-5 Sekunden nach Webhook oder Cron-Event + +--- + +## 🔄 Flow B: Advoware → EspoCRM (Cron-basiert mit Events) + +**Trigger**: Cron alle 15 Minuten + +``` +1. beteiligte_sync_cron_step.py (*/15 * * * *): + + ├─> Query EspoCRM: Alle Entities die Sync benötigen + │ + │ SELECT * FROM CBeteiligte WHERE: + │ - syncStatus IN ('pending_sync', 'dirty', 'failed') + │ - OR (syncStatus = 'clean' AND betnr IS NOT NULL + │ AND advowareLastSync < NOW() - 24 HOURS) + │ + ├─> Für JEDEN Beteiligten einzeln: + │ └─> Emit Event: "vmh.beteiligte.sync_check" + │ payload: { + │ entity_id: "68e4af00172be7924", + │ source: "cron", + │ timestamp: "2026-02-07T14:30:00Z" + │ } + │ + └─> Log: "Emitted {count} sync_check events" + +2. beteiligte_sync_event_step.py (GLEICHER Handler wie Webhook!): + + └─> Subscribe zu: "vmh.beteiligte.sync_check" + (Dieser Event kommt von Cron oder manuellen Triggers) + + ├─> Fetch entity_id aus Event-Payload + │ + └─> Führe GLEICHE Logik aus wie bei Webhook (siehe Flow A oben!) + - Lock via syncStatus + - Timestamp-Vergleich + - Create/Update + - Konfliktauflösung (EspoCRM wins) + - 404 Handling (deleted_in_advoware) + - Update syncStatus + Felder + +**WICHTIG**: Flow B nutzt Events statt Batch-Processing! +- Cron emittiert nur Events für zu syncende Entities +- Der normale Sync-Handler (Flow A) verarbeitet beide Quellen gleich +- Code-Wiederverwendung: KEIN separater Batch-Handler nötig! +``` + +**Timing**: +- Cron läuft alle 15 Minuten +- Events werden sofort verarbeitet (wie Webhooks) + +--- + +## 📊 Optimierung: Nur veraltete checken + +### Cron-Query für zu prüfende Entities: + +```javascript +// In beteiligte_sync_all_event_step.py + +// 1. Holen von Entities die Sync benötigen +const needsSyncFilter = { + where: [ + { + type: 'or', + value: [ + // Neu und noch nicht gesynct + { + type: 'and', + value: [ + {type: 'equals', attribute: 'syncStatus', value: 'pending_sync'}, + {type: 'isNull', attribute: 'betnr'} + ] + }, + // Dirty (geändert in EspoCRM) + {type: 'equals', attribute: 'syncStatus', value: 'dirty'}, + + // Failed (Retry) + {type: 'equals', attribute: 'syncStatus', value: 'failed'}, + + // Clean aber lange nicht gesynct (> 24h) + { + type: 'and', + value: [ + {type: 'equals', attribute: 'syncStatus', value: 'clean'}, + {type: 'isNotNull', attribute: 'betnr'}, + { + type: 'or', + value: [ + {type: 'isNull', attribute: 'advowareLastSync'}, + {type: 'before', attribute: 'advowareLastSync', value: 'NOW() - 24 HOURS'} + ] + } + ] + } + ] + } + ] +}; +``` + +### Advoware Query-Optimierung: + +```python +# Nur kürzlich geänderte aus Advoware holen +last_full_sync = get_last_full_sync_timestamp() # z.B. vor 7 Tagen +if last_full_sync: + # Incremental Fetch + params = { + 'filter': f'geaendertAm gt {last_full_sync.isoformat()}', + 'orderBy': 'geaendertAm desc' + } +else: + # Full Fetch (beim ersten Mal oder nach langer Zeit) + params = {} + +result = await advoware.api_call( + 'api/v1/advonet/Beteiligte', + method='GET', + params=params +) +``` + +--- + +## 🔐 Locking via syncStatus + +**Verhindert Race Conditions ohne Redis Lock**: + +```python +# Vor Sync-Operation: +async def acquire_sync_lock(espocrm_api, entity_id): + """ + Setzt syncStatus auf "syncing" wenn möglich. + Returns: True wenn Lock erhalten, False sonst + """ + try: + # Fetch current + entity = await espocrm_api.get_entity('CBeteiligte', entity_id) + + if entity.get('syncStatus') == 'syncing': + # Bereits im Sync-Prozess + return False + + # Atomic Update (EspoCRM sollte Optimistic Locking unterstützen) + await espocrm_api.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'syncing' + }) + + return True + + except Exception as e: + logger.error(f"Failed to acquire sync lock: {e}") + return False + +# Nach Sync-Operation (im finally-Block): +async def release_sync_lock(espocrm_api, entity_id, new_status='clean'): + """Setzt syncStatus zurück""" + try: + await espocrm_api.update_entity('CBeteiligte', entity_id, { + 'syncStatus': new_status, + 'advowareLastSync': datetime.now(pytz.UTC).isoformat() + }) + except Exception as e: + logger.error(f"Failed to release sync lock: {e}") +``` + +--- + +## 📋 Status-Übergänge + +``` +pending_sync → syncing → clean (erfolgreicher Create) +pending_sync → syncing → failed (fehlgeschlagener Create) + +clean → dirty → syncing → clean (Update nach Änderung) +clean → dirty → syncing → conflict (Konflikt detektiert) +clean → dirty → syncing → failed (Update fehlgeschlagen) + +failed → syncing → clean (erfolgreicher Retry) +failed → syncing → failed (erneuter Fehler) + +conflict → syncing → clean (manuell aufgelöst) + +clean → deleted_in_advoware (in Advoware gelöscht) +``` + +--- + +## 🎯 Implementierungs-Checkliste + +### Phase 1: Core Sync (Flow A - Webhook + Cron Events) + +- [ ] **services/espocrm_mapper.py** + - [ ] `map_cbeteiligte_to_advoware(espo_entity)` + - [ ] `map_advoware_to_cbeteiligte(advo_entity)` + +- [ ] **steps/vmh/beteiligte_sync_event_step.py** (ZENTRALER Handler!) + - [ ] Subscribe zu: `vmh.beteiligte.create`, `vmh.beteiligte.update`, `vmh.beteiligte.delete`, `vmh.beteiligte.sync_check` + - [ ] Fetch Entity von EspoCRM + - [ ] Lock via syncStatus="syncing" + - [ ] Timestamp-Vergleich + - [ ] Create/Update in Advoware + - [ ] **Konfliktauflösung: EspoCRM wins!** + - [ ] **404 Handling: Soft-Delete (deleted_in_advoware)** + - [ ] **Notifications: Bei Konflikt + Soft-Delete** + - [ ] Update syncStatus + advowareLastSync + syncErrorMessage + syncRetryCount + - [ ] Error Handling (→ syncStatus="failed" mit Retry-Counter) + - [ ] Redis Cleanup (SREM pending sets) + +### Phase 2: Cron Event Emitter (Flow B) + +- [ ] **steps/vmh/beteiligte_sync_cron_step.py** + - [ ] Cron: `*/15 * * * *` + - [ ] Query EspoCRM: Entities mit Status `IN (pending_sync, dirty, failed)` ODER `clean + advowareLastSync < NOW() - 24h` + - [ ] Für JEDEN Beteiligten: Emit `vmh.beteiligte.sync_check` Event + - [ ] Log: Anzahl emittierter Events + - [ ] **KEIN** Batch-Processing - Events werden einzeln vom Handler verarbeitet! + +### Phase 3: Utilities + +- [ ] **services/betei & Notifications + +- [ ] **services/beteiligte_sync_utils.py** + - [ ] `acquire_sync_lock(entity_id)` → Setzt syncStatus="syncing" + - [ ] `release_sync_lock(entity_id, new_status)` → Setzt syncStatus + Updates + - [ ] `compare_timestamps(espo_ts, advo_ts, last_sync)` → Returns: "espocrm_newer", "advoware_newer", "conflict", "no_change" + - [ ] `resolve_conflict_espocrm_wins(espo_entity, advo_entity)` → Überschreibt Advoware + - [ ] `send_notification(entity_id, template_name, extra_data=None)` → EspoCRM Notification + - [ ] `handle_advoware_deleted(entity_id, error_msg)` → Soft-Delete + Notification + +- [ ] Unit Tests für Mapper +- [ ] Integration Tests für beide Flows +- [ ] Konflikt-Szenarien testen +- [ ] Load-Tests (Performance mit 1000+ Entities) +- [ ] CLI Audit-Tool (analog zu calendar_sync audit) +→ clean (Konflikt → EspoCRM wins → gelöst!) +clean → dirty → syncing → failed (Update fehlgeschlagen) + +dirty → syncing → deleted_in_advoware (404 von Advoware → Soft-Delete) + +failed → syncing → clean (erfolgreicher Retry) +failed → syncing → failed (erneuter Fehler, syncRetryCount++) + +conflict → clean (automatisch via EspoCRM wins) + +clean → deleted_in_advoware (Advoware hat gelöscht) +deleted_in_advoware → clean (Re-create in Advoware via Manual-Trigger +GET /api/v1/CBeteiligte?select=syncStatus&maxSize=1000 +→ Gruppiere und zähle + +// Entities die Sync benötigen +GET /api/v1/CBeteiligte?where=[ + {type: 'in', attribute: 'syncStatus', value: ['pending_sync', 'dirty', 'failed']} +] + +// Lange nicht gesynct (> 7 Tage) +GET /api/v1/CBeteiligte?where=[ + {type: 'before', attribute: 'advowareLastSync', value: 'NOW() - 7 DAYS'} +] + +// Konflikte +GET /api/v1/CBeteiligte?where=[ + {type: 'equals', attribute: 'syncStatus', value: 'conflict'} +] +``` + +--- + +## 📈 Performance-Überlegungen + +### Batch-Größen: + +```python +# Cron-Job Configuration +CRON_BATCH_SIZE = 50 # Max 50 Entities pro Cron-Run +CRON_TIMEOUT = 300 # 5 Minuten Timeout + +# Advoware Fetch +ADVOWARE_PAGE_SIZE = 100 # Entities pro API-Request +``` + +### Timing: + +- **Webhook (Flow A)**: 2-5 Sekunden (near real-time) +- **Cron (Flow B)**: 15 Minuten Intervall +- **Veraltete Check**: 24 Stunden (täglich syncen) +- **Full Sync**: 7 Tage (wöchentlich alle prüfen) + +### Rate Limiting: + +```python +# Aus bestehender AdvowareAPI +# - Bereits implementiert +# - Token-based Rate Limiting via Redis + +# Für EspoCRM hinzufügen: +ESPOCRM_MAX_REQUESTS_PER_MINUTE = 100 +``` + +--- + +## 🎯 Vorteile dieser Architektur + +✅ **Kein PostgreSQL nötig** - EspoCRM ist State-Datenbank +✅ **Alle Daten in EspoCRM** - Single Source of Truth +✅ **Status sichtbar** - User können syncStatus in UI sehen +✅ **Optimiert** - Nur veraltete werden geprüft +✅ **Robust** - Locking via syncStatus verhindert Race Conditions +✅ **Konflikt-Tracking** - Konflikte werden explizit markiert +✅ **Wiederverwendbar** - Lock-Pattern nutzbar für andere Syncs + +--- + +## 🔧 Nächste Schritte + +1. **Mapper implementieren** (services/espocrm_mapper.py) +2. **Webhook-Handler komplettieren** (Flow A) +3. **Cron + Polling implementieren** (Flow B) +4. **Testing mit echten Daten** +5. **Monitoring & Dashboard** + +**Geschätzte Zeit**: 5-7 Tage + +--- +Entscheidungen (vom User bestätigt)**: +1. ✅ syncStatus als Enum in EspoCRM mit definierten Werten +2. ✅ Soft-Delete: Nur Flag (deleted_in_advoware + advowareDeletedAt) +3. ✅ Automatisch: **EspoCRM WINS** bei Konflikten +4. ✅ Notifications: Ja, bei Konflikten + Soft-Deletes (EspoCRM Notifications) + +**Architektur-Entscheidung**: +- ✅ Cron emittiert Events (`vmh.beteiligte.sync_check`), statt Batch-Processing +- ✅ Ein zentraler Sync-Handler für Webhooks UND Cron-Events +- ✅ Code-Wiederverwendung maximiertdvoware wins"? +4. Benachrichtigung bei Konflikten? (Email, Webhook, ...) diff --git a/bitbylaw/services/beteiligte_sync_utils.py b/bitbylaw/services/beteiligte_sync_utils.py new file mode 100644 index 00000000..5608f563 --- /dev/null +++ b/bitbylaw/services/beteiligte_sync_utils.py @@ -0,0 +1,341 @@ +""" +Beteiligte Sync Utilities + +Hilfsfunktionen für Sync-Operationen: +- Locking via syncStatus +- Timestamp-Vergleich +- Konfliktauflösung (EspoCRM wins) +- EspoCRM In-App Notifications +- Soft-Delete Handling +""" + +from typing import Dict, Any, Optional, Tuple, Literal +from datetime import datetime +import pytz +import logging +from services.espocrm import EspoCRMAPI + +logger = logging.getLogger(__name__) + +# Timestamp-Vergleich Ergebnis-Typen +TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"] + + +class BeteiligteSync: + """Utility-Klasse für Beteiligte-Synchronisation""" + + def __init__(self, espocrm_api: EspoCRMAPI, context=None): + self.espocrm = espocrm_api + self.context = context + + def _log(self, message: str, level: str = 'info'): + """Logging mit Context-Support""" + if self.context and hasattr(self.context, 'logger'): + getattr(self.context.logger, level)(message) + else: + getattr(logger, level)(message) + + async def acquire_sync_lock(self, entity_id: str) -> bool: + """ + Setzt syncStatus auf "syncing" (atomares Lock) + + Args: + entity_id: EspoCRM CBeteiligte ID + + Returns: + True wenn Lock erfolgreich, False wenn bereits im Sync + """ + try: + entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + + current_status = entity.get('syncStatus') + + if current_status == 'syncing': + self._log(f"Entity {entity_id} bereits im Sync-Prozess", level='warning') + return False + + # Setze Lock + await self.espocrm.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'syncing' + }) + + self._log(f"Sync-Lock für {entity_id} erworben (vorher: {current_status})") + return True + + except Exception as e: + self._log(f"Fehler beim Acquire Lock: {e}", level='error') + return False + + async def release_sync_lock( + self, + entity_id: str, + new_status: str = 'clean', + error_message: Optional[str] = None, + increment_retry: bool = False + ) -> None: + """ + Gibt Sync-Lock frei und setzt finalen Status + + Args: + entity_id: EspoCRM CBeteiligte ID + new_status: Neuer syncStatus (clean, failed, conflict, etc.) + error_message: Optional: Fehlermeldung für syncErrorMessage + increment_retry: Ob syncRetryCount erhöht werden soll + """ + try: + update_data = { + 'syncStatus': new_status, + 'advowareLastSync': datetime.now(pytz.UTC).isoformat() + } + + if error_message: + update_data['syncErrorMessage'] = error_message[:2000] # Max. 2000 chars + else: + update_data['syncErrorMessage'] = None + + if increment_retry: + # Hole aktuellen Retry-Count + entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + current_retry = entity.get('syncRetryCount') or 0 + update_data['syncRetryCount'] = current_retry + 1 + else: + update_data['syncRetryCount'] = 0 + + await self.espocrm.update_entity('CBeteiligte', entity_id, update_data) + + self._log(f"Sync-Lock released: {entity_id} → {new_status}") + + except Exception as e: + self._log(f"Fehler beim Release Lock: {e}", level='error') + + @staticmethod + def parse_timestamp(ts: Any) -> Optional[datetime]: + """ + Parse verschiedene Timestamp-Formate zu datetime + + Args: + ts: String, datetime oder None + + Returns: + datetime-Objekt oder None + """ + if not ts: + return None + + if isinstance(ts, datetime): + return ts + + if isinstance(ts, str): + # EspoCRM Format: "2026-02-07 14:30:00" + # Advoware Format: "2026-02-07T14:30:00" oder "2026-02-07T14:30:00Z" + try: + # Entferne trailing Z falls vorhanden + ts = ts.rstrip('Z') + + # Versuche verschiedene Formate + for fmt in [ + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%d', + ]: + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + + # Fallback: ISO-Format + return datetime.fromisoformat(ts) + + except Exception as e: + logger.warning(f"Konnte Timestamp nicht parsen: {ts} - {e}") + return None + + return None + + def compare_timestamps( + self, + espo_modified_at: Any, + advo_geaendert_am: Any, + last_sync_ts: Any + ) -> TimestampResult: + """ + Vergleicht Timestamps und bestimmt Sync-Richtung + + Args: + espo_modified_at: EspoCRM modifiedAt + advo_geaendert_am: Advoware geaendertAm + last_sync_ts: Letzter Sync (advowareLastSync) + + Returns: + "espocrm_newer": EspoCRM wurde nach last_sync geändert und ist neuer + "advoware_newer": Advoware wurde nach last_sync geändert und ist neuer + "conflict": Beide wurden nach last_sync geändert + "no_change": Keine Änderungen seit last_sync + """ + espo_ts = self.parse_timestamp(espo_modified_at) + advo_ts = self.parse_timestamp(advo_geaendert_am) + sync_ts = self.parse_timestamp(last_sync_ts) + + # Logging + self._log( + f"Timestamp-Vergleich: EspoCRM={espo_ts}, Advoware={advo_ts}, LastSync={sync_ts}", + level='debug' + ) + + # Falls kein last_sync → erster Sync, vergleiche direkt + if not sync_ts: + if not espo_ts or not advo_ts: + return "no_change" + + if espo_ts > advo_ts: + return "espocrm_newer" + elif advo_ts > espo_ts: + return "advoware_newer" + else: + return "no_change" + + # Check ob seit last_sync Änderungen + espo_changed = espo_ts and espo_ts > sync_ts + advo_changed = advo_ts and advo_ts > sync_ts + + if espo_changed and advo_changed: + # Beide geändert seit last_sync → Konflikt + return "conflict" + elif espo_changed: + # Nur EspoCRM geändert + return "espocrm_newer" if (not advo_ts or espo_ts > advo_ts) else "conflict" + elif advo_changed: + # Nur Advoware geändert + return "advoware_newer" + else: + # Keine Änderungen + return "no_change" + + async def send_notification( + self, + entity_id: str, + notification_type: Literal["conflict", "deleted"], + extra_data: Optional[Dict[str, Any]] = None + ) -> None: + """ + Sendet EspoCRM In-App Notification + + Args: + entity_id: CBeteiligte Entity ID + notification_type: "conflict" oder "deleted" + extra_data: Zusätzliche Daten für Nachricht + """ + try: + # Hole Entity-Daten + entity = await self.espocrm.get_entity('CBeteiligte', entity_id) + name = entity.get('name', 'Unbekannt') + betnr = entity.get('betnr') + assigned_user = entity.get('assignedUserId') + + # Erstelle Nachricht basierend auf Typ + if notification_type == "conflict": + message = ( + f"⚠️ Sync-Konflikt bei Beteiligten '{name}' (betNr: {betnr}). " + f"EspoCRM hat Vorrang - Änderungen wurden nach Advoware übertragen. " + f"Bitte prüfen Sie die Details." + ) + elif notification_type == "deleted": + deleted_at = entity.get('advowareDeletedAt', 'unbekannt') + message = ( + f"🗑️ Beteiligter '{name}' (betNr: {betnr}) wurde in Advoware gelöscht " + f"(am {deleted_at}). Der Datensatz wurde in EspoCRM markiert, aber nicht gelöscht. " + f"Bitte prüfen Sie, ob dies beabsichtigt war." + ) + else: + message = f"Benachrichtigung für Beteiligten '{name}'" + + # Erstelle Notification in EspoCRM + notification_data = { + 'type': 'message', + 'message': message, + 'relatedType': 'CBeteiligte', + 'relatedId': entity_id, + } + + # Wenn assigned user vorhanden, sende an diesen + if assigned_user: + notification_data['userId'] = assigned_user + + # Sende via API + result = await self.espocrm.api_call( + 'Notification', + method='POST', + data=notification_data + ) + + self._log(f"Notification gesendet für {entity_id}: {notification_type}") + + except Exception as e: + self._log(f"Fehler beim Senden der Notification: {e}", level='error') + + async def handle_advoware_deleted( + self, + entity_id: str, + error_details: str + ) -> None: + """ + Behandelt Fall dass Beteiligter in Advoware gelöscht wurde (404) + + Args: + entity_id: CBeteiligte Entity ID + error_details: Fehlerdetails von Advoware API + """ + try: + now = datetime.now(pytz.UTC).isoformat() + + # Update Entity: Soft-Delete Flag + await self.espocrm.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'deleted_in_advoware', + 'advowareDeletedAt': now, + 'syncErrorMessage': f"Beteiligter existiert nicht mehr in Advoware. {error_details}" + }) + + self._log(f"Entity {entity_id} als deleted_in_advoware markiert") + + # Sende Notification + await self.send_notification(entity_id, 'deleted') + + except Exception as e: + self._log(f"Fehler beim Handle Deleted: {e}", level='error') + + async def resolve_conflict_espocrm_wins( + self, + entity_id: str, + espo_entity: Dict[str, Any], + advo_entity: Dict[str, Any], + conflict_details: str + ) -> None: + """ + Löst Konflikt auf: EspoCRM wins (überschreibt Advoware) + + Args: + entity_id: CBeteiligte Entity ID + espo_entity: EspoCRM Entity-Daten + advo_entity: Advoware Entity-Daten + conflict_details: Details zum Konflikt + """ + try: + now = datetime.now(pytz.UTC).isoformat() + + # Markiere als gelöst mit Konflikt-Info + await self.espocrm.update_entity('CBeteiligte', entity_id, { + 'syncStatus': 'clean', # Gelöst! + 'advowareLastSync': now, + 'syncErrorMessage': f"Konflikt am {now}: {conflict_details}. EspoCRM hat gewonnen.", + 'syncRetryCount': 0 + }) + + self._log(f"Konflikt gelöst für {entity_id}: EspoCRM wins") + + # Sende Notification + await self.send_notification(entity_id, 'conflict', { + 'details': conflict_details + }) + + except Exception as e: + self._log(f"Fehler beim Resolve Conflict: {e}", level='error') diff --git a/bitbylaw/services/espocrm_mapper.py b/bitbylaw/services/espocrm_mapper.py new file mode 100644 index 00000000..dea612b8 --- /dev/null +++ b/bitbylaw/services/espocrm_mapper.py @@ -0,0 +1,243 @@ +""" +EspoCRM ↔ Advoware Entity Mapper + +Transformiert Beteiligte zwischen den beiden Systemen basierend auf ENTITY_MAPPING_CBeteiligte_Advoware.md +""" + +from typing import Dict, Any, Optional, List +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class BeteiligteMapper: + """Mapper für CBeteiligte (EspoCRM) ↔ Beteiligte (Advoware)""" + + @staticmethod + def map_cbeteiligte_to_advoware(espo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert EspoCRM CBeteiligte → Advoware Beteiligte Format + + Args: + espo_entity: CBeteiligte Entity von EspoCRM + + Returns: + Dict für Advoware API (POST/PUT /api/v1/advonet/Beteiligte) + """ + logger.debug(f"Mapping EspoCRM → Advoware: {espo_entity.get('id')}") + + # Bestimme ob Person oder Firma + is_firma = bool(espo_entity.get('firmenname')) + rechtsform = espo_entity.get('rechtsform', '') + + # Basis-Struktur + advo_data = { + 'rechtsform': rechtsform, + } + + # NAME: Person vs. Firma + if is_firma: + # Firma: name = firmenname + advo_data['name'] = espo_entity.get('firmenname', '') + advo_data['vorname'] = None + else: + # Person: name = lastName, vorname = firstName + advo_data['name'] = espo_entity.get('lastName', '') + advo_data['vorname'] = espo_entity.get('firstName', '') + + # ANREDE + salutation = espo_entity.get('salutationName', '') + if salutation: + advo_data['anrede'] = salutation + + # GEBURTSDATUM + date_of_birth = espo_entity.get('dateOfBirth') + if date_of_birth: + advo_data['geburtsdatum'] = date_of_birth + + # KONTAKTDATEN + # E-Mail (emailAddressData ist Array, wir nehmen Primary) + email_data = espo_entity.get('emailAddressData') + if email_data and isinstance(email_data, list): + primary_email = next((e for e in email_data if e.get('primary')), None) + if primary_email: + advo_data['emailGesch'] = primary_email.get('emailAddress') + elif espo_entity.get('emailAddress'): + advo_data['emailGesch'] = espo_entity.get('emailAddress') + + # Telefon (phoneNumberData ist Array, wir nehmen Primary) + phone_data = espo_entity.get('phoneNumberData') + if phone_data and isinstance(phone_data, list): + primary_phone = next((p for p in phone_data if p.get('primary')), None) + if primary_phone: + phone_num = primary_phone.get('phoneNumber') + phone_type = primary_phone.get('type', '').lower() + + if 'mobile' in phone_type or 'mobil' in phone_type: + advo_data['mobil'] = phone_num + else: + advo_data['telGesch'] = phone_num + elif espo_entity.get('phoneNumber'): + advo_data['telGesch'] = espo_entity.get('phoneNumber') + + # HANDELSREGISTER (nur für Firmen) + if is_firma: + hr_nummer = espo_entity.get('handelsregisterNummer') + if hr_nummer: + advo_data['handelsRegisterNummer'] = hr_nummer + + # DISGTYP (EspoCRM spezifisch - falls vorhanden) + disgtyp = espo_entity.get('disgTyp') + if disgtyp: + advo_data['disgTyp'] = disgtyp + + logger.debug(f"Mapped to Advoware: name={advo_data.get('name')}, vorname={advo_data.get('vorname')}") + + return advo_data + + @staticmethod + def map_advoware_to_cbeteiligte(advo_entity: Dict[str, Any]) -> Dict[str, Any]: + """ + Transformiert Advoware Beteiligte → EspoCRM CBeteiligte Format + + Args: + advo_entity: Beteiligter von Advoware API + + Returns: + Dict für EspoCRM API (POST/PUT /api/v1/CBeteiligte) + """ + logger.debug(f"Mapping Advoware → EspoCRM: betNr={advo_entity.get('betNr')}") + + # Bestimme ob Person oder Firma + vorname = advo_entity.get('vorname') + is_person = bool(vorname) + + # Basis-Struktur + espo_data = { + 'rechtsform': advo_entity.get('rechtsform', ''), + 'betnr': advo_entity.get('betNr'), # Link zu Advoware + } + + # NAME: Person vs. Firma + if is_person: + # Person + espo_data['firstName'] = vorname + espo_data['lastName'] = advo_entity.get('name', '') + espo_data['name'] = f"{vorname} {advo_entity.get('name', '')}".strip() + espo_data['firmenname'] = None + else: + # Firma + espo_data['firmenname'] = advo_entity.get('name', '') + espo_data['name'] = advo_entity.get('name', '') + espo_data['firstName'] = None + espo_data['lastName'] = None + + # ANREDE + anrede = advo_entity.get('anrede') + if anrede: + espo_data['salutationName'] = anrede + + # GEBURTSDATUM + geburtsdatum = advo_entity.get('geburtsdatum') + if geburtsdatum: + espo_data['dateOfBirth'] = geburtsdatum + + # KONTAKTDATEN + # E-Mail (emailGesch ist primary) + email_gesch = advo_entity.get('emailGesch') + email = advo_entity.get('email') + + primary_email = email_gesch or email + if primary_email: + espo_data['emailAddress'] = primary_email + espo_data['emailAddressData'] = [ + { + 'emailAddress': primary_email, + 'primary': True, + 'optOut': False, + 'invalid': False + } + ] + + # Telefon (telGesch ist primary, mobil als secondary) + tel_gesch = advo_entity.get('telGesch') + tel_privat = advo_entity.get('telPrivat') + mobil = advo_entity.get('mobil') + + phone_data = [] + + # Primary: telGesch oder telPrivat + primary_tel = tel_gesch or tel_privat + if primary_tel: + espo_data['phoneNumber'] = primary_tel + phone_data.append({ + 'phoneNumber': primary_tel, + 'primary': True, + 'type': 'Office' if tel_gesch else 'Home' + }) + + # Secondary: mobil + if mobil and mobil != primary_tel: + phone_data.append({ + 'phoneNumber': mobil, + 'primary': False, + 'type': 'Mobile' + }) + + if phone_data: + espo_data['phoneNumberData'] = phone_data + + # HANDELSREGISTER (nur für Firmen) + if not is_person: + hr_nummer = advo_entity.get('handelsRegisterNummer') + if hr_nummer: + espo_data['handelsregisterNummer'] = hr_nummer + + # DISGTYP + disgtyp = advo_entity.get('disgTyp') + if disgtyp: + espo_data['disgTyp'] = disgtyp + + logger.debug(f"Mapped to EspoCRM: name={espo_data.get('name')}") + + return espo_data + + @staticmethod + def get_changed_fields(espo_entity: Dict[str, Any], advo_entity: Dict[str, Any]) -> List[str]: + """ + Vergleicht zwei Entities und gibt Liste der geänderten Felder zurück + + Args: + espo_entity: EspoCRM CBeteiligte + advo_entity: Advoware Beteiligte + + Returns: + Liste von Feldnamen die unterschiedlich sind + """ + # Mappe Advoware zu EspoCRM Format für Vergleich + mapped_advo = BeteiligteMapper.map_advoware_to_cbeteiligte(advo_entity) + + changed = [] + + # Vergleiche wichtige Felder + compare_fields = [ + 'name', 'firstName', 'lastName', 'firmenname', + 'emailAddress', 'phoneNumber', + 'dateOfBirth', 'rechtsform', + 'handelsregisterNummer' + ] + + for field in compare_fields: + espo_val = espo_entity.get(field) + advo_val = mapped_advo.get(field) + + # Normalisiere None und leere Strings + espo_val = espo_val if espo_val else None + advo_val = advo_val if advo_val else None + + if espo_val != advo_val: + changed.append(field) + logger.debug(f"Field '{field}' changed: EspoCRM='{espo_val}' vs Advoware='{advo_val}'") + + return changed diff --git a/bitbylaw/steps/vmh/beteiligte_sync_cron_step.py b/bitbylaw/steps/vmh/beteiligte_sync_cron_step.py new file mode 100644 index 00000000..d78f6a45 --- /dev/null +++ b/bitbylaw/steps/vmh/beteiligte_sync_cron_step.py @@ -0,0 +1,117 @@ +""" +Beteiligte Sync Cron Job + +Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die: +- Neu sind (pending_sync) +- Geändert wurden (dirty) +- Fehlgeschlagen sind (failed → Retry) +- Lange nicht gesynct wurden (clean aber > 24h alt) +""" + +import asyncio +from services.espocrm import EspoCRMAPI +import datetime + +config = { + 'type': 'cron', + 'name': 'VMH Beteiligte Sync Cron', + 'description': 'Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen', + 'schedule': '*/15 * * * *', # Alle 15 Minuten + 'flows': ['vmh'], + 'emits': ['vmh.beteiligte.sync_check'] +} + +async def handler(context): + """ + Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events + """ + context.logger.info("🕐 Beteiligte Sync Cron gestartet") + + try: + espocrm = EspoCRMAPI() + + # Berechne Threshold für "veraltete" Syncs (24 Stunden) + threshold = datetime.datetime.now() - datetime.timedelta(hours=24) + threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S') + + context.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})") + + # QUERY 1: Entities mit Status pending_sync, dirty oder failed + unclean_filter = { + 'where': [ + { + 'type': 'or', + 'value': [ + {'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'}, + {'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'}, + {'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'}, + ] + } + ] + } + + unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100) + unclean_entities = unclean_result.get('list', []) + + context.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed") + + # QUERY 2: Clean Entities die > 24h nicht gesynct wurden + stale_filter = { + 'where': [ + { + 'type': 'and', + 'value': [ + {'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'}, + {'type': 'isNotNull', 'attribute': 'betnr'}, + { + 'type': 'or', + 'value': [ + {'type': 'isNull', 'attribute': 'advowareLastSync'}, + {'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str} + ] + } + ] + } + ] + } + + stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50) + stale_entities = stale_result.get('list', []) + + context.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)") + + # KOMBINIERE ALLE + all_entities = unclean_entities + stale_entities + entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere + + context.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync") + + if not entity_ids: + context.logger.info("✅ Keine Entities benötigen Sync") + return + + # EMITTIERE EVENT FÜR JEDEN BETEILIGTEN + emitted_count = 0 + + for entity_id in entity_ids: + try: + await context.emit({ + 'topic': 'vmh.beteiligte.sync_check', + 'data': { + 'entity_id': entity_id, + 'action': 'sync_check', + 'source': 'cron', + 'timestamp': datetime.datetime.now().isoformat() + } + }) + emitted_count += 1 + + except Exception as e: + context.logger.error(f"❌ Fehler beim Emittieren für {entity_id}: {e}") + + context.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert") + + except Exception as e: + context.logger.error(f"❌ Fehler im Sync Cron: {e}") + import traceback + context.logger.error(traceback.format_exc()) diff --git a/bitbylaw/steps/vmh/beteiligte_sync_event_step.py b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py index c932034c..cfcb1917 100644 --- a/bitbylaw/steps/vmh/beteiligte_sync_event_step.py +++ b/bitbylaw/steps/vmh/beteiligte_sync_event_step.py @@ -1,52 +1,261 @@ from services.advoware import AdvowareAPI +from services.espocrm import EspoCRMAPI +from services.espocrm_mapper import BeteiligteMapper +from services.beteiligte_sync_utils import BeteiligteSync import json import redis from config import Config config = { 'type': 'event', - 'name': 'VMH Beteiligte Sync', - 'description': 'Synchronisiert Beteiligte Entities von Advoware nach Änderungen (Create/Update/Delete)', - 'subscribes': ['vmh.beteiligte.create', 'vmh.beteiligte.update', 'vmh.beteiligte.delete'], + 'name': 'VMH Beteiligte Sync Handler', + 'description': 'Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)', + 'subscribes': [ + 'vmh.beteiligte.create', + 'vmh.beteiligte.update', + 'vmh.beteiligte.delete', + 'vmh.beteiligte.sync_check' # Von Cron + ], 'flows': ['vmh'], 'emits': [] } async def handler(event_data, context): + """ + Zentraler Sync-Handler für Beteiligte + + Verarbeitet: + - vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware + - vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware + - vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware + - vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig + """ + entity_id = event_data.get('entity_id') + action = event_data.get('action', 'sync_check') + source = event_data.get('source', 'unknown') + + if not entity_id: + context.logger.error("Keine entity_id im Event gefunden") + return + + context.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}") + + # Redis für Queue-Management + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_ADVOWARE_CACHE), + decode_responses=True + ) + + # APIs initialisieren + espocrm = EspoCRMAPI() + advoware = AdvowareAPI(context) + sync_utils = BeteiligteSync(espocrm, context) + mapper = BeteiligteMapper() + try: - entity_id = event_data.get('entity_id') - action = event_data.get('action', 'unknown') + # 1. ACQUIRE LOCK (verhindert parallele Syncs) + lock_acquired = await sync_utils.acquire_sync_lock(entity_id) - if not entity_id: - context.logger.error("Keine entity_id im Event gefunden") + if not lock_acquired: + context.logger.warning(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe") return - - context.logger.info(f"Starte {action.upper()} Sync für Beteiligte Entity: {entity_id}") - - # Advoware API initialisieren (für später) - # advoware = AdvowareAPI(context) - - # PLATZHALTER: Für jetzt nur loggen, keine API-Anfrage - context.logger.info(f"PLATZHALTER: {action.upper()} Sync für Entity {entity_id} würde hier Advoware API aufrufen") - context.logger.info(f"PLATZHALTER: Entity-Daten würden hier verarbeitet werden") - - # TODO: Hier die Entity in das Zielsystem syncen (EspoCRM?) - # Für Create: Neu anlegen - # Für Update: Aktualisieren - # Für Delete: Löschen - - # Entferne die ID aus der entsprechenden Pending-Queue - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_ADVOWARE_CACHE), - decode_responses=True - ) + # 2. FETCH ENTITY VON ESPOCRM + try: + espo_entity = await espocrm.get_entity('CBeteiligte', entity_id) + except Exception as e: + context.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}") + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) + return + + context.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})") + + betnr = espo_entity.get('betnr') + sync_status = espo_entity.get('syncStatus', 'pending_sync') + + # 3. BESTIMME SYNC-AKTION + + # FALL A: Neu (kein betnr) → CREATE in Advoware + if not betnr and action in ['create', 'sync_check']: + context.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware") + await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context) + + # FALL B: Existiert (hat betnr) → UPDATE oder CHECK + elif betnr: + context.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK") + await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, context) + + # FALL C: DELETE (TODO: Implementierung später) + elif action == 'delete': + context.logger.warning(f"🗑️ DELETE noch nicht implementiert für {entity_id}") + await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert') + + else: + context.logger.warning(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}") + await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}') + + # Redis Queue Cleanup pending_key = f'vmh:beteiligte:{action}_pending' redis_client.srem(pending_key, entity_id) - context.logger.info(f"Entity {entity_id} aus {action.upper()}-Pending-Queue entfernt") - + except Exception as e: - context.logger.error(f"Fehler beim {event_data.get('action', 'unknown').upper()} Sync von Beteiligte Entity: {e}") - context.logger.error(f"Event Data: {event_data}") \ No newline at end of file + context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}") + import traceback + context.logger.error(traceback.format_exc()) + + try: + await sync_utils.release_sync_lock( + entity_id, + 'failed', + f'Unerwarteter Fehler: {str(e)[:1900]}', + increment_retry=True + ) + except: + pass + + +async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context): + """Erstellt neuen Beteiligten in Advoware""" + try: + context.logger.info(f"🔨 CREATE in Advoware...") + + # Transform zu Advoware Format + advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity) + + context.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...") + + # POST zu Advoware + result = await advoware.api_call( + 'api/v1/advonet/Beteiligte', + method='POST', + data=advo_data + ) + + # Extrahiere betNr aus Response + new_betnr = result.get('betNr') if isinstance(result, dict) else None + + if not new_betnr: + raise Exception(f"Keine betNr in Advoware Response: {result}") + + context.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}") + + # Update EspoCRM mit neuer betNr + await sync_utils.release_sync_lock(entity_id, 'clean', error_message=None) + await espocrm.update_entity('CBeteiligte', entity_id, { + 'betnr': new_betnr + }) + + context.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}") + + except Exception as e: + context.logger.error(f"❌ CREATE fehlgeschlagen: {e}") + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) + + +async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, context): + """Synchronisiert existierenden Beteiligten""" + try: + context.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...") + + # Fetch von Advoware + try: + advo_result = await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='GET' + ) + + # Advoware gibt manchmal Listen zurück + if isinstance(advo_result, list): + advo_entity = advo_result[0] if advo_result else None + else: + advo_entity = advo_result + + if not advo_entity: + raise Exception(f"Beteiligter betNr={betnr} nicht gefunden") + + except Exception as e: + # 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht + if '404' in str(e) or 'nicht gefunden' in str(e).lower(): + context.logger.warning(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}") + await sync_utils.handle_advoware_deleted(entity_id, str(e)) + return + else: + raise + + context.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}") + + # TIMESTAMP-VERGLEICH + comparison = sync_utils.compare_timestamps( + espo_entity.get('modifiedAt'), + advo_entity.get('geaendertAm'), + espo_entity.get('advowareLastSync') + ) + + context.logger.info(f"⏱️ Timestamp-Vergleich: {comparison}") + + # KEIN SYNC NÖTIG + if comparison == 'no_change': + context.logger.info(f"✅ Keine Änderungen, Sync übersprungen") + await sync_utils.release_sync_lock(entity_id, 'clean') + return + + # ESPOCRM NEUER → Update Advoware + if comparison == 'espocrm_newer': + context.logger.info(f"📤 EspoCRM ist neuer → Update Advoware") + + advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity) + + await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='PUT', + data=advo_data + ) + + await sync_utils.release_sync_lock(entity_id, 'clean') + context.logger.info(f"✅ Advoware aktualisiert") + + # ADVOWARE NEUER → Update EspoCRM + elif comparison == 'advoware_newer': + context.logger.info(f"📥 Advoware ist neuer → Update EspoCRM") + + espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity) + + await espocrm.update_entity('CBeteiligte', entity_id, espo_data) + await sync_utils.release_sync_lock(entity_id, 'clean') + context.logger.info(f"✅ EspoCRM aktualisiert") + + # KONFLIKT → EspoCRM WINS + elif comparison == 'conflict': + context.logger.warning(f"⚠️ KONFLIKT erkannt → EspoCRM WINS") + + # Überschreibe Advoware mit EspoCRM + advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity) + + await advoware.api_call( + f'api/v1/advonet/Beteiligte/{betnr}', + method='PUT', + data=advo_data + ) + + conflict_msg = ( + f"EspoCRM: {espo_entity.get('modifiedAt')}, " + f"Advoware: {advo_entity.get('geaendertAm')}. " + f"EspoCRM hat gewonnen." + ) + + await sync_utils.resolve_conflict_espocrm_wins( + entity_id, + espo_entity, + advo_entity, + conflict_msg + ) + + context.logger.info(f"✅ Konflikt gelöst: EspoCRM → Advoware") + + except Exception as e: + context.logger.error(f"❌ UPDATE fehlgeschlagen: {e}") + import traceback + context.logger.error(traceback.format_exc()) + await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) \ No newline at end of file