Files
motia/bitbylaw/steps/vmh/beteiligte_sync_event_step.py
bitbylaw e057f9fa00 Enhance KommunikationSyncManager and Sync Event Step
- Improved bidirectional synchronization logic in KommunikationSyncManager:
  - Added initial sync handling to prevent duplicates.
  - Optimized hash calculation to only write changes when necessary.
  - Enhanced conflict resolution with clearer logging and handling of various scenarios.
  - Refactored diff computation for better clarity and maintainability.

- Updated beteiligte_sync_event_step to ensure proper lock management:
  - Added error handling for entity fetching and retry logic.
  - Improved logging for better traceability of sync actions.
  - Ensured lock release in case of unexpected errors.
2026-02-08 22:21:08 +00:00

435 lines
18 KiB
Python

from typing import Dict, Any, Optional
from services.advoware import AdvowareAPI
from services.advoware_service import AdvowareService
from services.espocrm import EspoCRMAPI
from services.espocrm_mapper import BeteiligteMapper
from services.beteiligte_sync_utils import BeteiligteSync
from services.kommunikation_sync_utils import (
KommunikationSyncManager,
detect_kommunikation_changes
)
import json
import redis
from config import Config
config = {
'type': 'event',
'name': 'VMH Beteiligte Sync Handler',
'description': 'Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)',
'subscribes': [
'vmh.beteiligte.create',
'vmh.beteiligte.update',
'vmh.beteiligte.delete',
'vmh.beteiligte.sync_check' # Von Cron
],
'flows': ['vmh'],
'emits': []
}
async def handler(event_data, context):
"""
Zentraler Sync-Handler für Beteiligte
Verarbeitet:
- vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware
- vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware
- vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware
- vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig
"""
entity_id = event_data.get('entity_id')
action = event_data.get('action', 'sync_check')
source = event_data.get('source', 'unknown')
if not entity_id:
context.logger.error("Keine entity_id im Event gefunden")
return
context.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
# Shared Redis client for distributed locking
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
decode_responses=True
)
# APIs initialisieren
espocrm = EspoCRMAPI()
advoware = AdvowareAPI(context)
sync_utils = BeteiligteSync(espocrm, redis_client, context)
mapper = BeteiligteMapper()
# Kommunikation Sync Manager
advo_service = AdvowareService(context)
komm_sync = KommunikationSyncManager(advo_service, espocrm, context)
try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
if not lock_acquired:
context.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
return
# Lock erfolgreich acquired - MUSS im finally block released werden!
try:
# 2. FETCH ENTITY VON ESPOCRM
try:
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
except Exception as e:
context.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
return
context.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
betnr = espo_entity.get('betnr')
sync_status = espo_entity.get('syncStatus', 'pending_sync')
# FIX #12: Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht
sync_next_retry = espo_entity.get('syncNextRetry')
if sync_next_retry and sync_status == 'failed':
import datetime
import pytz
try:
next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S')
next_retry_ts = pytz.UTC.localize(next_retry_ts)
now_utc = datetime.datetime.now(pytz.UTC)
if now_utc < next_retry_ts:
remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60)
context.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten")
await sync_utils.release_sync_lock(entity_id, sync_status)
return
except Exception as e:
context.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}")
# 3. BESTIMME SYNC-AKTION
# FALL A: Neu (kein betnr) → CREATE in Advoware
if not betnr and action in ['create', 'sync_check']:
context.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
elif betnr:
context.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, komm_sync, context)
# FALL C: DELETE (TODO: Implementierung später)
elif action == 'delete':
context.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
else:
context.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
context.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
'failed',
f'Unerwarteter Fehler: {str(e)[:1900]}',
increment_retry=True
)
except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler
context.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}")
# Force Redis lock release
try:
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
redis_client.delete(lock_key)
context.logger.info(f"✅ Redis lock manuell released: {lock_key}")
except:
pass
except Exception as e:
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
context.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
import traceback
context.logger.error(traceback.format_exc())
async def run_kommunikation_sync(entity_id: str, betnr: int, komm_sync, context, direction: str = 'both') -> Dict[str, Any]:
"""
Helper: Führt Kommunikation-Sync aus mit Error-Handling
Args:
direction: 'both' (bidirektional), 'to_advoware' (nur EspoCRM→Advoware), 'to_espocrm' (nur Advoware→EspoCRM)
Returns:
Sync-Ergebnis oder None bei Fehler
"""
context.logger.info(f"📞 Starte Kommunikation-Sync (direction={direction})...")
try:
komm_result = await komm_sync.sync_bidirectional(entity_id, betnr, direction=direction)
context.logger.info(f"✅ Kommunikation synced: {komm_result}")
return komm_result
except Exception as e:
context.logger.error(f"⚠️ Kommunikation-Sync fehlgeschlagen: {e}")
import traceback
context.logger.error(traceback.format_exc())
return None
async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context):
"""Erstellt neuen Beteiligten in Advoware"""
try:
context.logger.info(f"🔨 CREATE in Advoware...")
# Transform zu Advoware Format
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
context.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...")
# POST zu Advoware
result = await advoware.api_call(
'api/v1/advonet/Beteiligte',
method='POST',
data=advo_data
)
# Extrahiere betNr aus Response (case-insensitive: betNr oder betnr)
new_betnr = None
if isinstance(result, dict):
new_betnr = result.get('betNr') or result.get('betnr')
if not new_betnr:
raise Exception(f"Keine betNr/betnr in Advoware Response: {result}")
context.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}")
# Lade Entity nach POST um rowId zu bekommen (WICHTIG für Change Detection!)
created_entity = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{new_betnr}',
method='GET'
)
new_rowid = created_entity.get('rowId') if isinstance(created_entity, dict) else created_entity[0].get('rowId')
if not new_rowid:
context.logger.warn(f"⚠️ Keine rowId nach CREATE - Change Detection nicht möglich!")
# OPTIMIERT: Kombiniere release_lock + betnr + rowId update in 1 API call
await sync_utils.release_sync_lock(
entity_id,
'clean',
error_message=None,
extra_fields={
'betnr': new_betnr,
'advowareRowId': new_rowid # WICHTIG für Change Detection!
}
)
context.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}, rowId {new_rowid[:20] if new_rowid else 'N/A'}...")
except Exception as e:
context.logger.error(f"❌ CREATE fehlgeschlagen: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, komm_sync, context):
"""Synchronisiert existierenden Beteiligten"""
try:
context.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...")
# Fetch von Advoware
try:
advo_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='GET'
)
# Advoware gibt manchmal Listen zurück
if isinstance(advo_result, list):
advo_entity = advo_result[0] if advo_result else None
else:
advo_entity = advo_result
if not advo_entity:
raise Exception(f"Beteiligter betNr={betnr} nicht gefunden")
except Exception as e:
# 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht
if '404' in str(e) or 'nicht gefunden' in str(e).lower():
context.logger.warn(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}")
await sync_utils.handle_advoware_deleted(entity_id, str(e))
return
else:
raise
context.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}")
# ÄNDERUNGSERKENNUNG (Primary: rowId, Fallback: Timestamps)
comparison = sync_utils.compare_entities(espo_entity, advo_entity)
context.logger.info(f"⏱️ Vergleich: {comparison}")
# KOMMUNIKATION-ÄNDERUNGSERKENNUNG (zusätzlich zu Stammdaten)
# Speichere alte Version für späteren Vergleich
old_advo_entity = advo_entity.copy()
komm_changes_detected = False
# KEIN STAMMDATEN-SYNC NÖTIG (aber Kommunikation könnte geändert sein)
if comparison == 'no_change':
context.logger.info(f"✅ Keine Stammdaten-Änderungen erkannt")
# KOMMUNIKATION SYNC: Prüfe trotzdem Kommunikationen
await run_kommunikation_sync(entity_id, betnr, komm_sync, context)
await sync_utils.release_sync_lock(entity_id, 'clean')
return
# ESPOCRM NEUER → Update Advoware
if comparison == 'espocrm_newer':
context.logger.info(f"📤 EspoCRM ist neuer → Update Advoware STAMMDATEN")
# OPTIMIERT: Use merge utility
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
put_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
data=merged_data
)
# Extrahiere neue rowId aus PUT Response (spart extra GET!)
new_rowid = None
if isinstance(put_result, list) and len(put_result) > 0:
new_rowid = put_result[0].get('rowId')
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
context.logger.info(f"✅ Advoware STAMMDATEN aktualisiert, rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
# FIX #13: Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_advoware'
)
if not validation_success:
context.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Validation failed: {validation_error}",
increment_retry=True
)
return
# KOMMUNIKATION SYNC: Immer ausführen nach Stammdaten-Update
await run_kommunikation_sync(entity_id, betnr, komm_sync, context)
# Release Lock NACH Kommunikation-Sync + Update rowId
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': new_rowid}
)
# ADVOWARE NEUER → Update EspoCRM
elif comparison == 'advoware_newer':
context.logger.info(f"📥 Advoware ist neuer → Update EspoCRM STAMMDATEN")
espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity)
await espocrm.update_entity('CBeteiligte', entity_id, espo_data)
context.logger.info(f"✅ EspoCRM STAMMDATEN aktualisiert")
# FIX #13: Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_espocrm'
)
if not validation_success:
context.logger.error(f"❌ Sync-Validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Validation failed: {validation_error}",
increment_retry=True
)
return
# KOMMUNIKATION SYNC: Immer ausführen nach Stammdaten-Update
await run_kommunikation_sync(entity_id, betnr, komm_sync, context)
# Release Lock NACH Kommunikation-Sync + Update rowId
await sync_utils.release_sync_lock(
entity_id,
'clean',
extra_fields={'advowareRowId': advo_entity.get('rowId')}
)
# KONFLIKT → EspoCRM WINS
elif comparison == 'conflict':
context.logger.warn(f"⚠️ KONFLIKT erkannt → EspoCRM WINS (STAMMDATEN)")
# OPTIMIERT: Use merge utility
merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
put_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
data=merged_data
)
# Extrahiere neue rowId aus PUT Response (spart extra GET!)
new_rowid = None
if isinstance(put_result, list) and len(put_result) > 0:
new_rowid = put_result[0].get('rowId')
elif isinstance(put_result, dict):
new_rowid = put_result.get('rowId')
conflict_msg = (
f"EspoCRM: {espo_entity.get('modifiedAt')}, "
f"Advoware: {advo_entity.get('geaendertAm')}. "
f"EspoCRM hat gewonnen."
)
context.logger.info(f"✅ Konflikt gelöst (EspoCRM won), neue rowId: {new_rowid[:20] if new_rowid else 'N/A'}...")
# FIX #13: Validiere Sync-Ergebnis
validation_success, validation_error = await sync_utils.validate_sync_result(
entity_id, betnr, mapper, direction='to_advoware'
)
if not validation_success:
context.logger.error(f"❌ Conflict resolution validation fehlgeschlagen: {validation_error}")
await sync_utils.release_sync_lock(
entity_id,
'failed',
error_message=f"Conflict resolution validation failed: {validation_error}",
increment_retry=True
)
return
await sync_utils.resolve_conflict_espocrm_wins(
entity_id,
espo_entity,
advo_entity,
conflict_msg,
extra_fields={'advowareRowId': new_rowid}
)
# KOMMUNIKATION SYNC: NUR EspoCRM→Advoware (EspoCRM wins!)
await run_kommunikation_sync(entity_id, betnr, komm_sync, context, direction='to_advoware')
# Release Lock NACH Kommunikation-Sync
await sync_utils.release_sync_lock(entity_id, 'clean')
except Exception as e:
context.logger.error(f"❌ UPDATE fehlgeschlagen: {e}")
import traceback
context.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
# Alias für Tests/externe Aufrufe
handle = handler