Globales Rate Limiting: 600 Anfragen/Minute über gleitendes Fenster mit Redis
This commit is contained in:
@@ -252,6 +252,7 @@ Beispiel: `turnus=3, turnusArt=1` → `RRULE:FREQ=DAILY;INTERVAL=3;UNTIL=2025122
|
||||
### Rate Limiting & Backoff
|
||||
|
||||
- **Google Calendar API**: 403-Fehler bei Rate-Limits werden mit exponentiellem Backoff (max. 60s) wiederholt
|
||||
- **Global Rate Limiting**: Redis-basierte Koordination stellt sicher, dass maximal 600 API-Calls pro Minute über alle parallel laufenden Sync-Prozesse hinweg gemacht werden
|
||||
- **Delays**: 100ms zwischen API-Calls zur Vermeidung von Limits
|
||||
- **Retry-Logic**: Max. 4 Versuche mit base=4
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from googleapiclient.errors import HttpError
|
||||
from google.oauth2 import service_account
|
||||
import asyncpg
|
||||
import redis
|
||||
import time
|
||||
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
|
||||
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
|
||||
|
||||
@@ -30,6 +31,61 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
|
||||
|
||||
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
||||
|
||||
# Global rate limiting for Google Calendar API (600 requests per minute sliding window)
|
||||
GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls'
|
||||
GOOGLE_API_RATE_LIMIT_PER_MINUTE = 600
|
||||
GOOGLE_API_WINDOW_SECONDS = 60
|
||||
|
||||
async def enforce_global_rate_limit(context=None):
|
||||
"""Enforce global rate limiting for Google Calendar API across all sync processes.
|
||||
|
||||
Uses Redis sorted set to track API calls in a sliding window of 60 seconds.
|
||||
Limits to 600 requests per minute on average.
|
||||
"""
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
try:
|
||||
current_time = time.time()
|
||||
window_start = current_time - GOOGLE_API_WINDOW_SECONDS
|
||||
|
||||
# Remove old entries outside the sliding window
|
||||
redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start)
|
||||
|
||||
# Count current requests in window
|
||||
current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf')
|
||||
|
||||
if current_count >= GOOGLE_API_RATE_LIMIT_PER_MINUTE:
|
||||
# Get the oldest timestamp in the current window
|
||||
oldest_entries = redis_client.zrange(GOOGLE_API_RATE_LIMIT_KEY, 0, 0, withscores=True)
|
||||
if oldest_entries:
|
||||
oldest_time = oldest_entries[0][1]
|
||||
# Calculate wait time until oldest entry falls out of window
|
||||
wait_time = (oldest_time + GOOGLE_API_WINDOW_SECONDS) - current_time + 0.001 # + epsilon
|
||||
if wait_time > 0:
|
||||
log_operation('info', f"Rate limit: {current_count}/{GOOGLE_API_RATE_LIMIT_PER_MINUTE} requests in window, waiting {wait_time:.2f}s", context=context)
|
||||
await asyncio.sleep(wait_time)
|
||||
# After waiting, recount (in case other processes also waited)
|
||||
current_time = time.time()
|
||||
window_start = current_time - GOOGLE_API_WINDOW_SECONDS
|
||||
redis_client.zremrangebyscore(GOOGLE_API_RATE_LIMIT_KEY, '-inf', window_start)
|
||||
current_count = redis_client.zcount(GOOGLE_API_RATE_LIMIT_KEY, window_start, '+inf')
|
||||
|
||||
# Add current request to the window
|
||||
request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}"
|
||||
redis_client.zadd(GOOGLE_API_RATE_LIMIT_KEY, {request_id: current_time})
|
||||
|
||||
# Set expiry on the key to prevent unlimited growth (keep for 2 windows)
|
||||
redis_client.expire(GOOGLE_API_RATE_LIMIT_KEY, GOOGLE_API_WINDOW_SECONDS * 2)
|
||||
|
||||
except Exception as e:
|
||||
log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context)
|
||||
# Continue without rate limiting if Redis fails
|
||||
|
||||
def log_operation(level, message, context=None, **context_vars):
|
||||
"""Centralized logging with context, supporting Motia workbench logging."""
|
||||
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
||||
@@ -89,6 +145,9 @@ async def get_google_service(context=None):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def ensure_google_calendar(service, employee_kuerzel, context=None):
|
||||
"""Ensure Google Calendar exists for employee and has correct ACL."""
|
||||
# Enforce global rate limiting for calendar operations
|
||||
await enforce_global_rate_limit(context)
|
||||
|
||||
calendar_name = f"AW-{employee_kuerzel}"
|
||||
try:
|
||||
calendar_list = service.calendarList().list().execute()
|
||||
@@ -158,6 +217,9 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
||||
async def fetch_google_events(service, calendar_id, context=None):
|
||||
"""Fetch Google events in range."""
|
||||
# Enforce global rate limiting for events fetch (batch operation)
|
||||
await enforce_global_rate_limit(context)
|
||||
|
||||
try:
|
||||
time_min = f"{current_year - 2}-01-01T00:00:00Z"
|
||||
time_max = f"{current_year + 10}-12-31T23:59:59Z"
|
||||
@@ -457,6 +519,9 @@ async def delete_advoware_appointment(advoware, frnr, context=None):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def create_google_event(service, calendar_id, data, context=None):
|
||||
"""Create Google event from standardized data."""
|
||||
# Enforce global rate limiting
|
||||
await enforce_global_rate_limit(context)
|
||||
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
|
||||
@@ -492,6 +557,9 @@ async def create_google_event(service, calendar_id, data, context=None):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def update_google_event(service, calendar_id, event_id, data, context=None):
|
||||
"""Update Google event."""
|
||||
# Enforce global rate limiting
|
||||
await enforce_global_rate_limit(context)
|
||||
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
|
||||
@@ -525,6 +593,9 @@ async def update_google_event(service, calendar_id, event_id, data, context=None
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def delete_google_event(service, calendar_id, event_id, context=None):
|
||||
"""Delete Google event."""
|
||||
# Enforce global rate limiting
|
||||
await enforce_global_rate_limit(context)
|
||||
|
||||
try:
|
||||
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
|
||||
log_operation('info', f"Deleted Google event ID: {event_id}", context=context)
|
||||
|
||||
Reference in New Issue
Block a user