""" Beteiligte Sync Cron Job Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die: - Neu sind (pending_sync) - Geändert wurden (dirty) - Fehlgeschlagen sind (failed → Retry) - Lange nicht gesynct wurden (clean aber > 24h alt) """ import asyncio from services.espocrm import EspoCRMAPI import datetime config = { 'type': 'cron', 'name': 'VMH Beteiligte Sync Cron', 'description': 'Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen', 'schedule': '*/15 * * * *', # Alle 15 Minuten 'flows': ['vmh'], 'emits': ['vmh.beteiligte.sync_check'] } async def handler(context): """ Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events """ context.logger.info("🕐 Beteiligte Sync Cron gestartet") try: espocrm = EspoCRMAPI() # Berechne Threshold für "veraltete" Syncs (24 Stunden) threshold = datetime.datetime.now() - datetime.timedelta(hours=24) threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S') context.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})") # QUERY 1: Entities mit Status pending_sync, dirty oder failed unclean_filter = { 'where': [ { 'type': 'or', 'value': [ {'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'}, {'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'}, {'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'}, ] } ] } unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100) unclean_entities = unclean_result.get('list', []) context.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed") # QUERY 2: Clean Entities die > 24h nicht gesynct wurden stale_filter = { 'where': [ { 'type': 'and', 'value': [ {'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'}, {'type': 'isNotNull', 'attribute': 'betnr'}, { 'type': 'or', 'value': [ {'type': 'isNull', 'attribute': 'advowareLastSync'}, {'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str} ] } ] } ] } stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50) stale_entities = stale_result.get('list', []) context.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)") # KOMBINIERE ALLE all_entities = unclean_entities + stale_entities entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere context.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync") if not entity_ids: context.logger.info("✅ Keine Entities benötigen Sync") return # OPTIMIERT: Batch emit mit asyncio.gather für Parallelität context.logger.info(f"🚀 Emittiere {len(entity_ids)} Events parallel...") emit_tasks = [ context.emit({ 'topic': 'vmh.beteiligte.sync_check', 'data': { 'entity_id': entity_id, 'action': 'sync_check', 'source': 'cron', 'timestamp': datetime.datetime.now().isoformat() } }) for entity_id in entity_ids ] # Parallel emit mit error handling results = await asyncio.gather(*emit_tasks, return_exceptions=True) # Count successes and failures emitted_count = sum(1 for r in results if not isinstance(r, Exception)) failed_count = sum(1 for r in results if isinstance(r, Exception)) if failed_count > 0: context.logger.warn(f"⚠️ {failed_count} Events konnten nicht emittiert werden") # Log first few errors for i, result in enumerate(results[:5]): # Log max 5 errors if isinstance(result, Exception): context.logger.error(f" Entity {entity_ids[i]}: {result}") context.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert") except Exception as e: context.logger.error(f"❌ Fehler im Sync Cron: {e}") import traceback context.logger.error(traceback.format_exc())