""" Calendar Sync Event Step Main calendar synchronization logic between Google Calendar and Advoware. Handles bidirectional sync with 4-phase approach: 1. New appointments from Advoware → Google 2. New events from Google → Advoware 3. Deleted entries (propagate or soft-delete) 4. Updates to existing entries Uses PostgreSQL as sync state hub and Redis for rate limiting. """ import asyncio import os import datetime from datetime import timedelta from typing import Dict, Any import pytz import backoff import time import random from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google.oauth2 import service_account import redis import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from calendar_sync_utils import ( connect_db, get_google_service, log_operation, get_redis_client, clear_employee_lock ) from motia import queue, FlowContext from services.advoware_service import AdvowareService # Timezone for all operations BERLIN_TZ = pytz.timezone('Europe/Berlin') # Constants for date ranges now = datetime.datetime.now(BERLIN_TZ) current_year = now.year FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead # Rate limiting constants (Google Calendar API: 600/min) RATE_LIMIT_KEY = 'google_calendar_api_tokens' MAX_TOKENS = 7 REFILL_RATE_PER_MS = 7 / 1000 # 7 tokens per 1000ms MIN_WAIT = 0.1 # 100ms minimum wait JITTER_MAX = 0.1 # Random jitter 0-100ms async def enforce_global_rate_limit(context=None): """Global rate limiter for Google Calendar API using Redis token bucket.""" redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', '6379')), db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')), socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')) ) try: lua_script = """ local key = KEYS[1] local current_time_ms = tonumber(ARGV[1]) local max_tokens = tonumber(ARGV[2]) local refill_rate_per_ms = tonumber(ARGV[3]) local min_wait_ms = tonumber(ARGV[4]) local data = redis.call('HMGET', key, 'tokens', 'last_refill_ms') local tokens = tonumber(data[1]) or max_tokens local last_refill_ms = tonumber(data[2]) or current_time_ms -- Refill tokens based on elapsed time local elapsed_ms = current_time_ms - last_refill_ms local added_tokens = elapsed_ms * refill_rate_per_ms local new_tokens = math.min(max_tokens, tokens + added_tokens) local wait_ms = 0 if new_tokens < 1 then wait_ms = math.ceil((1 - new_tokens) / refill_rate_per_ms) else new_tokens = new_tokens - 1 end if wait_ms == 0 then redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill_ms', current_time_ms) redis.call('EXPIRE', key, 120) return {1, 0} else return {0, math.max(min_wait_ms, wait_ms) / 1000.0} end """ script = redis_client.register_script(lua_script) while True: current_time_ms = int(time.time() * 1000) result = script( keys=[RATE_LIMIT_KEY], args=[current_time_ms, MAX_TOKENS, REFILL_RATE_PER_MS, int(MIN_WAIT * 1000)] ) added, wait_time = result[0], result[1] if added: log_operation('debug', "Rate limit acquired successfully", context=context) return # Add jitter for smoothing wait_time += random.uniform(0, JITTER_MAX) log_operation('debug', f"Rate limit: waiting {wait_time:.2f}s before retry", context=context) await asyncio.sleep(wait_time) except Exception as e: log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context) finally: # Always close Redis connection to prevent resource leaks try: redis_client.close() except Exception: pass @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def ensure_google_calendar(service, employee_kuerzel: str, context=None): """Ensure Google Calendar exists for employee and has correct ACL.""" calendar_name = f"AW-{employee_kuerzel}" try: # Enforce rate limiting await enforce_global_rate_limit(context) # Fetch all calendars with pagination all_calendars = [] page_token = None while True: calendar_list = service.calendarList().list(pageToken=page_token, maxResults=250).execute() calendars = calendar_list.get('items', []) all_calendars.extend(calendars) page_token = calendar_list.get('nextPageToken') if not page_token: break calendar_id = None for calendar in all_calendars: if calendar['summary'] == calendar_name: calendar_id = calendar['id'] break if not calendar_id: # Create new calendar await enforce_global_rate_limit(context) calendar_body = { 'summary': calendar_name, 'timeZone': 'Europe/Berlin' } created = service.calendars().insert(body=calendar_body).execute() calendar_id = created['id'] log_operation('info', f"Created new Google calendar {calendar_name} with ID {calendar_id}", context=context) # Check and add ACL if needed await enforce_global_rate_limit(context) acl_list = service.acl().list(calendarId=calendar_id).execute() acl_exists = False for rule in acl_list.get('items', []): if (rule.get('scope', {}).get('type') == 'user' and rule.get('scope', {}).get('value') == 'lehmannundpartner@gmail.com' and rule.get('role') == 'owner'): acl_exists = True break if not acl_exists: await enforce_global_rate_limit(context) acl_rule = { 'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'}, 'role': 'owner' } service.acl().insert(calendarId=calendar_id, body=acl_rule).execute() log_operation('info', f"Added ACL rule for calendar {calendar_name} (ID: {calendar_id})", context=context) return calendar_id except HttpError as e: log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context) raise except Exception as e: log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context) raise async def fetch_advoware_appointments(advoware, employee_kuerzel: str, context=None): """Fetch Advoware appointments in range.""" try: params = { 'kuerzel': employee_kuerzel, 'from': FETCH_FROM, 'to': FETCH_TO } result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params) appointments = result if isinstance(result, list) else [] log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context) return appointments except Exception as e: log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context) raise @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504]) async def fetch_google_events(service, calendar_id: str, context=None): """Fetch Google events in range.""" try: time_min = f"{current_year - 2}-01-01T00:00:00Z" time_max = f"{current_year + 10}-12-31T23:59:59Z" all_events = [] page_token = None while True: await enforce_global_rate_limit(context) events_result = service.events().list( calendarId=calendar_id, timeMin=time_min, timeMax=time_max, singleEvents=True, orderBy='startTime', pageToken=page_token, maxResults=2500 ).execute() events_page = events_result.get('items', []) all_events.extend(events_page) page_token = events_result.get('nextPageToken') if not page_token: break events = [evt for evt in all_events if evt.get('status') != 'cancelled'] log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context) return events except HttpError as e: log_operation('error', f"Google API error fetching events: {e}", context=context) raise except Exception as e: log_operation('error', f"Failed to fetch Google events: {e}", context=context) raise def generate_rrule(turnus, turnus_art, datum_bis, context=None): """Generate RRULE string from Advoware turnus and turnusArt.""" freq_map = { 1: 'DAILY', 2: 'WEEKLY', 3: 'MONTHLY', 4: 'YEARLY' } if turnus_art not in freq_map: return None freq = freq_map[turnus_art] # Parse datum_bis to date and limit to max 2 years from now try: if 'T' in datum_bis: bis_dt = datetime.datetime.fromisoformat(datum_bis.replace('Z', '')) else: bis_dt = datetime.datetime.fromisoformat(datum_bis + 'T00:00:00') # Limit to max 2 years from now max_until = datetime.datetime.now() + timedelta(days=730) if bis_dt > max_until: bis_dt = max_until log_operation('info', f"Limited recurrence until date to {bis_dt.date()}", context=context) until_date = bis_dt.strftime('%Y%m%d') except ValueError: log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context) return None rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}" return rrule def parse_times(data, source): """Parse start and end times from data.""" if source == 'advoware': start_str = data.get('datum', '') if 'T' in start_str: start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', ''))) else: start_time = data.get('uhrzeitVon') or '09:00:00' start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}")) # Check if recurring event is_recurring = (data.get('dauertermin', 0) == 1 and (data.get('turnus', 0) > 0 or data.get('turnusArt', 0) > 0)) if is_recurring: end_date_str = data.get('datum', '') else: end_date_str = data.get('datumBis', data.get('datum', '')) if 'T' in end_date_str: base_end_date = end_date_str.split('T')[0] else: base_end_date = end_date_str end_time = data.get('uhrzeitBis', '10:00:00') start_date_str = data.get('datum', '').split('T')[0] if 'T' in data.get('datum', '') else data.get('datum', '') if end_time == '00:00:00' and base_end_date != start_date_str: end_time = '23:59:59' try: end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}")) except ValueError: end_dt = start_dt + timedelta(hours=1) elif source == 'google': start_obj = data.get('start', {}) end_obj = data.get('end', {}) if 'dateTime' in start_obj: start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) else: start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date'])) if 'dateTime' in end_obj: end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) else: end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date'])) return start_dt, end_dt def adjust_times(start_dt, end_dt, data): """Adjust times for preparation, travel, etc.""" vorbereitungs_dauer = data.get('vorbereitungsDauer', '00:00:00') fahrzeit = data.get('fahrzeit', '00:00:00') fahrt_anzeigen = data.get('fahrtAnzeigen', 0) try: vorb_h, vorb_m, vorb_s = map(int, vorbereitungs_dauer.split(':')) vorbereitung_td = timedelta(hours=vorb_h, minutes=vorb_m, seconds=vorb_s) except: vorbereitung_td = timedelta(0) try: fahrt_h, fahrt_m, fahrt_s = map(int, fahrzeit.split(':')) fahrt_td = timedelta(hours=fahrt_h, minutes=fahrt_m, seconds=fahrt_s) except: fahrt_td = timedelta(0) hinfahrt_td = timedelta(0) rueckfahrt_td = timedelta(0) if fahrt_anzeigen == 1: hinfahrt_td = fahrt_td elif fahrt_anzeigen == 2: rueckfahrt_td = fahrt_td elif fahrt_anzeigen == 3: hinfahrt_td = fahrt_td rueckfahrt_td = fahrt_td adjusted_start = start_dt - vorbereitung_td - hinfahrt_td adjusted_end = end_dt + rueckfahrt_td return adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td def build_notiz(original_notiz, time_breakdown, duration_capped): """Build the description string.""" notiz_parts = [] if original_notiz.strip(): notiz_parts.append(original_notiz.strip()) notiz_parts.append("Zeitaufteilung:") notiz_parts.extend(time_breakdown) if duration_capped: notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt") return "\n".join(notiz_parts) def standardize_appointment_data(data, source, context=None): """Standardize data from Advoware or Google to comparable dict.""" start_dt, end_dt = parse_times(data, source) if source == 'advoware': adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data) anonymize = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true' if anonymize: text = f'Advoware (frNr: {data.get("frNr", "unknown")})' ort = '' original_notiz = '' else: text = data.get('text', '') ort = data.get('ort', '') original_notiz = data.get('notiz', '') time_breakdown = [] if vorbereitung_td.total_seconds() > 0: vorb_start = adjusted_start vorb_end = adjusted_start + vorbereitung_td time_breakdown.append(f"{vorb_start.strftime('%H:%M')}-{vorb_end.strftime('%H:%M')} Vorbereitung") if hinfahrt_td.total_seconds() > 0: outbound_start = adjusted_start + vorbereitung_td outbound_end = adjusted_start + vorbereitung_td + hinfahrt_td time_breakdown.append(f"{outbound_start.strftime('%H:%M')}-{outbound_end.strftime('%H:%M')} Hinfahrt") appt_start = adjusted_start + vorbereitung_td + hinfahrt_td appt_end = adjusted_end - rueckfahrt_td time_breakdown.append(f"{appt_start.strftime('%H:%M')}-{appt_end.strftime('%H:%M')} Termin") if rueckfahrt_td.total_seconds() > 0: return_start = appt_end return_end = adjusted_end time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt") notiz = build_notiz(original_notiz, time_breakdown, False) start_dt, end_dt = adjusted_start, adjusted_end recurrence = None if data.get('dauertermin', 0) == 1: turnus = data.get('turnus', 1) turnus_art = data.get('turnusArt', 1) datum_bis = data.get('datumBis', '') if datum_bis: recurrence = generate_rrule(turnus, turnus_art, datum_bis, context) if recurrence: recurrence = [recurrence] return { 'start': start_dt, 'end': end_dt, 'text': text, 'notiz': notiz, 'ort': ort, 'dauertermin': data.get('dauertermin', 0), 'turnus': data.get('turnus', 0), 'turnusArt': data.get('turnusArt', 0), 'recurrence': recurrence } elif source == 'google': duration_days = (end_dt.date() - start_dt.date()).days dauertermin = 1 if data.get('start', {}).get('date') or duration_days > 1 else 0 recurrence = data.get('recurrence') if recurrence: turnus = 1 turnus_art = 0 else: turnus = 0 turnus_art = 0 return { 'start': start_dt, 'end': end_dt, 'text': data.get('summary', ''), 'notiz': data.get('description', ''), 'ort': data.get('location', ''), 'dauertermin': dauertermin, 'turnus': turnus, 'turnusArt': turnus_art, 'recurrence': recurrence } async def create_advoware_appointment(advoware, data, employee_kuerzel: str, context=None): """Create Advoware appointment from standardized data.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) appointment_data = { 'text': data['text'], 'notiz': data['notiz'], 'ort': data['ort'], 'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'), 'uhrzeitBis': end_dt.strftime('%H:%M:%S'), 'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'), 'anwalt': employee_kuerzel, 'vorbereitungsDauer': '00:00:00', 'dauertermin': data['dauertermin'], 'turnus': data['turnus'], 'turnusArt': data['turnusArt'] } try: result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data) frnr = str(result.get('frNr') or result.get('frnr')) log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context) return frnr except Exception as e: log_operation('error', f"Failed to create Advoware appointment: {e}", context=context) raise async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel: str, context=None): """Update Advoware appointment.""" start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) appointment_data = { 'frNr': int(frnr), 'text': data['text'], 'notiz': data['notiz'], 'ort': data['ort'], 'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'), 'uhrzeitBis': end_dt.strftime('%H:%M:%S'), 'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'), 'anwalt': employee_kuerzel, 'vorbereitungsDauer': '00:00:00', 'dauertermin': data['dauertermin'], 'turnus': data['turnus'], 'turnusArt': data['turnusArt'] } try: await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data) log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context) except Exception as e: log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context) raise async def delete_advoware_appointment(advoware, frnr, context=None): """Delete Advoware appointment.""" try: await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr}) log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context) except Exception as e: log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context) raise @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def create_google_event(service, calendar_id: str, data, context=None): """Create Google event from standardized data.""" await enforce_global_rate_limit(context) start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0) if all_day: start_obj = {'date': start_dt.strftime('%Y-%m-%d')} end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d') end_obj = {'date': end_date} else: start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'} end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'} event_body = { 'summary': data['text'], 'description': data['notiz'], 'location': data['ort'], 'start': start_obj, 'end': end_obj, 'recurrence': data['recurrence'] } try: created = service.events().insert(calendarId=calendar_id, body=event_body).execute() event_id = created['id'] log_operation('info', f"Created Google event ID: {event_id}", context=context) return event_id except HttpError as e: log_operation('error', f"Google API error creating event: {e}", context=context) raise except Exception as e: log_operation('error', f"Failed to create Google event: {e}", context=context) raise @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def update_google_event(service, calendar_id: str, event_id: str, data, context=None): """Update Google event.""" await enforce_global_rate_limit(context) start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0) if all_day: start_obj = {'date': start_dt.strftime('%Y-%m-%d')} end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d') end_obj = {'date': end_date} else: start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'} end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'} event_body = { 'summary': data['text'], 'description': data['notiz'], 'location': data['ort'], 'start': start_obj, 'end': end_obj, 'recurrence': data['recurrence'] } try: service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute() log_operation('info', f"Updated Google event ID: {event_id}", context=context) except HttpError as e: log_operation('error', f"Google API error updating event {event_id}: {e}", context=context) raise except Exception as e: log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context) raise @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def delete_google_event(service, calendar_id: str, event_id: str, context=None): """Delete Google event.""" await enforce_global_rate_limit(context) try: service.events().delete(calendarId=calendar_id, eventId=event_id).execute() log_operation('info', f"Deleted Google event ID: {event_id}", context=context) except HttpError as e: log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context) raise except Exception as e: log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context) raise async def safe_create_advoware_appointment(advoware, data, employee_kuerzel: str, write_allowed: bool, context=None): """Safe wrapper for creating Advoware appointments with write permission check.""" write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' if write_protection: log_operation('warning', "Global write protection active, skipping Advoware create", context=context) return None if not write_allowed: log_operation('warning', "Cannot create in Advoware, write not allowed", context=context) return None return await create_advoware_appointment(advoware, data, employee_kuerzel, context) async def safe_delete_advoware_appointment(advoware, frnr, write_allowed: bool, context=None): """Safe wrapper for deleting Advoware appointments with write permission check.""" write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' if write_protection: log_operation('warning', "Global write protection active, skipping Advoware delete", context=context) return if not write_allowed: log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context) return await delete_advoware_appointment(advoware, frnr, context) async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed: bool, employee_kuerzel: str, context=None): """Safe wrapper for updating Advoware appointments with write permission check.""" write_protection = os.getenv('ADVOWARE_WRITE_PROTECTION', 'true').lower() == 'true' if write_protection: log_operation('warning', "Global write protection active, skipping Advoware update", context=context) return if not write_allowed: log_operation('warning', "Cannot update in Advoware, write not allowed", context=context) return await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context) async def get_advoware_timestamp(advoware, frnr, context=None): """Fetch the last modified timestamp for an Advoware appointment.""" try: result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr}) if isinstance(result, list) and result: appointment = result[0] timestamp_str = appointment.get('zuletztGeaendertAm') if timestamp_str: return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str)) return None except Exception as e: log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context) return None async def process_new_from_advoware(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): """Phase 1: Process new appointments from Advoware to Google.""" log_operation('info', "Phase 1: Processing new appointments from Advoware", context=context) for frnr, app in state['adv_map'].items(): if frnr not in state['db_adv_index']: try: event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware', context), context) async with conn.transaction(): await conn.execute( """ INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE); """, kuerzel, int(frnr), event_id ) log_operation('info', f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}", context=context) state['stats']['new_adv_to_google'] += 1 except Exception as e: log_operation('warning', f"Phase 1: Failed to process new Advoware {frnr}: {e}", context=context) async def process_new_from_google(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): """Phase 2: Process new events from Google to Advoware.""" log_operation('info', "Phase 2: Processing new events from Google", context=context) for event_id, evt in state['google_map'].items(): # Check if already synced (master or instance) recurring_master_id = evt.get('recurringEventId') is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) if not is_already_synced: # Skip events that appear to be from Advoware summary = evt.get('summary', '') if 'Advoware' in summary and 'frNr' in summary: log_operation('warning', f"Skipping sync back to Advoware for Google event {event_id} (summary: {summary})", context=context) continue try: frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context) if frnr and str(frnr) != 'None': async with conn.transaction(): await conn.execute( """ INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed) VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE); """, kuerzel, int(frnr), event_id ) log_operation('info', f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}", context=context) state['stats']['new_google_to_adv'] += 1 else: log_operation('warning', f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None", context=context) except Exception as e: log_operation('warning', f"Phase 2: Failed to process new Google {event_id}: {e}", context=context) async def process_deleted_entries(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): """Phase 3: Process deleted entries.""" log_operation('info', "Phase 3: Processing deleted entries", context=context) for row in state['rows']: frnr = row['advoware_frnr'] event_id = row['google_event_id'] adv_exists = str(frnr) in state['adv_map'] if frnr else False # Check if Google event exists (master or instance) google_exists = False if event_id: if event_id in state['google_map']: google_exists = True else: # Check if any event has this as recurringEventId for evt in state['google_map'].values(): if evt.get('recurringEventId') == event_id: google_exists = True break if not adv_exists and not google_exists: # Both missing - soft delete async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)", context=context) state['stats']['deleted'] += 1 elif not adv_exists: # Missing in Advoware - handle based on strategy strategy = row['sync_strategy'] if strategy == 'source_system_wins': if row['source_system'] == 'advoware': # Propagate delete to Google try: await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Recreate in Advoware try: new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google', context), kuerzel, row['advoware_write_allowed'], context) if new_frnr and str(new_frnr) != 'None': async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}", context=context) state['stats']['recreated'] += 1 else: log_operation('warning', f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) except Exception as e: log_operation('warning', f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: # Propagate delete to Google try: await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: # Propagate delete to Google try: await delete_google_event(service, calendar_id, event_id, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif not google_exists: # Missing in Google - handle based on strategy strategy = row['sync_strategy'] if strategy == 'source_system_wins': if row['source_system'] == 'google': # Delete in Advoware if row['advoware_write_allowed']: try: await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: log_operation('warning', f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) elif row['source_system'] == 'advoware': # Recreate in Google try: new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context) state['stats']['recreated'] += 1 except Exception as e: log_operation('warning', f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) else: # Propagate delete to Advoware try: await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) async def process_updates(state, conn, service, calendar_id: str, kuerzel: str, advoware, context=None): """Phase 4: Process updates for existing entries.""" log_operation('info', "Phase 4: Processing updates for existing entries", context=context) # Track which master events we've already processed processed_master_events = set() for row in state['rows']: frnr = row['advoware_frnr'] event_id = row['google_event_id'] adv_data = state['adv_map'].get(str(frnr)) if frnr else None # Find corresponding Google event (master or instance) google_data = None if event_id: if event_id in state['google_map']: google_data = state['google_map'][event_id] else: # Look for any event with this recurringEventId for evt in state['google_map'].values(): if evt.get('recurringEventId') == event_id: google_data = evt break # Skip if missing data or already processed master event if not adv_data or not google_data: continue # For recurring events, only process master event once master_event_id = google_data.get('recurringEventId') or event_id if master_event_id in processed_master_events: continue processed_master_events.add(master_event_id) if adv_data and google_data: adv_std = standardize_appointment_data(adv_data, 'advoware', context) google_std = standardize_appointment_data(google_data, 'google', context) strategy = row['sync_strategy'] try: if strategy == 'source_system_wins': if row['source_system'] == 'advoware': # Check for changes in source (Advoware) adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) google_ts_str = google_data.get('updated', '') google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None if adv_ts > row['last_sync']: await update_google_event(service, calendar_id, event_id, adv_std, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context) state['stats']['updated'] += 1 elif google_ts and google_ts > row['last_sync']: log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context) await update_google_event(service, calendar_id, event_id, adv_std, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context) elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Check for changes in source (Google) google_ts_str = google_data.get('updated', '') google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm'])) if google_ts and google_ts > row['last_sync']: await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context) elif adv_ts > row['last_sync']: log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context) await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context) elif strategy == 'last_change_wins': adv_ts = await get_advoware_timestamp(advoware, frnr, context) google_ts_str = google_data.get('updated', '') google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None if adv_ts and google_ts: if adv_ts > google_ts: await update_google_event(service, calendar_id, event_id, adv_std, context) elif row['advoware_write_allowed']: await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts)) log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context) except Exception as e: log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) # Motia Step Configuration config = { "name": "Calendar Sync Event Step", "description": "Handles bidirectional calendar sync between Advoware and Google Calendar using Postgres as hub", "flows": ["advoware-calendar-sync"], "triggers": [ queue("calendar_sync_employee") ], "enqueues": [] } async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: """Main event handler for calendar sync.""" start_time = time.time() kuerzel = input_data.get('kuerzel') if not kuerzel: log_operation('error', "No kuerzel provided in event", context=ctx) return log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=ctx) redis_client = get_redis_client(ctx) service = None try: log_operation('debug', "Initializing Advoware service", context=ctx) advoware = AdvowareService(ctx) log_operation('debug', "Initializing Google service", context=ctx) service = await get_google_service(ctx) log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=ctx) calendar_id = await ensure_google_calendar(service, kuerzel, ctx) conn = await connect_db(ctx) try: # Initialize state state = { 'rows': [], 'db_adv_index': {}, 'db_google_index': {}, 'adv_appointments': [], 'adv_map': {}, 'google_events': [], 'google_map': {}, 'stats': { 'new_adv_to_google': 0, 'new_google_to_adv': 0, 'deleted': 0, 'updated': 0, 'recreated': 0 } } async def reload_db_indexes(): """Reload database indexes after DB changes.""" state['rows'] = await conn.fetch( """ SELECT * FROM calendar_sync WHERE employee_kuerzel = $1 AND deleted = FALSE """, kuerzel ) state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} state['db_google_index'] = {} for row in state['rows']: if row['google_event_id']: state['db_google_index'][row['google_event_id']] = row log_operation('debug', "Reloaded indexes", context=ctx, rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) async def reload_api_maps(): """Reload API maps after creating new events.""" state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel, ctx) state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} state['google_events'] = await fetch_google_events(service, calendar_id, ctx) state['google_map'] = {evt['id']: evt for evt in state['google_events']} log_operation('debug', "Reloaded API maps", context=ctx, adv=len(state['adv_map']), google=len(state['google_map'])) # Initial fetch log_operation('info', "Fetching fresh data from APIs", context=ctx) await reload_api_maps() await reload_db_indexes() log_operation('info', "Fetched existing sync rows", context=ctx, count=len(state['rows'])) # Phase 1: New from Advoware => Google await process_new_from_advoware(state, conn, service, calendar_id, kuerzel, advoware, ctx) await reload_db_indexes() await reload_api_maps() # Phase 2: New from Google => Advoware await process_new_from_google(state, conn, service, calendar_id, kuerzel, advoware, ctx) await reload_db_indexes() await reload_api_maps() # Phase 3: Process deleted entries await process_deleted_entries(state, conn, service, calendar_id, kuerzel, advoware, ctx) await reload_db_indexes() await reload_api_maps() # Phase 4: Update existing entries await process_updates(state, conn, service, calendar_id, kuerzel, advoware, ctx) finally: await conn.close() # Log final statistics stats = state['stats'] log_operation('info', f"Sync statistics for {kuerzel}: New Adv->Google: {stats['new_adv_to_google']}, New Google->Adv: {stats['new_google_to_adv']}, Deleted: {stats['deleted']}, Updated: {stats['updated']}, Recreated: {stats['recreated']}", context=ctx) log_operation('info', f"Calendar sync completed for {kuerzel}", context=ctx) log_operation('info', f"Handler duration: {time.time() - start_time}", context=ctx) return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} finally: # Always close resources to prevent memory leaks if service is not None: try: service.close() except Exception as e: log_operation('debug', f"Error closing Google service: {e}", context=ctx) try: redis_client.close() except Exception as e: log_operation('debug', f"Error closing Redis client: {e}", context=ctx) except Exception as e: log_operation('error', f"Sync failed for {kuerzel}: {e}", context=ctx) log_operation('info', f"Handler duration (failed): {time.time() - start_time}", context=ctx) return {'status': 500, 'body': {'error': str(e)}} finally: # Ensure lock is always released clear_employee_lock(redis_client, kuerzel, ctx)