diff --git a/bitbylaw/config.py b/bitbylaw/config.py index e45e77a2..4eded45a 100644 --- a/bitbylaw/config.py +++ b/bitbylaw/config.py @@ -37,5 +37,5 @@ class Config: # Calendar Sync settings CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true' - CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO').split(',')] + CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO,OK,BI,ST,UR,PB,VB').split(',')] ADVOWARE_WRITE_PROTECTION = True \ No newline at end of file diff --git a/bitbylaw/motia-workbench.json b/bitbylaw/motia-workbench.json index bab792d2..e23eb9dd 100644 --- a/bitbylaw/motia-workbench.json +++ b/bitbylaw/motia-workbench.json @@ -74,20 +74,20 @@ "id": "advoware", "config": { "steps/advoware_proxy/advoware_api_proxy_put_step.py": { - "x": 168, - "y": -54 + "x": -7, + "y": 7 }, "steps/advoware_proxy/advoware_api_proxy_post_step.py": { "x": -340, "y": -2 }, "steps/advoware_proxy/advoware_api_proxy_get_step.py": { - "x": 12, - "y": 406 + "x": -334, + "y": 193 }, "steps/advoware_proxy/advoware_api_proxy_delete_step.py": { - "x": 600, - "y": 0 + "x": 18, + "y": 204 }, "steps/advoware_cal_sync/calendar_sync_event_step.py": { "x": 395, diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 98ee5f16..3b554a0f 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -38,7 +38,10 @@ def log_operation(level, message, context=None, **context_vars): if level == 'info': context.logger.info(full_message) elif level == 'warning': - context.logger.warning(full_message) + if hasattr(context.logger, 'warn'): + context.logger.warn(full_message) + else: + context.logger.warning(full_message) elif level == 'error': context.logger.error(full_message) elif level == 'debug': @@ -53,7 +56,7 @@ def log_operation(level, message, context=None, **context_vars): elif level == 'debug': logger.debug(full_message) -async def connect_db(): +async def connect_db(context=None): """Connect to Postgres DB from Config.""" try: conn = await asyncpg.connect( @@ -65,10 +68,10 @@ async def connect_db(): ) return conn except Exception as e: - logger.error(f"Failed to connect to DB: {e}") + log_operation('error', f"Failed to connect to DB: {e}", context=context) raise -async def get_google_service(): +async def get_google_service(context=None): """Initialize Google Calendar service.""" try: service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH @@ -80,11 +83,11 @@ async def get_google_service(): service = build('calendar', 'v3', credentials=creds) return service except Exception as e: - logger.error(f"Failed to initialize Google service: {e}") + log_operation('error', f"Failed to initialize Google service: {e}", context=context) raise -@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) -async def ensure_google_calendar(service, employee_kuerzel): +@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) +async def ensure_google_calendar(service, employee_kuerzel, context=None): """Ensure Google Calendar exists for employee.""" calendar_name = f"AW-{employee_kuerzel}" try: @@ -107,13 +110,13 @@ async def ensure_google_calendar(service, employee_kuerzel): service.acl().insert(calendarId=calendar_id, body=acl_rule).execute() return calendar_id except HttpError as e: - logger.error(f"Google API error for calendar {employee_kuerzel}: {e}") + log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context) raise except Exception as e: - logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}") + log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context) raise -async def fetch_advoware_appointments(advoware, employee_kuerzel): +async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None): """Fetch Advoware appointments in range.""" try: params = { @@ -122,16 +125,16 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel): 'to': FETCH_TO } result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params) - logger.debug(f"Raw Advoware API response: {result}") + log_operation('debug', f"Raw Advoware API response: {result}", context=context) appointments = result if isinstance(result, list) else [] - logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}") + log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context) return appointments except Exception as e: - logger.error(f"Failed to fetch Advoware appointments: {e}") + log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context) raise -@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504]) -async def fetch_google_events(service, calendar_id): +@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504]) +async def fetch_google_events(service, calendar_id, context=None): """Fetch Google events in range.""" try: time_min = f"{current_year - 2}-01-01T00:00:00Z" @@ -156,16 +159,16 @@ async def fetch_google_events(service, calendar_id): break events = [evt for evt in all_events if evt.get('status') != 'cancelled'] - logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}") + log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context) return events except HttpError as e: - logger.error(f"Google API error fetching events: {e}") + log_operation('error', f"Google API error fetching events: {e}", context=context) raise except Exception as e: - logger.error(f"Failed to fetch Google events: {e}") + log_operation('error', f"Failed to fetch Google events: {e}", context=context) raise -def generate_rrule(turnus, turnus_art, datum_bis): +def generate_rrule(turnus, turnus_art, datum_bis, context=None): """Generate RRULE string from Advoware turnus and turnusArt.""" freq_map = { 1: 'DAILY', @@ -188,11 +191,11 @@ def generate_rrule(turnus, turnus_art, datum_bis): max_until = datetime.datetime.now() + timedelta(days=730) # 2 years if bis_dt > max_until: bis_dt = max_until - logger.info(f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits") + log_operation('info', f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits", context=context) until_date = bis_dt.strftime('%Y%m%d') except ValueError: - logger.warning(f"Invalid datum_bis: {datum_bis}, skipping recurrence") + log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context) return None rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}" @@ -274,7 +277,7 @@ def build_notiz(original_notiz, time_breakdown, duration_capped): notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)") return "\n".join(notiz_parts) -def standardize_appointment_data(data, source): +def standardize_appointment_data(data, source, context=None): """Standardize data from Advoware or Google to comparable dict, with TZ handling.""" duration_capped = False start_dt, end_dt = parse_times(data, source) @@ -323,7 +326,7 @@ def standardize_appointment_data(data, source): turnus_art = data.get('turnusArt', 1) datum_bis = data.get('datumBis', '') if datum_bis: - recurrence = generate_rrule(turnus, turnus_art, datum_bis) + recurrence = generate_rrule(turnus, turnus_art, datum_bis, context) if recurrence: recurrence = [recurrence] @@ -360,7 +363,7 @@ def standardize_appointment_data(data, source): 'recurrence': recurrence } -async def create_advoware_appointment(advoware, data, employee_kuerzel): +async def create_advoware_appointment(advoware, data, employee_kuerzel, context=None): """Create Advoware appointment from standardized data.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) @@ -379,15 +382,15 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel): } try: result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data) - logger.debug(f"Raw Advoware POST response: {result}") + log_operation('debug', f"Raw Advoware POST response: {result}", context=context) frnr = str(result.get('frNr') or result.get('frnr')) - logger.info(f"Created Advoware appointment frNr: {frnr}") + log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context) return frnr except Exception as e: - logger.error(f"Failed to create Advoware appointment: {e}") + log_operation('error', f"Failed to create Advoware appointment: {e}", context=context) raise -async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel): +async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context=None): """Update Advoware appointment.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) @@ -407,22 +410,22 @@ async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel): } try: await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data) - logger.info(f"Updated Advoware appointment frNr: {frnr}") + log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context) except Exception as e: - logger.error(f"Failed to update Advoware appointment {frnr}: {e}") + log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context) raise -async def delete_advoware_appointment(advoware, frnr): +async def delete_advoware_appointment(advoware, frnr, context=None): """Delete Advoware appointment.""" try: await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr}) - logger.info(f"Deleted Advoware appointment frNr: {frnr}") + log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context) except Exception as e: - logger.error(f"Failed to delete Advoware appointment {frnr}: {e}") + log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context) raise -@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) -async def create_google_event(service, calendar_id, data): +@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) +async def create_google_event(service, calendar_id, data, context=None): """Create Google event from standardized data.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) @@ -446,17 +449,17 @@ async def create_google_event(service, calendar_id, data): try: created = service.events().insert(calendarId=calendar_id, body=event_body).execute() event_id = created['id'] - logger.info(f"Created Google event ID: {event_id}") + log_operation('info', f"Created Google event ID: {event_id}", context=context) return event_id except HttpError as e: - logger.error(f"Google API error creating event: {e}") + log_operation('error', f"Google API error creating event: {e}", context=context) raise except Exception as e: - logger.error(f"Failed to create Google event: {e}") + log_operation('error', f"Failed to create Google event: {e}", context=context) raise -@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) -async def update_google_event(service, calendar_id, event_id, data): +@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) +async def update_google_event(service, calendar_id, event_id, data, context=None): """Update Google event.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) @@ -479,79 +482,79 @@ async def update_google_event(service, calendar_id, event_id, data): } try: service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute() - logger.info(f"Updated Google event ID: {event_id}") + log_operation('info', f"Updated Google event ID: {event_id}", context=context) except HttpError as e: - logger.error(f"Google API error updating event {event_id}: {e}") + log_operation('error', f"Google API error updating event {event_id}: {e}", context=context) raise except Exception as e: - logger.error(f"Failed to update Google event {event_id}: {e}") + log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context) raise -@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) -async def delete_google_event(service, calendar_id, event_id): +@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) +async def delete_google_event(service, calendar_id, event_id, context=None): """Delete Google event.""" try: service.events().delete(calendarId=calendar_id, eventId=event_id).execute() - logger.info(f"Deleted Google event ID: {event_id}") + log_operation('info', f"Deleted Google event ID: {event_id}", context=context) except HttpError as e: - logger.error(f"Google API error deleting event {event_id}: {e}") + log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context) raise except Exception as e: - logger.error(f"Failed to delete Google event {event_id}: {e}") + log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context) raise -async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed): +async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed, context=None): """Safe wrapper for creating Advoware appointments with write permission check and global protection.""" if Config.ADVOWARE_WRITE_PROTECTION: - logger.warning("Global write protection active, skipping Advoware create") + log_operation('warning', "Global write protection active, skipping Advoware create", context=context) return None if not write_allowed: - logger.warning("Cannot create in Advoware, write not allowed") + log_operation('warning', "Cannot create in Advoware, write not allowed", context=context) return None - return await create_advoware_appointment(advoware, data, employee_kuerzel) + return await create_advoware_appointment(advoware, data, employee_kuerzel, context) -async def safe_delete_advoware_appointment(advoware, frnr, write_allowed): +async def safe_delete_advoware_appointment(advoware, frnr, write_allowed, context=None): """Safe wrapper for deleting Advoware appointments with write permission check and global protection.""" if Config.ADVOWARE_WRITE_PROTECTION: - logger.warning("Global write protection active, skipping Advoware delete") + log_operation('warning', "Global write protection active, skipping Advoware delete", context=context) return if not write_allowed: - logger.warning("Cannot delete in Advoware, write not allowed") + log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context) return - await delete_advoware_appointment(advoware, frnr) + await delete_advoware_appointment(advoware, frnr, context) -async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel): +async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel, context=None): """Safe wrapper for updating Advoware appointments with write permission check and global protection.""" if Config.ADVOWARE_WRITE_PROTECTION: - logger.warning("Global write protection active, skipping Advoware update") + log_operation('warning', "Global write protection active, skipping Advoware update", context=context) return if not write_allowed: - logger.warning("Cannot update in Advoware, write not allowed") + log_operation('warning', "Cannot update in Advoware, write not allowed", context=context) return - await update_advoware_appointment(advoware, frnr, data, employee_kuerzel) + await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context) -async def safe_advoware_operation(operation, write_allowed, *args, **kwargs): +async def safe_advoware_operation(operation, write_allowed, context=None, *args, **kwargs): """Generic safe wrapper for Advoware operations with write permission check.""" if Config.ADVOWARE_WRITE_PROTECTION: - log_operation('warning', "Global write protection active, skipping Advoware operation") + log_operation('warning', "Global write protection active, skipping Advoware operation", context=context) return None if not write_allowed: - log_operation('warning', "Cannot perform operation in Advoware, write not allowed") + log_operation('warning', "Cannot perform operation in Advoware, write not allowed", context=context) return None return await operation(*args, **kwargs) -async def get_advoware_employees(advoware): +async def get_advoware_employees(advoware, context=None): """Fetch list of employees from Advoware.""" try: result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) employees = result if isinstance(result, list) else [] - logger.info(f"Fetched {len(employees)} Advoware employees") + log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context) return employees except Exception as e: - logger.error(f"Failed to fetch Advoware employees: {e}") + log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context) raise -async def get_advoware_timestamp(advoware, frnr): +async def get_advoware_timestamp(advoware, frnr, context=None): """Fetch the last modified timestamp for an Advoware appointment.""" try: result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr}) @@ -562,7 +565,7 @@ async def get_advoware_timestamp(advoware, frnr): return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str)) return None except Exception as e: - logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}") + log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context) return None def get_timestamps(adv_data, google_data): @@ -587,7 +590,7 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel, for frnr, app in state['adv_map'].items(): if frnr not in state['db_adv_index']: try: - event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware')) + event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware', context), context) async with conn.transaction(): await conn.execute( """ @@ -613,7 +616,7 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad if not is_already_synced: try: - frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True) + frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context) if frnr and str(frnr) != 'None': async with conn.transaction(): await conn.execute( @@ -664,7 +667,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad if row['source_system'] == 'advoware': # Propagate delete to Google try: - await delete_google_event(service, calendar_id, event_id) + await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) @@ -676,7 +679,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Recreate in Advoware try: - new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed']) + new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google', context), kuerzel, row['advoware_write_allowed'], context) if new_frnr and str(new_frnr) != 'None': async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) @@ -693,7 +696,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad else: # For other cases, propagate delete to Google try: - await delete_google_event(service, calendar_id, event_id) + await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) @@ -705,7 +708,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad else: # Propagate delete to Google try: - await delete_google_event(service, calendar_id, event_id) + await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) @@ -722,7 +725,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad # Delete in Advoware if row['advoware_write_allowed']: try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) @@ -737,7 +740,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad elif row['source_system'] == 'advoware': # Recreate in Google try: - new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware')) + new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context) @@ -750,7 +753,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad else: # last_change_wins or other, propagate delete to Advoware try: - await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed']) + await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) @@ -794,8 +797,8 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, processed_master_events.add(master_event_id) if adv_data and google_data: - adv_std = standardize_appointment_data(adv_data, 'advoware') - google_std = standardize_appointment_data(google_data, 'google') + adv_std = standardize_appointment_data(adv_data, 'advoware', context) + google_std = standardize_appointment_data(google_data, 'google', context) strategy = row['sync_strategy'] try: if strategy == 'source_system_wins': @@ -805,7 +808,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, google_ts_str = google_data.get('updated', '') google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None if adv_ts > row['last_sync']: - await update_google_event(service, calendar_id, event_id, adv_std) + await update_google_event(service, calendar_id, event_id, adv_std, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context) @@ -813,7 +816,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, await asyncio.sleep(0.1) # Small delay to avoid rate limits elif google_ts and google_ts > row['last_sync']: log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context) - await update_google_event(service, calendar_id, event_id, adv_std) + await update_google_event(service, calendar_id, event_id, adv_std, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context) @@ -825,25 +828,25 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) log_operation('debug', f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}", context=context) if google_ts and google_ts > row['last_sync']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context) elif adv_ts > row['last_sync']: log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context) - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context) elif strategy == 'last_change_wins': - adv_ts = await get_advoware_timestamp(advoware, frnr) + adv_ts = await get_advoware_timestamp(advoware, frnr, context) google_ts_str = google_data.get('updated', '') google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None if adv_ts and google_ts: if adv_ts > google_ts: - await update_google_event(service, calendar_id, event_id, adv_std) + await update_google_event(service, calendar_id, event_id, adv_std, context) elif row['advoware_write_allowed']: - await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel']) + await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context) @@ -878,11 +881,11 @@ async def handler(event_data, context): advoware = AdvowareAPI(context) log_operation('debug', "Initializing Google service", context=context) - service = await get_google_service() + service = await get_google_service(context) log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=context) - calendar_id = await ensure_google_calendar(service, kuerzel) + calendar_id = await ensure_google_calendar(service, kuerzel, context) - conn = await connect_db() + conn = await connect_db(context) try: # Initialize state state = { @@ -920,9 +923,9 @@ async def handler(event_data, context): async def reload_api_maps(): """Reload API maps after creating new events in phases.""" - state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) + state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel, context) state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} - state['google_events'] = await fetch_google_events(service, calendar_id) + state['google_events'] = await fetch_google_events(service, calendar_id, context) state['google_map'] = {evt['id']: evt for evt in state['google_events']} log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map']))