feat: Enhance sync process with rowId-based change detection and update handling for Advoware and EspoCRM
This commit is contained in:
@@ -9,13 +9,19 @@ Für neuen Sync von Entity `XYZ`:
|
||||
### 1. EspoCRM Custom Fields
|
||||
```sql
|
||||
-- In EspoCRM Admin → Entity Manager → XYZ
|
||||
advowareId (int, unique) -- Foreign Key
|
||||
syncStatus (enum: clean|dirty|...) -- Status
|
||||
advowareLastSync (datetime) -- Letzter Sync
|
||||
syncErrorMessage (text, 2000) -- Fehler
|
||||
syncRetryCount (int) -- Retries
|
||||
advowareId (int, unique) -- Foreign Key zu Advoware
|
||||
advowareRowId (varchar 50) -- Für Change Detection (WICHTIG!)
|
||||
syncStatus (enum: clean|dirty|...) -- Status tracking
|
||||
advowareLastSync (datetime) -- Timestamp letzter erfolgreicher Sync
|
||||
syncErrorMessage (text, 2000) -- Fehler-Details
|
||||
syncRetryCount (int) -- Anzahl Retry-Versuche
|
||||
```
|
||||
|
||||
**WICHTIG: Change Detection via rowId**
|
||||
- Advoware's `rowId` Feld ändert sich bei **jedem** Update
|
||||
- **EINZIGE** Methode für Advoware Change Detection (Advoware liefert keine Timestamps!)
|
||||
- Base64-kodierte Binary-ID (~40 Zeichen), sehr zuverlässig
|
||||
|
||||
### 2. Mapper erstellen
|
||||
```python
|
||||
# services/xyz_mapper.py
|
||||
@@ -35,6 +41,7 @@ class XYZMapper:
|
||||
return {
|
||||
'espoField1': advo_entity.get('field1'),
|
||||
'espoField2': advo_entity.get('field2'),
|
||||
'advowareRowId': advo_entity.get('rowId'), # WICHTIG für Change Detection!
|
||||
}
|
||||
```
|
||||
|
||||
@@ -74,10 +81,18 @@ class XYZSync:
|
||||
increment_retry: bool = False,
|
||||
extra_fields: Optional[Dict[str, Any]] = None
|
||||
):
|
||||
"""Release lock and update status (combined operation)"""
|
||||
"""
|
||||
Release lock and update status (combined operation)
|
||||
|
||||
WICHTIG: extra_fields verwenden um advowareRowId nach jedem Sync zu speichern!
|
||||
"""
|
||||
# EspoCRM DateTime Format: 'YYYY-MM-DD HH:MM:SS' (kein Timezone!)
|
||||
now_utc = datetime.now(pytz.UTC)
|
||||
espocrm_timestamp = now_utc.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
update_data = {
|
||||
'syncStatus': new_status,
|
||||
'advowareLastSync': datetime.now(pytz.UTC).isoformat()
|
||||
'advowareLastSync': espocrm_timestamp
|
||||
}
|
||||
|
||||
if error_message:
|
||||
@@ -106,8 +121,75 @@ class XYZSync:
|
||||
|
||||
if self.redis:
|
||||
self.redis.delete(f"sync_lock:xyz:{entity_id}")
|
||||
entities(self, espo_entity: Dict, advo_entity: Dict) -> str:
|
||||
"""
|
||||
Vergleicht EspoCRM und Advoware Entity mit rowId-basierter Change Detection.
|
||||
|
||||
PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update)
|
||||
FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
|
||||
|
||||
Logik:
|
||||
- rowId geändert + EspoCRM geändert (modifiedAt > lastSync) → conflict
|
||||
- Nur rowId geändert → advoware_newer
|
||||
- Nur EspoCRM geändert → espocrm_newer
|
||||
- Keine Änderung → no_change
|
||||
|
||||
Returns:
|
||||
"espocrm_newer": EspoCRM wurde geändert
|
||||
"advoware_newer": Advoware wurde geändert
|
||||
"conflict": Beide wurden geändert
|
||||
"no_change": Keine Änderungen
|
||||
"""
|
||||
espo_rowid = espo_entity.get('advowareRowId')
|
||||
advo_rowid = advo_entity.get('rowId')
|
||||
last_sync = espo_entity.get('advowareLastSync')
|
||||
espo_modified = espo_entity.get('modifiedAt')
|
||||
|
||||
# PRIMÄR: rowId-basierte Änderungserkennung (sehr zuverlässig!)
|
||||
if espo_rowid and advo_rowid and last_sync:
|
||||
# Prüfe ob Advoware geändert wurde (rowId)
|
||||
advo_changed = (espo_rowid != advo_rowid)
|
||||
|
||||
# Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync)
|
||||
espo_changed = False
|
||||
if espo_modified:
|
||||
try:
|
||||
espo_ts = self._parse_ts(espo_modified)
|
||||
sync_ts = self._parse_ts(last_sync)
|
||||
if espo_ts and sync_ts:
|
||||
espo_changed = (espo_ts > sync_ts)
|
||||
except Exception as e:
|
||||
self._log(f"Timestamp-Parse-Fehler: {e}", level='debug')
|
||||
|
||||
# Konfliktlogik
|
||||
if advo_changed and espo_changed:
|
||||
self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync")
|
||||
return 'conflict'
|
||||
elif advo_changed:
|
||||
self._log(f"Advoware rowId geändert: {espo_rowid[:20]}... → {advo_rowid[:20]}...")
|
||||
return 'advoware_newer'
|
||||
elif espo_changed:
|
||||
self._log(f"EspoCRM neuer (modifiedAt > lastSync)")
|
||||
return 'espocrm_newer'
|
||||
else:
|
||||
# Weder Advoware noch EspoCRM geändert
|
||||
return 'no_change'
|
||||
|
||||
# FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
|
||||
self._log("⚠️ rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='warn')
|
||||
return self.compare_timestamps(
|
||||
espo_entity.get('modifiedAt'),
|
||||
advo_entity.get('geaendertAm'), # Advoware Timestamp-Feld
|
||||
espo_entity.get('advowareLastSync')
|
||||
)
|
||||
|
||||
def compare_timestamps(self, espo_ts, advo_ts, last_sync_ts):
|
||||
"""
|
||||
FALLBACK: Timestamp-basierte Änderungserkennung
|
||||
|
||||
ACHTUNG: Weniger zuverlässig als rowId (Timestamps können NULL sein)
|
||||
Nur verwenden wenn rowId nicht verfügbar!
|
||||
nc_ts):
|
||||
"""Compare timestamps and determine sync direction"""
|
||||
# Parse timestamps
|
||||
espo = self._parse_ts(espo_ts)
|
||||
@@ -227,14 +309,24 @@ async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, m
|
||||
|
||||
result = await advoware.api_call(
|
||||
'api/v1/advonet/XYZ',
|
||||
method='POST',
|
||||
data=advo_data
|
||||
WICHTIG: Lade Entity nach POST um rowId zu bekommen
|
||||
created_entity = await advoware.api_call(
|
||||
f'api/v1/advonet/XYZ/{new_id}',
|
||||
method='GET'
|
||||
)
|
||||
new_rowid = created_entity.get('rowId') if isinstance(created_entity, dict) else created_entity[0].get('rowId')
|
||||
|
||||
# Combined API call: release lock + save foreign key + rowId
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'clean',
|
||||
extra_fields={
|
||||
'advowareId': new_id,
|
||||
'advowareRowId': new_rowid # WICHTIG für Change Detection!
|
||||
}
|
||||
)
|
||||
|
||||
new_id = result.get('id')
|
||||
if not new_id:
|
||||
raise Exception(f"No ID in response: {result}")
|
||||
|
||||
context.logger.info(f"✅ Created in Advoware: {new_id} (rowId: {new_rowid[:20]}...)
|
||||
# Combined API call: release lock + save foreign key
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
@@ -243,12 +335,8 @@ async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, m
|
||||
)
|
||||
|
||||
context.logger.info(f"✅ Created in Advoware: {new_id}")
|
||||
|
||||
except Exception as e:
|
||||
context.logger.error(f"❌ Create failed: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
|
||||
|
||||
entities (rowId-basiert, NICHT nur Timestamps!)
|
||||
comparison = sync_utils.compare_entities(espo_entity, advo_entity
|
||||
async def handle_update(entity_id, advoware_id, espo_entity, espocrm, advoware, sync_utils, mapper, context):
|
||||
"""Sync existing entity"""
|
||||
try:
|
||||
@@ -262,31 +350,46 @@ async def handle_update(entity_id, advoware_id, espo_entity, espocrm, advoware,
|
||||
return
|
||||
|
||||
# Compare timestamps
|
||||
comparison = sync_utils.compare_timestamps(
|
||||
espo_entity.get('modifiedAt'),
|
||||
advo_entity.get('modifiedAt'), # Advoware timestamp field
|
||||
espo_entity.get('advowareLastSync')
|
||||
)
|
||||
|
||||
# Initial sync (no last_sync)
|
||||
comparison = sync_utils.compa - Merge EspoCRM → Advoware
|
||||
if not espo_entity.get('advowareLastSync'):
|
||||
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
|
||||
await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean')
|
||||
return
|
||||
|
||||
# No change
|
||||
if comparison == 'no_change':
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean')
|
||||
return
|
||||
# Lade Entity nach PUT um neue rowId zu bekommen
|
||||
updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
|
||||
new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
|
||||
|
||||
# EspoCRM newer
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid}
|
||||
|
||||
# Initial sync (no last_sync)
|
||||
if not espo_ent → Update Advoware
|
||||
if comparison == 'espocrm_newer':
|
||||
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
|
||||
await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
|
||||
|
||||
# WICHTIG: Lade Entity nach PUT um neue rowId zu bekommen
|
||||
updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
|
||||
new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
|
||||
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid})
|
||||
|
||||
# Advoware newer → Update EspoCRM
|
||||
elif comparison == 'advoware_newer':
|
||||
espo_data = mapper.map_advoware_to_espo(advo_entity) # Enthält bereits rowId!
|
||||
await espocrm.update_entity('XYZ', entity_id, espo_data)
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean')
|
||||
|
||||
# Advoware newer
|
||||
# Conflict → EspoCRM wins
|
||||
elif comparison == 'conflict':
|
||||
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
|
||||
await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
|
||||
|
||||
# WICHTIG: Auch bei Konflikt rowId aktualisieren
|
||||
updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
|
||||
new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
|
||||
|
||||
await sync_utils.send_notification(entity_id, "Conflict resolved: EspoCRM won")
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid}
|
||||
elif comparison == 'advoware_newer':
|
||||
espo_data = mapper.map_advoware_to_espo(advo_entity)
|
||||
await espocrm.update_entity('XYZ', entity_id, espo_data)
|
||||
@@ -386,6 +489,94 @@ async def handler(context):
|
||||
- Don't emit events sequentially in cron
|
||||
- Don't map every field (performance)
|
||||
- Don't swallow exceptions silently
|
||||
- Don't rely on Advoware timestamps (nicht vorhanden!)
|
||||
|
||||
## Architecture Principles
|
||||
|
||||
1. **Atomicity**: Redis lock + TTL
|
||||
2. **Efficiency**: Combined operations
|
||||
3. **Reusability**: Utility functions
|
||||
4. **Robustness**: Max retries + notifications
|
||||
5. **Scalability**: Batch processing
|
||||
6. **Maintainability**: Clear separation of concerns
|
||||
7. **Reliability**: rowId-basierte Change Detection (EINZIGE Methode)
|
||||
|
||||
## Change Detection Details
|
||||
|
||||
### rowId-basierte Erkennung (EINZIGE METHODE)
|
||||
|
||||
**Warum nur rowId?**
|
||||
- Advoware liefert **KEINE** Timestamps (geaendertAm, modifiedAt etc.)
|
||||
- Advoware's `rowId` Feld ändert sich bei **jedem** Update der Entity
|
||||
- Base64-kodierte Binary-ID (~40 Zeichen)
|
||||
- Sehr zuverlässig, keine Timezone-Probleme, keine NULL-Werte
|
||||
|
||||
**Implementierung:**
|
||||
```python
|
||||
# 1. EspoCRM Feld: advowareRowId (varchar 50)
|
||||
# 2. Im Mapper IMMER rowId mitmappen:
|
||||
'advowareRowId': advo_entity.get('rowId')
|
||||
|
||||
# 3. Nach JEDEM Sync rowId in EspoCRM speichern:
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'clean',
|
||||
extra_fields={'advowareRowId': new_rowid}
|
||||
)
|
||||
|
||||
# 4. Bei Änderungserkennung:
|
||||
if espo_rowid != advo_rowid:
|
||||
# Advoware wurde geändert!
|
||||
if espo_modified > last_sync:
|
||||
# Konflikt: Beide Seiten geändert
|
||||
return 'conflict'
|
||||
else:
|
||||
# Nur Advoware geändert
|
||||
return 'advoware_newer'
|
||||
```
|
||||
|
||||
**Wichtige Sync-Punkte für rowId:**
|
||||
- Nach POST (Create) - GET aufrufen um rowId zu laden
|
||||
- Nach PUT (EspoCRM → Advoware) - GET aufrufen um neue rowId zu laden
|
||||
- Nach PUT (Konfliktlösung) - GET aufrufen um neue rowId zu laden
|
||||
- Bei Advoware → EspoCRM (via Mapper) - rowId ist bereits in Advoware Response
|
||||
|
||||
**WICHTIG:** rowId ist PFLICHT für Change Detection! Ohne rowId können Änderungen nicht erkannt werden.
|
||||
|
||||
### Person vs. Firma Mapping
|
||||
|
||||
**Unterschiedliche Felder je nach Typ:**
|
||||
|
||||
```python
|
||||
# EspoCRM Struktur:
|
||||
# - Natürliche Person: firstName, lastName (firmenname=None)
|
||||
# - Firma: firmenname (firstName=None, lastName=None)
|
||||
|
||||
def map_advoware_to_espo(advo_entity):
|
||||
vorname = advo_entity.get('vorname')
|
||||
is_person = bool(vorname and vorname.strip())
|
||||
|
||||
if is_person:
|
||||
# Natürliche Person
|
||||
return {
|
||||
'firstName': vorname,
|
||||
'lastName': advo_entity.get('name'),
|
||||
'name': f"{vorname} {advo_entity.get('name')}".strip(),
|
||||
'firmenname': None
|
||||
}
|
||||
else:
|
||||
# Firma
|
||||
return {
|
||||
'firmenname': advo_entity.get('name'),
|
||||
'name': advo_entity.get('name'),
|
||||
'firstName': None,
|
||||
'lastName': None # EspoCRM blendet aus bei Firmen
|
||||
}
|
||||
```
|
||||
|
||||
**Wichtig:** EspoCRM blendet `firstName/lastName` im Frontend aus wenn `firmenname` gefüllt ist. Daher sauber trennen!
|
||||
- Don't map every field (performance)
|
||||
- Don't swallow exceptions silently
|
||||
|
||||
## Architecture Principles
|
||||
|
||||
|
||||
Reference in New Issue
Block a user