Enhance KommunikationSyncManager and Sync Event Step
- Improved bidirectional synchronization logic in KommunikationSyncManager: - Added initial sync handling to prevent duplicates. - Optimized hash calculation to only write changes when necessary. - Enhanced conflict resolution with clearer logging and handling of various scenarios. - Refactored diff computation for better clarity and maintainability. - Updated beteiligte_sync_event_step to ensure proper lock management: - Added error handling for entity fetching and retry logic. - Improved logging for better traceability of sync actions. - Ensured lock release in case of unexpected errors.
This commit is contained in:
@@ -72,73 +72,90 @@ async def handler(event_data, context):
|
||||
context.logger.warn(f"⏸️ Sync bereits aktiv für {entity_id}, überspringe")
|
||||
return
|
||||
|
||||
# 2. FETCH ENTITY VON ESPOCRM
|
||||
# Lock erfolgreich acquired - MUSS im finally block released werden!
|
||||
try:
|
||||
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
|
||||
# 2. FETCH ENTITY VON ESPOCRM
|
||||
try:
|
||||
espo_entity = await espocrm.get_entity('CBeteiligte', entity_id)
|
||||
except Exception as e:
|
||||
context.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
return
|
||||
|
||||
context.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
|
||||
|
||||
betnr = espo_entity.get('betnr')
|
||||
sync_status = espo_entity.get('syncStatus', 'pending_sync')
|
||||
|
||||
# FIX #12: Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht
|
||||
sync_next_retry = espo_entity.get('syncNextRetry')
|
||||
if sync_next_retry and sync_status == 'failed':
|
||||
import datetime
|
||||
import pytz
|
||||
|
||||
try:
|
||||
next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S')
|
||||
next_retry_ts = pytz.UTC.localize(next_retry_ts)
|
||||
now_utc = datetime.datetime.now(pytz.UTC)
|
||||
|
||||
if now_utc < next_retry_ts:
|
||||
remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60)
|
||||
context.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten")
|
||||
await sync_utils.release_sync_lock(entity_id, sync_status)
|
||||
return
|
||||
except Exception as e:
|
||||
context.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}")
|
||||
|
||||
# 3. BESTIMME SYNC-AKTION
|
||||
|
||||
# FALL A: Neu (kein betnr) → CREATE in Advoware
|
||||
if not betnr and action in ['create', 'sync_check']:
|
||||
context.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
|
||||
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
|
||||
|
||||
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
|
||||
elif betnr:
|
||||
context.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
|
||||
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, komm_sync, context)
|
||||
|
||||
# FALL C: DELETE (TODO: Implementierung später)
|
||||
elif action == 'delete':
|
||||
context.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
|
||||
|
||||
else:
|
||||
context.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
|
||||
|
||||
except Exception as e:
|
||||
context.logger.error(f"❌ Fehler beim Laden von EspoCRM Entity: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)
|
||||
return
|
||||
|
||||
context.logger.info(f"📋 Entity geladen: {espo_entity.get('name')} (betnr: {espo_entity.get('betnr')})")
|
||||
|
||||
betnr = espo_entity.get('betnr')
|
||||
sync_status = espo_entity.get('syncStatus', 'pending_sync')
|
||||
|
||||
# FIX #12: Check Retry-Backoff - überspringe wenn syncNextRetry noch nicht erreicht
|
||||
sync_next_retry = espo_entity.get('syncNextRetry')
|
||||
if sync_next_retry and sync_status == 'failed':
|
||||
import datetime
|
||||
import pytz
|
||||
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
|
||||
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
||||
import traceback
|
||||
context.logger.error(traceback.format_exc())
|
||||
|
||||
try:
|
||||
next_retry_ts = datetime.datetime.strptime(sync_next_retry, '%Y-%m-%d %H:%M:%S')
|
||||
next_retry_ts = pytz.UTC.localize(next_retry_ts)
|
||||
now_utc = datetime.datetime.now(pytz.UTC)
|
||||
|
||||
if now_utc < next_retry_ts:
|
||||
remaining_minutes = int((next_retry_ts - now_utc).total_seconds() / 60)
|
||||
context.logger.info(f"⏸️ Retry-Backoff aktiv: Nächster Versuch in {remaining_minutes} Minuten")
|
||||
await sync_utils.release_sync_lock(entity_id, sync_status)
|
||||
return
|
||||
except Exception as e:
|
||||
context.logger.warn(f"⚠️ Fehler beim Parsen von syncNextRetry: {e}")
|
||||
|
||||
# 3. BESTIMME SYNC-AKTION
|
||||
|
||||
# FALL A: Neu (kein betnr) → CREATE in Advoware
|
||||
if not betnr and action in ['create', 'sync_check']:
|
||||
context.logger.info(f"🆕 Neuer Beteiligter → CREATE in Advoware")
|
||||
await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
|
||||
|
||||
# FALL B: Existiert (hat betnr) → UPDATE oder CHECK
|
||||
elif betnr:
|
||||
context.logger.info(f"♻️ Existierender Beteiligter (betNr: {betnr}) → UPDATE/CHECK")
|
||||
await handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, komm_sync, context)
|
||||
|
||||
# FALL C: DELETE (TODO: Implementierung später)
|
||||
elif action == 'delete':
|
||||
context.logger.warn(f"🗑️ DELETE noch nicht implementiert für {entity_id}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', 'Delete-Operation nicht implementiert')
|
||||
|
||||
else:
|
||||
context.logger.warn(f"⚠️ Unbekannte Kombination: action={action}, betnr={betnr}")
|
||||
await sync_utils.release_sync_lock(entity_id, 'failed', f'Unbekannte Aktion: {action}')
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
f'Unerwarteter Fehler: {str(e)[:1900]}',
|
||||
increment_retry=True
|
||||
)
|
||||
except Exception as release_error:
|
||||
# Selbst Lock-Release failed - logge kritischen Fehler
|
||||
context.logger.critical(f"🚨 CRITICAL: Lock-Release failed für {entity_id}: {release_error}")
|
||||
# Force Redis lock release
|
||||
try:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
redis_client.delete(lock_key)
|
||||
context.logger.info(f"✅ Redis lock manuell released: {lock_key}")
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
context.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
||||
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
|
||||
context.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
|
||||
import traceback
|
||||
context.logger.error(traceback.format_exc())
|
||||
|
||||
try:
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
'failed',
|
||||
f'Unerwarteter Fehler: {str(e)[:1900]}',
|
||||
increment_retry=True
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
async def run_kommunikation_sync(entity_id: str, betnr: int, komm_sync, context, direction: str = 'both') -> Dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user