diff --git a/services/calendar_sync_utils.py b/services/calendar_sync_utils.py new file mode 100644 index 0000000..59f03e7 --- /dev/null +++ b/services/calendar_sync_utils.py @@ -0,0 +1,133 @@ +""" +Calendar Sync Utilities + +Shared utility functions for calendar synchronization between Google Calendar and Advoware. +""" +import asyncpg +import os +import redis +import time +from typing import Optional, Any, List +from googleapiclient.discovery import build +from google.oauth2 import service_account +from services.logging_utils import get_service_logger + + +def get_logger(context=None): + """Get logger for calendar sync operations""" + return get_service_logger('calendar_sync', context) + + +def log_operation(level: str, message: str, context=None, **extra): + """ + Log calendar sync operations with structured context. + + Args: + level: Log level ('debug', 'info', 'warning', 'error') + message: Log message + context: FlowContext if available + **extra: Additional key-value pairs to log + """ + logger = get_logger(context) + log_func = getattr(logger, level.lower(), logger.info) + + if extra: + extra_str = " | " + " | ".join(f"{k}={v}" for k, v in extra.items()) + log_func(message + extra_str) + else: + log_func(message) + + +async def connect_db(context=None): + """Connect to Postgres DB from environment variables.""" + logger = get_logger(context) + try: + conn = await asyncpg.connect( + host=os.getenv('POSTGRES_HOST', 'localhost'), + user=os.getenv('POSTGRES_USER', 'calendar_sync_user'), + password=os.getenv('POSTGRES_PASSWORD', 'default_password'), + database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'), + timeout=10 + ) + return conn + except Exception as e: + logger.error(f"Failed to connect to DB: {e}") + raise + + +async def get_google_service(context=None): + """Initialize Google Calendar service.""" + logger = get_logger(context) + try: + service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json') + if not os.path.exists(service_account_path): + raise FileNotFoundError(f"Service account file not found: {service_account_path}") + + scopes = ['https://www.googleapis.com/auth/calendar'] + creds = service_account.Credentials.from_service_account_file( + service_account_path, scopes=scopes + ) + service = build('calendar', 'v3', credentials=creds) + return service + except Exception as e: + logger.error(f"Failed to initialize Google service: {e}") + raise + + +def get_redis_client(context=None) -> redis.Redis: + """Initialize Redis client for calendar sync operations.""" + logger = get_logger(context) + try: + redis_client = redis.Redis( + host=os.getenv('REDIS_HOST', 'localhost'), + port=int(os.getenv('REDIS_PORT', '6379')), + db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')), + socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')), + decode_responses=True + ) + return redis_client + except Exception as e: + logger.error(f"Failed to initialize Redis client: {e}") + raise + + +async def get_advoware_employees(advoware, context=None) -> List[Any]: + """Fetch list of employees from Advoware.""" + logger = get_logger(context) + try: + result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) + employees = result if isinstance(result, list) else [] + logger.info(f"Fetched {len(employees)} Advoware employees") + return employees + except Exception as e: + logger.error(f"Failed to fetch Advoware employees: {e}") + raise + + +def set_employee_lock(redis_client: redis.Redis, kuerzel: str, triggered_by: str, context=None) -> bool: + """Set lock for employee sync operation.""" + logger = get_logger(context) + employee_lock_key = f'calendar_sync_lock_{kuerzel}' + if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None: + logger.info(f"Sync already active for {kuerzel}, skipping") + return False + return True + + +def clear_employee_lock(redis_client: redis.Redis, kuerzel: str, context=None) -> None: + """Clear lock for employee sync operation and update last-synced timestamp.""" + logger = get_logger(context) + try: + employee_lock_key = f'calendar_sync_lock_{kuerzel}' + employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}' + + # Update last-synced timestamp (no TTL, persistent) + current_time = int(time.time()) + redis_client.set(employee_last_synced_key, current_time) + + # Delete the lock + redis_client.delete(employee_lock_key) + + logger.debug(f"Cleared lock and updated last-synced for {kuerzel} to {current_time}") + except Exception as e: + logger.warning(f"Failed to clear lock and update last-synced for {kuerzel}: {e}") diff --git a/src/steps/advoware_cal_sync/calendar_sync_all_step.py b/src/steps/advoware_cal_sync/calendar_sync_all_step.py index f6da55e..f743ae5 100644 --- a/src/steps/advoware_cal_sync/calendar_sync_all_step.py +++ b/src/steps/advoware_cal_sync/calendar_sync_all_step.py @@ -4,10 +4,7 @@ Calendar Sync All Step Handles calendar_sync_all event and emits individual sync events for oldest employees. Uses Redis to track last sync times and distribute work. """ -import sys -from pathlib import Path -sys.path.insert(0, str(Path(__file__).parent)) -from calendar_sync_utils import ( +from services.calendar_sync_utils import ( get_redis_client, get_advoware_employees, set_employee_lock, diff --git a/src/steps/advoware_cal_sync/calendar_sync_api_step.py b/src/steps/advoware_cal_sync/calendar_sync_api_step.py index b62c3b5..12e3d29 100644 --- a/src/steps/advoware_cal_sync/calendar_sync_api_step.py +++ b/src/steps/advoware_cal_sync/calendar_sync_api_step.py @@ -4,10 +4,7 @@ Calendar Sync API Step HTTP API endpoint for manual calendar sync triggering. Supports syncing a single employee or all employees. """ -import sys -from pathlib import Path -sys.path.insert(0, str(Path(__file__).parent)) -from calendar_sync_utils import get_redis_client, set_employee_lock, get_logger +from services.calendar_sync_utils import get_redis_client, set_employee_lock from motia import http, ApiRequest, ApiResponse, FlowContext diff --git a/src/steps/advoware_cal_sync/calendar_sync_cron_step.py b/src/steps/advoware_cal_sync/calendar_sync_cron_step.py index 91fc474..708da12 100644 --- a/src/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/src/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -4,10 +4,6 @@ Calendar Sync Cron Step Cron trigger for automatic calendar synchronization. Emits calendar_sync_all event to start sync cascade. """ -import sys -from pathlib import Path -sys.path.insert(0, str(Path(__file__).parent)) -from calendar_sync_utils import log_operation from typing import Dict, Any from motia import cron, FlowContext diff --git a/src/steps/advoware_cal_sync/calendar_sync_event_step.py b/src/steps/advoware_cal_sync/calendar_sync_event_step.py index 2dd21a3..7368e19 100644 --- a/src/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/src/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -24,10 +24,7 @@ from googleapiclient.errors import HttpError from google.oauth2 import service_account import redis -import sys -from pathlib import Path -sys.path.insert(0, str(Path(__file__).parent)) -from calendar_sync_utils import ( +from services.calendar_sync_utils import ( connect_db, get_google_service, log_operation,