- Added logic to reset permanently_failed entities that have reached their auto-reset threshold in `beteiligte_sync_cron_step.py`. - Enhanced event handling in `beteiligte_sync_event_step.py` to skip retries if the next retry time has not been reached. - Introduced validation checks after sync operations to ensure data consistency and integrity. - Created detailed documentation outlining the fixes and their impacts on the sync process. - Added scripts for analyzing sync issues and comparing entities to facilitate debugging and validation.
161 lines
6.4 KiB
Python
161 lines
6.4 KiB
Python
"""
|
|
Beteiligte Sync Cron Job
|
|
|
|
Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die:
|
|
- Neu sind (pending_sync)
|
|
- Geändert wurden (dirty)
|
|
- Fehlgeschlagen sind (failed → Retry)
|
|
- Lange nicht gesynct wurden (clean aber > 24h alt)
|
|
"""
|
|
|
|
import asyncio
|
|
from services.espocrm import EspoCRMAPI
|
|
import datetime
|
|
|
|
config = {
|
|
'type': 'cron',
|
|
'name': 'VMH Beteiligte Sync Cron',
|
|
'description': 'Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen',
|
|
'schedule': '*/15 * * * *', # Alle 15 Minuten
|
|
'flows': ['vmh'],
|
|
'emits': ['vmh.beteiligte.sync_check']
|
|
}
|
|
|
|
async def handler(context):
|
|
"""
|
|
Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events
|
|
"""
|
|
context.logger.info("🕐 Beteiligte Sync Cron gestartet")
|
|
|
|
try:
|
|
espocrm = EspoCRMAPI()
|
|
|
|
# Berechne Threshold für "veraltete" Syncs (24 Stunden)
|
|
threshold = datetime.datetime.now() - datetime.timedelta(hours=24)
|
|
threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
context.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})")
|
|
|
|
# QUERY 1: Entities mit Status pending_sync, dirty oder failed
|
|
unclean_filter = {
|
|
'where': [
|
|
{
|
|
'type': 'or',
|
|
'value': [
|
|
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'},
|
|
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'},
|
|
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'},
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100)
|
|
unclean_entities = unclean_result.get('list', [])
|
|
|
|
context.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
|
|
|
|
# FIX #12: QUERY 1b: permanently_failed Entities die Auto-Reset erreicht haben
|
|
permanently_failed_filter = {
|
|
'where': [
|
|
{
|
|
'type': 'and',
|
|
'value': [
|
|
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'permanently_failed'},
|
|
{'type': 'isNotNull', 'attribute': 'syncAutoResetAt'},
|
|
{'type': 'before', 'attribute': 'syncAutoResetAt', 'value': threshold_str}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
reset_result = await espocrm.search_entities('CBeteiligte', permanently_failed_filter, max_size=50)
|
|
reset_entities = reset_result.get('list', [])
|
|
|
|
# Reset permanently_failed entities
|
|
for entity in reset_entities:
|
|
entity_id = entity['id']
|
|
context.logger.info(f"🔄 Auto-Reset für permanently_failed Entity {entity_id}")
|
|
|
|
# Reset Status und Retry-Count
|
|
await espocrm.update_entity('CBeteiligte', entity_id, {
|
|
'syncStatus': 'failed', # Zurück zu 'failed' für normalen Retry
|
|
'syncRetryCount': 0,
|
|
'syncAutoResetAt': None,
|
|
'syncErrorMessage': f"Auto-Reset nach 24h - vorheriger Fehler: {entity.get('syncErrorMessage', 'N/A')}"
|
|
})
|
|
|
|
context.logger.info(f"📊 Auto-Reset: {len(reset_entities)} permanently_failed Entities")
|
|
|
|
# QUERY 2: Clean Entities die > 24h nicht gesynct wurden
|
|
stale_filter = {
|
|
'where': [
|
|
{
|
|
'type': 'and',
|
|
'value': [
|
|
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'},
|
|
{'type': 'isNotNull', 'attribute': 'betnr'},
|
|
{
|
|
'type': 'or',
|
|
'value': [
|
|
{'type': 'isNull', 'attribute': 'advowareLastSync'},
|
|
{'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50)
|
|
stale_entities = stale_result.get('list', [])
|
|
|
|
context.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")
|
|
|
|
# KOMBINIERE ALLE (inkl. reset_entities)
|
|
all_entities = unclean_entities + stale_entities + reset_entities
|
|
entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere
|
|
|
|
context.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync")
|
|
|
|
if not entity_ids:
|
|
context.logger.info("✅ Keine Entities benötigen Sync")
|
|
return
|
|
|
|
# OPTIMIERT: Batch emit mit asyncio.gather für Parallelität
|
|
context.logger.info(f"🚀 Emittiere {len(entity_ids)} Events parallel...")
|
|
|
|
emit_tasks = [
|
|
context.emit({
|
|
'topic': 'vmh.beteiligte.sync_check',
|
|
'data': {
|
|
'entity_id': entity_id,
|
|
'action': 'sync_check',
|
|
'source': 'cron',
|
|
'timestamp': datetime.datetime.now().isoformat()
|
|
}
|
|
})
|
|
for entity_id in entity_ids
|
|
]
|
|
|
|
# Parallel emit mit error handling
|
|
results = await asyncio.gather(*emit_tasks, return_exceptions=True)
|
|
|
|
# Count successes and failures
|
|
emitted_count = sum(1 for r in results if not isinstance(r, Exception))
|
|
failed_count = sum(1 for r in results if isinstance(r, Exception))
|
|
|
|
if failed_count > 0:
|
|
context.logger.warn(f"⚠️ {failed_count} Events konnten nicht emittiert werden")
|
|
# Log first few errors
|
|
for i, result in enumerate(results[:5]): # Log max 5 errors
|
|
if isinstance(result, Exception):
|
|
context.logger.error(f" Entity {entity_ids[i]}: {result}")
|
|
|
|
context.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert")
|
|
|
|
except Exception as e:
|
|
context.logger.error(f"❌ Fehler im Sync Cron: {e}")
|
|
import traceback
|
|
context.logger.error(traceback.format_exc())
|