Files
motia/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
root b18e770f12 refactor: extract common functions to utils
- Add get_redis_client() to calendar_sync_utils.py
- Add get_advoware_employees() to calendar_sync_utils.py
- Add set_employee_lock() and clear_employee_lock() to calendar_sync_utils.py
- Update all step files to use shared utility functions
- Remove duplicate code across calendar_sync_*.py files
2025-10-25 09:21:45 +00:00

106 lines
4.1 KiB
Python

import logging
import asyncpg
import os
import redis
from config import Config
from googleapiclient.discovery import build
from google.oauth2 import service_account
logger = logging.getLogger(__name__)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if context:
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
def get_redis_client(context=None):
"""Initialize Redis client for calendar sync operations."""
try:
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
return redis_client
except Exception as e:
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
raise
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
"""Set lock for employee sync operation."""
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
return False
return True
def clear_employee_lock(redis_client, kuerzel, context=None):
"""Clear lock for employee sync operation."""
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
redis_client.delete(employee_lock_key)
except Exception as e:
log_operation('warning', f"Failed to clear lock for {kuerzel}: {e}", context=context)