Refaktorierung Calendar Sync: Event-driven Design, Fixes für mehrtägige Termine, Logging und Locking

- Refaktorierung zu event-driven Ansatz ohne PostgreSQL Hub
- Fixes für mehrtägige Termine: korrekte Verwendung von datumBis, Entfernung 24h-Limit
- Per-Employee Locking mit Redis
- Logging via context.logger für Motia Workbench
- Neue Schritte: calendar_sync_all_step.py, calendar_sync_cron_step.py
- Aktualisiertes README.md mit aktueller Architektur
- Workbench-Gruppierung: advoware-calendar-sync
This commit is contained in:
root
2025-10-24 19:13:41 +00:00
parent f4490f21cb
commit 9d40f47e19
7 changed files with 258 additions and 361 deletions

View File

@@ -44,8 +44,8 @@ def log_operation(level, message, context=None, **context_vars):
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
elif level == 'debug':
context.logger.debug(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
@@ -211,12 +211,17 @@ def parse_times(data, source):
start_time = data.get('uhrzeitVon') or '09:00:00'
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
end_date_str = data.get('datum', '')
# Use datumBis for end date if available, otherwise datum
end_date_str = data.get('datumBis', data.get('datum', ''))
if 'T' in end_date_str:
base_end_date = end_date_str.split('T')[0]
else:
base_end_date = end_date_str
end_time = data.get('uhrzeitBis', '10:00:00')
# Special handling: if end_time is '00:00:00' and it's multi-day, interpret as end of day
start_date_str = data.get('datum', '').split('T')[0] if 'T' in data.get('datum', '') else data.get('datum', '')
if end_time == '00:00:00' and base_end_date != start_date_str:
end_time = '23:59:59'
try:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
except ValueError:
@@ -286,7 +291,7 @@ def standardize_appointment_data(data, source, context=None):
adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data)
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
text = 'Advoware blocked'
text = f'Advoware (frNr: {data.get("frNr", "unknown")})'
ort = ''
original_notiz = ''
else:
@@ -311,15 +316,9 @@ def standardize_appointment_data(data, source, context=None):
return_end = adjusted_end
time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt")
notiz = build_notiz(original_notiz, time_breakdown, duration_capped)
notiz = build_notiz(original_notiz, time_breakdown, False) # No duration capping
start_dt, end_dt = adjusted_start, adjusted_end
duration = end_dt - start_dt
max_duration = timedelta(hours=24)
if duration > max_duration:
end_dt = start_dt + max_duration
duration_capped = True
recurrence = None
if data.get('dauertermin', 0) == 1:
turnus = data.get('turnus', 1)
@@ -850,7 +849,6 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():