diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 1d8b6bdc..f91598f6 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -30,8 +30,18 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' -# Relevant fields for data comparison (simple diff) -COMPARISON_FIELDS = ['text', 'notiz', 'ort', 'datum', 'uhrzeitBis', 'datumBis', 'dauertermin', 'turnus', 'turnusArt', 'recurrence'] +def log_operation(level, message, **context): + """Centralized logging with context.""" + context_str = ' '.join(f"{k}={v}" for k, v in context.items() if v is not None) + full_message = f"{message} {context_str}".strip() + if level == 'info': + logger.info(full_message) + elif level == 'warning': + logger.warning(full_message) + elif level == 'error': + logger.error(full_message) + elif level == 'debug': + logger.debug(full_message) async def connect_db(): """Connect to Postgres DB from Config.""" @@ -93,23 +103,6 @@ async def ensure_google_calendar(service, employee_kuerzel): logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}") raise -async def share_google_calendar(service, calendar_id, email): - """Share Google Calendar with specific email.""" - try: - acl_rule = { - 'scope': {'type': 'user', 'value': email}, - 'role': 'owner' - } - result = service.acl().insert(calendarId=calendar_id, body=acl_rule).execute() - logger.info(f"Shared calendar {calendar_id} with {email}") - return result - except HttpError as e: - logger.error(f"Google API error sharing calendar {calendar_id} with {email}: {e}") - raise - except Exception as e: - logger.error(f"Failed to share Google calendar {calendar_id} with {email}: {e}") - raise - async def fetch_advoware_appointments(advoware, employee_kuerzel): """Fetch Advoware appointments in range.""" try: @@ -195,24 +188,16 @@ def generate_rrule(turnus, turnus_art, datum_bis): rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}" return rrule -def standardize_appointment_data(data, source): - """Standardize data from Advoware or Google to comparable dict, with TZ handling.""" - duration_capped = False # Initialize flag for duration capping +def parse_times(data, source): + """Parse start and end times from data.""" if source == 'advoware': start_str = data.get('datum', '') - # Improved parsing: if datum contains 'T', it's datetime; else combine with uhrzeitVon if 'T' in start_str: - try: - start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', ''))) - except ValueError: - logger.warning(f"Invalid start datetime in Advoware: {start_str}") - start_dt = BERLIN_TZ.localize(datetime.datetime.now()) + start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', ''))) else: start_time = data.get('uhrzeitVon') or '09:00:00' start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}")) - # For end: Use date from datum (not datumBis for recurring events!), time from uhrzeitBis - # datumBis is only for recurrence end date, not individual event end date end_date_str = data.get('datum', '') if 'T' in end_date_str: base_end_date = end_date_str.split('T')[0] @@ -222,100 +207,106 @@ def standardize_appointment_data(data, source): try: end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}")) except ValueError: - logger.warning(f"Invalid end datetime in Advoware: {base_end_date}T{end_time}") end_dt = start_dt + timedelta(hours=1) + elif source == 'google': + start_obj = data.get('start', {}) + end_obj = data.get('end', {}) + if 'dateTime' in start_obj: + start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) + else: + start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date'])) + if 'dateTime' in end_obj: + end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) + else: + end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date'])) + return start_dt, end_dt + +def adjust_times(start_dt, end_dt, data): + """Adjust times for preparation, travel, etc.""" + vorbereitungs_dauer = data.get('vorbereitungsDauer', '00:00:00') + fahrzeit = data.get('fahrzeit', '00:00:00') + fahrt_anzeigen = data.get('fahrtAnzeigen', 0) + + try: + vorb_h, vorb_m, vorb_s = map(int, vorbereitungs_dauer.split(':')) + vorbereitung_td = timedelta(hours=vorb_h, minutes=vorb_m, seconds=vorb_s) + except: + vorbereitung_td = timedelta(0) + + try: + fahrt_h, fahrt_m, fahrt_s = map(int, fahrzeit.split(':')) + fahrt_td = timedelta(hours=fahrt_h, minutes=fahrt_m, seconds=fahrt_s) + except: + fahrt_td = timedelta(0) + + hinfahrt_td = timedelta(0) + rueckfahrt_td = timedelta(0) + if fahrt_anzeigen == 1: + hinfahrt_td = fahrt_td + elif fahrt_anzeigen == 2: + rueckfahrt_td = fahrt_td + elif fahrt_anzeigen == 3: + hinfahrt_td = fahrt_td + rueckfahrt_td = fahrt_td + + adjusted_start = start_dt - vorbereitung_td - hinfahrt_td + adjusted_end = end_dt + rueckfahrt_td + return adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td + +def build_notiz(original_notiz, time_breakdown, duration_capped): + """Build the description string.""" + notiz_parts = [] + if original_notiz.strip(): + notiz_parts.append(original_notiz.strip()) + notiz_parts.append("Zeitaufteilung:") + notiz_parts.extend(time_breakdown) + if duration_capped: + notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)") + return "\n".join(notiz_parts) + +def standardize_appointment_data(data, source): + """Standardize data from Advoware or Google to comparable dict, with TZ handling.""" + duration_capped = False + start_dt, end_dt = parse_times(data, source) + + if source == 'advoware': + adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data) - # Anonymization for Google events if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS: text = 'Advoware blocked' ort = '' - original_notiz = '' # Bei Anonymisierung keine Original-Notiz anzeigen + original_notiz = '' else: text = data.get('text', '') ort = data.get('ort', '') original_notiz = data.get('notiz', '') - - # Process preparation time and travel time (immer, auch bei Anonymisierung) - vorbereitungs_dauer = data.get('vorbereitungsDauer', '00:00:00') - fahrzeit = data.get('fahrzeit', '00:00:00') - fahrt_anzeigen = data.get('fahrtAnzeigen', 0) - # Parse times (format: HH:MM:SS) - try: - vorb_h, vorb_m, vorb_s = map(int, vorbereitungs_dauer.split(':')) - vorbereitung_td = timedelta(hours=vorb_h, minutes=vorb_m, seconds=vorb_s) - except: - vorbereitung_td = timedelta(0) - - try: - fahrt_h, fahrt_m, fahrt_s = map(int, fahrzeit.split(':')) - fahrt_td = timedelta(hours=fahrt_h, minutes=fahrt_m, seconds=fahrt_s) - except: - fahrt_td = timedelta(0) - - # Calculate travel times based on fahrtAnzeigen - hinfahrt_td = timedelta(0) - rueckfahrt_td = timedelta(0) - - if fahrt_anzeigen == 1: # Nur Hinfahrt - hinfahrt_td = fahrt_td - elif fahrt_anzeigen == 2: # Nur Rückfahrt - rueckfahrt_td = fahrt_td - elif fahrt_anzeigen == 3: # Beides (volle Fahrtzeit für Hin- und Rückfahrt) - hinfahrt_td = fahrt_td - rueckfahrt_td = fahrt_td - - # Adjust start and end times - adjusted_start = start_dt - vorbereitung_td - hinfahrt_td - adjusted_end = end_dt + rueckfahrt_td - - # Create detailed description with time breakdown time_breakdown = [] - if vorbereitung_td.total_seconds() > 0: vorb_start = adjusted_start vorb_end = adjusted_start + vorbereitung_td time_breakdown.append(f"{vorb_start.strftime('%H:%M')}-{vorb_end.strftime('%H:%M')} Vorbereitung") - if hinfahrt_td.total_seconds() > 0: outbound_start = adjusted_start + vorbereitung_td outbound_end = adjusted_start + vorbereitung_td + hinfahrt_td time_breakdown.append(f"{outbound_start.strftime('%H:%M')}-{outbound_end.strftime('%H:%M')} Hinfahrt") - - # Actual appointment time appt_start = adjusted_start + vorbereitung_td + hinfahrt_td appt_end = adjusted_end - rueckfahrt_td time_breakdown.append(f"{appt_start.strftime('%H:%M')}-{appt_end.strftime('%H:%M')} Termin") - if rueckfahrt_td.total_seconds() > 0: return_start = appt_end return_end = adjusted_end time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt") - # Combine description - notiz_parts = [] - if original_notiz.strip(): - notiz_parts.append(original_notiz.strip()) - notiz_parts.append("Zeitaufteilung:") - notiz_parts.extend(time_breakdown) - if duration_capped: - notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)") - notiz = "\n".join(notiz_parts) + notiz = build_notiz(original_notiz, time_breakdown, duration_capped) + start_dt, end_dt = adjusted_start, adjusted_end - # Update start and end times - start_dt = adjusted_start - end_dt = adjusted_end - - # Check for Google Calendar duration limit (24 hours max) duration = end_dt - start_dt max_duration = timedelta(hours=24) - duration_capped = False if duration > max_duration: - logger.warning(f"Event duration {duration} exceeds Google Calendar limit of {max_duration}. Capping at 24 hours.") end_dt = start_dt + max_duration duration_capped = True - # Generate recurrence if dauertermin recurrence = None if data.get('dauertermin', 0) == 1: turnus = data.get('turnus', 1) @@ -324,7 +315,7 @@ def standardize_appointment_data(data, source): if datum_bis: recurrence = generate_rrule(turnus, turnus_art, datum_bis) if recurrence: - recurrence = [recurrence] # Google expects list of strings + recurrence = [recurrence] return { 'start': start_dt, @@ -338,26 +329,12 @@ def standardize_appointment_data(data, source): 'recurrence': recurrence } elif source == 'google': - start_obj = data.get('start', {}) - end_obj = data.get('end', {}) - if 'dateTime' in start_obj: - start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) - all_day = False - else: - start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date'])) - all_day = True - if 'dateTime' in end_obj: - end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ) - else: - end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date'])) - # Improved dauertermin: set to 1 if all-day or duration >1 day duration_days = (end_dt.date() - start_dt.date()).days - dauertermin = 1 if all_day or duration_days > 1 else 0 + dauertermin = 1 if data.get('start', {}).get('date') or duration_days > 1 else 0 recurrence = data.get('recurrence') if recurrence: - # Simple mapping: if recurrence exists, set turnus=1, turnusArt based on RRULE (simplified) turnus = 1 - turnus_art = 0 # Default, could parse RRULE for better mapping + turnus_art = 0 else: turnus = 0 turnus_art = 0 @@ -373,15 +350,6 @@ def standardize_appointment_data(data, source): 'recurrence': recurrence } -def data_diff(data1, data2): - """Simple diff on standardized data.""" - if not data1 or not data2: - return True - for field in COMPARISON_FIELDS: - if data1.get(field) != data2.get(field): - return True - return False - async def create_advoware_appointment(advoware, data, employee_kuerzel): """Create Advoware appointment from standardized data.""" start_dt = data['start'].astimezone(BERLIN_TZ) @@ -552,6 +520,16 @@ async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, return await update_advoware_appointment(advoware, frnr, data, employee_kuerzel) +async def safe_advoware_operation(operation, write_allowed, *args, **kwargs): + """Generic safe wrapper for Advoware operations with write permission check.""" + if Config.ADVOWARE_WRITE_PROTECTION: + log_operation('warning', "Global write protection active, skipping Advoware operation") + return None + if not write_allowed: + log_operation('warning', "Cannot perform operation in Advoware, write not allowed") + return None + return await operation(*args, **kwargs) + async def get_advoware_employees(advoware): """Fetch list of employees from Advoware.""" try: @@ -577,6 +555,22 @@ async def get_advoware_timestamp(advoware, frnr): logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}") return None +def get_timestamps(adv_data, google_data): + """Extract and parse timestamps from Advoware and Google data.""" + adv_ts = None + if adv_data: + ts_str = adv_data.get('zuletztGeaendertAm') + if ts_str: + adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(ts_str)) + + google_ts = None + if google_data: + ts_str = google_data.get('updated') + if ts_str: + google_ts = datetime.datetime.fromisoformat(ts_str.rstrip('Z')).astimezone(BERLIN_TZ) + + return adv_ts, google_ts + async def handler(event, context): """Main event handler for calendar sync.""" logger.info("Starting calendar sync for all employees") @@ -621,57 +615,46 @@ async def handler(event, context): conn = await connect_db() try: - # Fetch fresh data - logger.info("Fetching fresh data from APIs") - adv_appointments = await fetch_advoware_appointments(advoware, kuerzel) - adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')} - google_events = await fetch_google_events(service, calendar_id) - google_map = {evt['id']: evt for evt in google_events} - - # Fetch existing DB rows - rows = await conn.fetch( - """ - SELECT * FROM calendar_sync - WHERE employee_kuerzel = $1 AND deleted = FALSE - """, - kuerzel - ) - logger.info(f"Fetched {len(rows)} existing sync rows") - - # Build indexes - use recurringEventId for recurring events to avoid duplicates - db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} - db_google_index = {} - for row in rows: - if row['google_event_id']: - # For recurring events, use the master event ID (recurringEventId) - # For regular events, use the event_id directly - db_google_index[row['google_event_id']] = row + # Initialize state + state = { + 'rows': [], + 'db_adv_index': {}, + 'db_google_index': {}, + 'adv_appointments': [], + 'adv_map': {}, + 'google_events': [], + 'google_map': {} + } async def reload_db_indexes(): """Reload database indexes after DB changes in phases.""" - nonlocal rows, db_adv_index, db_google_index - rows = await conn.fetch( + state['rows'] = await conn.fetch( """ SELECT * FROM calendar_sync WHERE employee_kuerzel = $1 AND deleted = FALSE """, kuerzel ) - db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} - db_google_index = {} - for row in rows: + state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']} + state['db_google_index'] = {} + for row in state['rows']: if row['google_event_id']: - db_google_index[row['google_event_id']] = row - logger.debug(f"Reloaded indexes: {len(rows)} rows, {len(db_adv_index)} adv, {len(db_google_index)} google") + state['db_google_index'][row['google_event_id']] = row + log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index'])) async def reload_api_maps(): """Reload API maps after creating new events in phases.""" - nonlocal adv_appointments, adv_map, google_events, google_map - adv_appointments = await fetch_advoware_appointments(advoware, kuerzel) - adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')} - google_events = await fetch_google_events(service, calendar_id) - google_map = {evt['id']: evt for evt in google_events} - logger.debug(f"Reloaded API maps: {len(adv_map)} adv, {len(google_map)} google") + state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel) + state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')} + state['google_events'] = await fetch_google_events(service, calendar_id) + state['google_map'] = {evt['id']: evt for evt in state['google_events']} + log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map'])) + + # Initial fetch + log_operation('info', "Fetching fresh data from APIs") + await reload_api_maps() + await reload_db_indexes() + log_operation('info', "Fetched existing sync rows", count=len(state['rows'])) # Phase 1: New from Advoware => Google logger.info("Phase 1: Processing new appointments from Advoware")