diff --git a/bitbylaw/steps/advoware_cal_sync/README.md b/bitbylaw/steps/advoware_cal_sync/README.md index 4e343aaf..cfbd5e7b 100644 --- a/bitbylaw/steps/advoware_cal_sync/README.md +++ b/bitbylaw/steps/advoware_cal_sync/README.md @@ -256,6 +256,7 @@ Cron-Step für regelmäßige Ausführung. - Timestamps: Fehlende in Google können zu Fallback führen. - Performance: Bei vielen Terminen könnte Paginierung helfen. - **Single Events Expansion**: `singleEvents=true` in `fetch_google_events()` expandiert wiederkehrende Events in einzelne Instanzen, was zu Duplizierungsproblemen führt, wenn nicht korrekt behandelt. +- **Advoware API Time Filtering**: Die Advoware-API respektiert die `from`/`to`-Parameter möglicherweise nicht vollständig und gibt alle Termine zurück, unabhängig vom angeforderten Zeitraum. Das Audit-Script prüft dies und warnt bei Abweichungen. Als Workaround wurden die Zeiträume erweitert (Advoware: -1 bis +9 Jahre, Google: -2 bis +10 Jahre), um alle potenziellen Daten abzudecken. ## Kritischer Bugfix: Duplizierung wiederkehrender Termine diff --git a/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py b/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py new file mode 100644 index 00000000..4fe9e9b9 --- /dev/null +++ b/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py @@ -0,0 +1,307 @@ +import asyncio +import logging +import sys +import os +from datetime import datetime, timedelta +import pytz +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from config import Config +from services.advoware import AdvowareAPI +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from google.oauth2 import service_account +import asyncpg + +# Setup logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +logger.addHandler(handler) + +# Timezone and year +BERLIN_TZ = pytz.timezone('Europe/Berlin') +now = datetime.now(BERLIN_TZ) +current_year = now.year + +async def connect_db(): + """Connect to Postgres DB from Config.""" + try: + conn = await asyncpg.connect( + host=Config.POSTGRES_HOST or 'localhost', + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + database=Config.POSTGRES_DB_NAME, + timeout=10 + ) + return conn + except Exception as e: + logger.error(f"Failed to connect to DB: {e}") + raise + +async def get_google_service(): + """Initialize Google Calendar service.""" + try: + service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH + if not os.path.exists(service_account_path): + raise FileNotFoundError(f"Service account file not found: {service_account_path}") + creds = service_account.Credentials.from_service_account_file( + service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES + ) + service = build('calendar', 'v3', credentials=creds) + return service + except Exception as e: + logger.error(f"Failed to initialize Google service: {e}") + raise + +async def ensure_google_calendar(service, employee_kuerzel): + """Ensure Google Calendar exists for employee.""" + calendar_name = f"AW-{employee_kuerzel}" + try: + calendar_list = service.calendarList().list().execute() + for calendar in calendar_list.get('items', []): + if calendar['summary'] == calendar_name: + return calendar['id'] + return None # Calendar doesn't exist + except HttpError as e: + logger.error(f"Google API error for calendar {employee_kuerzel}: {e}") + raise + except Exception as e: + logger.error(f"Failed to check Google calendar for {employee_kuerzel}: {e}") + raise + +async def fetch_advoware_appointments(advoware, employee_kuerzel): + """Fetch Advoware appointments in range.""" + try: + # Use the same range as the sync script: previous year to 9 years ahead + from_date = f"{current_year - 1}-01-01T00:00:00" + to_date = f"{current_year + 9}-12-31T23:59:59" + params = { + 'kuerzel': employee_kuerzel, + 'from': from_date, + 'to': to_date + } + result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params) + appointments = result if isinstance(result, list) else [] + + # Check if Advoware respects the time limit + from_dt = datetime.fromisoformat(from_date.replace('T', ' ')) + to_dt = datetime.fromisoformat(to_date.replace('T', ' ')) + out_of_range = [] + for app in appointments: + if 'datum' in app: + app_date_str = app['datum'] + if 'T' in app_date_str: + app_dt = datetime.fromisoformat(app_date_str.replace('Z', '')) + else: + app_dt = datetime.fromisoformat(app_date_str + 'T00:00:00') + if app_dt < from_dt or app_dt > to_dt: + out_of_range.append(app) + + if out_of_range: + logger.warning(f"Advoware returned {len(out_of_range)} appointments outside the requested range {from_date} to {to_date}") + for app in out_of_range[:5]: # Log first 5 + logger.warning(f"Out of range appointment: frNr {app.get('frNr')}, datum {app.get('datum')}") + + logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel} (expected range: {from_date} to {to_date})") + return {str(app['frNr']): app for app in appointments if app.get('frNr')} + except Exception as e: + logger.error(f"Failed to fetch Advoware appointments: {e}") + raise + +async def fetch_google_events(service, calendar_id): + """Fetch Google events in range.""" + try: + # Use the same range as the sync script: 2 years back to 10 years forward + time_min = f"{current_year - 2}-01-01T00:00:00Z" + time_max = f"{current_year + 10}-12-31T23:59:59Z" + + all_events = [] + page_token = None + while True: + events_result = service.events().list( + calendarId=calendar_id, + timeMin=time_min, + timeMax=time_max, + singleEvents=True, + orderBy='startTime', + pageToken=page_token, + maxResults=2500 # Max per page + ).execute() + events_page = events_result.get('items', []) + all_events.extend(events_page) + page_token = events_result.get('nextPageToken') + if not page_token: + break + + events = [evt for evt in all_events if evt.get('status') != 'cancelled'] + logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}") + return events, len(all_events) # Return filtered events and total count + except HttpError as e: + logger.error(f"Google API error fetching events: {e}") + raise + except Exception as e: + logger.error(f"Failed to fetch Google events: {e}") + raise + +async def audit_calendar_sync(employee_kuerzel, check_system, delete_orphaned_google=False): + """Audit calendar sync entries for a user.""" + if check_system not in ['google', 'advoware']: + raise ValueError("check_system must be 'google' or 'advoware'") + + logger.info(f"Starting audit for {employee_kuerzel}, checking {check_system}, delete_orphaned_google={delete_orphaned_google}") + + # Initialize APIs + advoware = AdvowareAPI({}) + service = await get_google_service() + calendar_id = await ensure_google_calendar(service, employee_kuerzel) + + if not calendar_id: + logger.error(f"Google calendar for {employee_kuerzel} does not exist") + return + + # Fetch API data + advoware_map = {} + google_events = [] + total_google_events = 0 + + if check_system == 'advoware': + advoware_map = await fetch_advoware_appointments(advoware, employee_kuerzel) + elif check_system == 'google': + google_events, total_google_events = await fetch_google_events(service, calendar_id) + google_map = {evt['id']: evt for evt in google_events} + + # Connect to DB + conn = await connect_db() + try: + # Fetch DB entries + rows = await conn.fetch( + """ + SELECT sync_id, employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, last_sync + FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + ORDER BY sync_id + """, + employee_kuerzel + ) + + logger.info(f"Found {len(rows)} active sync entries in DB for {employee_kuerzel}") + + # Build DB indexes + db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} + db_google_index = {} + for row in rows: + if row['google_event_id']: + db_google_index[row['google_event_id']] = row + + # Audit results + total_entries = len(rows) + existing_in_api = 0 + missing_in_api = 0 + missing_details = [] + + for row in rows: + sync_id = row['sync_id'] + advoware_frnr = row['advoware_frnr'] + google_event_id = row['google_event_id'] + + exists_in_api = False + + if check_system == 'advoware' and advoware_frnr: + exists_in_api = str(advoware_frnr) in advoware_map + elif check_system == 'google' and google_event_id: + exists_in_api = google_event_id in google_map + + if exists_in_api: + existing_in_api += 1 + else: + missing_in_api += 1 + missing_details.append({ + 'sync_id': sync_id, + 'advoware_frnr': advoware_frnr, + 'google_event_id': google_event_id, + 'source_system': row['source_system'], + 'sync_strategy': row['sync_strategy'], + 'sync_status': row['sync_status'], + 'last_sync': row['last_sync'] + }) + + # Check for orphaned Google events (events in Google not in DB) + orphaned_google_events = [] + if check_system == 'google': + for event_id, evt in google_map.items(): + if event_id not in db_google_index: + # Check if this is an instance of a recurring event whose master is synced + is_instance_of_synced_master = False + if '_' in event_id: + master_id = event_id.split('_')[0] + if master_id in db_google_index: + is_instance_of_synced_master = True + + if not is_instance_of_synced_master: + orphaned_google_events.append({ + 'event_id': event_id, + 'summary': evt.get('summary', ''), + 'start': evt.get('start', {}), + 'end': evt.get('end', {}) + }) + + # Print summary + print(f"\n=== Calendar Sync Audit for {employee_kuerzel} ===") + print(f"Checking system: {check_system}") + print(f"Total active DB entries: {total_entries}") + if check_system == 'google': + print(f"Total events in Google: {total_google_events}") + print(f"Orphaned events in Google (not in DB): {len(orphaned_google_events)}") + print(f"Existing in {check_system}: {existing_in_api}") + print(f"Missing in {check_system}: {missing_in_api}") + print(".1f") + + if missing_details: + print(f"\n=== Details of missing entries in {check_system} ===") + for detail in missing_details: + print(f"Sync ID: {detail['sync_id']}") + print(f" Advoware frNr: {detail['advoware_frnr']}") + print(f" Google Event ID: {detail['google_event_id']}") + print(f" Source System: {detail['source_system']}") + print(f" Sync Strategy: {detail['sync_strategy']}") + print(f" Sync Status: {detail['sync_status']}") + print(f" Last Sync: {detail['last_sync']}") + print(" ---") + else: + print(f"\nAll entries exist in {check_system}!") + + # Delete orphaned Google events if requested + if delete_orphaned_google and check_system == 'google' and orphaned_google_events: + print(f"\n=== Deleting orphaned Google events ===") + for orphaned in orphaned_google_events: + event_id = orphaned['event_id'] + try: + service.events().delete(calendarId=calendar_id, eventId=event_id).execute() + print(f"Deleted orphaned Google event: {event_id} - {orphaned['summary']}") + except HttpError as e: + print(f"Failed to delete Google event {event_id}: {e}") + except Exception as e: + print(f"Error deleting Google event {event_id}: {e}") + + finally: + await conn.close() + +async def main(): + if len(sys.argv) < 3 or len(sys.argv) > 4: + print("Usage: python audit_calendar_sync.py [--delete-orphaned-google]") + print(" --delete-orphaned-google: Delete Google events that exist in Google but not in the DB") + print("Example: python audit_calendar_sync.py SB google --delete-orphaned-google") + sys.exit(1) + + employee_kuerzel = sys.argv[1].upper() + check_system = sys.argv[2].lower() + delete_orphaned_google = len(sys.argv) == 4 and sys.argv[3] == '--delete-orphaned-google' + + try: + await audit_calendar_sync(employee_kuerzel, check_system, delete_orphaned_google) + except Exception as e: + logger.error(f"Audit failed: {e}") + sys.exit(1) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index e569c239..5107a1dd 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -23,8 +23,10 @@ logger.addHandler(handler) BERLIN_TZ = pytz.timezone('Europe/Berlin') # Constants for ranges (optimize fetch efficiency) -FETCH_FROM = (datetime.datetime.now(BERLIN_TZ) - timedelta(days=365)).strftime('%Y-01-01T00:00:00') -FETCH_TO = (datetime.datetime.now(BERLIN_TZ) + timedelta(days=730)).strftime('%Y-12-31T23:59:59') +now = datetime.datetime.now(BERLIN_TZ) +current_year = now.year +FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year +FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' @@ -129,21 +131,29 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel): async def fetch_google_events(service, calendar_id): """Fetch Google events in range.""" try: - now = datetime.datetime.now(pytz.utc) - from_date = now - timedelta(days=365) - to_date = now + timedelta(days=730) - time_min = from_date.strftime('%Y-%m-%dT%H:%M:%SZ') - time_max = to_date.strftime('%Y-%m-%dT%H:%M:%SZ') - events_result = service.events().list( - calendarId=calendar_id, - timeMin=time_min, - timeMax=time_max, - singleEvents=True, # Expand recurring - orderBy='startTime' - ).execute() - logger.debug(f"Raw Google API response: {events_result}") - events = [evt for evt in events_result.get('items', []) if evt.get('status') != 'cancelled'] - logger.info(f"Fetched {len(events)} Google events for calendar {calendar_id}") + time_min = f"{current_year - 2}-01-01T00:00:00Z" + time_max = f"{current_year + 10}-12-31T23:59:59Z" + + all_events = [] + page_token = None + while True: + events_result = service.events().list( + calendarId=calendar_id, + timeMin=time_min, + timeMax=time_max, + singleEvents=True, # Expand recurring + orderBy='startTime', + pageToken=page_token, + maxResults=2500 # Max per page + ).execute() + events_page = events_result.get('items', []) + all_events.extend(events_page) + page_token = events_result.get('nextPageToken') + if not page_token: + break + + events = [evt for evt in all_events if evt.get('status') != 'cancelled'] + logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}") return events except HttpError as e: logger.error(f"Google API error fetching events: {e}") @@ -553,6 +563,20 @@ async def get_advoware_employees(advoware): logger.error(f"Failed to fetch Advoware employees: {e}") raise +async def get_advoware_timestamp(advoware, frnr): + """Fetch the last modified timestamp for an Advoware appointment.""" + try: + result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr}) + if isinstance(result, list) and result: + appointment = result[0] # Assuming it returns a list with one item + timestamp_str = appointment.get('zuletztGeaendertAm') + if timestamp_str: + return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str)) + return None + except Exception as e: + logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}") + return None + async def handler(event, context): """Main event handler for calendar sync.""" logger.info("Starting calendar sync for all employees") @@ -576,7 +600,7 @@ async def handler(event, context): continue # DEBUG: Nur für Nutzer AI syncen (für Test der Travel/Prep Zeit) - if kuerzel != 'AI': + if kuerzel != 'SB': logger.info(f"DEBUG: Überspringe {kuerzel}, nur AI wird gesynct") continue @@ -623,6 +647,32 @@ async def handler(event, context): # For regular events, use the event_id directly db_google_index[row['google_event_id']] = row + async def reload_db_indexes(): + """Reload database indexes after DB changes in phases.""" + nonlocal rows, db_adv_index, db_google_index + rows = await conn.fetch( + """ + SELECT * FROM calendar_sync + WHERE employee_kuerzel = $1 AND deleted = FALSE + """, + kuerzel + ) + db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']} + db_google_index = {} + for row in rows: + if row['google_event_id']: + db_google_index[row['google_event_id']] = row + logger.debug(f"Reloaded indexes: {len(rows)} rows, {len(db_adv_index)} adv, {len(db_google_index)} google") + + async def reload_api_maps(): + """Reload API maps after creating new events in phases.""" + nonlocal adv_appointments, adv_map, google_events, google_map + adv_appointments = await fetch_advoware_appointments(advoware, kuerzel) + adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')} + google_events = await fetch_google_events(service, calendar_id) + google_map = {evt['id']: evt for evt in google_events} + logger.debug(f"Reloaded API maps: {len(adv_map)} adv, {len(google_map)} google") + # Phase 1: New from Advoware => Google logger.info("Phase 1: Processing new appointments from Advoware") for frnr, app in adv_map.items(): @@ -642,6 +692,11 @@ async def handler(event, context): except Exception as e: logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}") + # Reload indexes after Phase 1 changes + await reload_db_indexes() + # Reload API maps after Phase 1 changes + await reload_api_maps() + # Phase 2: New from Google => Advoware logger.info("Phase 2: Processing new events from Google") for event_id, evt in google_map.items(): @@ -668,6 +723,11 @@ async def handler(event, context): except Exception as e: logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}") + # Reload indexes after Phase 2 changes + await reload_db_indexes() + # Reload API maps after Phase 2 changes + await reload_api_maps() + # Phase 3: Identify deleted entries logger.info("Phase 3: Processing deleted entries") for row in rows: @@ -793,6 +853,11 @@ async def handler(event, context): async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) + # Reload indexes after Phase 3 changes + await reload_db_indexes() + # Reload API maps after Phase 3 changes + await reload_api_maps() + # Phase 4: Update existing entries if changed logger.info("Phase 4: Processing updates for existing entries") # Track which master events we've already processed to avoid duplicate updates