diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py index 0b91fbf1..38e8a507 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py @@ -2,6 +2,7 @@ import json import redis from config import Config from services.advoware import AdvowareAPI +from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock config = { 'type': 'event', @@ -12,17 +13,6 @@ config = { 'flows': ['advoware'] } -async def get_advoware_employees(context, advoware): - """Fetch list of employees from Advoware.""" - try: - result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) - employees = result if isinstance(result, list) else [] - context.logger.info(f"Fetched {len(employees)} Advoware employees") - return employees - except Exception as e: - context.logger.error(f"Failed to fetch Advoware employees: {e}") - raise - async def handler(event_data, context): try: triggered_by = event_data.get('triggered_by', 'unknown') @@ -32,7 +22,7 @@ async def handler(event_data, context): advoware = AdvowareAPI(context) # Fetch employees - employees = await get_advoware_employees(context, advoware) + employees = await get_advoware_employees(advoware, context) if not employees: context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.") return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}} @@ -51,14 +41,9 @@ async def handler(event_data, context): employee_lock_key = f'calendar_sync_lock_{kuerzel}' - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + redis_client = get_redis_client(context) - if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None: + if not set_employee_lock(redis_client, kuerzel, triggered_by, context): context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe") continue diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py index 7f87615f..94442cc6 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_api_step.py @@ -1,6 +1,7 @@ import json import redis from config import Config +from .calendar_sync_utils import get_redis_client, set_employee_lock config = { 'type': 'api', @@ -50,14 +51,9 @@ async def handler(req, context): employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}' # Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + redis_client = get_redis_client(context) - if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None: + if not set_employee_lock(redis_client, kuerzel_upper, 'api', context): context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe") return { 'status': 409, diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index 187ff89c..bd4e67e0 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -14,7 +14,7 @@ import time import random from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc. from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls -from .calendar_sync_utils import connect_db, get_google_service, log_operation +from .calendar_sync_utils import connect_db, get_google_service, log_operation, get_redis_client, get_advoware_employees, set_employee_lock, clear_employee_lock # Setup logging logger = logging.getLogger(__name__) @@ -617,17 +617,6 @@ async def safe_advoware_operation(operation, write_allowed, context=None, *args, return None return await operation(*args, **kwargs) -async def get_advoware_employees(advoware, context=None): - """Fetch list of employees from Advoware.""" - try: - result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) - employees = result if isinstance(result, list) else [] - log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context) - return employees - except Exception as e: - log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context) - raise - async def get_advoware_timestamp(advoware, frnr, context=None): """Fetch the last modified timestamp for an Advoware appointment.""" try: @@ -939,12 +928,7 @@ async def handler(event_data, context): log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context) - redis_client = redis.Redis( - host=Config.REDIS_HOST, - port=int(Config.REDIS_PORT), - db=int(Config.REDIS_DB_CALENDAR_SYNC), - socket_timeout=Config.REDIS_TIMEOUT_SECONDS - ) + redis_client = get_redis_client(context) try: @@ -1048,10 +1032,7 @@ async def handler(event_data, context): return {'status': 500, 'body': {'error': str(e)}} finally: # Ensure lock is always released - try: - redis_client.delete(employee_lock_key) - except Exception: - pass # Ignore errors when deleting lock + clear_employee_lock(redis_client, kuerzel, context) # Motia Step Configuration config = { diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py index d7044dbd..07548a95 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py @@ -1,6 +1,7 @@ import logging import asyncpg import os +import redis from config import Config from googleapiclient.discovery import build from google.oauth2 import service_account @@ -61,4 +62,45 @@ async def get_google_service(context=None): return service except Exception as e: log_operation('error', f"Failed to initialize Google service: {e}", context=context) - raise \ No newline at end of file + raise + +def get_redis_client(context=None): + """Initialize Redis client for calendar sync operations.""" + try: + redis_client = redis.Redis( + host=Config.REDIS_HOST, + port=int(Config.REDIS_PORT), + db=int(Config.REDIS_DB_CALENDAR_SYNC), + socket_timeout=Config.REDIS_TIMEOUT_SECONDS + ) + return redis_client + except Exception as e: + log_operation('error', f"Failed to initialize Redis client: {e}", context=context) + raise + +async def get_advoware_employees(advoware, context=None): + """Fetch list of employees from Advoware.""" + try: + result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'}) + employees = result if isinstance(result, list) else [] + log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context) + return employees + except Exception as e: + log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context) + raise + +def set_employee_lock(redis_client, kuerzel, triggered_by, context=None): + """Set lock for employee sync operation.""" + employee_lock_key = f'calendar_sync_lock_{kuerzel}' + if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None: + log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context) + return False + return True + +def clear_employee_lock(redis_client, kuerzel, context=None): + """Clear lock for employee sync operation.""" + try: + employee_lock_key = f'calendar_sync_lock_{kuerzel}' + redis_client.delete(employee_lock_key) + except Exception as e: + log_operation('warning', f"Failed to clear lock for {kuerzel}: {e}", context=context) \ No newline at end of file