""" Beteiligte Sync Cron Job Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die: - Neu sind (pending_sync) - Geändert wurden (dirty) - Fehlgeschlagen sind (failed → Retry) - Lange nicht gesynct wurden (clean aber > 24h alt) """ import asyncio from typing import Dict, Any from motia import FlowContext, cron from services.espocrm import EspoCRMAPI import datetime config = { "name": "VMH Beteiligte Sync Cron", "description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen", "flows": ["vmh-beteiligte"], "triggers": [ cron("0 */15 * * * *") # Alle 15 Minuten (6-field format!) ], "enqueues": ["vmh.beteiligte.sync_check"] } async def handler(input_data: Dict[str, Any], ctx: FlowContext): """ Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events """ ctx.logger.info("🕐 Beteiligte Sync Cron gestartet") try: espocrm = EspoCRMAPI() # Berechne Threshold für "veraltete" Syncs (24 Stunden) threshold = datetime.datetime.now() - datetime.timedelta(hours=24) threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S') ctx.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})") # QUERY 1: Entities mit Status pending_sync, dirty oder failed unclean_filter = { 'where': [ { 'type': 'or', 'value': [ {'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'}, {'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'}, {'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'}, ] } ] } unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100) unclean_entities = unclean_result.get('list', []) ctx.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed") # QUERY 1b: permanently_failed Entities die Auto-Reset erreicht haben permanently_failed_filter = { 'where': [ { 'type': 'and', 'value': [ {'type': 'equals', 'attribute': 'syncStatus', 'value': 'permanently_failed'}, {'type': 'isNotNull', 'attribute': 'syncAutoResetAt'}, {'type': 'before', 'attribute': 'syncAutoResetAt', 'value': threshold_str} ] } ] } reset_result = await espocrm.search_entities('CBeteiligte', permanently_failed_filter, max_size=50) reset_entities = reset_result.get('list', []) # Reset permanently_failed entities for entity in reset_entities: entity_id = entity['id'] ctx.logger.info(f"🔄 Auto-Reset für permanently_failed Entity {entity_id}") # Reset Status und Retry-Count await espocrm.update_entity('CBeteiligte', entity_id, { 'syncStatus': 'failed', # Zurück zu 'failed' für normalen Retry 'syncRetryCount': 0, 'syncAutoResetAt': None, 'syncErrorMessage': f"Auto-Reset nach 24h - vorheriger Fehler: {entity.get('syncErrorMessage', 'N/A')}" }) ctx.logger.info(f"📊 Auto-Reset: {len(reset_entities)} permanently_failed Entities") # QUERY 2: Clean Entities die > 24h nicht gesynct wurden stale_filter = { 'where': [ { 'type': 'and', 'value': [ {'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'}, {'type': 'isNotNull', 'attribute': 'betnr'}, { 'type': 'or', 'value': [ {'type': 'isNull', 'attribute': 'advowareLastSync'}, {'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str} ] } ] } ] } stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50) stale_entities = stale_result.get('list', []) ctx.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)") # KOMBINIERE ALLE (inkl. reset_entities) all_entities = unclean_entities + stale_entities + reset_entities entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere ctx.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync") if not entity_ids: ctx.logger.info("✅ Keine Entities benötigen Sync") return # Emittiere Events parallel ctx.logger.info(f"🚀 Emittiere {len(entity_ids)} Events parallel...") emit_tasks = [ ctx.enqueue({ 'topic': 'vmh.beteiligte.sync_check', 'data': { 'entity_id': entity_id, 'action': 'sync_check', 'source': 'cron', 'timestamp': datetime.datetime.now().isoformat() } }) for entity_id in entity_ids ] # Parallel emit mit error handling results = await asyncio.gather(*emit_tasks, return_exceptions=True) # Count successes and failures emitted_count = sum(1 for r in results if not isinstance(r, Exception)) failed_count = sum(1 for r in results if isinstance(r, Exception)) if failed_count > 0: ctx.logger.warn(f"⚠️ {failed_count} Events konnten nicht emittiert werden") # Log first few errors for i, result in enumerate(results[:5]): # Log max 5 errors if isinstance(result, Exception): ctx.logger.error(f" Entity {entity_ids[i]}: {result}") ctx.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert") except Exception as e: ctx.logger.error(f"❌ Fehler im Sync Cron: {e}") import traceback ctx.logger.error(traceback.format_exc())