Rate Limiting: Atomisches Lua Script zur Vermeidung von Race Conditions
This commit is contained in:
@@ -33,13 +33,13 @@ CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
||||
|
||||
# Global rate limiting for Google Calendar API (600 requests per minute sliding window)
|
||||
GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls'
|
||||
GOOGLE_API_RATE_LIMIT_PER_MINUTE = 600
|
||||
GOOGLE_API_RATE_LIMIT_PER_MINUTE = 50
|
||||
GOOGLE_API_WINDOW_SECONDS = 60
|
||||
|
||||
async def enforce_global_rate_limit(context=None):
|
||||
"""Enforce global rate limiting for Google Calendar API across all sync processes.
|
||||
|
||||
Uses Redis sorted set to track API calls in a sliding window of 60 seconds.
|
||||
Uses Redis Lua script for atomic operations to prevent race conditions.
|
||||
Limits to 600 requests per minute on average.
|
||||
"""
|
||||
redis_client = redis.Redis(
|
||||
@@ -49,38 +49,55 @@ async def enforce_global_rate_limit(context=None):
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
# Lua script for atomic rate limiting
|
||||
lua_script = """
|
||||
local key = KEYS[1]
|
||||
local current_time = tonumber(ARGV[1])
|
||||
local window_seconds = tonumber(ARGV[2])
|
||||
local limit = tonumber(ARGV[3])
|
||||
local request_id = ARGV[4]
|
||||
|
||||
-- Remove old entries outside the sliding window
|
||||
local window_start = current_time - window_seconds
|
||||
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
|
||||
|
||||
-- Count current requests in window
|
||||
local current_count = redis.call('ZCARD', key)
|
||||
|
||||
-- If over limit, calculate wait time until oldest entry falls out
|
||||
if current_count >= limit then
|
||||
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
|
||||
if #oldest > 0 then
|
||||
local oldest_time = tonumber(oldest[2])
|
||||
local wait_time = (oldest_time + window_seconds) - current_time + 0.001
|
||||
if wait_time > 0 then
|
||||
return {0, wait_time} -- Don't add, return wait time
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- Add new request to the window
|
||||
redis.call('ZADD', key, current_time, request_id)
|
||||
redis.call('EXPIRE', key, window_seconds * 2)
|
||||
return {1, 0} -- Added successfully, no wait needed
|
||||
"""
|
||||
|
||||
try:
|
||||
current_time = time.time()
|
||||
window_start = current_time - GOOGLE_API_WINDOW_SECONDS
|
||||
|
||||
# Remove old entries outside the sliding window
|
||||
redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start)
|
||||
|
||||
# Count current requests in window
|
||||
current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf')
|
||||
|
||||
if current_count >= GOOGLE_API_RATE_LIMIT_PER_MINUTE:
|
||||
# Get the oldest timestamp in the current window
|
||||
oldest_entries = redis_client.zrange(GOOGLE_API_RATE_LIMIT_KEY, 0, 0, withscores=True)
|
||||
if oldest_entries:
|
||||
oldest_time = oldest_entries[0][1]
|
||||
# Calculate wait time until oldest entry falls out of window
|
||||
wait_time = (oldest_time + GOOGLE_API_WINDOW_SECONDS) - current_time + 0.001 # + epsilon
|
||||
if wait_time > 0:
|
||||
log_operation('info', f"Rate limit: {current_count}/{GOOGLE_API_RATE_LIMIT_PER_MINUTE} requests in window, waiting {wait_time:.2f}s", context=context)
|
||||
await asyncio.sleep(wait_time)
|
||||
# After waiting, recount (in case other processes also waited)
|
||||
current_time = time.time()
|
||||
window_start = current_time - GOOGLE_API_WINDOW_SECONDS
|
||||
redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start)
|
||||
current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf')
|
||||
|
||||
# Add current request to the window
|
||||
request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}"
|
||||
redis_client.zadd(GOOGLE_API_RATE_LIMIT_KEY, {request_id: current_time})
|
||||
|
||||
# Set expiry on the key to prevent unlimited growth (keep for 2 windows)
|
||||
redis_client.expire(GOOGLE_API_RATE_LIMIT_KEY, GOOGLE_API_WINDOW_SECONDS * 2)
|
||||
# Register and execute Lua script
|
||||
script = redis_client.register_script(lua_script)
|
||||
result = script(
|
||||
keys=[GOOGLE_API_RATE_LIMIT_KEY],
|
||||
args=[current_time, GOOGLE_API_WINDOW_SECONDS, GOOGLE_API_RATE_LIMIT_PER_MINUTE, request_id]
|
||||
)
|
||||
|
||||
added, wait_time = result[0], result[1]
|
||||
|
||||
if not added and wait_time > 0:
|
||||
log_operation('info', f"Rate limit: waiting {wait_time:.2f}s before next API call", context=context)
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
except Exception as e:
|
||||
log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context)
|
||||
|
||||
Reference in New Issue
Block a user