Files
motia/bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
root e4bf21e676 Refactor: Extract common utilities to shared module
- Created calendar_sync_utils.py with shared functions:
  - log_operation: Centralized logging with context support
  - connect_db: Database connection wrapper
  - get_google_service: Google Calendar API initialization
- Updated imports in calendar_sync_event_step.py and audit_calendar_sync.py
- Removed duplicated function definitions
- Maintained function logic without changes
- Improved code maintainability and reduced duplications
2025-10-25 08:54:54 +00:00

64 lines
2.4 KiB
Python

import logging
import asyncpg
import os
from config import Config
from googleapiclient.discovery import build
from google.oauth2 import service_account
logger = logging.getLogger(__name__)
def log_operation(level, message, context=None, **context_vars):
"""Centralized logging with context, supporting Motia workbench logging."""
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if context:
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise