Refactor calendar sync to prioritize oldest synced employees with human-readable timestamps

This commit is contained in:
root
2025-10-26 08:58:48 +00:00
parent b18e770f12
commit 96eabe3db6
4 changed files with 59 additions and 26 deletions

View File

@@ -1,5 +1,8 @@
import json import json
import redis import redis
import math
import time
from datetime import datetime
from config import Config from config import Config
from services.advoware import AdvowareAPI from services.advoware import AdvowareAPI
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock
@@ -7,7 +10,7 @@ from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_e
config = { config = {
'type': 'event', 'type': 'event',
'name': 'Calendar Sync All Step', 'name': 'Calendar Sync All Step',
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter', 'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für die ältesten Mitarbeiter',
'subscribes': ['calendar_sync_all'], 'subscribes': ['calendar_sync_all'],
'emits': ['calendar_sync_employee'], 'emits': ['calendar_sync_employee'],
'flows': ['advoware'] 'flows': ['advoware']
@@ -16,7 +19,7 @@ config = {
async def handler(event_data, context): async def handler(event_data, context):
try: try:
triggered_by = event_data.get('triggered_by', 'unknown') triggered_by = event_data.get('triggered_by', 'unknown')
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}") context.logger.info(f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}")
# Initialize Advoware API # Initialize Advoware API
advoware = AdvowareAPI(context) advoware = AdvowareAPI(context)
@@ -27,22 +30,40 @@ async def handler(event_data, context):
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.") context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
# Emit event for each employee redis_client = get_redis_client(context)
# Collect last_synced timestamps
employee_timestamps = {}
for employee in employees: for employee in employees:
kuerzel = employee.get('kuerzel') kuerzel = employee.get('kuerzel')
if not kuerzel: if not kuerzel:
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
continue continue
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
timestamp_str = redis_client.get(employee_last_synced_key)
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
employee_timestamps[kuerzel] = timestamp
# # DEBUG: Nur für konfigurierte Nutzer syncen # Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL: sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
# continue
# Log the sorted list with timestamps
def format_timestamp(ts):
if ts == 0:
return "never"
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
context.logger.info(f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}")
# Calculate number to sync: ceil(N / 10)
num_to_sync = math.ceil(len(sorted_kuerzel) / 10)
context.logger.info(f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest")
# Emit for the oldest num_to_sync employees, if not locked
emitted_count = 0
for kuerzel in sorted_kuerzel[:num_to_sync]:
employee_lock_key = f'calendar_sync_lock_{kuerzel}' employee_lock_key = f'calendar_sync_lock_{kuerzel}'
redis_client = get_redis_client(context)
if not set_employee_lock(redis_client, kuerzel, triggered_by, context): if not set_employee_lock(redis_client, kuerzel, triggered_by, context):
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe") context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
continue continue
@@ -55,12 +76,14 @@ async def handler(event_data, context):
"triggered_by": triggered_by "triggered_by": triggered_by
} }
}) })
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}") context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})")
emitted_count += 1
context.logger.info("Calendar Sync All: Completed emitting events for employees") context.logger.info(f"Calendar Sync All: Completed, emitted {emitted_count} events")
return { return {
'status': 'completed', 'status': 'completed',
'triggered_by': triggered_by 'triggered_by': triggered_by,
'emitted_count': emitted_count
} }
except Exception as e: except Exception as e:

View File

@@ -6,8 +6,8 @@ from services.advoware import AdvowareAPI
config = { config = {
'type': 'cron', 'type': 'cron',
'name': 'Calendar Sync Cron Job', 'name': 'Calendar Sync Cron Job',
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus', 'description': 'Führt den Calendar Sync alle 1 Minuten automatisch aus',
'cron': '*/5 * * * *', # Alle 5 Minuten 'cron': '*/1 * * * *', # Alle 1 Minute
'emits': ['calendar_sync_all'], 'emits': ['calendar_sync_all'],
'flows': ['advoware'] 'flows': ['advoware']
} }
@@ -17,12 +17,12 @@ async def handler(context):
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event") context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
# # Emit sync-all event # # Emit sync-all event
# await context.emit({ await context.emit({
# "topic": "calendar_sync_all", "topic": "calendar_sync_all",
# "data": { "data": {
# "triggered_by": "cron" "triggered_by": "cron"
# } }
# }) })
context.logger.info("Calendar Sync Cron: Emitted sync-all event") context.logger.info("Calendar Sync Cron: Emitted sync-all event")
return { return {

View File

@@ -33,9 +33,9 @@ FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms # Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
RATE_LIMIT_KEY = 'google_calendar_api_tokens' RATE_LIMIT_KEY = 'google_calendar_api_tokens'
MAX_TOKENS = 5 MAX_TOKENS = 7
REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers REFILL_RATE_PER_MS = 7 / 1000 # Float nur hier; Ops mit Integers
MIN_WAIT = 0.2 # 200ms MIN_WAIT = 0.1 # 100ms
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
async def enforce_global_rate_limit(context=None): async def enforce_global_rate_limit(context=None):

View File

@@ -98,9 +98,19 @@ def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
return True return True
def clear_employee_lock(redis_client, kuerzel, context=None): def clear_employee_lock(redis_client, kuerzel, context=None):
"""Clear lock for employee sync operation.""" """Clear lock for employee sync operation and update last-synced timestamp."""
try: try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}' employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
# Update last-synced timestamp (no TTL, persistent)
import time
current_time = int(time.time())
redis_client.set(employee_last_synced_key, current_time)
# Delete the lock
redis_client.delete(employee_lock_key) redis_client.delete(employee_lock_key)
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
except Exception as e: except Exception as e:
log_operation('warning', f"Failed to clear lock for {kuerzel}: {e}", context=context) log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)