fix: move calendar_sync_utils to services/ and remove sys.path.insert hacks from all calendar sync steps

This commit is contained in:
bsiggel
2026-03-31 07:08:21 +00:00
parent cc70189513
commit e1695e530a
5 changed files with 136 additions and 16 deletions

View File

@@ -0,0 +1,133 @@
"""
Calendar Sync Utilities
Shared utility functions for calendar synchronization between Google Calendar and Advoware.
"""
import asyncpg
import os
import redis
import time
from typing import Optional, Any, List
from googleapiclient.discovery import build
from google.oauth2 import service_account
from services.logging_utils import get_service_logger
def get_logger(context=None):
"""Get logger for calendar sync operations"""
return get_service_logger('calendar_sync', context)
def log_operation(level: str, message: str, context=None, **extra):
"""
Log calendar sync operations with structured context.
Args:
level: Log level ('debug', 'info', 'warning', 'error')
message: Log message
context: FlowContext if available
**extra: Additional key-value pairs to log
"""
logger = get_logger(context)
log_func = getattr(logger, level.lower(), logger.info)
if extra:
extra_str = " | " + " | ".join(f"{k}={v}" for k, v in extra.items())
log_func(message + extra_str)
else:
log_func(message)
async def connect_db(context=None):
"""Connect to Postgres DB from environment variables."""
logger = get_logger(context)
try:
conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
user=os.getenv('POSTGRES_USER', 'calendar_sync_user'),
password=os.getenv('POSTGRES_PASSWORD', 'default_password'),
database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'),
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
logger = get_logger(context)
try:
service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
scopes = ['https://www.googleapis.com/auth/calendar']
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=scopes
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
def get_redis_client(context=None) -> redis.Redis:
"""Initialize Redis client for calendar sync operations."""
logger = get_logger(context)
try:
redis_client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', '6379')),
db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')),
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')),
decode_responses=True
)
return redis_client
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}")
raise
async def get_advoware_employees(advoware, context=None) -> List[Any]:
"""Fetch list of employees from Advoware."""
logger = get_logger(context)
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
logger.error(f"Failed to fetch Advoware employees: {e}")
raise
def set_employee_lock(redis_client: redis.Redis, kuerzel: str, triggered_by: str, context=None) -> bool:
"""Set lock for employee sync operation."""
logger = get_logger(context)
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
logger.info(f"Sync already active for {kuerzel}, skipping")
return False
return True
def clear_employee_lock(redis_client: redis.Redis, kuerzel: str, context=None) -> None:
"""Clear lock for employee sync operation and update last-synced timestamp."""
logger = get_logger(context)
try:
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
# Update last-synced timestamp (no TTL, persistent)
current_time = int(time.time())
redis_client.set(employee_last_synced_key, current_time)
# Delete the lock
redis_client.delete(employee_lock_key)
logger.debug(f"Cleared lock and updated last-synced for {kuerzel} to {current_time}")
except Exception as e:
logger.warning(f"Failed to clear lock and update last-synced for {kuerzel}: {e}")

View File

@@ -4,10 +4,7 @@ Calendar Sync All Step
Handles calendar_sync_all event and emits individual sync events for oldest employees. Handles calendar_sync_all event and emits individual sync events for oldest employees.
Uses Redis to track last sync times and distribute work. Uses Redis to track last sync times and distribute work.
""" """
import sys from services.calendar_sync_utils import (
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import (
get_redis_client, get_redis_client,
get_advoware_employees, get_advoware_employees,
set_employee_lock, set_employee_lock,

View File

@@ -4,10 +4,7 @@ Calendar Sync API Step
HTTP API endpoint for manual calendar sync triggering. HTTP API endpoint for manual calendar sync triggering.
Supports syncing a single employee or all employees. Supports syncing a single employee or all employees.
""" """
import sys from services.calendar_sync_utils import get_redis_client, set_employee_lock
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import get_redis_client, set_employee_lock, get_logger
from motia import http, ApiRequest, ApiResponse, FlowContext from motia import http, ApiRequest, ApiResponse, FlowContext

View File

@@ -4,10 +4,6 @@ Calendar Sync Cron Step
Cron trigger for automatic calendar synchronization. Cron trigger for automatic calendar synchronization.
Emits calendar_sync_all event to start sync cascade. Emits calendar_sync_all event to start sync cascade.
""" """
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import log_operation
from typing import Dict, Any from typing import Dict, Any
from motia import cron, FlowContext from motia import cron, FlowContext

View File

@@ -24,10 +24,7 @@ from googleapiclient.errors import HttpError
from google.oauth2 import service_account from google.oauth2 import service_account
import redis import redis
import sys from services.calendar_sync_utils import (
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from calendar_sync_utils import (
connect_db, connect_db,
get_google_service, get_google_service,
log_operation, log_operation,