Migrate VMH Integration - Phase 3: Core sync handlers & utilities
- Added 5 core service modules:
* services/notification_utils.py: NotificationManager for manual actions (412 lines)
* services/advoware_service.py: Extended Advoware operations wrapper
* services/espocrm_mapper.py: BeteiligteMapper for data transformation (198 lines)
* services/bankverbindungen_mapper.py: BankverbindungenMapper (174 lines)
* services/beteiligte_sync_utils.py: BeteiligteSync with complex logic (663 lines)
- Distributed locking via Redis + syncStatus
- rowId-based change detection (primary) + timestamp fallback
- Retry with exponential backoff (1min, 5min, 15min, 1h, 4h)
- Conflict resolution (EspoCRM wins)
- Soft-delete handling & round-trip validation
- Added 2 event handler steps:
* steps/vmh/beteiligte_sync_event_step.py: Handles vmh.beteiligte.* queue events
- CREATE: New Beteiligte → POST to Advoware
- UPDATE: Bi-directional sync with conflict resolution
- DELETE: Not yet implemented
- SYNC_CHECK: Periodic sync check via cron
* steps/vmh/bankverbindungen_sync_event_step.py: Handles vmh.bankverbindungen.* events
- CREATE: New Bankverbindung → POST to Advoware
- UPDATE/DELETE: Send notification (API limitation - no PUT/DELETE support)
- Added pytz dependency to pyproject.toml (required for timezone handling)
System Status:
- 27 steps registered (14 VMH-specific)
- 8 queue subscriptions active (4 Beteiligte + 4 Bankverbindungen)
- All webhook endpoints operational and emitting queue events
- Sync handlers ready for processing
Note: Kommunikation sync (kommunikation_sync_utils.py) not yet migrated.
Complex bi-directional sync for Telefon/Email/Fax will be added in Phase 4.
Updated MIGRATION_STATUS.md
This commit is contained in:
413
steps/vmh/beteiligte_sync_event_step.py
Normal file
413
steps/vmh/beteiligte_sync_event_step.py
Normal file
@@ -0,0 +1,413 @@
|
||||
"""
|
||||
VMH Beteiligte Sync Handler
|
||||
|
||||
Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)
|
||||
|
||||
Verarbeitet:
|
||||
- vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware
|
||||
- vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware
|
||||
- vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware (TODO)
|
||||
- vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
from motia import FlowContext
|
||||
from services.advoware import AdvowareAPI
|
||||
from services.advoware_service import AdvowareService
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.espocrm_mapper import BeteiligteMapper
|
||||
from services.beteiligte_sync_utils import BeteiligteSync
|
||||
import json
|
||||
import redis
|
||||
import os
|
||||
|
||||
config = {
|
||||
"name": "VMH Beteiligte Sync Handler",
|
||||
"description": "Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)",
|
||||
"flows": ["vmh"],
|
||||
"triggers": [
|
||||
{"type": "queue", "topic": "vmh.beteiligte.create"},
|
||||
{"type": "queue", "topic": "vmh.beteiligte.update"},
|
||||
{"type": "queue", "topic": "vmh.beteiligte.delete"},
|
||||
{"type": "queue", "topic": "vmh.beteiligte.sync_check"}
|
||||
],
|
||||
"enqueues": []
|
||||
}
|
||||
|
||||
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
"""Zentraler Sync-Handler für Beteiligte"""
|
||||
entity_id = event_data.entity_id
|
||||
action = event_data.action
|
||||
source = event_data.source
|
||||
|
||||
if not entity_id:
|
||||
ctx.logger.error("Keine entity_id im Event gefunden")
|
||||
return
|
||||
|
||||
ctx.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
|
||||
|
||||
# Shared Redis client for distributed locking
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
|
||||
# APIs initialisieren
|
||||
espocrm = EspoCRMAPI()
|
||||
advoware = AdvowareAPI(ctx)
|
||||
sync_utils = BeteiligteSync(espocrm, redis_client, ctx)
|
||||
mapper = BeteiligteMapper()
|
||||
|
||||
# NOTE: Kommunikation Sync Manager wird in zukünftiger Version hinzugefügt
|
||||
# wenn kommunikation_sync_utils.py migriert ist
|
||||
# advo_service = AdvowareService(ctx)
|
||||
# komm_sync = KommunikationSyncManager(advo_service, espocrm, ctx)
|
||||
|
||||
try:
|
||||
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
|
||||
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
|
||||
|
||||
if not lock_acquired:
|
||||
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
|
||||
return
|
||||
|
||||
# Lock erfolgreich acquired - MUSS im finally block released werden!
|
||||
try:
|
||||
# 2. FETCH ENTITY VON ESPOCRM
|
||||
try:
|
||||
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
return
|
||||
|
||||
ctx.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
|
||||
|
||||
betnr = espo_entity.get('betnr')
|
||||
sync_status = espo_entity.get('syncStatus', 'pending_sync')
|
||||
|
||||
# Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht
|
||||
sync_next_retry = espo_entity.get('syncNextRetry')
|
||||
if sync_next_retry and sync_status == 'failed':
|
||||
import datetime
|
||||
import pytz
|
||||
|
||||
try:
|
||||
next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S')
|
||||
next_retry_ts = pytz.UTC.localize(next_retry_ts)
|
||||
now_utc = datetime.datetime.now(pytz.UTC)
|
||||
|
||||
if now_utc < next_retry_ts:
|
||||
remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60)
|
||||
ctx.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten")
|
||||
await sync_utils.release_sync_lock(entity_id, sync_status)
|
||||
return
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}")
|
||||
|
||||
# 3. BESTIMME SYNC-AKTION
|
||||
|
||||
# FALL A: Neu (kein betnr) → CREATE in Advoware
|
||||
if not betnr and action in ['create', 'sync_check']:
|
||||
ctx.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
|
||||
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx)
|
||||
|
||||
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
|
||||
elif betnr:
|
||||
ctx.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
|
||||
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx)
|
||||
|
||||
# FALL C: DELETE (TODO: Implementierung später)
|
||||
elif action == 'delete':
|
||||
ctx.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
|
||||
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
|
||||
|
||||
except Exception as e:
|
||||
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
|
||||
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
try:
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
f'Unerwarteter Fehler: {str(e)[:1900]}',
|
||||
increment_retry=True
|
||||
)
|
||||
except Exception as release_error:
|
||||
# Selbst Lock-Release failed - logge kritischen Fehler
|
||||
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}")
|
||||
# Force Redis lock release
|
||||
try:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
redis_client.delete(lock_key)
|
||||
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
|
||||
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx):
|
||||
"""Erstellt neuen Beteiligten in Advoware"""
|
||||
try:
|
||||
ctx.logger.info(f"🔨 CREATE in Advoware...")
|
||||
|
||||
# Transform zu Advoware Format
|
||||
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
|
||||
|
||||
ctx.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...")
|
||||
|
||||
# POST zu Advoware
|
||||
result = await advoware.api_call(
|
||||
'api/v1/advonet/Beteiligte',
|
||||
method='POST',
|
||||
json_data=advo_data
|
||||
)
|
||||
|
||||
# Extrahiere betNr aus Response (case-insensitive: betNr oder betnr)
|
||||
new_betnr = None
|
||||
if isinstance(result, dict):
|
||||
new_betnr = result.get('betNr') or result.get('betnr')
|
||||
|
||||
if not new_betnr:
|
||||
raise Exception(f"Keine betNr/betnr in Advoware Response: {result}")
|
||||
|
||||
ctx.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}")
|
||||
|
||||
# Lade Entity nach POST um rowId zu bekommen (WICHTIG für Change Detection!)
|
||||
created_entity = await advoware.api_call(
|
||||
f'api/v1/advonet/Beteiligte/{new_betnr}',
|
||||
method='GET'
|
||||
)
|
||||
|
||||
if isinstance(created_entity, list):
|
||||
new_rowid = created_entity[0].get('rowId') if created_entity else None
|
||||
else:
|
||||
new_rowid = created_entity.get('rowId')
|
||||
|
||||
if not new_rowid:
|
||||
ctx.logger.warn(f"⚠️ Keine rowId nach CREATE - Change Detection nicht möglich!")
|
||||
|
||||
# OPTIMIERT: Kombiniere release_lock + betnr + rowId update in 1 API call
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'clean',
|
||||
error_message=None,
|
||||
extra_fields={
|
||||
'betnr': new_betnr,
|
||||
'advowareRowId': new_rowid # WICHTIG für Change Detection!
|
||||
}
|
||||
)
|
||||
|
||||
ctx.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}, rowId {new_rowid[:20] if new_rowid else 'N/A'}...")
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ CREATE fehlgeschlagen: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
|
||||
|
||||
async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx):
|
||||
"""Synchronisiert existierenden Beteiligten"""
|
||||
try:
|
||||
ctx.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...")
|
||||
|
||||
# Fetch von Advoware
|
||||
try:
|
||||
advo_result = await advoware.api_call(
|
||||
f'api/v1/advonet/Beteiligte/{betnr}',
|
||||
method='GET'
|
||||
)
|
||||
|
||||
# Advoware gibt manchmal Listen zurück
|
||||
if isinstance(advo_result, list):
|
||||
advo_entity = advo_result[0] if advo_result else None
|
||||
else:
|
||||
advo_entity = advo_result
|
||||
|
||||
if not advo_entity:
|
||||
raise Exception(f"Beteiligter betNr={betnr} nicht gefunden")
|
||||
|
||||
except Exception as e:
|
||||
# 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht
|
||||
if '404' in str(e) or 'nicht gefunden' in str(e).lower():
|
||||
ctx.logger.warn(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}")
|
||||
await sync_utils.handle_advoware_deleted(entity_id, str(e))
|
||||
return
|
||||
else:
|
||||
raise
|
||||
|
||||
ctx.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}")
|
||||
|
||||
# ÄNDERUNGSERKENNUNG (Primary: rowId, Fallback: Timestamps)
|
||||
comparison = sync_utils.compare_entities(espo_entity, advo_entity)
|
||||
|
||||
ctx.logger.info(f"⏱️ Vergleich: {comparison}")
|
||||
|
||||
# KEIN STAMMDATEN-SYNC NÖTIG
|
||||
if comparison == 'no_change':
|
||||
ctx.logger.info(f"✅ Keine Stammdaten-Änderungen erkannt")
|
||||
|
||||
# NOTE: Kommunikation-Sync würde hier stattfinden
|
||||
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
|
||||
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean')
|
||||
return
|
||||
|
||||
# ESPOCRM NEUER → Update Advoware
|
||||
if comparison == 'espocrm_newer':
|
||||
ctx.logger.info(f"📤 EspoCRM ist neuer → Update Advoware STAMMDATEN")
|
||||
|
||||
# OPTIMIERT: Use merge utility
|
||||
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
|
||||
|
||||
put_result = await advoware.api_call(
|
||||
f'api/v1/advonet/Beteiligte/{betnr}',
|
||||
method='PUT',
|
||||
json_data=merged_data
|
||||
)
|
||||
|
||||
# Extrahiere neue rowId aus PUT Response (spart extra GET!)
|
||||
new_rowid = None
|
||||
if isinstance(put_result, list) and len(put_result) > 0:
|
||||
new_rowid = put_result[0].get('rowId')
|
||||
elif isinstance(put_result, dict):
|
||||
new_rowid = put_result.get('rowId')
|
||||
|
||||
ctx.logger.info(f"✅ Advoware STAMMDATEN aktualisiert, rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
|
||||
|
||||
# Validiere Sync-Ergebnis
|
||||
validation_success, validation_error = await sync_utils.validate_sync_result(
|
||||
entity_id, betnr, mapper, direction='to_advoware'
|
||||
)
|
||||
|
||||
if not validation_success:
|
||||
ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
error_message=f"Validation failed: {validation_error}",
|
||||
increment_retry=True
|
||||
)
|
||||
return
|
||||
|
||||
# NOTE: Kommunikation-Sync würde hier stattfinden
|
||||
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
|
||||
|
||||
# Release Lock + Update rowId
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'clean',
|
||||
extra_fields={'advowareRowId': new_rowid}
|
||||
)
|
||||
|
||||
# ADVOWARE NEUER → Update EspoCRM
|
||||
elif comparison == 'advoware_newer':
|
||||
ctx.logger.info(f"📥 Advoware ist neuer → Update EspoCRM STAMMDATEN")
|
||||
|
||||
espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity)
|
||||
await espocrm.update_entity('CBeteiligte', entity_id, espo_data)
|
||||
ctx.logger.info(f"✅ EspoCRM STAMMDATEN aktualisiert")
|
||||
|
||||
# Validiere Sync-Ergebnis
|
||||
validation_success, validation_error = await sync_utils.validate_sync_result(
|
||||
entity_id, betnr, mapper, direction='to_espocrm'
|
||||
)
|
||||
|
||||
if not validation_success:
|
||||
ctx.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
error_message=f"Validation failed: {validation_error}",
|
||||
increment_retry=True
|
||||
)
|
||||
return
|
||||
|
||||
# NOTE: Kommunikation-Sync würde hier stattfinden
|
||||
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx)
|
||||
|
||||
# Release Lock + Update rowId
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'clean',
|
||||
extra_fields={'advowareRowId': advo_entity.get('rowId')}
|
||||
)
|
||||
|
||||
# KONFLIKT → EspoCRM WINS
|
||||
elif comparison == 'conflict':
|
||||
ctx.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS (STAMMDATEN)")
|
||||
|
||||
# OPTIMIERT: Use merge utility
|
||||
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
|
||||
|
||||
put_result = await advoware.api_call(
|
||||
f'api/v1/advonet/Beteiligte/{betnr}',
|
||||
method='PUT',
|
||||
json_data=merged_data
|
||||
)
|
||||
|
||||
# Extrahiere neue rowId aus PUT Response
|
||||
new_rowid = None
|
||||
if isinstance(put_result, list) and len(put_result) > 0:
|
||||
new_rowid = put_result[0].get('rowId')
|
||||
elif isinstance(put_result, dict):
|
||||
new_rowid = put_result.get('rowId')
|
||||
|
||||
conflict_msg = (
|
||||
f"EspoCRM: {espo_entity.get('modifiedAt')}, "
|
||||
f"Advoware: {advo_entity.get('geaendertAm')}. "
|
||||
f"EspoCRM hat gewonnen."
|
||||
)
|
||||
|
||||
ctx.logger.info(f"✅ Konflikt gelöst (EspoCRM won), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
|
||||
|
||||
# Validiere Sync-Ergebnis
|
||||
validation_success, validation_error = await sync_utils.validate_sync_result(
|
||||
entity_id, betnr, mapper, direction='to_advoware'
|
||||
)
|
||||
|
||||
if not validation_success:
|
||||
ctx.logger.error(f"❌ Conflict resolution validation fehlgeschlagen: {validation_error}")
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
error_message=f"Conflict resolution validation failed: {validation_error}",
|
||||
increment_retry=True
|
||||
)
|
||||
return
|
||||
|
||||
await sync_utils.resolve_conflict_espocrm_wins(
|
||||
entity_id,
|
||||
espo_entity,
|
||||
advo_entity,
|
||||
conflict_msg,
|
||||
extra_fields={'advowareRowId': new_rowid}
|
||||
)
|
||||
|
||||
# NOTE: Kommunikation-Sync (nur EspoCRM→Advoware) würde hier stattfinden
|
||||
# await run_kommunikation_sync(entity_id, betnr, komm_sync, ctx, direction='to_advoware', force_espo_wins=True)
|
||||
|
||||
await sync_utils.release_sync_lock(entity_id, 'clean')
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ UPDATE fehlgeschlagen: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
Reference in New Issue
Block a user