diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 2b15074a..e1978b2e 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -571,7 +571,271 @@ def get_timestamps(adv_data, google_data): return adv_ts, google_ts -async def handler(event, context): +async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware): + """Phase 1: Process new appointments from Advoware to Google.""" + logger.info("Phase 1: Processing new appointments from Advoware") + for frnr, app in state['adv_map'].items(): + if frnr not in state['db_adv_index']: + try: + event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) + async with conn.transaction(): + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); + """, + kuerzel, int(frnr), event_id + ) + logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") + +async def process_new_from_google(state, conn, service, calendar_id, kuerzel, advoware): + """Phase 2: Process new events from Google to Advoware.""" + logger.info("Phase 2: Processing new events from Google") + for event_id, evt in state['google_map'].items(): + # For recurring events, check if the master event (recurringEventId) is already synced + # For regular events, check the event_id directly + recurring_master_id = evt.get('recurringEventId') + is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) + + if not is_already_synced: + try: + frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) + if frnr and str(frnr) != 'None': + async with conn.transaction(): + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); + """, + kuerzel, int(frnr), event_id + ) + logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") + else: + logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") + except Exception as e: + logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") + +async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, advoware): + """Phase 3: Process deleted entries.""" + logger.info("Phase 3: Processing deleted entries") + for row in state['rows']: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_exists = str(frnr) in state['adv_map'] if frnr else False + + # For Google events, check if the master event or any instance exists + google_exists = False + if event_id: + # Check if the stored event_id exists + if event_id in state['google_map']: + google_exists = True + else: + # Check if any event has this as recurringEventId (master event still exists) + for evt in state['google_map'].values(): + if evt.get('recurringEventId') == event_id: + google_exists = True + break + + if not adv_exists and not google_exists: + # Both missing - soft delete + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") + elif not adv_exists: + # Missing in Advoware + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Recreate in Advoware + try: + new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed']) + if new_frnr and str(new_frnr) != 'None': + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") + else: + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + except Exception as e: + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # For other cases, propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif not google_exists: + # Missing in Google + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'google': + # Delete in Advoware + if row['advoware_write_allowed']: + try: + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'advoware': + # Recreate in Google + try: + new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # last_change_wins or other, propagate delete to Advoware + try: + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + +async def process_updates(state, conn, service, calendar_id, kuerzel, advoware): + """Phase 4: Process updates for existing entries.""" + logger.info("Phase 4: Processing updates for existing entries") + # Track which master events we've already processed to avoid duplicate updates + processed_master_events = set() + + for row in state['rows']: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_data = state['adv_map'].get(str(frnr)) if frnr else None + + # For Google events, find the corresponding event (could be master or instance) + google_data = None + if event_id: + # First try to find the exact event_id + if event_id in state['google_map']: + google_data = state['google_map'][event_id] + else: + # Look for any event that has this as recurringEventId + for evt in state['google_map'].values(): + if evt.get('recurringEventId') == event_id: + google_data = evt + break + + # Skip if we don't have both sides or if we've already processed this master event + if not adv_data or not google_data: + continue + + # For recurring events, only process the master event once + master_event_id = google_data.get('recurringEventId') or event_id + if master_event_id in processed_master_events: + continue + processed_master_events.add(master_event_id) + + if adv_data and google_data: + adv_std = standardize_appointment_data(adv_data, 'advoware') + google_std = standardize_appointment_data(google_data, 'google') + strategy = row['sync_strategy'] + try: + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Check for changes in source (Advoware) or unauthorized changes in target (Google) + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts > row['last_sync']: + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + elif google_ts and google_ts > row['last_sync']: + logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Check for changes in source (Google) or unauthorized changes in target (Advoware) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") + if google_ts and google_ts > row['last_sync']: + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") + elif adv_ts > row['last_sync']: + logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") + elif strategy == 'last_change_wins': + adv_ts = await get_advoware_timestamp(advoware, frnr) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts and google_ts: + if adv_ts > google_ts: + await update_google_event(service, calendar_id, event_id, adv_std) + elif row['advoware_write_allowed']: + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) + logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) """Main event handler for calendar sync.""" logger.info("Starting calendar sync for all employees") @@ -657,23 +921,7 @@ async def handler(event, context): log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) # Phase 1: New from Advoware => Google - logger.info("Phase 1: Processing new appointments from Advoware") - for frnr, app in state['adv_map'].items(): - if frnr not in state['db_adv_index']: - try: - event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); - """, - kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") + await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware) # Reload indexes after Phase 1 changes await reload_db_indexes() @@ -681,30 +929,7 @@ async def handler(event, context): await reload_api_maps() # Phase 2: New from Google => Advoware - logger.info("Phase 2: Processing new events from Google") - for event_id, evt in state['google_map'].items(): - # For recurring events, check if the master event (recurringEventId) is already synced - # For regular events, check the event_id directly - recurring_master_id = evt.get('recurringEventId') - is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) - - if not is_already_synced: - try: - frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) - if frnr and str(frnr) != 'None': - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); - """, - kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") - else: - logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") - except Exception as e: - logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") + await process_new_from_google(state, conn, service, calendar_id, kuerzel, advoware) # Reload indexes after Phase 2 changes await reload_db_indexes() @@ -712,129 +937,7 @@ async def handler(event, context): await reload_api_maps() # Phase 3: Identify deleted entries - logger.info("Phase 3: Processing deleted entries") - for row in state['rows']: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_exists = str(frnr) in state['adv_map'] if frnr else False - - # For Google events, check if the master event or any instance exists - google_exists = False - if event_id: - # Check if the stored event_id exists - if event_id in state['google_map']: - google_exists = True - else: - # Check if any event has this as recurringEventId (master event still exists) - for evt in state['google_map'].values(): - if evt.get('recurringEventId') == event_id: - google_exists = True - break - - if not adv_exists and not google_exists: - # Both missing - soft delete - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") - elif not adv_exists: - # Missing in Advoware - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Recreate in Advoware - try: - new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed']) - if new_frnr and str(new_frnr) != 'None': - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") - else: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # For other cases, propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # Propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif not google_exists: - # Missing in Google - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'google': - # Delete in Advoware - if row['advoware_write_allowed']: - try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'advoware': - # Recreate in Google - try: - new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # last_change_wins or other, propagate delete to Advoware - try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + await process_deleted_entries(state, conn, service, calendar_id, kuerzel, advoware) # Reload indexes after Phase 3 changes await reload_db_indexes() @@ -842,96 +945,7 @@ async def handler(event, context): await reload_api_maps() # Phase 4: Update existing entries if changed - logger.info("Phase 4: Processing updates for existing entries") - # Track which master events we've already processed to avoid duplicate updates - processed_master_events = set() - - for row in state['rows']: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_data = state['adv_map'].get(str(frnr)) if frnr else None - - # For Google events, find the corresponding event (could be master or instance) - google_data = None - if event_id: - # First try to find the exact event_id - if event_id in state['google_map']: - google_data = state['google_map'][event_id] - else: - # Look for any event that has this as recurringEventId - for evt in state['google_map'].values(): - if evt.get('recurringEventId') == event_id: - google_data = evt - break - - # Skip if we don't have both sides or if we've already processed this master event - if not adv_data or not google_data: - continue - - # For recurring events, only process the master event once - master_event_id = google_data.get('recurringEventId') or event_id - if master_event_id in processed_master_events: - continue - processed_master_events.add(master_event_id) - - if adv_data and google_data: - adv_std = standardize_appointment_data(adv_data, 'advoware') - google_std = standardize_appointment_data(google_data, 'google') - strategy = row['sync_strategy'] - try: - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Check for changes in source (Advoware) or unauthorized changes in target (Google) - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts > row['last_sync']: - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - elif google_ts and google_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Check for changes in source (Google) or unauthorized changes in target (Advoware) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") - if google_ts and google_ts > row['last_sync']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") - elif adv_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") - elif strategy == 'last_change_wins': - adv_ts = await get_advoware_timestamp(advoware, frnr) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts and google_ts: - if adv_ts > google_ts: - await update_google_event(service, calendar_id, event_id, adv_std) - elif row['advoware_write_allowed']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) - logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") - await asyncio.sleep(0.1) # Small delay to avoid rate limits - except Exception as e: - logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + await process_updates(state, conn, service, calendar_id, kuerzel, advoware) # Update last_sync timestamps logger.debug("Updated last_sync timestamps")