Files
motia-iii/steps/advoware_cal_sync/calendar_sync_utils.py

134 lines
5.1 KiB
Python

"""
Calendar Sync Utilities
Shared utility functions for calendar synchronization between Google Calendar and Advoware.
"""
import logging
import asyncpg
import os
import redis
import time
from googleapiclient.discovery import build
from google.oauth2 import service_account
# Configure logging
logger = logging.getLogger(__name__)
def log_operation(level: str, message: str, context=None, **context_vars):
"""Centralized logging with context, supporting file and console logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
# Use ctx.logger if context is available (Motia III FlowContext)
if context and hasattr(context, 'logger'):
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
elif level == 'debug':
context.logger.debug(full_message)
else:
# Fallback to standard logger
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
# Also log to console for journalctl visibility
print(f"[{level.upper()}] {full_message}")
async def connect_db(context=None):
"""Connect to Postgres DB from environment variables."""
try:
conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
user=os.getenv('POSTGRES_USER', 'calendar_sync_user'),
password=os.getenv('POSTGRES_PASSWORD', 'default_password'),
database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'),
timeout=10
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
scopes = ['https://www.googleapis.com/auth/calendar']
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=scopes
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
def get_redis_client(context=None):
"""Initialize Redis client for calendar sync operations."""
try:
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')),
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
)
return redis_client
except Exception as e:
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
raise
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
def set_employee_lock(redis_client, kuerzel: str, triggered_by: str, context=None) -> bool:
"""Set lock for employee sync operation."""
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
return False
return True
def clear_employee_lock(redis_client, kuerzel: str, context=None):
"""Clear lock for employee sync operation and update last-synced timestamp."""
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
# Update last-synced timestamp (no TTL, persistent)
current_time = int(time.time())
redis_client.set(employee_last_synced_key, current_time)
# Delete the lock
redis_client.delete(employee_lock_key)
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
except Exception as e:
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)