diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py index 3fa38491..420fcb8a 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py @@ -15,40 +15,30 @@ config = { async def handler(req, context): try: - # Prüfe ob bereits ein Sync läuft - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) - - if redis_client.get(CALENDAR_SYNC_LOCK_KEY): - context.logger.info("Calendar Sync API: Sync bereits aktiv, überspringe") + # Konfiguration aus Request-Body + body = req.get('body', {}) + kuerzel = body.get('kuerzel') + full_content = body.get('full_content', True) + + if not kuerzel: + context.logger.error("Calendar Sync API: kuerzel is required") return { - 'status': 409, + 'status': 400, 'body': { - 'status': 'conflict', - 'message': 'Calendar sync bereits aktiv', + 'status': 'error', + 'message': 'kuerzel is required', 'triggered_by': 'api' } } - - # Konfiguration aus Request-Body - body = req.get('body', {}) - full_content = body.get('full_content', True) - context.logger.info(f"Calendar Sync API aufgerufen, full_content: {full_content}") - - # Setze Lock für 30 Minuten (Sync sollte max 30 Minuten dauern) - redis_client.set(CALENDAR_SYNC_LOCK_KEY, 'api', ex=1800) - context.logger.info("Calendar Sync API: Lock gesetzt") + context.logger.info(f"Calendar Sync API called for {kuerzel}, full_content: {full_content}") - # Emit Event für den Sync + # Emit Event für den Sync mit kuerzel await context.emit({ "topic": "calendar.sync.triggered", "data": { "body": { + "kuerzel": kuerzel, "full_content": full_content, "triggered_by": "api" } @@ -60,6 +50,7 @@ async def handler(req, context): 'body': { 'status': 'triggered', 'message': 'Calendar sync wurde ausgelöst', + 'kuerzel': kuerzel, 'full_content': full_content, 'triggered_by': 'api' } diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py index 734c9cff..dcdb1188 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -1,10 +1,18 @@ import json import redis from config import Config +from services.advoware import AdvowareAPI -CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' - -config = { +async def get_advoware_employees(advoware, logger): + """Fetch list of employees from Advoware.""" + try: + result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) + employees = result if isinstance(result, list) else [] + logger.info(f"Fetched {len(employees)} Advoware employees") + return employees + except Exception as e: + logger.error(f"Failed to fetch Advoware employees: {e}") + raise 'type': 'cron', 'name': 'Calendar Sync Cron Job', 'description': 'Führt den Calendar Sync alle 15 Minuten automatisch aus', @@ -14,45 +22,58 @@ config = { async def handler(context): try: - # Prüfe ob bereits ein Sync läuft - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) - - if redis_client.get(CALENDAR_SYNC_LOCK_KEY): - context.logger.info("Calendar Sync Cron: Sync bereits aktiv, überspringe") + context.logger.info("Calendar Sync Cron: Starting automatic synchronization every 15 minutes") + + # Initialize Advoware API + advoware = AdvowareAPI(context) + + # Fetch all employees + employees = await get_advoware_employees(advoware, context.logger) + if not employees: + context.logger.error("Calendar Sync Cron: No employees found. Sync cancelled.") return { - 'status': 'skipped', - 'reason': 'sync_already_running', + 'status': 'error', + 'reason': 'no_employees', 'triggered_by': 'cron' } - - # Setze Lock für 30 Minuten (Sync sollte max 30 Minuten dauern) - redis_client.set(CALENDAR_SYNC_LOCK_KEY, 'cron', ex=1800) - context.logger.info("Calendar Sync Cron: Lock gesetzt, starte automatische Synchronisation alle 15 Minuten") - # Emit Event für den Sync - await context.emit({ - "topic": "calendar.sync.triggered", - "data": { - "body": { - "full_content": True, # Cron verwendet immer volle Details - "triggered_by": "cron" + total_emitted = 0 + for employee in employees: + kuerzel = employee.get('kuerzel') + if not kuerzel: + context.logger.warning(f"Calendar Sync Cron: Employee without kuerzel skipped: {employee}") + continue + + # DEBUG: Limit to SB for debugging + if kuerzel != 'SB': + context.logger.info(f"Calendar Sync Cron: DEBUG: Skipping {kuerzel}, only SB synced") + continue + + context.logger.info(f"Calendar Sync Cron: Emitting sync event for {kuerzel}") + + # Emit Event for the Sync with kuerzel + await context.emit({ + "topic": "calendar.sync.triggered", + "data": { + "body": { + "kuerzel": kuerzel, + "full_content": True, # Cron uses always full details + "triggered_by": "cron" + } } - } - }) + }) - context.logger.info("Calendar Sync Cron: Event wurde emittiert") + total_emitted += 1 + + context.logger.info(f"Calendar Sync Cron: Emitted {total_emitted} sync events") return { 'status': 'completed', + 'total_emitted': total_emitted, 'triggered_by': 'cron' } except Exception as e: - context.logger.error(f"Fehler beim Cron-Job: {e}") + context.logger.error(f"Calendar Sync Cron: Error in cron job: {e}") return { 'status': 'error', 'error': str(e) diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 2b15074a..bba096f4 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -573,382 +573,117 @@ def get_timestamps(adv_data, google_data): async def handler(event, context): """Main event handler for calendar sync.""" - logger.info("Starting calendar sync for all employees") + logger.info("Starting calendar sync event handler") try: - logger.debug("Initializing Advoware API") - advoware = AdvowareAPI(context) + # Get kuerzel from event + body = event.get('body', {}) + kuerzel = body.get('kuerzel') + if not kuerzel: + logger.error("Calendar Sync Event: kuerzel is required in event") + return {'status': 400, 'body': {'error': 'kuerzel is required'}} - # Alle Mitarbeiter abrufen - logger.info("Rufe Advoware Mitarbeiter ab...") - employees = await get_advoware_employees(advoware) - if not employees: - logger.error("Keine Mitarbeiter gefunden. Sync abgebrochen.") - return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} + logger.info(f"Starting calendar sync for {kuerzel}") - total_synced = 0 - for employee in employees: - kuerzel = employee.get('kuerzel') - if not kuerzel: - logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}") - continue + # Per-employee lock + CALENDAR_SYNC_LOCK_KEY = f'calendar_sync_lock_{kuerzel}' + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS + ) - # DEBUG: Nur für Nutzer AI syncen (für Test der Travel/Prep Zeit) - if kuerzel != 'SB': - logger.info(f"DEBUG: Überspringe {kuerzel}, nur AI wird gesynct") - continue + if redis_client.get(CALENDAR_SYNC_LOCK_KEY): + logger.info(f"Calendar Sync Event: Sync for {kuerzel} already active, skipping") + return {'status': 409, 'body': {'error': 'sync_already_running', 'kuerzel': kuerzel}} - logger.info(f"Starting calendar sync for {kuerzel}") + # Set lock for 30 minutes + redis_client.set(CALENDAR_SYNC_LOCK_KEY, 'event', ex=1800) + logger.info(f"Calendar Sync Event: Lock set for {kuerzel}") - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + try: + logger.debug("Initializing Advoware API") + advoware = AdvowareAPI(context) + # Initialize Google service + logger.debug("Initializing Google service") + service = await get_google_service() + logger.debug(f"Ensuring Google calendar for {kuerzel}") + calendar_id = await ensure_google_calendar(service, kuerzel) + + conn = await connect_db() try: - logger.debug("Initializing Google service") - service = await get_google_service() - logger.debug(f"Ensuring Google calendar for {kuerzel}") - calendar_id = await ensure_google_calendar(service, kuerzel) + # Initialize state + state = { + 'rows': [], + 'db_adv_index': {}, + 'db_google_index': {}, + 'adv_appointments': [], + 'adv_map': {}, + 'google_events': [], + 'google_map': {} + } - conn = await connect_db() - try: - # Initialize state - state = { - 'rows': [], - 'db_adv_index': {}, - 'db_google_index': {}, - 'adv_appointments': [], - 'adv_map': {}, - 'google_events': [], - 'google_map': {} - } - - async def reload_db_indexes(): - """Reload database indexes after DB changes in phases.""" - state['rows'] = await conn.fetch( - """ - SELECT * FROM calendar_sync - WHERE employee_kuerzel = $1 AND deleted = FALSE - """, - kuerzel - ) - state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} - state['db_google_index'] = {} - for row in state['rows']: - if row['google_event_id']: - state['db_google_index'][row['google_event_id']] = row - log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) - - async def reload_api_maps(): - """Reload API maps after creating new events in phases.""" - state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) - state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} - state['google_events'] = await fetch_google_events(service, calendar_id) - state['google_map'] = {evt['id']: evt for evt in state['google_events']} - log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map'])) - - # Initial fetch - log_operation('info', "Fetching fresh data from APIs") - await reload_api_maps() - await reload_db_indexes() - log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) - - # Phase 1: New from Advoware => Google - logger.info("Phase 1: Processing new appointments from Advoware") - for frnr, app in state['adv_map'].items(): - if frnr not in state['db_adv_index']: - try: - event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); - """, - kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") - - # Reload indexes after Phase 1 changes - await reload_db_indexes() - # Reload API maps after Phase 1 changes - await reload_api_maps() - - # Phase 2: New from Google => Advoware - logger.info("Phase 2: Processing new events from Google") - for event_id, evt in state['google_map'].items(): - # For recurring events, check if the master event (recurringEventId) is already synced - # For regular events, check the event_id directly - recurring_master_id = evt.get('recurringEventId') - is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) - - if not is_already_synced: - try: - frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) - if frnr and str(frnr) != 'None': - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); - """, - kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") - else: - logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") - except Exception as e: - logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") - - # Reload indexes after Phase 2 changes - await reload_db_indexes() - # Reload API maps after Phase 2 changes - await reload_api_maps() - - # Phase 3: Identify deleted entries - logger.info("Phase 3: Processing deleted entries") + async def reload_db_indexes(): + """Reload database indexes after DB changes in phases.""" + state['rows'] = await conn.fetch( + """ + SELECT * FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + """, + kuerzel + ) + state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} + state['db_google_index'] = {} for row in state['rows']: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_exists = str(frnr) in state['adv_map'] if frnr else False - - # For Google events, check if the master event or any instance exists - google_exists = False - if event_id: - # Check if the stored event_id exists - if event_id in state['google_map']: - google_exists = True - else: - # Check if any event has this as recurringEventId (master event still exists) - for evt in state['google_map'].values(): - if evt.get('recurringEventId') == event_id: - google_exists = True - break + if row['google_event_id']: + state['db_google_index'][row['google_event_id']] = row + log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) - if not adv_exists and not google_exists: - # Both missing - soft delete - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") - elif not adv_exists: - # Missing in Advoware - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Recreate in Advoware - try: - new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed']) - if new_frnr and str(new_frnr) != 'None': - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") - else: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # For other cases, propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # Propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif not google_exists: - # Missing in Google - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'google': - # Delete in Advoware - if row['advoware_write_allowed']: - try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'advoware': - # Recreate in Google - try: - new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # last_change_wins or other, propagate delete to Advoware - try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + async def reload_api_maps(): + """Reload API maps after creating new events in phases.""" + state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) + state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} + state['google_events'] = await fetch_google_events(service, calendar_id) + state['google_map'] = {evt['id']: evt for evt in state['google_events']} + log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map'])) - # Reload indexes after Phase 3 changes - await reload_db_indexes() - # Reload API maps after Phase 3 changes - await reload_api_maps() + # Initial fetch + log_operation('info', "Fetching fresh data from APIs") + await reload_api_maps() + await reload_db_indexes() + log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) - # Phase 4: Update existing entries if changed - logger.info("Phase 4: Processing updates for existing entries") - # Track which master events we've already processed to avoid duplicate updates - processed_master_events = set() - - for row in state['rows']: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_data = state['adv_map'].get(str(frnr)) if frnr else None - - # For Google events, find the corresponding event (could be master or instance) - google_data = None - if event_id: - # First try to find the exact event_id - if event_id in state['google_map']: - google_data = state['google_map'][event_id] - else: - # Look for any event that has this as recurringEventId - for evt in state['google_map'].values(): - if evt.get('recurringEventId') == event_id: - google_data = evt - break - - # Skip if we don't have both sides or if we've already processed this master event - if not adv_data or not google_data: - continue - - # For recurring events, only process the master event once - master_event_id = google_data.get('recurringEventId') or event_id - if master_event_id in processed_master_events: - continue - processed_master_events.add(master_event_id) + # Execute phases + await phase_1(state, conn, service, calendar_id, advoware, kuerzel) + await reload_db_indexes() + await reload_api_maps() - if adv_data and google_data: - adv_std = standardize_appointment_data(adv_data, 'advoware') - google_std = standardize_appointment_data(google_data, 'google') - strategy = row['sync_strategy'] - try: - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Check for changes in source (Advoware) or unauthorized changes in target (Google) - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts > row['last_sync']: - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - elif google_ts and google_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Check for changes in source (Google) or unauthorized changes in target (Advoware) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") - if google_ts and google_ts > row['last_sync']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") - elif adv_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") - elif strategy == 'last_change_wins': - adv_ts = await get_advoware_timestamp(advoware, frnr) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts and google_ts: - if adv_ts > google_ts: - await update_google_event(service, calendar_id, event_id, adv_std) - elif row['advoware_write_allowed']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) - logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + await phase_2(state, conn, service, calendar_id, advoware, kuerzel) + await reload_db_indexes() + await reload_api_maps() - # Update last_sync timestamps - logger.debug("Updated last_sync timestamps") + await phase_3(state, conn, service, calendar_id, advoware, kuerzel) + await reload_db_indexes() + await reload_api_maps() - finally: - await conn.close() + await phase_4(state, conn, service, calendar_id, advoware, kuerzel) - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - logger.info(f"Calendar sync completed for {kuerzel}") - total_synced += 1 + # Update last_sync timestamps + log_operation('debug', "Updated last_sync timestamps") - except Exception as e: - logger.error(f"Sync failed for {kuerzel}: {e}") - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + finally: + await conn.close() - logger.info(f"Calendar sync completed for all employees. Total synced: {total_synced}") - return {'status': 200, 'body': {'status': 'completed', 'total_synced': total_synced}} + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + logger.info(f"Calendar sync completed for {kuerzel}") + return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} + + except Exception as e: + logger.error(f"Sync failed for {kuerzel}: {e}") + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + return {'status': 500, 'body': {'error': str(e), 'kuerzel': kuerzel}} except Exception as e: logger.error(f"Sync failed: {e}")