Implement EspoCRM-based sync strategy for Beteiligte entities

- Add SYNC_STRATEGY_ESPOCRM_BASED.md detailing the sync flows and status management.
- Create utilities for sync operations in services/beteiligte_sync_utils.py, including locking, timestamp comparison, conflict resolution, and notification handling.
- Implement entity mapping between EspoCRM and Advoware in services/espocrm_mapper.py.
- Develop a cron job for periodic sync checks in steps/vmh/beteiligte_sync_cron_step.py, emitting events for entities needing synchronization.
This commit is contained in:
2026-02-07 15:21:16 +00:00
parent e6ab22d5f4
commit b5abe6cf00
7 changed files with 2430 additions and 33 deletions

View File

@@ -0,0 +1,117 @@
"""
Beteiligte Sync Cron Job
Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die:
- Neu sind (pending_sync)
- Geändert wurden (dirty)
- Fehlgeschlagen sind (failed → Retry)
- Lange nicht gesynct wurden (clean aber > 24h alt)
"""
import asyncio
from services.espocrm import EspoCRMAPI
import datetime
config = {
'type': 'cron',
'name': 'VMH Beteiligte Sync Cron',
'description': 'Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen',
'schedule': '*/15 * * * *', # Alle 15 Minuten
'flows': ['vmh'],
'emits': ['vmh.beteiligte.sync_check']
}
async def handler(context):
"""
Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events
"""
context.logger.info("🕐 Beteiligte Sync Cron gestartet")
try:
espocrm = EspoCRMAPI()
# Berechne Threshold für "veraltete" Syncs (24 Stunden)
threshold = datetime.datetime.now() - datetime.timedelta(hours=24)
threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S')
context.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})")
# QUERY 1: Entities mit Status pending_sync, dirty oder failed
unclean_filter = {
'where': [
{
'type': 'or',
'value': [
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'},
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'},
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'},
]
}
]
}
unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100)
unclean_entities = unclean_result.get('list', [])
context.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
# QUERY 2: Clean Entities die > 24h nicht gesynct wurden
stale_filter = {
'where': [
{
'type': 'and',
'value': [
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'},
{'type': 'isNotNull', 'attribute': 'betnr'},
{
'type': 'or',
'value': [
{'type': 'isNull', 'attribute': 'advowareLastSync'},
{'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str}
]
}
]
}
]
}
stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50)
stale_entities = stale_result.get('list', [])
context.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")
# KOMBINIERE ALLE
all_entities = unclean_entities + stale_entities
entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere
context.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync")
if not entity_ids:
context.logger.info("✅ Keine Entities benötigen Sync")
return
# EMITTIERE EVENT FÜR JEDEN BETEILIGTEN
emitted_count = 0
for entity_id in entity_ids:
try:
await context.emit({
'topic': 'vmh.beteiligte.sync_check',
'data': {
'entity_id': entity_id,
'action': 'sync_check',
'source': 'cron',
'timestamp': datetime.datetime.now().isoformat()
}
})
emitted_count += 1
except Exception as e:
context.logger.error(f"❌ Fehler beim Emittieren für {entity_id}: {e}")
context.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert")
except Exception as e:
context.logger.error(f"❌ Fehler im Sync Cron: {e}")
import traceback
context.logger.error(traceback.format_exc())

View File

