Improve calendar sync: Add Token Bucket rate limiting, security check to prevent syncing Advoware-sourced events back, fix random import

This commit is contained in:
root
2025-10-24 23:56:00 +00:00
parent 6ab7b4a376
commit b4c4bf0a9e
2 changed files with 83 additions and 99 deletions

View File

@@ -3,8 +3,6 @@ import redis
from config import Config
from services.advoware import AdvowareAPI
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
config = {
'type': 'cron',
'name': 'Calendar Sync Cron Job',
@@ -14,28 +12,17 @@ config = {
'flows': ['advoware']
}
async def get_advoware_employees(context, advoware):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
context.logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
context.logger.error(f"Failed to fetch Advoware employees: {e}")
raise
async def handler(context):
try:
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
# Emit sync-all event
await context.emit({
"topic": "calendar_sync_all",
"data": {
"triggered_by": "cron"
}
})
# # Emit sync-all event
# await context.emit({
# "topic": "calendar_sync_all",
# "data": {
# "triggered_by": "cron"
# }
# })
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
return {

View File

@@ -11,6 +11,7 @@ from google.oauth2 import service_account
import asyncpg
import redis
import time
import random
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
@@ -31,77 +32,77 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
# Global rate limiting for Google Calendar API (600 requests per minute sliding window)
GOOGLE_API_RATE_LIMIT_KEY = 'google_calendar_api_calls'
GOOGLE_API_RATE_LIMIT_PER_MINUTE = 50
GOOGLE_API_WINDOW_SECONDS = 60
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
RATE_LIMIT_KEY = 'google_calendar_api_tokens'
MAX_TOKENS = 5
REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers
MIN_WAIT = 0.2 # 200ms
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
async def enforce_global_rate_limit(context=None):
"""Enforce global rate limiting for Google Calendar API across all sync processes.
Uses Redis Lua script for atomic operations to prevent race conditions.
Limits to 600 requests per minute on average.
"""
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
# Lua script for atomic rate limiting
lua_script = """
local key = KEYS[1]
local current_time = tonumber(ARGV[1])
local window_seconds = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local request_id = ARGV[4]
-- Remove old entries outside the sliding window
local window_start = current_time - window_seconds
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- Count current requests in window
local current_count = redis.call('ZCARD', key)
-- If over limit, calculate wait time until oldest entry falls out
if current_count >= limit then
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
if #oldest > 0 then
local oldest_time = tonumber(oldest[2])
local wait_time = (oldest_time + window_seconds) - current_time + 0.001
if wait_time > 0 then
return {0, wait_time} -- Don't add, return wait time
end
end
local current_time_ms = tonumber(ARGV[1])
local max_tokens = tonumber(ARGV[2])
local refill_rate_per_ms = tonumber(ARGV[3]) -- Float, aber Multiplikation okay
local min_wait_ms = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'last_refill_ms')
local tokens = tonumber(data[1]) or max_tokens
local last_refill_ms = tonumber(data[2]) or current_time_ms
-- Refill: Integer ms, Float-Multi für Tokens (dann floor/ceil wenn nötig)
local elapsed_ms = current_time_ms - last_refill_ms
local added_tokens = elapsed_ms * refill_rate_per_ms -- Float
local new_tokens = math.min(max_tokens, tokens + added_tokens)
local wait_ms = 0
if new_tokens < 1 then
wait_ms = math.ceil((1 - new_tokens) / refill_rate_per_ms) -- Ceil zu Integer >0
else
new_tokens = new_tokens - 1
end
if wait_ms == 0 then
redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill_ms', current_time_ms)
redis.call('EXPIRE', key, 120)
return {1, 0}
else
return {0, math.max(min_wait_ms, wait_ms) / 1000.0} -- Back to seconds, min enforced
end
-- Add new request to the window
redis.call('ZADD', key, current_time, request_id)
redis.call('EXPIRE', key, window_seconds * 2)
return {1, 0} -- Added successfully, no wait needed
"""
try:
current_time = time.time()
request_id = f"{asyncio.current_task().get_name()}_{current_time}_{id(context) if context else 'no_context'}"
# Register and execute Lua script
script = redis_client.register_script(lua_script)
result = script(
keys=[GOOGLE_API_RATE_LIMIT_KEY],
args=[current_time, GOOGLE_API_WINDOW_SECONDS, GOOGLE_API_RATE_LIMIT_PER_MINUTE, request_id]
)
added, wait_time = result[0], result[1]
if not added and wait_time > 0:
log_operation('info', f"Rate limit: waiting {wait_time:.2f}s before next API call", context=context)
await asyncio.sleep(wait_time)
while True:
current_time_ms = int(time.time() * 1000) # ms Integer
result = script(
keys=[RATE_LIMIT_KEY],
args=[current_time_ms, MAX_TOKENS, REFILL_RATE_PER_MS, int(MIN_WAIT * 1000)]
)
added, wait_time = result[0], result[1]
if added:
log_operation('info', "Rate limit acquired successfully", context=context)
return
# Add Jitter für Burst-Glättung
wait_time += random.uniform(0, JITTER_MAX)
log_operation('debug', f"Rate limit: waiting {wait_time:.2f}s before retry", context=context)
await asyncio.sleep(wait_time) # Immer >= MIN_WAIT
except Exception as e:
log_operation('warning', f"Rate limiting failed, proceeding without limit: {e}", context=context)
# Continue without rate limiting if Redis fails
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
@@ -162,11 +163,10 @@ async def get_google_service(context=None):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel, context=None):
"""Ensure Google Calendar exists for employee and has correct ACL."""
# Enforce global rate limiting for calendar operations
await enforce_global_rate_limit(context)
calendar_name = f"AW-{employee_kuerzel}"
try:
# Enforce rate limiting for calendar list fetch
await enforce_global_rate_limit(context)
calendar_list = service.calendarList().list().execute()
calendar_id = None
for calendar in calendar_list.get('items', []):
@@ -175,7 +175,8 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None):
break
if not calendar_id:
# Create new calendar
# Enforce rate limiting for calendar creation
await enforce_global_rate_limit(context)
calendar_body = {
'summary': calendar_name,
'timeZone': 'Europe/Berlin'
@@ -184,13 +185,8 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None):
calendar_id = created['id']
log_operation('info', f"Created new Google calendar {calendar_name} with ID {calendar_id}", context=context)
# Ensure ACL rule exists
acl_rule = {
'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'},
'role': 'owner'
}
# Check existing ACL rules
# Enforce rate limiting for ACL list fetch
await enforce_global_rate_limit(context)
acl_list = service.acl().list(calendarId=calendar_id).execute()
acl_exists = False
for rule in acl_list.get('items', []):
@@ -201,6 +197,12 @@ async def ensure_google_calendar(service, employee_kuerzel, context=None):
break
if not acl_exists:
# Enforce rate limiting for ACL insert
await enforce_global_rate_limit(context)
acl_rule = {
'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'},
'role': 'owner'
}
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
log_operation('info', f"Added ACL rule for calendar {calendar_name} (ID: {calendar_id})", context=context)
else:
@@ -234,9 +236,6 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id, context=None):
"""Fetch Google events in range."""
# Enforce global rate limiting for events fetch (batch operation)
await enforce_global_rate_limit(context)
try:
time_min = f"{current_year - 2}-01-01T00:00:00Z"
time_max = f"{current_year + 10}-12-31T23:59:59Z"
@@ -244,6 +243,8 @@ async def fetch_google_events(service, calendar_id, context=None):
all_events = []
page_token = None
while True:
# Enforce rate limiting for each page fetch
await enforce_global_rate_limit(context)
events_result = service.events().list(
calendarId=calendar_id,
timeMin=time_min,
@@ -562,7 +563,6 @@ async def create_google_event(service, calendar_id, data, context=None):
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
log_operation('info', f"Created Google event ID: {event_id}", context=context)
await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call
return event_id
except HttpError as e:
log_operation('error', f"Google API error creating event: {e}", context=context)
@@ -599,7 +599,6 @@ async def update_google_event(service, calendar_id, event_id, data, context=None
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
log_operation('info', f"Updated Google event ID: {event_id}", context=context)
await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call
except HttpError as e:
log_operation('error', f"Google API error updating event {event_id}: {e}", context=context)
raise
@@ -616,7 +615,6 @@ async def delete_google_event(service, calendar_id, event_id, context=None):
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
log_operation('info', f"Deleted Google event ID: {event_id}", context=context)
await asyncio.sleep(0.1) # Rate limit protection: 100ms delay after each Google API call
except HttpError as e:
log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context)
raise
@@ -722,7 +720,6 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel,
)
log_operation('info', f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}", context=context)
state['stats']['new_adv_to_google'] += 1
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 1: Failed to process new Advoware {frnr}: {e}", context=context)
@@ -736,6 +733,12 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad
is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index'])
if not is_already_synced:
# Sicherheitscheck: Überspringe Events, die aus Advoware stammen (enthalten "Advoware" und "frNr" im Summary)
summary = evt.get('summary', '')
if 'Advoware' in summary and 'frNr' in summary:
log_operation('warning', f"Skipping sync back to Advoware for Google event {event_id} as it appears to be an Advoware-sourced event (summary: {summary})", context=context)
continue
try:
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context)
if frnr and str(frnr) != 'None':
@@ -792,7 +795,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():
@@ -821,7 +823,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():
@@ -833,7 +834,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():
@@ -866,7 +866,6 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context)
state['stats']['recreated'] += 1
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():
@@ -934,14 +933,12 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context)
state['stats']['updated'] += 1
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif google_ts and google_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context)
await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Check for changes in source (Google) or unauthorized changes in target (Advoware)
google_ts_str = google_data.get('updated', '')