diff --git a/bitbylaw/steps/advoware_cal_sync/README.md b/bitbylaw/steps/advoware_cal_sync/README.md index c3056006..0943c687 100644 --- a/bitbylaw/steps/advoware_cal_sync/README.md +++ b/bitbylaw/steps/advoware_cal_sync/README.md @@ -168,7 +168,7 @@ Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ): { "data": { "body": { - "employee_kuerzel": "SB" // Erforderlich, kein Default + // Kein employee_kuerzel erforderlich, syncronisiert alle Mitarbeiter automatisch } } } diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 7a39ff33..62ead861 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -383,170 +383,217 @@ async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, return await update_advoware_appointment(advoware, frnr, data, employee_kuerzel) -async def safe_delete_advoware_appointment(advoware, frnr, write_allowed): - """Safe wrapper for deleting Advoware appointments with write permission check.""" - if not write_allowed: - logger.warning("Cannot delete in Advoware, write not allowed") - return - await delete_advoware_appointment(advoware, frnr) +async def get_advoware_employees(advoware): + """Fetch list of employees from Advoware.""" + try: + result = await advoware.api_call('api/v1/advonet/Anwaelte', method='GET') + employees = result if isinstance(result, list) else [] + logger.info(f"Fetched {len(employees)} Advoware employees") + return employees + except Exception as e: + logger.error(f"Failed to fetch Advoware employees: {e}") + raise async def handler(event, context): """Main event handler for calendar sync.""" - employee_kuerzel = event.get('data', {}).get('body', {}).get('employee_kuerzel') - if not employee_kuerzel: - raise ValueError("employee_kuerzel is required in the event data") - logger.info(f"Starting calendar sync for {employee_kuerzel}") - - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + logger.info("Starting calendar sync for all employees") try: - logger.debug("Initializing Google service") - service = await get_google_service() - logger.debug(f"Ensuring Google calendar for {employee_kuerzel}") - calendar_id = await ensure_google_calendar(service, employee_kuerzel) logger.debug("Initializing Advoware API") advoware = AdvowareAPI(context) - conn = await connect_db() - try: - # Fetch fresh data - logger.info("Fetching fresh data from APIs") - adv_appointments = await fetch_advoware_appointments(advoware, employee_kuerzel) - adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')} - google_events = await fetch_google_events(service, calendar_id) - google_map = {evt['id']: evt for evt in google_events} + # Alle Mitarbeiter abrufen + logger.info("Rufe Advoware Mitarbeiter ab...") + employees = await get_advoware_employees(advoware) + if not employees: + logger.error("Keine Mitarbeiter gefunden. Sync abgebrochen.") + return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} - # Fetch existing DB rows - rows = await conn.fetch( - """ - SELECT * FROM calendar_sync - WHERE employee_kuerzel = $1 AND deleted = FALSE - """, - employee_kuerzel + total_synced = 0 + for employee in employees: + kuerzel = employee.get('kuerzel') or employee.get('anwalt') + if not kuerzel: + logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}") + continue + + logger.info(f"Starting calendar sync for {kuerzel}") + + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS ) - logger.info(f"Fetched {len(rows)} existing sync rows") - # Build indexes - db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} - db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']} + CALENDAR_SYNC_LOCK_KEY = f'calendar_sync_lock_{kuerzel}' - # Phase 1: New from Advoware => Google - logger.info("Phase 1: Processing new appointments from Advoware") - for frnr, app in adv_map.items(): - if frnr not in db_adv_index: - try: - event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); - """, - employee_kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") - except Exception as e: - logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") + try: + logger.debug("Initializing Google service") + service = await get_google_service() + logger.debug(f"Ensuring Google calendar for {kuerzel}") + calendar_id = await ensure_google_calendar(service, kuerzel) - # Phase 2: New from Google => Advoware - logger.info("Phase 2: Processing new events from Google") - for event_id, evt in google_map.items(): - if event_id not in db_google_index: - try: - frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), employee_kuerzel, True) - if frnr and str(frnr) != 'None': - async with conn.transaction(): - await conn.execute( - """ - INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) - VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); - """, - employee_kuerzel, int(frnr), event_id - ) - logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") - else: - logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") - except Exception as e: - logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") + conn = await connect_db() + try: + # Fetch fresh data + logger.info("Fetching fresh data from APIs") + adv_appointments = await fetch_advoware_appointments(advoware, kuerzel) + adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')} + google_events = await fetch_google_events(service, calendar_id) + google_map = {evt['id']: evt for evt in google_events} - # Phase 3: Identify deleted entries - logger.info("Phase 3: Processing deleted entries") - for row in rows: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_exists = str(frnr) in adv_map if frnr else False - google_exists = event_id in google_map if event_id else False + # Fetch existing DB rows + rows = await conn.fetch( + """ + SELECT * FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + """, + kuerzel + ) + logger.info(f"Fetched {len(rows)} existing sync rows") - if not adv_exists and not google_exists: - # Both missing - soft delete - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") - elif not adv_exists: - # Missing in Advoware - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Propagate delete to Google + # Build indexes + db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} + db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']} + + # Phase 1: New from Advoware => Google + logger.info("Phase 1: Processing new appointments from Advoware") + for frnr, app in adv_map.items(): + if frnr not in db_adv_index: try: - await delete_google_event(service, calendar_id, event_id) + event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); + """, + kuerzel, int(frnr), event_id + ) + logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}") except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Recreate in Advoware + logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") + + # Phase 2: New from Google => Advoware + logger.info("Phase 2: Processing new events from Google") + for event_id, evt in google_map.items(): + if event_id not in db_google_index: try: - new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(google_map[event_id], 'google'), employee_kuerzel, row['advoware_write_allowed']) - if new_frnr and str(new_frnr) != 'None': + frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) + if frnr and str(frnr) != 'None': async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") + await conn.execute( + """ + INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) + VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); + """, + kuerzel, int(frnr), event_id + ) + logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}") else: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None") except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # For other cases, propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # Propagate delete to Google - try: - await delete_google_event(service, calendar_id, event_id) + logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") + + # Phase 3: Identify deleted entries + logger.info("Phase 3: Processing deleted entries") + for row in rows: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_exists = str(frnr) in adv_map if frnr else False + google_exists = event_id in google_map if event_id else False + + if not adv_exists and not google_exists: + # Both missing - soft delete async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif not google_exists: - # Missing in Google - strategy = row['sync_strategy'] - if strategy == 'source_system_wins': - if row['source_system'] == 'google': - # Delete in Advoware - if row['advoware_write_allowed']: + logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)") + elif not adv_exists: + # Missing in Advoware + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Recreate in Advoware + try: + new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(google_map[event_id], 'google'), kuerzel, row['advoware_write_allowed']) + if new_frnr and str(new_frnr) != 'None': + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}") + else: + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + except Exception as e: + logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # For other cases, propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # Propagate delete to Google + try: + await delete_google_event(service, calendar_id, event_id) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif not google_exists: + # Missing in Google + strategy = row['sync_strategy'] + if strategy == 'source_system_wins': + if row['source_system'] == 'google': + # Delete in Advoware + if row['advoware_write_allowed']: + try: + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) + logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + elif row['source_system'] == 'advoware': + # Recreate in Google + try: + new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(adv_map[str(frnr)], 'advoware')) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") + except Exception as e: + logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + else: + # last_change_wins or other, propagate delete to Advoware try: await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) async with conn.transaction(): @@ -556,110 +603,90 @@ async def handler(event, context): logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - elif row['source_system'] == 'advoware': - # Recreate in Google + + # Phase 4: Update existing entries if changed + logger.info("Phase 4: Processing updates for existing entries") + for row in rows: + frnr = row['advoware_frnr'] + event_id = row['google_event_id'] + adv_data = adv_map.get(str(frnr)) if frnr else None + google_data = google_map.get(event_id) if event_id else None + + if adv_data and google_data: + adv_std = standardize_appointment_data(adv_data, 'advoware') + google_std = standardize_appointment_data(google_data, 'google') + strategy = row['sync_strategy'] try: - new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(adv_map[str(frnr)], 'advoware')) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}") + if strategy == 'source_system_wins': + if row['source_system'] == 'advoware': + # Check for changes in source (Advoware) or unauthorized changes in target (Google) + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts > row['last_sync']: + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") + elif google_ts and google_ts > row['last_sync']: + logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") + await update_google_event(service, calendar_id, event_id, adv_std) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") + elif row['source_system'] == 'google' and row['advoware_write_allowed']: + # Check for changes in source (Google) or unauthorized changes in target (Advoware) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) + logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") + if google_ts and google_ts > row['last_sync']: + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") + elif adv_ts > row['last_sync']: + logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) + logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") + elif strategy == 'last_change_wins': + adv_ts = await get_advoware_timestamp(advoware, frnr) + google_ts_str = google_data.get('updated', '') + google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None + if adv_ts and google_ts: + if adv_ts > google_ts: + await update_google_event(service, calendar_id, event_id, adv_std) + elif row['advoware_write_allowed']: + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + async with conn.transaction(): + await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) + logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") except Exception as e: - logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}") + logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - else: - # last_change_wins or other, propagate delete to Advoware - try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) - logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) - # Phase 4: Update existing entries if changed - logger.info("Phase 4: Processing updates for existing entries") - for row in rows: - frnr = row['advoware_frnr'] - event_id = row['google_event_id'] - adv_data = adv_map.get(str(frnr)) if frnr else None - google_data = google_map.get(event_id) if event_id else None + # Update last_sync timestamps + logger.debug("Updated last_sync timestamps") - if adv_data and google_data: - adv_std = standardize_appointment_data(adv_data, 'advoware') - google_std = standardize_appointment_data(google_data, 'google') - strategy = row['sync_strategy'] - try: - if strategy == 'source_system_wins': - if row['source_system'] == 'advoware': - # Check for changes in source (Advoware) or unauthorized changes in target (Google) - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts > row['last_sync']: - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}") - elif google_ts and google_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}") - await update_google_event(service, calendar_id, event_id, adv_std) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}") - elif row['source_system'] == 'google' and row['advoware_write_allowed']: - # Check for changes in source (Google) or unauthorized changes in target (Advoware) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) - logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}") - if google_ts and google_ts > row['last_sync']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}") - elif adv_ts > row['last_sync']: - logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}") - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) - logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}") - elif strategy == 'last_change_wins': - adv_ts = await get_advoware_timestamp(advoware, frnr) - google_ts_str = google_data.get('updated', '') - google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None - if adv_ts and google_ts: - if adv_ts > google_ts: - await update_google_event(service, calendar_id, event_id, adv_std) - elif row['advoware_write_allowed']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) - logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}") - except Exception as e: - logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}") - async with conn.transaction(): - await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + finally: + await conn.close() - # Update last_sync timestamps - logger.debug("Updated last_sync timestamps") + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + logger.info(f"Calendar sync completed for {kuerzel}") + total_synced += 1 - finally: - await conn.close() + except Exception as e: + logger.error(f"Sync failed for {kuerzel}: {e}") + redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) - logger.info(f"Calendar sync completed for {employee_kuerzel}") - return {'status': 200, 'body': {'status': 'completed'}} + logger.info(f"Calendar sync completed for all employees. Total synced: {total_synced}") + return {'status': 200, 'body': {'status': 'completed', 'total_synced': total_synced}} except Exception as e: - logger.error(f"Sync failed for {employee_kuerzel}: {e}") - redis_client.delete(CALENDAR_SYNC_LOCK_KEY) + logger.error(f"Sync failed: {e}") return {'status': 500, 'body': {'error': str(e)}} # Motia Step Configuration