Compare commits

..

3 Commits

Author SHA1 Message Date
root
96eabe3db6 Refactor calendar sync to prioritize oldest synced employees with human-readable timestamps 2025-10-26 08:58:48 +00:00
root
b18e770f12 refactor: extract common functions to utils
- Add get_redis_client() to calendar_sync_utils.py
- Add get_advoware_employees() to calendar_sync_utils.py
- Add set_employee_lock() and clear_employee_lock() to calendar_sync_utils.py
- Update all step files to use shared utility functions
- Remove duplicate code across calendar_sync_*.py files
2025-10-25 09:21:45 +00:00
root
e4bf21e676 Refactor: Extract common utilities to shared module
- Created calendar_sync_utils.py with shared functions:
  - log_operation: Centralized logging with context support
  - connect_db: Database connection wrapper
  - get_google_service: Google Calendar API initialization
- Updated imports in calendar_sync_event_step.py and audit_calendar_sync.py
- Removed duplicated function definitions
- Maintained function logic without changes
- Improved code maintainability and reduced duplications
2025-10-25 08:54:54 +00:00
6 changed files with 173 additions and 156 deletions

View File

@@ -7,6 +7,7 @@ import pytz
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from config import Config from config import Config
from services.advoware import AdvowareAPI from services.advoware import AdvowareAPI
from .calendar_sync_utils import connect_db, get_google_service
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
from google.oauth2 import service_account from google.oauth2 import service_account
@@ -23,35 +24,7 @@ BERLIN_TZ = pytz.timezone('Europe/Berlin')
now = datetime.now(BERLIN_TZ) now = datetime.now(BERLIN_TZ)
current_year = now.year current_year = now.year
async def connect_db():
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service():
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
async def ensure_google_calendar(service, employee_kuerzel): async def ensure_google_calendar(service, employee_kuerzel):
"""Ensure Google Calendar exists for employee.""" """Ensure Google Calendar exists for employee."""

View File

@@ -1,64 +1,70 @@
import json import json
import redis import redis
import math
import time
from datetime import datetime
from config import Config from config import Config
from services.advoware import AdvowareAPI from services.advoware import AdvowareAPI
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock
config = { config = {
'type': 'event', 'type': 'event',
'name': 'Calendar Sync All Step', 'name': 'Calendar Sync All Step',
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter', 'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für die ältesten Mitarbeiter',
'subscribes': ['calendar_sync_all'], 'subscribes': ['calendar_sync_all'],
'emits': ['calendar_sync_employee'], 'emits': ['calendar_sync_employee'],
'flows': ['advoware'] 'flows': ['advoware']
} }
async def get_advoware_employees(context, advoware):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
context.logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
context.logger.error(f"Failed to fetch Advoware employees: {e}")
raise
async def handler(event_data, context): async def handler(event_data, context):
try: try:
triggered_by = event_data.get('triggered_by', 'unknown') triggered_by = event_data.get('triggered_by', 'unknown')
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}") context.logger.info(f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}")
# Initialize Advoware API # Initialize Advoware API
advoware = AdvowareAPI(context) advoware = AdvowareAPI(context)
# Fetch employees # Fetch employees
employees = await get_advoware_employees(context, advoware) employees = await get_advoware_employees(advoware, context)
if not employees: if not employees:
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.") context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
# Emit event for each employee redis_client = get_redis_client(context)
# Collect last_synced timestamps
employee_timestamps = {}
for employee in employees: for employee in employees:
kuerzel = employee.get('kuerzel') kuerzel = employee.get('kuerzel')
if not kuerzel: if not kuerzel:
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
continue continue
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
timestamp_str = redis_client.get(employee_last_synced_key)
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
employee_timestamps[kuerzel] = timestamp
# # DEBUG: Nur für konfigurierte Nutzer syncen # Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL: sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
# continue
# Log the sorted list with timestamps
def format_timestamp(ts):
if ts == 0:
return "never"
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
context.logger.info(f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}")
# Calculate number to sync: ceil(N / 10)
num_to_sync = math.ceil(len(sorted_kuerzel) / 10)
context.logger.info(f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest")
# Emit for the oldest num_to_sync employees, if not locked
emitted_count = 0
for kuerzel in sorted_kuerzel[:num_to_sync]:
employee_lock_key = f'calendar_sync_lock_{kuerzel}' employee_lock_key = f'calendar_sync_lock_{kuerzel}'
redis_client = redis.Redis( if not set_employee_lock(redis_client, kuerzel, triggered_by, context):
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe") context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
continue continue
@@ -70,12 +76,14 @@ async def handler(event_data, context):
"triggered_by": triggered_by "triggered_by": triggered_by
} }
}) })
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}") context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})")
emitted_count += 1
context.logger.info("Calendar Sync All: Completed emitting events for employees") context.logger.info(f"Calendar Sync All: Completed, emitted {emitted_count} events")
return { return {
'status': 'completed', 'status': 'completed',
'triggered_by': triggered_by 'triggered_by': triggered_by,
'emitted_count': emitted_count
} }
except Exception as e: except Exception as e:

