Fix logging to appear in Motia workbench

- Updated log_operation to use context.logger.warn for warnings
- Added context parameter to all functions with logging
- Replaced all direct logger calls with log_operation calls
- Ensured all logging goes through context.logger for workbench visibility
- Adjusted backoff base from 4 to 3 for faster retries
- Added debug kuerzel list support in cron step
This commit is contained in:
root
2025-10-24 07:04:57 +00:00
parent 72ee01b74b
commit f4490f21cb
3 changed files with 101 additions and 98 deletions

View File

@@ -37,5 +37,5 @@ class Config:
# Calendar Sync settings
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO').split(',')]
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO,OK,BI,ST,UR,PB,VB').split(',')]
ADVOWARE_WRITE_PROTECTION = True

View File

@@ -74,20 +74,20 @@
"id": "advoware",
"config": {
"steps/advoware_proxy/advoware_api_proxy_put_step.py": {
"x": 168,
"y": -54
"x": -7,
"y": 7
},
"steps/advoware_proxy/advoware_api_proxy_post_step.py": {
"x": -340,
"y": -2
},
"steps/advoware_proxy/advoware_api_proxy_get_step.py": {
"x": 12,
"y": 406
"x": -334,
"y": 193
},
"steps/advoware_proxy/advoware_api_proxy_delete_step.py": {
"x": 600,
"y": 0
"x": 18,
"y": 204
},
"steps/advoware_cal_sync/calendar_sync_event_step.py": {
"x": 395,

View File

@@ -38,7 +38,10 @@ def log_operation(level, message, context=None, **context_vars):
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
context.logger.warning(full_message)
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
elif level == 'debug':
@@ -53,7 +56,7 @@ def log_operation(level, message, context=None, **context_vars):
elif level == 'debug':
logger.debug(full_message)
async def connect_db():
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
@@ -65,10 +68,10 @@ async def connect_db():
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service():
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
@@ -80,11 +83,11 @@ async def get_google_service():
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel, context=None):
"""Ensure Google Calendar exists for employee."""
calendar_name = f"AW-{employee_kuerzel}"
try:
@@ -107,13 +110,13 @@ async def ensure_google_calendar(service, employee_kuerzel):
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
return calendar_id
except HttpError as e:
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context)
raise
async def fetch_advoware_appointments(advoware, employee_kuerzel):
async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None):
"""Fetch Advoware appointments in range."""
try:
params = {
@@ -122,16 +125,16 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel):
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
logger.debug(f"Raw Advoware API response: {result}")
log_operation('debug', f"Raw Advoware API response: {result}", context=context)
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context)
return appointments
except Exception as e:
logger.error(f"Failed to fetch Advoware appointments: {e}")
log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id, context=None):
"""Fetch Google events in range."""
try:
time_min = f"{current_year - 2}-01-01T00:00:00Z"
@@ -156,16 +159,16 @@ async def fetch_google_events(service, calendar_id):
break
events = [evt for evt in all_events if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}")
log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context)
return events
except HttpError as e:
logger.error(f"Google API error fetching events: {e}")
log_operation('error', f"Google API error fetching events: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to fetch Google events: {e}")
log_operation('error', f"Failed to fetch Google events: {e}", context=context)
raise
def generate_rrule(turnus, turnus_art, datum_bis):
def generate_rrule(turnus, turnus_art, datum_bis, context=None):
"""Generate RRULE string from Advoware turnus and turnusArt."""
freq_map = {
1: 'DAILY',
@@ -188,11 +191,11 @@ def generate_rrule(turnus, turnus_art, datum_bis):
max_until = datetime.datetime.now() + timedelta(days=730) # 2 years
if bis_dt > max_until:
bis_dt = max_until
logger.info(f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits")
log_operation('info', f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits", context=context)
until_date = bis_dt.strftime('%Y%m%d')
except ValueError:
logger.warning(f"Invalid datum_bis: {datum_bis}, skipping recurrence")
log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context)
return None
rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}"
@@ -274,7 +277,7 @@ def build_notiz(original_notiz, time_breakdown, duration_capped):
notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)")
return "\n".join(notiz_parts)
def standardize_appointment_data(data, source):
def standardize_appointment_data(data, source, context=None):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
duration_capped = False
start_dt, end_dt = parse_times(data, source)
@@ -323,7 +326,7 @@ def standardize_appointment_data(data, source):
turnus_art = data.get('turnusArt', 1)
datum_bis = data.get('datumBis', '')
if datum_bis:
recurrence = generate_rrule(turnus, turnus_art, datum_bis)
recurrence = generate_rrule(turnus, turnus_art, datum_bis, context)
if recurrence:
recurrence = [recurrence]
@@ -360,7 +363,7 @@ def standardize_appointment_data(data, source):
'recurrence': recurrence
}
async def create_advoware_appointment(advoware, data, employee_kuerzel):
async def create_advoware_appointment(advoware, data, employee_kuerzel, context=None):
"""Create Advoware appointment from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -379,15 +382,15 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel):
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
logger.debug(f"Raw Advoware POST response: {result}")
log_operation('debug', f"Raw Advoware POST response: {result}", context=context)
frnr = str(result.get('frNr') or result.get('frnr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context)
return frnr
except Exception as e:
logger.error(f"Failed to create Advoware appointment: {e}")
log_operation('error', f"Failed to create Advoware appointment: {e}", context=context)
raise
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context=None):
"""Update Advoware appointment."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -407,22 +410,22 @@ async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
}
try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
logger.info(f"Updated Advoware appointment frNr: {frnr}")
log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context)
except Exception as e:
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context)
raise
async def delete_advoware_appointment(advoware, frnr):
async def delete_advoware_appointment(advoware, frnr, context=None):
"""Delete Advoware appointment."""
try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context)
except Exception as e:
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data, context=None):
"""Create Google event from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -446,17 +449,17 @@ async def create_google_event(service, calendar_id, data):
try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
logger.info(f"Created Google event ID: {event_id}")
log_operation('info', f"Created Google event ID: {event_id}", context=context)
return event_id
except HttpError as e:
logger.error(f"Google API error creating event: {e}")
log_operation('error', f"Google API error creating event: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to create Google event: {e}")
log_operation('error', f"Failed to create Google event: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data, context=None):
"""Update Google event."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -479,79 +482,79 @@ async def update_google_event(service, calendar_id, event_id, data):
}
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
logger.info(f"Updated Google event ID: {event_id}")
log_operation('info', f"Updated Google event ID: {event_id}", context=context)
except HttpError as e:
logger.error(f"Google API error updating event {event_id}: {e}")
log_operation('error', f"Google API error updating event {event_id}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to update Google event {event_id}: {e}")
log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id, context=None):
"""Delete Google event."""
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
logger.info(f"Deleted Google event ID: {event_id}")
log_operation('info', f"Deleted Google event ID: {event_id}", context=context)
except HttpError as e:
logger.error(f"Google API error deleting event {event_id}: {e}")
log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to delete Google event {event_id}: {e}")
log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context)
raise
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed, context=None):
"""Safe wrapper for creating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware create")
log_operation('warning', "Global write protection active, skipping Advoware create", context=context)
return None
if not write_allowed:
logger.warning("Cannot create in Advoware, write not allowed")
log_operation('warning', "Cannot create in Advoware, write not allowed", context=context)
return None
return await create_advoware_appointment(advoware, data, employee_kuerzel)
return await create_advoware_appointment(advoware, data, employee_kuerzel, context)
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed, context=None):
"""Safe wrapper for deleting Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware delete")
log_operation('warning', "Global write protection active, skipping Advoware delete", context=context)
return
if not write_allowed:
logger.warning("Cannot delete in Advoware, write not allowed")
log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context)
return
await delete_advoware_appointment(advoware, frnr)
await delete_advoware_appointment(advoware, frnr, context)
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel, context=None):
"""Safe wrapper for updating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware update")
log_operation('warning', "Global write protection active, skipping Advoware update", context=context)
return
if not write_allowed:
logger.warning("Cannot update in Advoware, write not allowed")
log_operation('warning', "Cannot update in Advoware, write not allowed", context=context)
return
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel)
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context)
async def safe_advoware_operation(operation, write_allowed, *args, **kwargs):
async def safe_advoware_operation(operation, write_allowed, context=None, *args, **kwargs):
"""Generic safe wrapper for Advoware operations with write permission check."""
if Config.ADVOWARE_WRITE_PROTECTION:
log_operation('warning', "Global write protection active, skipping Advoware operation")
log_operation('warning', "Global write protection active, skipping Advoware operation", context=context)
return None
if not write_allowed:
log_operation('warning', "Cannot perform operation in Advoware, write not allowed")
log_operation('warning', "Cannot perform operation in Advoware, write not allowed", context=context)
return None
return await operation(*args, **kwargs)
async def get_advoware_employees(advoware):
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
logger.info(f"Fetched {len(employees)} Advoware employees")
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
logger.error(f"Failed to fetch Advoware employees: {e}")
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
async def get_advoware_timestamp(advoware, frnr):
async def get_advoware_timestamp(advoware, frnr, context=None):
"""Fetch the last modified timestamp for an Advoware appointment."""
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
@@ -562,7 +565,7 @@ async def get_advoware_timestamp(advoware, frnr):
return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str))
return None
except Exception as e:
logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}")
log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context)
return None
def get_timestamps(adv_data, google_data):
@@ -587,7 +590,7 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel,
for frnr, app in state['adv_map'].items():
if frnr not in state['db_adv_index']:
try:
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware', context), context)
async with conn.transaction():
await conn.execute(
"""
@@ -613,7 +616,7 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad
if not is_already_synced:
try:
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True)
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context)
if frnr and str(frnr) != 'None':
async with conn.transaction():
await conn.execute(
@@ -664,7 +667,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
if row['source_system'] == 'advoware':
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -676,7 +679,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Recreate in Advoware
try:
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed'])
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google', context), kuerzel, row['advoware_write_allowed'], context)
if new_frnr and str(new_frnr) != 'None':
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
@@ -693,7 +696,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# For other cases, propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -705,7 +708,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -722,7 +725,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
# Delete in Advoware
if row['advoware_write_allowed']:
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
@@ -737,7 +740,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
elif row['source_system'] == 'advoware':
# Recreate in Google
try:
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware'))
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context)
@@ -750,7 +753,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# last_change_wins or other, propagate delete to Advoware
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
@@ -794,8 +797,8 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
processed_master_events.add(master_event_id)
if adv_data and google_data:
adv_std = standardize_appointment_data(adv_data, 'advoware')
google_std = standardize_appointment_data(google_data, 'google')
adv_std = standardize_appointment_data(adv_data, 'advoware', context)
google_std = standardize_appointment_data(google_data, 'google', context)
strategy = row['sync_strategy']
try:
if strategy == 'source_system_wins':
@@ -805,7 +808,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts > row['last_sync']:
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context)
@@ -813,7 +816,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif google_ts and google_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context)
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context)
@@ -825,25 +828,25 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
log_operation('debug', f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}", context=context)
if google_ts and google_ts > row['last_sync']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context)
elif adv_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context)
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context)
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
adv_ts = await get_advoware_timestamp(advoware, frnr, context)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
elif row['advoware_write_allowed']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context)
@@ -878,11 +881,11 @@ async def handler(event_data, context):
advoware = AdvowareAPI(context)
log_operation('debug', "Initializing Google service", context=context)
service = await get_google_service()
service = await get_google_service(context)
log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=context)
calendar_id = await ensure_google_calendar(service, kuerzel)
calendar_id = await ensure_google_calendar(service, kuerzel, context)
conn = await connect_db()
conn = await connect_db(context)
try:
# Initialize state
state = {
@@ -920,9 +923,9 @@ async def handler(event_data, context):
async def reload_api_maps():
"""Reload API maps after creating new events in phases."""
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel)
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel, context)
state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')}
state['google_events'] = await fetch_google_events(service, calendar_id)
state['google_events'] = await fetch_google_events(service, calendar_id, context)
state['google_map'] = {evt['id']: evt for evt in state['google_events']}
log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map']))