117 lines
4.5 KiB
Python
117 lines
4.5 KiB
Python
import logging
|
|
import asyncpg
|
|
import os
|
|
import redis
|
|
import time
|
|
from config import Config
|
|
from googleapiclient.discovery import build
|
|
from google.oauth2 import service_account
|
|
|
|
# Configure logging to file
|
|
logging.basicConfig(
|
|
filename='/opt/motia-app/calendar_sync.log',
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def log_operation(level, message, context=None, **context_vars):
|
|
"""Centralized logging with context, supporting file and console logging."""
|
|
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
|
full_message = f"[{time.time()}] {message} {context_str}".strip()
|
|
|
|
# Log to file via Python logger
|
|
if level == 'info':
|
|
logger.info(full_message)
|
|
elif level == 'warning':
|
|
logger.warning(full_message)
|
|
elif level == 'error':
|
|
logger.error(full_message)
|
|
elif level == 'debug':
|
|
logger.debug(full_message)
|
|
|
|
# Also log to console for journalctl visibility
|
|
print(f"[{level.upper()}] {full_message}")
|
|
|
|
async def connect_db(context=None):
|
|
"""Connect to Postgres DB from Config."""
|
|
try:
|
|
conn = await asyncpg.connect(
|
|
host=Config.POSTGRES_HOST or 'localhost',
|
|
user=Config.POSTGRES_USER,
|
|
password=Config.POSTGRES_PASSWORD,
|
|
database=Config.POSTGRES_DB_NAME,
|
|
timeout=10
|
|
)
|
|
return conn
|
|
except Exception as e:
|
|
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
|
raise
|
|
|
|
async def get_google_service(context=None):
|
|
"""Initialize Google Calendar service."""
|
|
try:
|
|
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
|
if not os.path.exists(service_account_path):
|
|
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
|
creds = service_account.Credentials.from_service_account_file(
|
|
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
|
)
|
|
service = build('calendar', 'v3', credentials=creds)
|
|
return service
|
|
except Exception as e:
|
|
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
|
raise
|
|
|
|
def get_redis_client(context=None):
|
|
"""Initialize Redis client for calendar sync operations."""
|
|
try:
|
|
redis_client = redis.Redis(
|
|
host=Config.REDIS_HOST,
|
|
port=int(Config.REDIS_PORT),
|
|
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
|
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
|
)
|
|
return redis_client
|
|
except Exception as e:
|
|
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
|
|
raise
|
|
|
|
async def get_advoware_employees(advoware, context=None):
|
|
"""Fetch list of employees from Advoware."""
|
|
try:
|
|
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
|
employees = result if isinstance(result, list) else []
|
|
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
|
return employees
|
|
except Exception as e:
|
|
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
|
raise
|
|
|
|
def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
|
|
"""Set lock for employee sync operation."""
|
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
|
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
|
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
|
|
return False
|
|
return True
|
|
|
|
def clear_employee_lock(redis_client, kuerzel, context=None):
|
|
"""Clear lock for employee sync operation and update last-synced timestamp."""
|
|
try:
|
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
|
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
|
|
|
# Update last-synced timestamp (no TTL, persistent)
|
|
import time
|
|
current_time = int(time.time())
|
|
redis_client.set(employee_last_synced_key, current_time)
|
|
|
|
# Delete the lock
|
|
redis_client.delete(employee_lock_key)
|
|
|
|
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
|
|
except Exception as e:
|
|
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context) |