From b4c4bf0a9e61767934f3276b396cbba54b0b0004 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 24 Oct 2025 23:56:00 +0000 Subject: [PATCH] Improve calendar sync: Add Token Bucket rate limiting, security check to prevent syncing Advoware-sourced events back, fix random import --- .../calendar_sync_cron_step.py | 27 +-- .../calendar_sync_event_step.py | 155 +++++++++--------- 2 files changed, 83 insertions(+), 99 deletions(-) diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py index 8823dc71..4ed23c90 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -3,8 +3,6 @@ import redis from config import Config from services.advoware import AdvowareAPI -CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' - config = { 'type': 'cron', 'name': 'Calendar Sync Cron Job', @@ -14,28 +12,17 @@ config = { 'flows': ['advoware'] } -async def get_advoware_employees(context, advoware): - """Fetch list of employees from Advoware.""" - try: - result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) - employees = result if isinstance(result, list) else [] - context.logger.info(f"Fetched {len(employees)} Advoware employees") - return employees - except Exception as e: - context.logger.error(f"Failed to fetch Advoware employees: {e}") - raise - async def handler(context): try: context.logger.info("Calendar Sync Cron: Starting to emit sync-all event") - # Emit sync-all event - await context.emit({ - "topic": "calendar_sync_all", - "data": { - "triggered_by": "cron" - } - }) + # # Emit sync-all event + # await context.emit({ + # "topic": "calendar_sync_all", + # "data": { + # "triggered_by": "cron" + # } + # }) context.logger.info("Calendar Sync Cron: Emitted sync-all event") return { diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 9344c7d1..db985ac8 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -11,6 +11,7 @@ from google.oauth2 import service_account import asyncpg import redis import time +import random from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc. from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls @@ -31,77 +32,77 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' -# Global rate limiting for Google Calendar API (600 requests per minute sliding window) -GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls' -GOOGLE_API_RATE_LIMIT_PER_MINUTE = 50 -GOOGLE_API_WINDOW_SECONDS = 60 +# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms +RATE_LIMIT_KEY = 'google_calendar_api_tokens' +MAX_TOKENS = 5 +REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers +MIN_WAIT = 0.2 # 200ms +JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung async def enforce_global_rate_limit(context=None): - """Enforce global rate limiting for Google Calendar API across all sync processes. - - Uses Redis Lua script for atomic operations to prevent race conditions. - Limits to 600 requests per minute on average. - """ redis_client = redis.Redis( host=Config.REDIS_HOST, port=int(Config.REDIS_PORT), db=int(Config.REDIS_DB_CALENDAR_SYNC), socket_timeout=Config.REDIS_TIMEOUT_SECONDS ) - - # Lua script for atomic rate limiting + lua_script = """ local key = KEYS[1] - local current_time = tonumber(ARGV[1]) - local window_seconds = tonumber(ARGV[2]) - local limit = tonumber(ARGV[3]) - local request_id = ARGV[4] - - -- Remove old entries outside the sliding window - local window_start = current_time - window_seconds - redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start) - - -- Count current requests in window - local current_count = redis.call('ZCARD', key) - - -- If over limit, calculate wait time until oldest entry falls out - if current_count >= limit then - local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES') - if #oldest > 0 then - local oldest_time = tonumber(oldest[2]) - local wait_time = (oldest_time + window_seconds) - current_time + 0.001 - if wait_time > 0 then - return {0, wait_time} -- Don't add, return wait time - end - end + local current_time_ms = tonumber(ARGV[1]) + local max_tokens = tonumber(ARGV[2]) + local refill_rate_per_ms = tonumber(ARGV[3]) -- Float, aber Multiplikation okay + local min_wait_ms = tonumber(ARGV[4]) + + local data = redis.call('HMGET', key, 'tokens', 'last_refill_ms') + local tokens = tonumber(data[1]) or max_tokens + local last_refill_ms = tonumber(data[2]) or current_time_ms + + -- Refill: Integer ms, Float-Multi für Tokens (dann floor/ceil wenn nötig) + local elapsed_ms = current_time_ms - last_refill_ms + local added_tokens = elapsed_ms * refill_rate_per_ms -- Float + local new_tokens = math.min(max_tokens, tokens + added_tokens) + + local wait_ms = 0 + if new_tokens < 1 then + wait_ms = math.ceil((1 - new_tokens) / refill_rate_per_ms) -- Ceil zu Integer >0 + else + new_tokens = new_tokens - 1 + end + + if wait_ms == 0 then + redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill_ms', current_time_ms) + redis.call('EXPIRE', key, 120) + return {1, 0} + else + return {0, math.max(min_wait_ms, wait_ms) / 1000.0} -- Back to seconds, min enforced end - - -- Add new request to the window - redis.call('ZADD', key, current_time, request_id) - redis.call('EXPIRE', key, window_seconds * 2) - return {1, 0} -- Added successfully, no wait needed """ - + try: - current_time = time.time() - request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}" - - # Register and execute Lua script script = redis_client.register_script(lua_script) - result = script( - keys=[GOOGLE_API_RATE_LIMIT_KEY], - args=[current_time, GOOGLE_API_WINDOW_SECONDS, GOOGLE_API_RATE_LIMIT_PER_MINUTE, request_id] - ) - - added, wait_time = result[0], result[1] - - if not added and wait_time > 0: - log_operation('info', f"Rate limit: waiting {wait_time:.2f}s before next API call", context=context) - await asyncio.sleep(wait_time) + while True: + current_time_ms = int(time.time() * 1000) # ms Integer + + result = script( + keys=[RATE_LIMIT_KEY], + args=[current_time_ms, MAX_TOKENS, REFILL_RATE_PER_MS, int(MIN_WAIT * 1000)] + ) + + added, wait_time = result[0], result[1] + + if added: + log_operation('info', "Rate limit acquired successfully", context=context) + return + + # Add Jitter für Burst-Glättung + wait_time += random.uniform(0, JITTER_MAX) + log_operation('debug', f"Rate limit: waiting {wait_time:.2f}s before retry", context=context) + await asyncio.sleep(wait_time) # Immer >= MIN_WAIT + except Exception as e: - log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context) - # Continue without rate limiting if Redis fails + log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context) def log_operation(level, message, context=None, **context_vars): """Centralized logging with context, supporting Motia workbench logging.""" @@ -162,11 +163,10 @@ async def get_google_service(context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def ensure_google_calendar(service, employee_kuerzel, context=None): """Ensure Google Calendar exists for employee and has correct ACL.""" - # Enforce global rate limiting for calendar operations - await enforce_global_rate_limit(context) - calendar_name = f"AW-{employee_kuerzel}" try: + # Enforce rate limiting for calendar list fetch + await enforce_global_rate_limit(context) calendar_list = service.calendarList().list().execute() calendar_id = None for calendar in calendar_list.get('items', []): @@ -175,7 +175,8 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None): break if not calendar_id: - # Create new calendar + # Enforce rate limiting for calendar creation + await enforce_global_rate_limit(context) calendar_body = { 'summary': calendar_name, 'timeZone': 'Europe/Berlin' @@ -184,13 +185,8 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None): calendar_id = created['id'] log_operation('info', f"Created new Google calendar {calendar_name} with ID {calendar_id}", context=context) - # Ensure ACL rule exists - acl_rule = { - 'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'}, - 'role': 'owner' - } - - # Check existing ACL rules + # Enforce rate limiting for ACL list fetch + await enforce_global_rate_limit(context) acl_list = service.acl().list(calendarId=calendar_id).execute() acl_exists = False for rule in acl_list.get('items', []): @@ -201,6 +197,12 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None): break if not acl_exists: + # Enforce rate limiting for ACL insert + await enforce_global_rate_limit(context) + acl_rule = { + 'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'}, + 'role': 'owner' + } service.acl().insert(calendarId=calendar_id, body=acl_rule).execute() log_operation('info', f"Added ACL rule for calendar {calendar_name} (ID: {calendar_id})", context=context) else: @@ -234,9 +236,6 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504]) async def fetch_google_events(service, calendar_id, context=None): """Fetch Google events in range.""" - # Enforce global rate limiting for events fetch (batch operation) - await enforce_global_rate_limit(context) - try: time_min = f"{current_year - 2}-01-01T00:00:00Z" time_max = f"{current_year + 10}-12-31T23:59:59Z" @@ -244,6 +243,8 @@ async def fetch_google_events(service, calendar_id, context=None): all_events = [] page_token = None while True: + # Enforce rate limiting for each page fetch + await enforce_global_rate_limit(context) events_result = service.events().list( calendarId=calendar_id, timeMin=time_min, @@ -562,7 +563,6 @@ async def create_google_event(service, calendar_id, data, context=None): created = service.events().insert(calendarId=calendar_id, body=event_body).execute() event_id = created['id'] log_operation('info', f"Created Google event ID: {event_id}", context=context) - await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call return event_id except HttpError as e: log_operation('error', f"Google API error creating event: {e}", context=context) @@ -599,7 +599,6 @@ async def update_google_event(service, calendar_id, event_id, data, context=None try: service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute() log_operation('info', f"Updated Google event ID: {event_id}", context=context) - await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call except HttpError as e: log_operation('error', f"Google API error updating event {event_id}: {e}", context=context) raise @@ -616,7 +615,6 @@ async def delete_google_event(service, calendar_id, event_id, context=None): try: service.events().delete(calendarId=calendar_id, eventId=event_id).execute() log_operation('info', f"Deleted Google event ID: {event_id}", context=context) - await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call except HttpError as e: log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context) raise @@ -722,7 +720,6 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel, ) log_operation('info', f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}", context=context) state['stats']['new_adv_to_google'] += 1 - await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: log_operation('warning', f"Phase 1: Failed to process new Advoware {frnr}: {e}", context=context) @@ -736,6 +733,12 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index']) if not is_already_synced: + # Sicherheitscheck: Überspringe Events, die aus Advoware stammen (enthalten "Advoware" und "frNr" im Summary) + summary = evt.get('summary', '') + if 'Advoware' in summary and 'frNr' in summary: + log_operation('warning', f"Skipping sync back to Advoware for Google event {event_id} as it appears to be an Advoware-sourced event (summary: {summary})", context=context) + continue + try: frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context) if frnr and str(frnr) != 'None': @@ -792,7 +795,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) - await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): @@ -821,7 +823,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) - await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): @@ -833,7 +834,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id']) log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context) - await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): @@ -866,7 +866,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context) state['stats']['recreated'] += 1 - await asyncio.sleep(0.1) # Small delay to avoid rate limits except Exception as e: log_operation('warning', f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): @@ -934,14 +933,12 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context) state['stats']['updated'] += 1 - await asyncio.sleep(0.1) # Small delay to avoid rate limits elif google_ts and google_ts > row['last_sync']: log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context) await update_google_event(service, calendar_id, event_id, adv_std, context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ)) log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context) - await asyncio.sleep(0.1) # Small delay to avoid rate limits elif row['source_system'] == 'google' and row['advoware_write_allowed']: # Check for changes in source (Google) or unauthorized changes in target (Advoware) google_ts_str = google_data.get('updated', '')