diff --git a/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py b/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py index 35e2910c..371e3425 100644 --- a/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py +++ b/bitbylaw/steps/advoware_cal_sync/audit_calendar_sync.py @@ -7,6 +7,7 @@ import pytz sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) from config import Config from services.advoware import AdvowareAPI +from .calendar_sync_utils import connect_db, get_google_service from googleapiclient.discovery import build from googleapiclient.errors import HttpError from google.oauth2 import service_account @@ -23,35 +24,7 @@ BERLIN_TZ = pytz.timezone('Europe/Berlin') now = datetime.now(BERLIN_TZ) current_year = now.year -async def connect_db(): - """Connect to Postgres DB from Config.""" - try: - conn = await asyncpg.connect( - host=Config.POSTGRES_HOST or 'localhost', - user=Config.POSTGRES_USER, - password=Config.POSTGRES_PASSWORD, - database=Config.POSTGRES_DB_NAME, - timeout=10 - ) - return conn - except Exception as e: - logger.error(f"Failed to connect to DB: {e}") - raise -async def get_google_service(): - """Initialize Google Calendar service.""" - try: - service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH - if not os.path.exists(service_account_path): - raise FileNotFoundError(f"Service account file not found: {service_account_path}") - creds = service_account.Credentials.from_service_account_file( - service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES - ) - service = build('calendar', 'v3', credentials=creds) - return service - except Exception as e: - logger.error(f"Failed to initialize Google service: {e}") - raise async def ensure_google_calendar(service, employee_kuerzel): """Ensure Google Calendar exists for employee.""" diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py index aae071cb..187ff89c 100644 --- a/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -14,6 +14,7 @@ import time import random from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc. from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls +from .calendar_sync_utils import connect_db, get_google_service, log_operation # Setup logging logger = logging.getLogger(__name__) @@ -30,8 +31,6 @@ current_year = now.year FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead -CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' - # Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms RATE_LIMIT_KEY = 'google_calendar_api_tokens' MAX_TOKENS = 5 @@ -104,61 +103,6 @@ async def enforce_global_rate_limit(context=None): except Exception as e: log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context) -def log_operation(level, message, context=None, **context_vars): - """Centralized logging with context, supporting Motia workbench logging.""" - context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None) - full_message = f"{message} {context_str}".strip() - if context: - if level == 'info': - context.logger.info(full_message) - elif level == 'warning': - if hasattr(context.logger, 'warn'): - context.logger.warn(full_message) - else: - context.logger.warning(full_message) - elif level == 'error': - context.logger.error(full_message) - # elif level == 'debug': - # context.logger.debug(full_message)dddd - else: - if level == 'info': - logger.info(full_message) - elif level == 'warning': - logger.warning(full_message) - elif level == 'error': - logger.error(full_message) - elif level == 'debug': - logger.debug(full_message) - -async def connect_db(context=None): - """Connect to Postgres DB from Config.""" - try: - conn = await asyncpg.connect( - host=Config.POSTGRES_HOST or 'localhost', - user=Config.POSTGRES_USER, - password=Config.POSTGRES_PASSWORD, - database=Config.POSTGRES_DB_NAME, - timeout=10 - ) - return conn - except Exception as e: - log_operation('error', f"Failed to connect to DB: {e}", context=context) - raise - -async def get_google_service(context=None): - """Initialize Google Calendar service.""" - try: - service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH - if not os.path.exists(service_account_path): - raise FileNotFoundError(f"Service account file not found: {service_account_path}") - creds = service_account.Credentials.from_service_account_file( - service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES - ) - service = build('calendar', 'v3', credentials=creds) - return service - except Exception as e: - log_operation('error', f"Failed to initialize Google service: {e}", context=context) - raise @backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504]) async def ensure_google_calendar(service, employee_kuerzel, context=None): @@ -983,7 +927,6 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware, log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context) async with conn.transaction(): await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id']) -CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock' async def handler(event_data, context): """Main event handler for calendar sync.""" diff --git a/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py b/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py new file mode 100644 index 00000000..d7044dbd --- /dev/null +++ b/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py @@ -0,0 +1,64 @@ +import logging +import asyncpg +import os +from config import Config +from googleapiclient.discovery import build +from google.oauth2 import service_account + +logger = logging.getLogger(__name__) + +def log_operation(level, message, context=None, **context_vars): + """Centralized logging with context, supporting Motia workbench logging.""" + context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None) + full_message = f"{message} {context_str}".strip() + if context: + if level == 'info': + context.logger.info(full_message) + elif level == 'warning': + if hasattr(context.logger, 'warn'): + context.logger.warn(full_message) + else: + context.logger.warning(full_message) + elif level == 'error': + context.logger.error(full_message) + # elif level == 'debug': + # context.logger.debug(full_message)dddd + else: + if level == 'info': + logger.info(full_message) + elif level == 'warning': + logger.warning(full_message) + elif level == 'error': + logger.error(full_message) + elif level == 'debug': + logger.debug(full_message) + +async def connect_db(context=None): + """Connect to Postgres DB from Config.""" + try: + conn = await asyncpg.connect( + host=Config.POSTGRES_HOST or 'localhost', + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + database=Config.POSTGRES_DB_NAME, + timeout=10 + ) + return conn + except Exception as e: + log_operation('error', f"Failed to connect to DB: {e}", context=context) + raise + +async def get_google_service(context=None): + """Initialize Google Calendar service.""" + try: + service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH + if not os.path.exists(service_account_path): + raise FileNotFoundError(f"Service account file not found: {service_account_path}") + creds = service_account.Credentials.from_service_account_file( + service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES + ) + service = build('calendar', 'v3', credentials=creds) + return service + except Exception as e: + log_operation('error', f"Failed to initialize Google service: {e}", context=context) + raise \ No newline at end of file