fix: replace log_operation() with ctx.logger/context.logger in calendar_sync_event_step – comply with INDEX.md logging standard

This commit is contained in:
bsiggel
2026-03-31 07:02:41 +00:00
parent c5299b2889
commit 1f302e0de8

View File

@@ -111,16 +111,16 @@ async def enforce_global_rate_limit(context=None):
added, wait_time = result[0], result[1] added, wait_time = result[0], result[1]
if added: if added:
log_operation('debug', "Rate limit acquired successfully", context=context) context.logger.debug("Rate limit acquired successfully")
return return
# Add jitter for smoothing # Add jitter for smoothing
wait_time += random.uniform(0, JITTER_MAX) wait_time += random.uniform(0, JITTER_MAX)
log_operation('debug', f"Rate limit: waiting {wait_time:.2f}s before retry", context=context) context.logger.debug(f"Rate limit: waiting {wait_time:.2f}s before retry")
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
except Exception as e: except Exception as e:
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context) context.logger.error(f"Rate limiting failed: {e}. Proceeding without limit.")
finally: finally:
# Always close Redis connection to prevent resource leaks # Always close Redis connection to prevent resource leaks
try: try:
@@ -164,7 +164,7 @@ async def ensure_google_calendar(service, employee_kuerzel: str, context=None):
} }
created = service.calendars().insert(body=calendar_body).execute() created = service.calendars().insert(body=calendar_body).execute()
calendar_id = created['id'] calendar_id = created['id']
log_operation('info', f"Created new Google calendar {calendar_name} with ID {calendar_id}", context=context) context.logger.info(f"Created new Google calendar {calendar_name} with ID {calendar_id}")
# Check and add ACL if needed # Check and add ACL if needed
await enforce_global_rate_limit(context) await enforce_global_rate_limit(context)
@@ -184,14 +184,14 @@ async def ensure_google_calendar(service, employee_kuerzel: str, context=None):
'role': 'owner' 'role': 'owner'
} }
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute() service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
log_operation('info', f"Added ACL rule for calendar {calendar_name} (ID: {calendar_id})", context=context) context.logger.info(f"Added ACL rule for calendar {calendar_name} (ID: {calendar_id})")
return calendar_id return calendar_id
except HttpError as e: except HttpError as e:
log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context) context.logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
raise raise
except Exception as e: except Exception as e:
log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context) context.logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
raise raise
@@ -205,10 +205,10 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel: str, context=N
} }
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params) result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
appointments = result if isinstance(result, list) else [] appointments = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context) context.logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
return appointments return appointments
except Exception as e: except Exception as e:
log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context) context.logger.error(f"Failed to fetch Advoware appointments: {e}")
raise raise
@@ -240,13 +240,13 @@ async def fetch_google_events(service, calendar_id: str, context=None):
break break
events = [evt for evt in all_events if evt.get('status') != 'cancelled'] events = [evt for evt in all_events if evt.get('status') != 'cancelled']
log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context) context.logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}")
return events return events
except HttpError as e: except HttpError as e:
log_operation('error', f"Google API error fetching events: {e}", context=context) context.logger.error(f"Google API error fetching events: {e}")
raise raise
except Exception as e: except Exception as e:
log_operation('error', f"Failed to fetch Google events: {e}", context=context) context.logger.error(f"Failed to fetch Google events: {e}")
raise raise
@@ -273,11 +273,11 @@ def generate_rrule(turnus, turnus_art, datum_bis, context=None):
max_until = datetime.datetime.now() + timedelta(days=730) max_until = datetime.datetime.now() + timedelta(days=730)
if bis_dt > max_until: if bis_dt > max_until:
bis_dt = max_until bis_dt = max_until
log_operation('info', f"Limited recurrence until date to {bis_dt.date()}", context=context) context.logger.info(f"Limited recurrence until date to {bis_dt.date()}")
until_date = bis_dt.strftime('%Y%m%d') until_date = bis_dt.strftime('%Y%m%d')
except ValueError: except ValueError:
log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context) context.logger.warn(f"Invalid datum_bis: {datum_bis}, skipping recurrence")
return None return None
rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}" rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}"
@@ -481,10 +481,10 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel: str, con
try: try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data) result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
frnr = str(result.get('frNr') or result.get('frnr')) frnr = str(result.get('frNr') or result.get('frnr'))
log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context) context.logger.info(f"Created Advoware appointment frNr: {frnr}")
return frnr return frnr
except Exception as e: except Exception as e:
log_operation('error', f"Failed to create Advoware appointment: {e}", context=context) context.logger.error(f"Failed to create Advoware appointment: {e}")
raise raise
@@ -508,9 +508,9 @@ async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel: st
} }
try: try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data) await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context) context.logger.info(f"Updated Advoware appointment frNr: {frnr}")
except Exception as e: except Exception as e:
log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context) context.logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
raise raise
@@ -518,9 +518,9 @@ async def delete_advoware_appointment(advoware, frnr, context=None):
"""Delete Advoware appointment.""" """Delete Advoware appointment."""
try: try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr}) await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context) context.logger.info(f"Deleted Advoware appointment frNr: {frnr}")
except Exception as e: except Exception as e:
log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context) context.logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
raise raise
@@ -554,13 +554,13 @@ async def create_google_event(service, calendar_id: str, data, context=None):
try: try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute() created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id'] event_id = created['id']
log_operation('info', f"Created Google event ID: {event_id}", context=context) context.logger.info(f"Created Google event ID: {event_id}")
return event_id return event_id
except HttpError as e: except HttpError as e:
log_operation('error', f"Google API error creating event: {e}", context=context) context.logger.error(f"Google API error creating event: {e}")
raise raise
except Exception as e: except Exception as e:
log_operation('error', f"Failed to create Google event: {e}", context=context) context.logger.error(f"Failed to create Google event: {e}")
raise raise
@@ -593,12 +593,12 @@ async def update_google_event(service, calendar_id: str, event_id: str, data, co
try: try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute() service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
log_operation('info', f"Updated Google event ID: {event_id}", context=context) context.logger.info(f"Updated Google event ID: {event_id}")
except HttpError as e: except HttpError as e:
log_operation('error', f"Google API error updating event {event_id}: {e}", context=context) context.logger.error(f"Google API error updating event {event_id}: {e}")
raise raise
except Exception as e: except Exception as e:
log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context) context.logger.error(f"Failed to update Google event {event_id}: {e}")
raise raise
@@ -610,12 +610,12 @@ async def delete_google_event(service, calendar_id: str, event_id: str, context=
try: try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute() service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
log_operation('info', f"Deleted Google event ID: {event_id}", context=context) context.logger.info(f"Deleted Google event ID: {event_id}")
except HttpError as e: except HttpError as e:
log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context) context.logger.error(f"Google API error deleting event {event_id}: {e}")
raise raise
except Exception as e: except Exception as e:
log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context) context.logger.error(f"Failed to delete Google event {event_id}: {e}")
raise raise
@@ -623,10 +623,10 @@ async def safe_create_advoware_appointment(advoware, data, employee_kuerzel: str
"""Safe wrapper for creating Advoware appointments with write permission check.""" """Safe wrapper for creating Advoware appointments with write permission check."""
write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true'
if write_protection: if write_protection:
log_operation('warning', "Global write protection active, skipping Advoware create", context=context) context.logger.warn("Global write protection active, skipping Advoware create")
return None return None
if not write_allowed: if not write_allowed:
log_operation('warning', "Cannot create in Advoware, write not allowed", context=context) context.logger.warn("Cannot create in Advoware, write not allowed")
return None return None
return await create_advoware_appointment(advoware, data, employee_kuerzel, context) return await create_advoware_appointment(advoware, data, employee_kuerzel, context)
@@ -635,10 +635,10 @@ async def safe_delete_advoware_appointment(advoware, frnr, write_allowed: bool,
"""Safe wrapper for deleting Advoware appointments with write permission check.""" """Safe wrapper for deleting Advoware appointments with write permission check."""
write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true'
if write_protection: if write_protection:
log_operation('warning', "Global write protection active, skipping Advoware delete", context=context) context.logger.warn("Global write protection active, skipping Advoware delete")
return return
if not write_allowed: if not write_allowed:
log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context) context.logger.warn("Cannot delete in Advoware, write not allowed")
return return
await delete_advoware_appointment(advoware, frnr, context) await delete_advoware_appointment(advoware, frnr, context)
@@ -647,10 +647,10 @@ async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed:
"""Safe wrapper for updating Advoware appointments with write permission check.""" """Safe wrapper for updating Advoware appointments with write permission check."""
write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true'
if write_protection: if write_protection:
log_operation('warning', "Global write protection active, skipping Advoware update", context=context) context.logger.warn("Global write protection active, skipping Advoware update")
return return
if not write_allowed: if not write_allowed:
log_operation('warning', "Cannot update in Advoware, write not allowed", context=context) context.logger.warn("Cannot update in Advoware, write not allowed")
return return
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context) await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context)
@@ -666,13 +666,13 @@ async def get_advoware_timestamp(advoware, frnr, context=None):
return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str)) return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str))
return None return None
except Exception as e: except Exception as e:
log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context) context.logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}")
return None return None
async def process_new_from_advoware(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): async def process_new_from_advoware(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None):
"""Phase 1: Process new appointments from Advoware to Google.""" """Phase 1: Process new appointments from Advoware to Google."""
log_operation('info', "Phase 1: Processing new appointments from Advoware", context=context) context.logger.info("Phase 1: Processing new appointments from Advoware")
for frnr, app in state['adv_map'].items(): for frnr, app in state['adv_map'].items():
if frnr not in state['db_adv_index']: if frnr not in state['db_adv_index']:
try: try:
@@ -685,15 +685,15 @@ async def process_new_from_advoware(state, conn, service, calendar_id: str, kuer
""", """,
kuerzel, int(frnr), event_id kuerzel, int(frnr), event_id
) )
log_operation('info', f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}", context=context) context.logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}")
state['stats']['new_adv_to_google'] += 1 state['stats']['new_adv_to_google'] += 1
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 1: Failed to process new Advoware {frnr}: {e}", context=context) context.logger.warn(f"Phase 1: Failed to process new Advoware {frnr}: {e}")
async def process_new_from_google(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): async def process_new_from_google(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None):
"""Phase 2: Process new events from Google to Advoware.""" """Phase 2: Process new events from Google to Advoware."""
log_operation('info', "Phase 2: Processing new events from Google", context=context) context.logger.info("Phase 2: Processing new events from Google")
for event_id, evt in state['google_map'].items(): for event_id, evt in state['google_map'].items():
# Check if already synced (master or instance) # Check if already synced (master or instance)
recurring_master_id = evt.get('recurringEventId') recurring_master_id = evt.get('recurringEventId')
@@ -703,7 +703,7 @@ async def process_new_from_google(state, conn, service, calendar_id: str, kuerze
# Skip events that appear to be from Advoware # Skip events that appear to be from Advoware
summary = evt.get('summary', '') summary = evt.get('summary', '')
if 'Advoware' in summary and 'frNr' in summary: if 'Advoware' in summary and 'frNr' in summary:
log_operation('warning', f"Skipping sync back to Advoware for Google event {event_id} (summary: {summary})", context=context) context.logger.warn(f"Skipping sync back to Advoware for Google event {event_id} (summary: {summary})")
continue continue
try: try:
@@ -717,17 +717,17 @@ async def process_new_from_google(state, conn, service, calendar_id: str, kuerze
""", """,
kuerzel, int(frnr), event_id kuerzel, int(frnr), event_id
) )
log_operation('info', f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}", context=context) context.logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}")
state['stats']['new_google_to_adv'] += 1 state['stats']['new_google_to_adv'] += 1
else: else:
log_operation('warning', f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None", context=context) context.logger.warn(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 2: Failed to process new Google {event_id}: {e}", context=context) context.logger.warn(f"Phase 2: Failed to process new Google {event_id}: {e}")
async def process_deleted_entries(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): async def process_deleted_entries(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None):
"""Phase 3: Process deleted entries.""" """Phase 3: Process deleted entries."""
log_operation('info', "Phase 3: Processing deleted entries", context=context) context.logger.info("Phase 3: Processing deleted entries")
for row in state['rows']: for row in state['rows']:
frnr = row['advoware_frnr'] frnr = row['advoware_frnr']
event_id = row['google_event_id'] event_id = row['google_event_id']
@@ -749,7 +749,7 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
# Both missing - soft delete # Both missing - soft delete
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)", context=context) context.logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)")
state['stats']['deleted'] += 1 state['stats']['deleted'] += 1
elif not adv_exists: elif not adv_exists:
# Missing in Advoware - handle based on strategy # Missing in Advoware - handle based on strategy
@@ -761,9 +761,9 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
await delete_google_event(service, calendar_id, event_id, context) await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'google' and row['advoware_write_allowed']: elif row['source_system'] == 'google' and row['advoware_write_allowed']:
@@ -773,14 +773,14 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
if new_frnr and str(new_frnr) != 'None': if new_frnr and str(new_frnr) != 'None':
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}")
state['stats']['recreated'] += 1 state['stats']['recreated'] += 1
else: else:
log_operation('warning', f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None", context=context) context.logger.warn(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else: else:
@@ -789,9 +789,9 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
await delete_google_event(service, calendar_id, event_id, context) await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else: else:
@@ -800,9 +800,9 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
await delete_google_event(service, calendar_id, event_id, context) await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif not google_exists: elif not google_exists:
@@ -816,13 +816,13 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else: else:
log_operation('warning', f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed", context=context) context.logger.warn(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'advoware': elif row['source_system'] == 'advoware':
@@ -831,10 +831,10 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context) new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}")
state['stats']['recreated'] += 1 state['stats']['recreated'] += 1
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else: else:
@@ -843,16 +843,16 @@ async def process_deleted_entries(state, conn, service, calendar_id: str, kuerze
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
async def process_updates(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): async def process_updates(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None):
"""Phase 4: Process updates for existing entries.""" """Phase 4: Process updates for existing entries."""
log_operation('info', "Phase 4: Processing updates for existing entries", context=context) context.logger.info("Phase 4: Processing updates for existing entries")
# Track which master events we've already processed # Track which master events we've already processed
processed_master_events = set() processed_master_events = set()
@@ -898,14 +898,14 @@ async def process_updates(state, conn, service, calendar_id: str, kuerzel: str,
await update_google_event(service, calendar_id, event_id, adv_std, context) await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context) context.logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}")
state['stats']['updated'] += 1 state['stats']['updated'] += 1
elif google_ts and google_ts > row['last_sync']: elif google_ts and google_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context) context.logger.warn(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}")
await update_google_event(service, calendar_id, event_id, adv_std, context) await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context) context.logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}")
elif row['source_system'] == 'google' and row['advoware_write_allowed']: elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Check for changes in source (Google) # Check for changes in source (Google)
google_ts_str = google_data.get('updated', '') google_ts_str = google_data.get('updated', '')
@@ -915,13 +915,13 @@ async def process_updates(state, conn, service, calendar_id: str, kuerzel: str,
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context) context.logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}")
elif adv_ts > row['last_sync']: elif adv_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context) context.logger.warn(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}")
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context) context.logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}")
elif strategy == 'last_change_wins': elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr, context) adv_ts = await get_advoware_timestamp(advoware, frnr, context)
google_ts_str = google_data.get('updated', '') google_ts_str = google_data.get('updated', '')
@@ -933,9 +933,9 @@ async def process_updates(state, conn, service, calendar_id: str, kuerzel: str,
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context) context.logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}")
except Exception as e: except Exception as e:
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context) context.logger.warn(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}")
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
@@ -958,22 +958,22 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
kuerzel = input_data.get('kuerzel') kuerzel = input_data.get('kuerzel')
if not kuerzel: if not kuerzel:
log_operation('error', "No kuerzel provided in event", context=ctx) ctx.logger.error("No kuerzel provided in event")
return return
log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=ctx) ctx.logger.info(f"Starting calendar sync for employee {kuerzel}")
redis_client = get_redis_client(ctx) redis_client = get_redis_client(ctx)
service = None service = None
try: try:
log_operation('debug', "Initializing Advoware service", context=ctx) ctx.logger.debug("Initializing Advoware service")
advoware = AdvowareService(ctx) advoware = AdvowareService(ctx)
log_operation('debug', "Initializing Google service", context=ctx) ctx.logger.debug("Initializing Google service")
service = await get_google_service(ctx) service = await get_google_service(ctx)
log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=ctx) ctx.logger.debug(f"Ensuring Google calendar for {kuerzel}")
calendar_id = await ensure_google_calendar(service, kuerzel, ctx) calendar_id = await ensure_google_calendar(service, kuerzel, ctx)
conn = await connect_db(ctx) conn = await connect_db(ctx)
@@ -1010,7 +1010,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
for row in state['rows']: for row in state['rows']:
if row['google_event_id']: if row['google_event_id']:
state['db_google_index'][row['google_event_id']] = row state['db_google_index'][row['google_event_id']] = row
log_operation('debug', "Reloaded indexes", context=ctx, rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) ctx.logger.debug(f"Reloaded indexes (rows={len(state['rows'])}, adv={len(state['db_adv_index'])}, google={len(state['db_google_index'])})")
async def reload_api_maps(): async def reload_api_maps():
"""Reload API maps after creating new events.""" """Reload API maps after creating new events."""
@@ -1018,13 +1018,13 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')}
state['google_events'] = await fetch_google_events(service, calendar_id, ctx) state['google_events'] = await fetch_google_events(service, calendar_id, ctx)
state['google_map'] = {evt['id']: evt for evt in state['google_events']} state['google_map'] = {evt['id']: evt for evt in state['google_events']}
log_operation('debug', "Reloaded API maps", context=ctx, adv=len(state['adv_map']), google=len(state['google_map'])) ctx.logger.debug(f"Reloaded API maps (adv={len(state['adv_map'])}, google={len(state['google_map'])})")
# Initial fetch # Initial fetch
log_operation('info', "Fetching fresh data from APIs", context=ctx) ctx.logger.info("Fetching fresh data from APIs")
await reload_api_maps() await reload_api_maps()
await reload_db_indexes() await reload_db_indexes()
log_operation('info', "Fetched existing sync rows", context=ctx, count=len(state['rows'])) ctx.logger.info(f"Fetched existing sync rows (count={len(state['rows'])})")
# Phase 1: New from Advoware => Google # Phase 1: New from Advoware => Google
await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware, ctx) await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware, ctx)
@@ -1049,16 +1049,16 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
# Log final statistics # Log final statistics
stats = state['stats'] stats = state['stats']
log_operation('info', f"Sync statistics for {kuerzel}: New Adv->Google: {stats['new_adv_to_google']}, New Google->Adv: {stats['new_google_to_adv']}, Deleted: {stats['deleted']}, Updated: {stats['updated']}, Recreated: {stats['recreated']}", context=ctx) ctx.logger.info(f"Sync statistics for {kuerzel}: New Adv->Google: {stats['new_adv_to_google']}, New Google->Adv: {stats['new_google_to_adv']}, Deleted: {stats['deleted']}, Updated: {stats['updated']}, Recreated: {stats['recreated']}")
log_operation('info', f"Calendar sync completed for {kuerzel}", context=ctx) ctx.logger.info(f"Calendar sync completed for {kuerzel}")
log_operation('info', f"Handler duration: {time.time() - start_time}", context=ctx) ctx.logger.info(f"Handler duration: {time.time() - start_time}")
return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}}
except Exception as e: except Exception as e:
log_operation('error', f"Sync failed for {kuerzel}: {e}", context=ctx) ctx.logger.error(f"Sync failed for {kuerzel}: {e}")
log_operation('info', f"Handler duration (failed): {time.time() - start_time}", context=ctx) ctx.logger.info(f"Handler duration (failed): {time.time() - start_time}")
return {'status': 500, 'body': {'error': str(e)}} return {'status': 500, 'body': {'error': str(e)}}
finally: finally:
@@ -1067,12 +1067,12 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None:
try: try:
service.close() service.close()
except Exception as e: except Exception as e:
log_operation('debug', f"Error closing Google service: {e}", context=ctx) ctx.logger.debug(f"Error closing Google service: {e}")
try: try:
redis_client.close() redis_client.close()
except Exception as e: except Exception as e:
log_operation('debug', f"Error closing Redis client: {e}", context=ctx) ctx.logger.debug(f"Error closing Redis client: {e}")
# Ensure lock is always released # Ensure lock is always released
clear_employee_lock(redis_client, kuerzel, ctx) clear_employee_lock(redis_client, kuerzel, ctx)