diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index bba096f4..4316431c 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -655,6 +655,268 @@ async def handler(event, context): await reload_db_indexes() log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) + async def phase_1(state, conn, service, calendar_id, advoware, kuerzel): + """Phase 1: New from Advoware => Google""" + log_operation('info', "Phase 1: Processing new appointments from Advoware") + for frnr, app in state['adv_map'].items(): + if frnr not in state['db_adv_index']: + try: + event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) + async with conn.transaction(): + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); + """, + kuerzel, int(frnr), event_id + ) + log_operation('info', "Phase 1: Created new from Advoware", frnr=frnr, event_id=event_id) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 1: Failed to process new Advoware", frnr=frnr, error=str(e)) + + async def phase_2(state, conn, service, calendar_id, advoware, kuerzel): + """Phase 2: New from Google => Advoware""" + log_operation('info', "Phase 2: Processing new events from Google") + for event_id, evt in state['google_map'].items(): + # For recurring events, check if the master event (recurringEventId) is already synced + # For regular events, check the event_id directly + recurring_master_id = evt.get('recurringEventId') + is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) + + if not is_already_synced: + try: + frnr = await safe_advoware_operation(create_advoware_appointment, True, advoware, standardize_appointment_data(evt, 'google'), kuerzel) + if frnr and str(frnr) != 'None': + async with conn.transaction(): + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); + """, + kuerzel, int(frnr), event_id + ) + log_operation('info', "Phase 2: Created new from Google", event_id=event_id, frnr=frnr) + else: + log_operation('warning', "Phase 2: Skipped DB insert for Google event", event_id=event_id) + except Exception as e: + log_operation('warning', "Phase 2: Failed to process new Google", event_id=event_id, error=str(e)) + + async def phase_3(state, conn, service, calendar_id, advoware, kuerzel): + """Phase 3: Identify deleted entries""" + log_operation('info', "Phase 3: Processing deleted entries") + for row in state['rows']: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_exists = str(frnr) in state['adv_map'] if frnr else False + + # For Google events, check if the master event or any instance exists + google_exists = False + if event_id: + # Check if the stored event_id exists + if event_id in state['google_map']: + google_exists = True + else: + # Check if any event has this as recurringEventId (master event still exists) + for evt in state['google_map'].values(): + if evt.get('recurringEventId') == event_id: + google_exists = True + break + + if not adv_exists and not google_exists: + # Both missing - soft delete + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Soft deleted", sync_id=row['sync_id']) + elif not adv_exists: + # Missing in Advoware + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Recreate in Advoware + try: + new_frnr = await safe_advoware_operation(create_advoware_appointment, row['advoware_write_allowed'], advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel) + if new_frnr and str(new_frnr) != 'None': + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 3: Recreated Advoware appointment", frnr=new_frnr, sync_id=row['sync_id']) + else: + log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + except Exception as e: + log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # For other cases, propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id']) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif not google_exists: + # Missing in Google + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'google': + # Delete in Advoware + if row['advoware_write_allowed']: + try: + await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id']) + except Exception as e: + log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + log_operation('warning', "Phase 3: Cannot delete in Advoware, write not allowed", sync_id=row['sync_id']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'advoware': + # Recreate in Google + try: + new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 3: Recreated Google event", event_id=new_event_id, sync_id=row['sync_id']) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 3: Failed to recreate Google", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # last_change_wins or other, propagate delete to Advoware + try: + await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id']) + except Exception as e: + log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + + async def phase_4(state, conn, service, calendar_id, advoware, kuerzel): + """Phase 4: Update existing entries if changed""" + log_operation('info', "Phase 4: Processing updates for existing entries") + # Track which master events we've already processed to avoid duplicate updates + processed_master_events = set() + + for row in state['rows']: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_data = state['adv_map'].get(str(frnr)) if frnr else None + + # For Google events, find the corresponding event (could be master or instance) + google_data = None + if event_id: + # First try to find the exact event_id + if event_id in state['google_map']: + google_data = state['google_map'][event_id] + else: + # Look for any event that has this as recurringEventId + for evt in state['google_map'].values(): + if evt.get('recurringEventId') == event_id: + google_data = evt + break + + # Skip if we don't have both sides or if we've already processed this master event + if not adv_data or not google_data: + continue + + # For recurring events, only process the master event once + master_event_id = google_data.get('recurringEventId') or event_id + if master_event_id in processed_master_events: + continue + processed_master_events.add(master_event_id) + + if adv_data and google_data: + adv_std = standardize_appointment_data(adv_data, 'advoware') + google_std = standardize_appointment_data(google_data, 'google') + strategy = row['sync_strategy'] + try: + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Check for changes in source (Advoware) or unauthorized changes in target (Google) + adv_ts, google_ts = get_timestamps(adv_data, google_data) + if adv_ts and adv_ts > row['last_sync']: + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 4: Updated Google event from Advoware", event_id=event_id, frnr=frnr) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + elif google_ts and google_ts > row['last_sync']: + log_operation('warning', "Phase 4: Unauthorized change in Google event, resetting to Advoware", event_id=event_id, frnr=frnr) + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 4: Reset Google event to Advoware", event_id=event_id, frnr=frnr) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Check for changes in source (Google) or unauthorized changes in target (Advoware) + adv_ts, google_ts = get_timestamps(adv_data, google_data) + log_operation('debug', "Phase 4: Checking sync", sync_id=row['sync_id'], adv_ts=adv_ts, google_ts=google_ts, last_sync=row['last_sync']) + if google_ts and google_ts > row['last_sync']: + await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 4: Updated Advoware from Google event", frnr=frnr, event_id=event_id) + elif adv_ts and adv_ts > row['last_sync']: + log_operation('warning', "Phase 4: Unauthorized change in Advoware, resetting to Google", frnr=frnr, event_id=event_id) + await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + log_operation('info', "Phase 4: Reset Advoware to Google event", frnr=frnr, event_id=event_id) + elif strategy == 'last_change_wins': + adv_ts = await get_advoware_timestamp(advoware, frnr) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts and google_ts: + if adv_ts > google_ts: + await update_google_event(service, calendar_id, event_id, adv_std) + elif row['advoware_write_allowed']: + await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) + log_operation('info', "Phase 4: Updated based on last_change_wins", sync_id=row['sync_id']) + await asyncio.sleep(0.1) # Small delay to avoid rate limits + except Exception as e: + log_operation('warning', "Phase 4: Failed to update", sync_id=row['sync_id'], error=str(e)) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + # Execute phases await phase_1(state, conn, service, calendar_id, advoware, kuerzel) await reload_db_indexes()