View File

@@ -1,6 +1,7 @@
import json import json
import redis import redis
from config import Config from config import Config
from .calendar_sync_utils import get_redis_client, set_employee_lock
config = { config = {
'type': 'api', 'type': 'api',
@@ -50,14 +51,9 @@ async def handler(req, context):
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}' employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft # Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
redis_client = redis.Redis( redis_client = get_redis_client(context)
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None: if not set_employee_lock(redis_client, kuerzel_upper, 'api', context):
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe") context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
return { return {
'status': 409, 'status': 409,

View File

@@ -6,8 +6,8 @@ from services.advoware import AdvowareAPI
config = { config = {
'type': 'cron', 'type': 'cron',
'name': 'Calendar Sync Cron Job', 'name': 'Calendar Sync Cron Job',
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus', 'description': 'Führt den Calendar Sync alle 1 Minuten automatisch aus',
'cron': '*/5 * * * *', # Alle 5 Minuten 'cron': '*/1 * * * *', # Alle 1 Minute
'emits': ['calendar_sync_all'], 'emits': ['calendar_sync_all'],
'flows': ['advoware'] 'flows': ['advoware']
} }
@@ -17,12 +17,12 @@ async def handler(context):
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event") context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
# # Emit sync-all event # # Emit sync-all event
# await context.emit({ await context.emit({
# "topic": "calendar_sync_all", "topic": "calendar_sync_all",
# "data": { "data": {
# "triggered_by": "cron" "triggered_by": "cron"
# } }
# }) })
context.logger.info("Calendar Sync Cron: Emitted sync-all event") context.logger.info("Calendar Sync Cron: Emitted sync-all event")
return { return {

View File

@@ -14,6 +14,7 @@ import time
import random import random
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc. from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
from .calendar_sync_utils import connect_db, get_google_service, log_operation, get_redis_client, get_advoware_employees, set_employee_lock, clear_employee_lock
# Setup logging # Setup logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -30,13 +31,11 @@ current_year = now.year
FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year
FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms # Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
RATE_LIMIT_KEY = 'google_calendar_api_tokens' RATE_LIMIT_KEY = 'google_calendar_api_tokens'
MAX_TOKENS = 5 MAX_TOKENS = 7
REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers REFILL_RATE_PER_MS = 7 / 1000 # Float nur hier; Ops mit Integers
MIN_WAIT = 0.2 # 200ms MIN_WAIT = 0.1 # 100ms
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
async def enforce_global_rate_limit(context=None): async def enforce_global_rate_limit(context=None):
@@ -104,61 +103,6 @@ async def enforce_global_rate_limit(context=None):
except Exception as e: except Exception as e:
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context) log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if context:
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel, context=None): async def ensure_google_calendar(service, employee_kuerzel, context=None):
@@ -673,17 +617,6 @@ async def safe_advoware_operation(operation, write_allowed, context=None, *args,
return None return None
return await operation(*args, **kwargs) return await operation(*args, **kwargs)
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
async def get_advoware_timestamp(advoware, frnr, context=None): async def get_advoware_timestamp(advoware, frnr, context=None):
"""Fetch the last modified timestamp for an Advoware appointment.""" """Fetch the last modified timestamp for an Advoware appointment."""
try: try:
@@ -983,7 +916,6 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context) log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction(): async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
async def handler(event_data, context): async def handler(event_data, context):
"""Main event handler for calendar sync.""" """Main event handler for calendar sync."""
@@ -996,12 +928,7 @@ async def handler(event_data, context):
log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context) log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context)
redis_client = redis.Redis( redis_client = get_redis_client(context)
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
try: try:
@@ -1105,10 +1032,7 @@ async def handler(event_data, context):
return {'status': 500, 'body': {'error': str(e)}} return {'status': 500, 'body': {'error': str(e)}}
finally: finally:
# Ensure lock is always released # Ensure lock is always released
try: clear_employee_lock(redis_client, kuerzel, context)
redis_client.delete(employee_lock_key)
except Exception:
pass # Ignore errors when deleting lock
# Motia Step Configuration # Motia Step Configuration
config = { config = {

View File

@@ -0,0 +1,116 @@
import logging
import asyncpg
import os
import redis
from config import Config
from googleapiclient.discovery import build
from google.oauth2 import service_account
logger = logging.getLogger(__name__)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if context:
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
def get_redis_client(context=None):
"""Initialize Redis client for calendar sync operations."""
try:
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
return redis_client
except Exception as e:
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
raise
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
"""Set lock for employee sync operation."""
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
return False
return True
def clear_employee_lock(redis_client, kuerzel, context=None):
"""Clear lock for employee sync operation and update last-synced timestamp."""
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
# Update last-synced timestamp (no TTL, persistent)
import time
current_time = int(time.time())
redis_client.set(employee_last_synced_key, current_time)
# Delete the lock
redis_client.delete(employee_lock_key)
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
except Exception as e:
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)