diff --git a/bitbylaw/README.md b/bitbylaw/README.md index 99968094..6685b748 100644 --- a/bitbylaw/README.md +++ b/bitbylaw/README.md @@ -252,6 +252,7 @@ Beispiel: `turnus=3, turnusArt=1` → `RRULE:FREQ=DAILY;INTERVAL=3;UNTIL=2025122 ### Rate Limiting & Backoff - **Google Calendar API**: 403-Fehler bei Rate-Limits werden mit exponentiellem Backoff (max. 60s) wiederholt +- **Global Rate Limiting**: Redis-basierte Koordination stellt sicher, dass maximal 600 API-Calls pro Minute über alle parallel laufenden Sync-Prozesse hinweg gemacht werden - **Delays**: 100ms zwischen API-Calls zur Vermeidung von Limits - **Retry-Logic**: Max. 4 Versuche mit base=4 diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index e1ebbef3..f48857b2 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -10,6 +10,7 @@ from googleapiclient.errors import HttpError from google.oauth2 import service_account import asyncpg import redis +import time from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc. from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls @@ -30,6 +31,61 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' +# Global rate limiting for Google Calendar API (600 requests per minute sliding window) +GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls' +GOOGLE_API_RATE_LIMIT_PER_MINUTE = 600 +GOOGLE_API_WINDOW_SECONDS = 60 + +async def enforce_global_rate_limit(context=None): + """Enforce global rate limiting for Google Calendar API across all sync processes. + + Uses Redis sorted set to track API calls in a sliding window of 60 seconds. + Limits to 600 requests per minute on average. + """ + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS + ) + + try: + current_time = time.time() + window_start = current_time - GOOGLE_API_WINDOW_SECONDS + + # Remove old entries outside the sliding window + redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start) + + # Count current requests in window + current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf') + + if current_count >= GOOGLE_API_RATE_LIMIT_PER_MINUTE: + # Get the oldest timestamp in the current window + oldest_entries = redis_client.zrange(GOOGLE_API_RATE_LIMIT_KEY, 0, 0, withscores=True) + if oldest_entries: + oldest_time = oldest_entries[0][1] + # Calculate wait time until oldest entry falls out of window + wait_time = (oldest_time + GOOGLE_API_WINDOW_SECONDS) - current_time + 0.001 # + epsilon + if wait_time > 0: + log_operation('info', f"Rate limit: {current_count}/{GOOGLE_API_RATE_LIMIT_PER_MINUTE} requests in window, waiting {wait_time:.2f}s", context=context) + await asyncio.sleep(wait_time) + # After waiting, recount (in case other processes also waited) + current_time = time.time() + window_start = current_time - GOOGLE_API_WINDOW_SECONDS + redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start) + current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf') + + # Add current request to the window + request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}" + redis_client.zadd(GOOGLE_API_RATE_LIMIT_KEY, {request_id: current_time}) + + # Set expiry on the key to prevent unlimited growth (keep for 2 windows) + redis_client.expire(GOOGLE_API_RATE_LIMIT_KEY, GOOGLE_API_WINDOW_SECONDS * 2) + + except Exception as e: + log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context) + # Continue without rate limiting if Redis fails + def log_operation(level, message, context=None, **context_vars): """Centralized logging with context, supporting Motia workbench logging.""" context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None) @@ -89,6 +145,9 @@ async def get_google_service(context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def ensure_google_calendar(service, employee_kuerzel, context=None): """Ensure Google Calendar exists for employee and has correct ACL.""" + # Enforce global rate limiting for calendar operations + await enforce_global_rate_limit(context) + calendar_name = f"AW-{employee_kuerzel}" try: calendar_list = service.calendarList().list().execute() @@ -158,6 +217,9 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504]) async def fetch_google_events(service, calendar_id, context=None): """Fetch Google events in range.""" + # Enforce global rate limiting for events fetch (batch operation) + await enforce_global_rate_limit(context) + try: time_min = f"{current_year - 2}-01-01T00:00:00Z" time_max = f"{current_year + 10}-12-31T23:59:59Z" @@ -457,6 +519,9 @@ async def delete_advoware_appointment(advoware, frnr, context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def create_google_event(service, calendar_id, data, context=None): """Create Google event from standardized data.""" + # Enforce global rate limiting + await enforce_global_rate_limit(context) + start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0) @@ -492,6 +557,9 @@ async def create_google_event(service, calendar_id, data, context=None): @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def update_google_event(service, calendar_id, event_id, data, context=None): """Update Google event.""" + # Enforce global rate limiting + await enforce_global_rate_limit(context) + start_dt = data['start'].astimezone(BERLIN_TZ) end_dt = data['end'].astimezone(BERLIN_TZ) all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0) @@ -525,6 +593,9 @@ async def update_google_event(service, calendar_id, event_id, data, context=None @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def delete_google_event(service, calendar_id, event_id, context=None): """Delete Google event.""" + # Enforce global rate limiting + await enforce_global_rate_limit(context) + try: service.events().delete(calendarId=calendar_id, eventId=event_id).execute() log_operation('info', f"Deleted Google event ID: {event_id}", context=context)