From c91d3fc76dabae43405ff77b9b1da11ffc0d2fe7 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 24 Oct 2025 00:08:18 +0000 Subject: [PATCH] Fix 'adv_map' not defined error by correcting state dict references --- .../calendar_sync_event_step.py | 299 +++++++++--------- 1 file changed, 151 insertions(+), 148 deletions(-) diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 4316431c..2b15074a 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -573,91 +573,91 @@ def get_timestamps(adv_data, google_data): async def handler(event, context): """Main event handler for calendar sync.""" - logger.info("Starting calendar sync event handler") + logger.info("Starting calendar sync for all employees") try: - # Get kuerzel from event - body = event.get('body', {}) - kuerzel = body.get('kuerzel') - if not kuerzel: - logger.error("Calendar Sync Event: kuerzel is required in event") - return {'status': 400, 'body': {'error': 'kuerzel is required'}} + logger.debug("Initializing Advoware API") + advoware = AdvowareAPI(context) - logger.info(f"Starting calendar sync for {kuerzel}") + # Alle Mitarbeiter abrufen + logger.info("Rufe Advoware Mitarbeiter ab...") + employees = await get_advoware_employees(advoware) + if not employees: + logger.error("Keine Mitarbeiter gefunden. Sync abgebrochen.") + return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} - # Per-employee lock - CALENDAR_SYNC_LOCK_KEY = f'calendar_sync_lock_{kuerzel}' - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + total_synced = 0 + for employee in employees: + kuerzel = employee.get('kuerzel') + if not kuerzel: + logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}") + continue - if redis_client.get(CALENDAR_SYNC_LOCK_KEY): - logger.info(f"Calendar Sync Event: Sync for {kuerzel} already active, skipping") - return {'status': 409, 'body': {'error': 'sync_already_running', 'kuerzel': kuerzel}} + # DEBUG: Nur für Nutzer AI syncen (für Test der Travel/Prep Zeit) + if kuerzel != 'SB': + logger.info(f"DEBUG: Überspringe {kuerzel}, nur AI wird gesynct") + continue - # Set lock for 30 minutes - redis_client.set(CALENDAR_SYNC_LOCK_KEY, 'event', ex=1800) - logger.info(f"Calendar Sync Event: Lock set for {kuerzel}") + logger.info(f"Starting calendar sync for {kuerzel}") - try: - logger.debug("Initializing Advoware API") - advoware = AdvowareAPI(context) + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS + ) - # Initialize Google service - logger.debug("Initializing Google service") - service = await get_google_service() - logger.debug(f"Ensuring Google calendar for {kuerzel}") - calendar_id = await ensure_google_calendar(service, kuerzel) - - conn = await connect_db() try: - # Initialize state - state = { - 'rows': [], - 'db_adv_index': {}, - 'db_google_index': {}, - 'adv_appointments': [], - 'adv_map': {}, - 'google_events': [], - 'google_map': {} - } + logger.debug("Initializing Google service") + service = await get_google_service() + logger.debug(f"Ensuring Google calendar for {kuerzel}") + calendar_id = await ensure_google_calendar(service, kuerzel) - async def reload_db_indexes(): - """Reload database indexes after DB changes in phases.""" - state['rows'] = await conn.fetch( - """ - SELECT * FROM calendar_sync - WHERE employee_kuerzel = $1 AND deleted = FALSE - """, - kuerzel - ) - state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} - state['db_google_index'] = {} - for row in state['rows']: - if row['google_event_id']: - state['db_google_index'][row['google_event_id']] = row - log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) + conn = await connect_db() + try: + # Initialize state + state = { + 'rows': [], + 'db_adv_index': {}, + 'db_google_index': {}, + 'adv_appointments': [], + 'adv_map': {}, + 'google_events': [], + 'google_map': {} + } - async def reload_api_maps(): - """Reload API maps after creating new events in phases.""" - state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) - state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} - state['google_events'] = await fetch_google_events(service, calendar_id) - state['google_map'] = {evt['id']: evt for evt in state['google_events']} - log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map'])) + async def reload_db_indexes(): + """Reload database indexes after DB changes in phases.""" + state['rows'] = await conn.fetch( + """ + SELECT * FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + """, + kuerzel + ) + state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} + state['db_google_index'] = {} + for row in state['rows']: + if row['google_event_id']: + state['db_google_index'][row['google_event_id']] = row + log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) - # Initial fetch - log_operation('info', "Fetching fresh data from APIs") - await reload_api_maps() - await reload_db_indexes() - log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) + async def reload_api_maps(): + """Reload API maps after creating new events in phases.""" + state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) + state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} + state['google_events'] = await fetch_google_events(service, calendar_id) + state['google_map'] = {evt['id']: evt for evt in state['google_events']} + log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map'])) - async def phase_1(state, conn, service, calendar_id, advoware, kuerzel): - """Phase 1: New from Advoware => Google""" - log_operation('info', "Phase 1: Processing new appointments from Advoware") + # Initial fetch + log_operation('info', "Fetching fresh data from APIs") + await reload_api_maps() + await reload_db_indexes() + log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) + + # Phase 1: New from Advoware => Google + logger.info("Phase 1: Processing new appointments from Advoware") for frnr, app in state['adv_map'].items(): if frnr not in state['db_adv_index']: try: @@ -670,14 +670,18 @@ async def handler(event, context): """, kuerzel, int(frnr), event_id ) - log_operation('info', "Phase 1: Created new from Advoware", frnr=frnr, event_id=event_id) + logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 1: Failed to process new Advoware", frnr=frnr, error=str(e)) + logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") - async def phase_2(state, conn, service, calendar_id, advoware, kuerzel): - """Phase 2: New from Google => Advoware""" - log_operation('info', "Phase 2: Processing new events from Google") + # Reload indexes after Phase 1 changes + await reload_db_indexes() + # Reload API maps after Phase 1 changes + await reload_api_maps() + + # Phase 2: New from Google => Advoware + logger.info("Phase 2: Processing new events from Google") for event_id, evt in state['google_map'].items(): # For recurring events, check if the master event (recurringEventId) is already synced # For regular events, check the event_id directly @@ -686,7 +690,7 @@ async def handler(event, context): if not is_already_synced: try: - frnr = await safe_advoware_operation(create_advoware_appointment, True, advoware, standardize_appointment_data(evt, 'google'), kuerzel) + frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) if frnr and str(frnr) != 'None': async with conn.transaction(): await conn.execute( @@ -696,15 +700,19 @@ async def handler(event, context): """, kuerzel, int(frnr), event_id ) - log_operation('info', "Phase 2: Created new from Google", event_id=event_id, frnr=frnr) + logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") else: - log_operation('warning', "Phase 2: Skipped DB insert for Google event", event_id=event_id) + logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") except Exception as e: - log_operation('warning', "Phase 2: Failed to process new Google", event_id=event_id, error=str(e)) + logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") - async def phase_3(state, conn, service, calendar_id, advoware, kuerzel): - """Phase 3: Identify deleted entries""" - log_operation('info', "Phase 3: Processing deleted entries") + # Reload indexes after Phase 2 changes + await reload_db_indexes() + # Reload API maps after Phase 2 changes + await reload_api_maps() + + # Phase 3: Identify deleted entries + logger.info("Phase 3: Processing deleted entries") for row in state['rows']: frnr = row['advoware_frnr'] event_id = row['google_event_id'] @@ -727,7 +735,7 @@ async def handler(event, context): # Both missing - soft delete async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Soft deleted", sync_id=row['sync_id']) + logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") elif not adv_exists: # Missing in Advoware strategy = row['sync_strategy'] @@ -738,26 +746,26 @@ async def handler(event, context): await delete_google_event(service, calendar_id, event_id) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Recreate in Advoware try: - new_frnr = await safe_advoware_operation(create_advoware_appointment, row['advoware_write_allowed'], advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel) + new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed']) if new_frnr and str(new_frnr) != 'None': async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 3: Recreated Advoware appointment", frnr=new_frnr, sync_id=row['sync_id']) + logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") else: - log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id']) + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) except Exception as e: - log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: @@ -766,10 +774,10 @@ async def handler(event, context): await delete_google_event(service, calendar_id, event_id) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: @@ -778,10 +786,10 @@ async def handler(event, context): await delete_google_event(service, calendar_id, event_id) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif not google_exists: @@ -792,16 +800,16 @@ async def handler(event, context): # Delete in Advoware if row['advoware_write_allowed']: try: - await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr) + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") except Exception as e: - log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: - log_operation('warning', "Phase 3: Cannot delete in Advoware, write not allowed", sync_id=row['sync_id']) + logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif row['source_system'] == 'advoware': @@ -810,27 +818,31 @@ async def handler(event, context): new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 3: Recreated Google event", event_id=new_event_id, sync_id=row['sync_id']) + logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 3: Failed to recreate Google", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: # last_change_wins or other, propagate delete to Advoware try: - await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr) + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") except Exception as e: - log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - async def phase_4(state, conn, service, calendar_id, advoware, kuerzel): - """Phase 4: Update existing entries if changed""" - log_operation('info', "Phase 4: Processing updates for existing entries") + # Reload indexes after Phase 3 changes + await reload_db_indexes() + # Reload API maps after Phase 3 changes + await reload_api_maps() + + # Phase 4: Update existing entries if changed + logger.info("Phase 4: Processing updates for existing entries") # Track which master events we've already processed to avoid duplicate updates processed_master_events = set() @@ -870,35 +882,39 @@ async def handler(event, context): if strategy == 'source_system_wins': if row['source_system'] == 'advoware': # Check for changes in source (Advoware) or unauthorized changes in target (Google) - adv_ts, google_ts = get_timestamps(adv_data, google_data) - if adv_ts and adv_ts > row['last_sync']: + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts > row['last_sync']: await update_google_event(service, calendar_id, event_id, adv_std) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 4: Updated Google event from Advoware", event_id=event_id, frnr=frnr) + logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") await asyncio.sleep(0.1) # Small delay to avoid rate limits elif google_ts and google_ts > row['last_sync']: - log_operation('warning', "Phase 4: Unauthorized change in Google event, resetting to Advoware", event_id=event_id, frnr=frnr) + logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") await update_google_event(service, calendar_id, event_id, adv_std) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 4: Reset Google event to Advoware", event_id=event_id, frnr=frnr) + logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") await asyncio.sleep(0.1) # Small delay to avoid rate limits elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Check for changes in source (Google) or unauthorized changes in target (Advoware) - adv_ts, google_ts = get_timestamps(adv_data, google_data) - log_operation('debug', "Phase 4: Checking sync", sync_id=row['sync_id'], adv_ts=adv_ts, google_ts=google_ts, last_sync=row['last_sync']) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") if google_ts and google_ts > row['last_sync']: - await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 4: Updated Advoware from Google event", frnr=frnr, event_id=event_id) - elif adv_ts and adv_ts > row['last_sync']: - log_operation('warning', "Phase 4: Unauthorized change in Advoware, resetting to Google", frnr=frnr, event_id=event_id) - await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") + elif adv_ts > row['last_sync']: + logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - log_operation('info', "Phase 4: Reset Advoware to Google event", frnr=frnr, event_id=event_id) + logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") elif strategy == 'last_change_wins': adv_ts = await get_advoware_timestamp(advoware, frnr) google_ts_str = google_data.get('updated', '') @@ -907,45 +923,32 @@ async def handler(event, context): if adv_ts > google_ts: await update_google_event(service, calendar_id, event_id, adv_std) elif row['advoware_write_allowed']: - await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) - log_operation('info', "Phase 4: Updated based on last_change_wins", sync_id=row['sync_id']) + logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: - log_operation('warning', "Phase 4: Failed to update", sync_id=row['sync_id'], error=str(e)) + logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - # Execute phases - await phase_1(state, conn, service, calendar_id, advoware, kuerzel) - await reload_db_indexes() - await reload_api_maps() + # Update last_sync timestamps + logger.debug("Updated last_sync timestamps") - await phase_2(state, conn, service, calendar_id, advoware, kuerzel) - await reload_db_indexes() - await reload_api_maps() + finally: + await conn.close() - await phase_3(state, conn, service, calendar_id, advoware, kuerzel) - await reload_db_indexes() - await reload_api_maps() + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + logger.info(f"Calendar sync completed for {kuerzel}") + total_synced += 1 - await phase_4(state, conn, service, calendar_id, advoware, kuerzel) + except Exception as e: + logger.error(f"Sync failed for {kuerzel}: {e}") + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - # Update last_sync timestamps - log_operation('debug', "Updated last_sync timestamps") - - finally: - await conn.close() - - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - logger.info(f"Calendar sync completed for {kuerzel}") - return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} - - except Exception as e: - logger.error(f"Sync failed for {kuerzel}: {e}") - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - return {'status': 500, 'body': {'error': str(e), 'kuerzel': kuerzel}} + logger.info(f"Calendar sync completed for all employees. Total synced: {total_synced}") + return {'status': 200, 'body': {'status': 'completed', 'total_synced': total_synced}} except Exception as e: logger.error(f"Sync failed: {e}")