From 8e9dc87b2a67efcb59c890cb1c4cc47c6363666e Mon Sep 17 00:00:00 2001 From: root Date: Fri, 24 Oct 2025 00:19:34 +0000 Subject: [PATCH] Refactor calendar sync for parallel processing: cron emits per-employee events, event-step processes one employee with per-employee locks --- .../calendar_sync_cron_step.py | 69 +++--- .../calendar_sync_event_step.py | 210 +++++++++--------- 2 files changed, 131 insertions(+), 148 deletions(-) diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py index dcdb1188..bdb26f4e 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -3,77 +3,70 @@ import redis from config import Config from services.advoware import AdvowareAPI -async def get_advoware_employees(advoware, logger): - """Fetch list of employees from Advoware.""" - try: - result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) - employees = result if isinstance(result, list) else [] - logger.info(f"Fetched {len(employees)} Advoware employees") - return employees - except Exception as e: - logger.error(f"Failed to fetch Advoware employees: {e}") - raise +CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' + +config = { 'type': 'cron', 'name': 'Calendar Sync Cron Job', 'description': 'Führt den Calendar Sync alle 15 Minuten automatisch aus', 'cron': '*/15 * * * *', # Alle 15 Minuten - 'emits': ['calendar.sync.triggered'] + 'emits': ['calendar.sync.employee'] } +async def get_advoware_employees(advoware): + """Fetch list of employees from Advoware.""" + try: + result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) + employees = result if isinstance(result, list) else [] + context.logger.info(f"Fetched {len(employees)} Advoware employees") + return employees + except Exception as e: + context.logger.error(f"Failed to fetch Advoware employees: {e}") + raise + async def handler(context): try: - context.logger.info("Calendar Sync Cron: Starting automatic synchronization every 15 minutes") + context.logger.info("Calendar Sync Cron: Starting to fetch employees and emit events") # Initialize Advoware API advoware = AdvowareAPI(context) - # Fetch all employees - employees = await get_advoware_employees(advoware, context.logger) + # Fetch employees + employees = await get_advoware_employees(advoware) if not employees: - context.logger.error("Calendar Sync Cron: No employees found. Sync cancelled.") - return { - 'status': 'error', - 'reason': 'no_employees', - 'triggered_by': 'cron' - } + context.logger.error("Keine Mitarbeiter gefunden. Cron abgebrochen.") + return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} - total_emitted = 0 + # Emit event for each employee (DEBUG: only for SB) for employee in employees: kuerzel = employee.get('kuerzel') if not kuerzel: - context.logger.warning(f"Calendar Sync Cron: Employee without kuerzel skipped: {employee}") + context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}") continue - # DEBUG: Limit to SB for debugging + # DEBUG: Nur für Nutzer SB syncen if kuerzel != 'SB': - context.logger.info(f"Calendar Sync Cron: DEBUG: Skipping {kuerzel}, only SB synced") + context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur SB wird gesynct") continue - context.logger.info(f"Calendar Sync Cron: Emitting sync event for {kuerzel}") - - # Emit Event for the Sync with kuerzel + # Emit event for this employee await context.emit({ - "topic": "calendar.sync.triggered", + "topic": "calendar.sync.employee", "data": { - "body": { - "kuerzel": kuerzel, - "full_content": True, # Cron uses always full details - "triggered_by": "cron" - } + "kuerzel": kuerzel, + "triggered_by": "cron" } }) + context.logger.info(f"Calendar Sync Cron: Emitted event for employee {kuerzel}") - total_emitted += 1 - - context.logger.info(f"Calendar Sync Cron: Emitted {total_emitted} sync events") + context.logger.info("Calendar Sync Cron: Completed emitting events for employees") return { 'status': 'completed', - 'total_emitted': total_emitted, 'triggered_by': 'cron' } except Exception as e: - context.logger.error(f"Calendar Sync Cron: Error in cron job: {e}") + context.logger.error(f"Fehler beim Cron-Job: {e}") return { 'status': 'error', 'error': str(e) diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 78926737..92254f75 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -846,136 +846,126 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) +CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' + +async def handler(context): """Main event handler for calendar sync.""" - logger.info("Starting calendar sync for all employees") + kuerzel = context.input.data.get('kuerzel') + if not kuerzel: + log_operation('error', "No kuerzel provided in event", context=context) + return {'status': 400, 'body': {'error': 'No kuerzel provided'}} + + employee_lock_key = f'calendar_sync_lock_{kuerzel}' + + log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context) + + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS + ) try: - logger.debug("Initializing Advoware API") + if redis_client.get(employee_lock_key): + log_operation('info', f"Sync already running for {kuerzel}, skipping", context=context) + return {'status': 200, 'body': {'status': 'skipped', 'reason': 'sync_already_running', 'kuerzel': kuerzel}} + + # Set lock for 30 minutes + redis_client.set(employee_lock_key, 'event', ex=1800) + log_operation('info', f"Lock set for {kuerzel}, starting sync", context=context) + + log_operation('debug', "Initializing Advoware API", context=context) advoware = AdvowareAPI(context) - # Alle Mitarbeiter abrufen - logger.info("Rufe Advoware Mitarbeiter ab...") - employees = await get_advoware_employees(advoware) - if not employees: - logger.error("Keine Mitarbeiter gefunden. Sync abgebrochen.") - return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} + log_operation('debug', "Initializing Google service", context=context) + service = await get_google_service() + log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=context) + calendar_id = await ensure_google_calendar(service, kuerzel) - total_synced = 0 - for employee in employees: - kuerzel = employee.get('kuerzel') - if not kuerzel: - logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}") - continue + conn = await connect_db() + try: + # Initialize state + state = { + 'rows': [], + 'db_adv_index': {}, + 'db_google_index': {}, + 'adv_appointments': [], + 'adv_map': {}, + 'google_events': [], + 'google_map': {} + } - # DEBUG: Nur für Nutzer AI syncen (für Test der Travel/Prep Zeit) - if kuerzel != 'SB': - logger.info(f"DEBUG: Überspringe {kuerzel}, nur AI wird gesynct") - continue + async def reload_db_indexes(): + """Reload database indexes after DB changes in phases.""" + state['rows'] = conn.fetch( + """ + SELECT * FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + """, + kuerzel + ) + state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} + state['db_google_index'] = {} + for row in state['rows']: + if row['google_event_id']: + state['db_google_index'][row['google_event_id']] = row + log_operation('debug', "Reloaded indexes", context=context, rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) - logger.info(f"Starting calendar sync for {kuerzel}") + async def reload_api_maps(): + """Reload API maps after creating new events in phases.""" + state['adv_appointments'] = fetch_advoware_appointments(advoware, kuerzel) + state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} + state['google_events'] = fetch_google_events(service, calendar_id) + state['google_map'] = {evt['id']: evt for evt in state['google_events']} + log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map'])) - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + # Initial fetch + log_operation('info', "Fetching fresh data from APIs", context=context) + await reload_api_maps() + await reload_db_indexes() + log_operation('info', "Fetched existing sync rows", context=context, count=len(state['rows'])) - try: - logger.debug("Initializing Google service") - service = await get_google_service() - logger.debug(f"Ensuring Google calendar for {kuerzel}") - calendar_id = await ensure_google_calendar(service, kuerzel) + # Phase 1: New from Advoware => Google + await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware, context) - conn = await connect_db() - try: - # Initialize state - state = { - 'rows': [], - 'db_adv_index': {}, - 'db_google_index': {}, - 'adv_appointments': [], - 'adv_map': {}, - 'google_events': [], - 'google_map': {} - } + # Reload indexes after Phase 1 changes + await reload_db_indexes() + # Reload API maps after Phase 1 changes + await reload_api_maps() - async def reload_db_indexes(): - """Reload database indexes after DB changes in phases.""" - state['rows'] = await conn.fetch( - """ - SELECT * FROM calendar_sync - WHERE employee_kuerzel = $1 AND deleted = FALSE - """, - kuerzel - ) - state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} - state['db_google_index'] = {} - for row in state['rows']: - if row['google_event_id']: - state['db_google_index'][row['google_event_id']] = row - log_operation('debug', "Reloaded indexes", context=context, rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) + # Phase 2: New from Google => Advoware + await process_new_from_google(state, conn, service, calendar_id, kuerzel, advoware, context) - async def reload_api_maps(): - """Reload API maps after creating new events in phases.""" - state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) - state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} - state['google_events'] = await fetch_google_events(service, calendar_id) - state['google_map'] = {evt['id']: evt for evt in state['google_events']} - log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map'])) + # Reload indexes after Phase 2 changes + await reload_db_indexes() + # Reload API maps after Phase 2 changes + await reload_api_maps() - # Initial fetch - log_operation('info', "Fetching fresh data from APIs", context=context) - await reload_api_maps() - await reload_db_indexes() - log_operation('info', "Fetched existing sync rows", context=context, count=len(state['rows'])) + # Phase 3: Identify deleted entries + await process_deleted_entries(state, conn, service, calendar_id, kuerzel, advoware, context) - # Phase 1: New from Advoware => Google - await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware, context) + # Reload indexes after Phase 3 changes + await reload_db_indexes() + # Reload API maps after Phase 3 changes + await reload_api_maps() - # Reload indexes after Phase 1 changes - await reload_db_indexes() - # Reload API maps after Phase 1 changes - await reload_api_maps() + # Phase 4: Update existing entries if changed + await process_updates(state, conn, service, calendar_id, kuerzel, advoware, context) - # Phase 2: New from Google => Advoware - await process_new_from_google(state, conn, service, calendar_id, kuerzel, advoware, context) + # Update last_sync timestamps + log_operation('debug', "Updated last_sync timestamps", context=context) - # Reload indexes after Phase 2 changes - await reload_db_indexes() - # Reload API maps after Phase 2 changes - await reload_api_maps() + finally: + conn.close() - # Phase 3: Identify deleted entries - await process_deleted_entries(state, conn, service, calendar_id, kuerzel, advoware, context) - - # Reload indexes after Phase 3 changes - await reload_db_indexes() - # Reload API maps after Phase 3 changes - await reload_api_maps() - - # Phase 4: Update existing entries if changed - await process_updates(state, conn, service, calendar_id, kuerzel, advoware, context) - - # Update last_sync timestamps - logger.debug("Updated last_sync timestamps") - - finally: - await conn.close() - - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - logger.info(f"Calendar sync completed for {kuerzel}") - total_synced += 1 - - except Exception as e: - logger.error(f"Sync failed for {kuerzel}: {e}") - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - - logger.info(f"Calendar sync completed for all employees. Total synced: {total_synced}") - return {'status': 200, 'body': {'status': 'completed', 'total_synced': total_synced}} + redis_client.delete(employee_lock_key) + log_operation('info', f"Calendar sync completed for {kuerzel}", context=context) + return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} except Exception as e: - logger.error(f"Sync failed: {e}") + log_operation('error', f"Sync failed for {kuerzel}: {e}", context=context) + redis_client.delete(employee_lock_key) return {'status': 500, 'body': {'error': str(e)}} # Motia Step Configuration @@ -983,7 +973,7 @@ config = { "type": "event", "name": "Calendar Sync Event Step", "description": "Handles bidirectional calendar sync between Advoware and Google Calendar using Postgres as hub", - "subscribes": ["calendar.sync.triggered"], + "subscribes": ["calendar.sync.employee"], "emits": [], "flows": ["advoware"] } \ No newline at end of file