diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index f48857b2..9344c7d1 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -33,13 +33,13 @@ CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' # Global rate limiting for Google Calendar API (600 requests per minute sliding window) GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls' -GOOGLE_API_RATE_LIMIT_PER_MINUTE = 600 +GOOGLE_API_RATE_LIMIT_PER_MINUTE = 50 GOOGLE_API_WINDOW_SECONDS = 60 async def enforce_global_rate_limit(context=None): """Enforce global rate limiting for Google Calendar API across all sync processes. - Uses Redis sorted set to track API calls in a sliding window of 60 seconds. + Uses Redis Lua script for atomic operations to prevent race conditions. Limits to 600 requests per minute on average. """ redis_client = redis.Redis( @@ -49,38 +49,55 @@ async def enforce_global_rate_limit(context=None): socket_timeout=Config.REDIS_TIMEOUT_SECONDS ) + # Lua script for atomic rate limiting + lua_script = """ + local key = KEYS[1] + local current_time = tonumber(ARGV[1]) + local window_seconds = tonumber(ARGV[2]) + local limit = tonumber(ARGV[3]) + local request_id = ARGV[4] + + -- Remove old entries outside the sliding window + local window_start = current_time - window_seconds + redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start) + + -- Count current requests in window + local current_count = redis.call('ZCARD', key) + + -- If over limit, calculate wait time until oldest entry falls out + if current_count >= limit then + local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES') + if #oldest > 0 then + local oldest_time = tonumber(oldest[2]) + local wait_time = (oldest_time + window_seconds) - current_time + 0.001 + if wait_time > 0 then + return {0, wait_time} -- Don't add, return wait time + end + end + end + + -- Add new request to the window + redis.call('ZADD', key, current_time, request_id) + redis.call('EXPIRE', key, window_seconds * 2) + return {1, 0} -- Added successfully, no wait needed + """ + try: current_time = time.time() - window_start = current_time - GOOGLE_API_WINDOW_SECONDS - - # Remove old entries outside the sliding window - redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start) - - # Count current requests in window - current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf') - - if current_count >= GOOGLE_API_RATE_LIMIT_PER_MINUTE: - # Get the oldest timestamp in the current window - oldest_entries = redis_client.zrange(GOOGLE_API_RATE_LIMIT_KEY, 0, 0, withscores=True) - if oldest_entries: - oldest_time = oldest_entries[0][1] - # Calculate wait time until oldest entry falls out of window - wait_time = (oldest_time + GOOGLE_API_WINDOW_SECONDS) - current_time + 0.001 # + epsilon - if wait_time > 0: - log_operation('info', f"Rate limit: {current_count}/{GOOGLE_API_RATE_LIMIT_PER_MINUTE} requests in window, waiting {wait_time:.2f}s", context=context) - await asyncio.sleep(wait_time) - # After waiting, recount (in case other processes also waited) - current_time = time.time() - window_start = current_time - GOOGLE_API_WINDOW_SECONDS - redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start) - current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf') - - # Add current request to the window request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}" - redis_client.zadd(GOOGLE_API_RATE_LIMIT_KEY, {request_id: current_time}) - # Set expiry on the key to prevent unlimited growth (keep for 2 windows) - redis_client.expire(GOOGLE_API_RATE_LIMIT_KEY, GOOGLE_API_WINDOW_SECONDS * 2) + # Register and execute Lua script + script = redis_client.register_script(lua_script) + result = script( + keys=[GOOGLE_API_RATE_LIMIT_KEY], + args=[current_time, GOOGLE_API_WINDOW_SECONDS, GOOGLE_API_RATE_LIMIT_PER_MINUTE, request_id] + ) + + added, wait_time = result[0], result[1] + + if not added and wait_time > 0: + log_operation('info', f"Rate limit: waiting {wait_time:.2f}s before next API call", context=context) + await asyncio.sleep(wait_time) except Exception as e: log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context)