""" Calendar Sync Utilities Shared utility functions for calendar synchronization between Google Calendar and Advoware. """ import logging import asyncpg import os import redis import time from googleapiclient.discovery import build from google.oauth2 import service_account # Configure logging logger = logging.getLogger(__name__) def log_operation(level: str, message: str, context=None, **context_vars): """Centralized logging with context, supporting file and console logging.""" context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None) full_message = f"{message} {context_str}".strip() # Use ctx.logger if context is available (Motia III FlowContext) if context and hasattr(context, 'logger'): if level == 'info': context.logger.info(full_message) elif level == 'warning': context.logger.warning(full_message) elif level == 'error': context.logger.error(full_message) elif level == 'debug': context.logger.debug(full_message) else: # Fallback to standard logger if level == 'info': logger.info(full_message) elif level == 'warning': logger.warning(full_message) elif level == 'error': logger.error(full_message) elif level == 'debug': logger.debug(full_message) # Also log to console for journalctl visibility print(f"[{level.upper()}] {full_message}") async def connect_db(context=None): """Connect to Postgres DB from environment variables.""" try: conn = await asyncpg.connect( host=os.getenv('POSTGRES_HOST', 'localhost'), user=os.getenv('POSTGRES_USER', 'calendar_sync_user'), password=os.getenv('POSTGRES_PASSWORD', 'default_password'), database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'), timeout=10 ) return conn except Exception as e: log_operation('error', f"Failed to connect to DB: {e}", context=context) raise async def get_google_service(context=None): """Initialize Google Calendar service.""" try: service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json') if not os.path.exists(service_account_path): raise FileNotFoundError(f"Service account file not found: {service_account_path}") scopes = ['https://www.googleapis.com/auth/calendar'] creds = service_account.Credentials.from_service_account_file( service_account_path, scopes=scopes ) service = build('calendar', 'v3', credentials=creds) return service except Exception as e: log_operation('error', f"Failed to initialize Google service: {e}", context=context) raise def get_redis_client(context=None): """Initialize Redis client for calendar sync operations.""" try: redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', '6379')), db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')), socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')) ) return redis_client except Exception as e: log_operation('error', f"Failed to initialize Redis client: {e}", context=context) raise async def get_advoware_employees(advoware, context=None): """Fetch list of employees from Advoware.""" try: result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) employees = result if isinstance(result, list) else [] log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context) return employees except Exception as e: log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context) raise def set_employee_lock(redis_client, kuerzel: str, triggered_by: str, context=None) -> bool: """Set lock for employee sync operation.""" employee_lock_key = f'calendar_sync_lock_{kuerzel}' if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None: log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context) return False return True def clear_employee_lock(redis_client, kuerzel: str, context=None): """Clear lock for employee sync operation and update last-synced timestamp.""" try: employee_lock_key = f'calendar_sync_lock_{kuerzel}' employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}' # Update last-synced timestamp (no TTL, persistent) current_time = int(time.time()) redis_client.set(employee_last_synced_key, current_time) # Delete the lock redis_client.delete(employee_lock_key) log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context) except Exception as e: log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)