@@ -1,52 +1,261 @@
from services.advoware import AdvowareAPI
from services.espocrm import EspoCRMAPI
from services.espocrm_mapper import BeteiligteMapper
from services.beteiligte_sync_utils import BeteiligteSync
import json
import redis
from config import Config
config = {
'type': 'event',
'name': 'VMH Beteiligte Sync',
'description': 'Synchronisiert Beteiligte Entities von Advoware nach Änderungen (Create/Update/Delete)',
'subscribes': ['vmh.beteiligte.create', 'vmh.beteiligte.update', 'vmh.beteiligte.delete'],
'name': 'VMH Beteiligte Sync Handler',
'description': 'Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)',
'subscribes': [
'vmh.beteiligte.create',
'vmh.beteiligte.update',
'vmh.beteiligte.delete',
'vmh.beteiligte.sync_check' # Von Cron
],
'flows': ['vmh'],
'emits': []
}
async def handler(event_data, context):
"""
Zentraler Sync-Handler für Beteiligte
Verarbeitet:
- vmh.beteiligte.create: Neu in EspoCRM → Create in Advoware
- vmh.beteiligte.update: Geändert in EspoCRM → Update in Advoware
- vmh.beteiligte.delete: Gelöscht in EspoCRM → Delete in Advoware
- vmh.beteiligte.sync_check: Cron-Check → Sync wenn nötig
"""
entity_id = event_data.get('entity_id')
action = event_data.get('action', 'sync_check')
source = event_data.get('source', 'unknown')
if not entity_id:
context.logger.error("Keine entity_id im Event gefunden")
return
context.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
# Redis für Queue-Management
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
decode_responses=True
)
# APIs initialisieren
espocrm = EspoCRMAPI()
advoware = AdvowareAPI(context)
sync_utils = BeteiligteSync(espocrm, context)
mapper = BeteiligteMapper()
try:
entity_id = event_data.get('entity_id')
action = event_data.get('action', 'unknown')
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
if not entity_id:
context.logger.error("Keine entity_id im Event gefunden")
if not lock_acquired:
context.logger.warning(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
return
context.logger.info(f"Starte {action.upper()} Sync für Beteiligte Entity: {entity_id}")
# Advoware API initialisieren (für später)
# advoware = AdvowareAPI(context)
# PLATZHALTER: Für jetzt nur loggen, keine API-Anfrage
context.logger.info(f"PLATZHALTER: {action.upper()} Sync für Entity {entity_id} würde hier Advoware API aufrufen")
context.logger.info(f"PLATZHALTER: Entity-Daten würden hier verarbeitet werden")
# TODO: Hier die Entity in das Zielsystem syncen (EspoCRM?)
# Für Create: Neu anlegen
# Für Update: Aktualisieren
# Für Delete: Löschen
# Entferne die ID aus der entsprechenden Pending-Queue
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
decode_responses=True
)
# 2. FETCH ENTITY VON ESPOCRM
try:
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
except Exception as e:
context.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
return
context.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
betnr = espo_entity.get('betnr')
sync_status = espo_entity.get('syncStatus', 'pending_sync')
# 3. BESTIMME SYNC-AKTION
# FALL A: Neu (kein betnr) → CREATE in Advoware
if not betnr and action in ['create', 'sync_check']:
context.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
elif betnr:
context.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, context)
# FALL C: DELETE (TODO: Implementierung später)
elif action == 'delete':
context.logger.warning(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
else:
context.logger.warning(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
# Redis Queue Cleanup
pending_key = f'vmh:beteiligte:{action}_pending'
redis_client.srem(pending_key, entity_id)
context.logger.info(f"Entity {entity_id} aus {action.upper()}-Pending-Queue entfernt")
except Exception as e:
context.logger.error(f"Fehler beim {event_data.get('action', 'unknown').upper()} Sync von Beteiligte Entity: {e}")
context.logger.error(f"Event Data: {event_data}")
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
import traceback
context.logger.error(traceback.format_exc())
try:
await sync_utils.release_sync_lock(
entity_id,
'failed',
f'Unerwarteter Fehler: {str(e)[:1900]}',
increment_retry=True
)
except:
pass
async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context):
"""Erstellt neuen Beteiligten in Advoware"""
try:
context.logger.info(f"🔨 CREATE in Advoware...")
# Transform zu Advoware Format
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
context.logger.info(f"📤 Sende an Advoware: {json.dumps(advo_data, ensure_ascii=False)[:200]}...")
# POST zu Advoware
result = await advoware.api_call(
'api/v1/advonet/Beteiligte',
method='POST',
data=advo_data
)
# Extrahiere betNr aus Response
new_betnr = result.get('betNr') if isinstance(result, dict) else None
if not new_betnr:
raise Exception(f"Keine betNr in Advoware Response: {result}")
context.logger.info(f"✅ In Advoware erstellt: betNr={new_betnr}")
# Update EspoCRM mit neuer betNr
await sync_utils.release_sync_lock(entity_id, 'clean', error_message=None)
await espocrm.update_entity('CBeteiligte', entity_id, {
'betnr': new_betnr
})
context.logger.info(f"✅ CREATE erfolgreich: {entity_id} → betNr {new_betnr}")
except Exception as e:
context.logger.error(f"❌ CREATE fehlgeschlagen: {e}")
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, context):
"""Synchronisiert existierenden Beteiligten"""
try:
context.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...")
# Fetch von Advoware
try:
advo_result = await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='GET'
)
# Advoware gibt manchmal Listen zurück
if isinstance(advo_result, list):
advo_entity = advo_result[0] if advo_result else None
else:
advo_entity = advo_result
if not advo_entity:
raise Exception(f"Beteiligter betNr={betnr} nicht gefunden")
except Exception as e:
# 404 oder anderer Fehler → Beteiligter wurde in Advoware gelöscht
if '404' in str(e) or 'nicht gefunden' in str(e).lower():
context.logger.warning(f"🗑️ Beteiligter in Advoware gelöscht: betNr={betnr}")
await sync_utils.handle_advoware_deleted(entity_id, str(e))
return
else:
raise
context.logger.info(f"📥 Von Advoware geladen: {advo_entity.get('name')}")
# TIMESTAMP-VERGLEICH
comparison = sync_utils.compare_timestamps(
espo_entity.get('modifiedAt'),
advo_entity.get('geaendertAm'),
espo_entity.get('advowareLastSync')
)
context.logger.info(f"⏱️ Timestamp-Vergleich: {comparison}")
# KEIN SYNC NÖTIG
if comparison == 'no_change':
context.logger.info(f"✅ Keine Änderungen, Sync übersprungen")
await sync_utils.release_sync_lock(entity_id, 'clean')
return
# ESPOCRM NEUER → Update Advoware
if comparison == 'espocrm_newer':
context.logger.info(f"📤 EspoCRM ist neuer → Update Advoware")
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
data=advo_data
)
await sync_utils.release_sync_lock(entity_id, 'clean')
context.logger.info(f"✅ Advoware aktualisiert")
# ADVOWARE NEUER → Update EspoCRM
elif comparison == 'advoware_newer':
context.logger.info(f"📥 Advoware ist neuer → Update EspoCRM")
espo_data = mapper.map_advoware_to_cbeteiligte(advo_entity)
await espocrm.update_entity('CBeteiligte', entity_id, espo_data)
await sync_utils.release_sync_lock(entity_id, 'clean')
context.logger.info(f"✅ EspoCRM aktualisiert")
# KONFLIKT → EspoCRM WINS
elif comparison == 'conflict':
context.logger.warning(f"⚠️ KONFLIKT erkannt → EspoCRM WINS")
# Überschreibe Advoware mit EspoCRM
advo_data = mapper.map_cbeteiligte_to_advoware(espo_entity)
await advoware.api_call(
f'api/v1/advonet/Beteiligte/{betnr}',
method='PUT',
data=advo_data
)
conflict_msg = (
f"EspoCRM: {espo_entity.get('modifiedAt')}, "
f"Advoware: {advo_entity.get('geaendertAm')}. "
f"EspoCRM hat gewonnen."
)
await sync_utils.resolve_conflict_espocrm_wins(
entity_id,
espo_entity,
advo_entity,
conflict_msg
)
context.logger.info(f"✅ Konflikt gelöst: EspoCRM → Advoware")
except Exception as e:
context.logger.error(f"❌ UPDATE fehlgeschlagen: {e}")
import traceback
context.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)