Compare commits
3 Commits
c5600b42ec
...
96eabe3db6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96eabe3db6 | ||
|
|
b18e770f12 | ||
|
|
e4bf21e676 |
@@ -7,6 +7,7 @@ import pytz
|
|||||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||||
from config import Config
|
from config import Config
|
||||||
from services.advoware import AdvowareAPI
|
from services.advoware import AdvowareAPI
|
||||||
|
from .calendar_sync_utils import connect_db, get_google_service
|
||||||
from googleapiclient.discovery import build
|
from googleapiclient.discovery import build
|
||||||
from googleapiclient.errors import HttpError
|
from googleapiclient.errors import HttpError
|
||||||
from google.oauth2 import service_account
|
from google.oauth2 import service_account
|
||||||
@@ -23,35 +24,7 @@ BERLIN_TZ = pytz.timezone('Europe/Berlin')
|
|||||||
now = datetime.now(BERLIN_TZ)
|
now = datetime.now(BERLIN_TZ)
|
||||||
current_year = now.year
|
current_year = now.year
|
||||||
|
|
||||||
async def connect_db():
|
|
||||||
"""Connect to Postgres DB from Config."""
|
|
||||||
try:
|
|
||||||
conn = await asyncpg.connect(
|
|
||||||
host=Config.POSTGRES_HOST or 'localhost',
|
|
||||||
user=Config.POSTGRES_USER,
|
|
||||||
password=Config.POSTGRES_PASSWORD,
|
|
||||||
database=Config.POSTGRES_DB_NAME,
|
|
||||||
timeout=10
|
|
||||||
)
|
|
||||||
return conn
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to connect to DB: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def get_google_service():
|
|
||||||
"""Initialize Google Calendar service."""
|
|
||||||
try:
|
|
||||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
|
||||||
if not os.path.exists(service_account_path):
|
|
||||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
|
||||||
creds = service_account.Credentials.from_service_account_file(
|
|
||||||
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
|
||||||
)
|
|
||||||
service = build('calendar', 'v3', credentials=creds)
|
|
||||||
return service
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to initialize Google service: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def ensure_google_calendar(service, employee_kuerzel):
|
async def ensure_google_calendar(service, employee_kuerzel):
|
||||||
"""Ensure Google Calendar exists for employee."""
|
"""Ensure Google Calendar exists for employee."""
|
||||||
|
|||||||
@@ -1,64 +1,70 @@
|
|||||||
import json
|
import json
|
||||||
import redis
|
import redis
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
from config import Config
|
from config import Config
|
||||||
from services.advoware import AdvowareAPI
|
from services.advoware import AdvowareAPI
|
||||||
|
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
'type': 'event',
|
'type': 'event',
|
||||||
'name': 'Calendar Sync All Step',
|
'name': 'Calendar Sync All Step',
|
||||||
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter',
|
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für die ältesten Mitarbeiter',
|
||||||
'subscribes': ['calendar_sync_all'],
|
'subscribes': ['calendar_sync_all'],
|
||||||
'emits': ['calendar_sync_employee'],
|
'emits': ['calendar_sync_employee'],
|
||||||
'flows': ['advoware']
|
'flows': ['advoware']
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_advoware_employees(context, advoware):
|
|
||||||
"""Fetch list of employees from Advoware."""
|
|
||||||
try:
|
|
||||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
|
||||||
employees = result if isinstance(result, list) else []
|
|
||||||
context.logger.info(f"Fetched {len(employees)} Advoware employees")
|
|
||||||
return employees
|
|
||||||
except Exception as e:
|
|
||||||
context.logger.error(f"Failed to fetch Advoware employees: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def handler(event_data, context):
|
async def handler(event_data, context):
|
||||||
try:
|
try:
|
||||||
triggered_by = event_data.get('triggered_by', 'unknown')
|
triggered_by = event_data.get('triggered_by', 'unknown')
|
||||||
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}")
|
context.logger.info(f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}")
|
||||||
|
|
||||||
# Initialize Advoware API
|
# Initialize Advoware API
|
||||||
advoware = AdvowareAPI(context)
|
advoware = AdvowareAPI(context)
|
||||||
|
|
||||||
# Fetch employees
|
# Fetch employees
|
||||||
employees = await get_advoware_employees(context, advoware)
|
employees = await get_advoware_employees(advoware, context)
|
||||||
if not employees:
|
if not employees:
|
||||||
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
|
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
|
||||||
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
||||||
|
|
||||||
# Emit event for each employee
|
redis_client = get_redis_client(context)
|
||||||
|
|
||||||
|
# Collect last_synced timestamps
|
||||||
|
employee_timestamps = {}
|
||||||
for employee in employees:
|
for employee in employees:
|
||||||
kuerzel = employee.get('kuerzel')
|
kuerzel = employee.get('kuerzel')
|
||||||
if not kuerzel:
|
if not kuerzel:
|
||||||
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
|
||||||
continue
|
continue
|
||||||
|
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||||
|
timestamp_str = redis_client.get(employee_last_synced_key)
|
||||||
|
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
|
||||||
|
employee_timestamps[kuerzel] = timestamp
|
||||||
|
|
||||||
# # DEBUG: Nur für konfigurierte Nutzer syncen
|
# Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
|
||||||
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
|
sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
|
||||||
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
|
|
||||||
# continue
|
|
||||||
|
|
||||||
|
# Log the sorted list with timestamps
|
||||||
|
def format_timestamp(ts):
|
||||||
|
if ts == 0:
|
||||||
|
return "never"
|
||||||
|
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
|
||||||
|
context.logger.info(f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}")
|
||||||
|
|
||||||
|
# Calculate number to sync: ceil(N / 10)
|
||||||
|
num_to_sync = math.ceil(len(sorted_kuerzel) / 10)
|
||||||
|
context.logger.info(f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest")
|
||||||
|
|
||||||
|
# Emit for the oldest num_to_sync employees, if not locked
|
||||||
|
emitted_count = 0
|
||||||
|
for kuerzel in sorted_kuerzel[:num_to_sync]:
|
||||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||||
|
|
||||||
redis_client = redis.Redis(
|
if not set_employee_lock(redis_client, kuerzel, triggered_by, context):
|
||||||
host=Config.REDIS_HOST,
|
|
||||||
port=int(Config.REDIS_PORT),
|
|
||||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
|
||||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
|
||||||
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
|
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -70,12 +76,14 @@ async def handler(event_data, context):
|
|||||||
"triggered_by": triggered_by
|
"triggered_by": triggered_by
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}")
|
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})")
|
||||||
|
emitted_count += 1
|
||||||
|
|
||||||
context.logger.info("Calendar Sync All: Completed emitting events for employees")
|
context.logger.info(f"Calendar Sync All: Completed, emitted {emitted_count} events")
|
||||||
return {
|
return {
|
||||||
'status': 'completed',
|
'status': 'completed',
|
||||||
'triggered_by': triggered_by
|
'triggered_by': triggered_by,
|
||||||
|
'emitted_count': emitted_count
|
||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import redis
|
import redis
|
||||||
from config import Config
|
from config import Config
|
||||||
|
from .calendar_sync_utils import get_redis_client, set_employee_lock
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
'type': 'api',
|
'type': 'api',
|
||||||
@@ -50,14 +51,9 @@ async def handler(req, context):
|
|||||||
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
|
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
|
||||||
|
|
||||||
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
|
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
|
||||||
redis_client = redis.Redis(
|
redis_client = get_redis_client(context)
|
||||||
host=Config.REDIS_HOST,
|
|
||||||
port=int(Config.REDIS_PORT),
|
|
||||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
|
||||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
|
if not set_employee_lock(redis_client, kuerzel_upper, 'api', context):
|
||||||
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
|
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
|
||||||
return {
|
return {
|
||||||
'status': 409,
|
'status': 409,
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ from services.advoware import AdvowareAPI
|
|||||||
config = {
|
config = {
|
||||||
'type': 'cron',
|
'type': 'cron',
|
||||||
'name': 'Calendar Sync Cron Job',
|
'name': 'Calendar Sync Cron Job',
|
||||||
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus',
|
'description': 'Führt den Calendar Sync alle 1 Minuten automatisch aus',
|
||||||
'cron': '*/5 * * * *', # Alle 5 Minuten
|
'cron': '*/1 * * * *', # Alle 1 Minute
|
||||||
'emits': ['calendar_sync_all'],
|
'emits': ['calendar_sync_all'],
|
||||||
'flows': ['advoware']
|
'flows': ['advoware']
|
||||||
}
|
}
|
||||||
@@ -17,12 +17,12 @@ async def handler(context):
|
|||||||
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
|
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
|
||||||
|
|
||||||
# # Emit sync-all event
|
# # Emit sync-all event
|
||||||
# await context.emit({
|
await context.emit({
|
||||||
# "topic": "calendar_sync_all",
|
"topic": "calendar_sync_all",
|
||||||
# "data": {
|
"data": {
|
||||||
# "triggered_by": "cron"
|
"triggered_by": "cron"
|
||||||
# }
|
}
|
||||||
# })
|
})
|
||||||
|
|
||||||
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
|
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import time
|
|||||||
import random
|
import random
|
||||||
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
|
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
|
||||||
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
|
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
|
||||||
|
from .calendar_sync_utils import connect_db, get_google_service, log_operation, get_redis_client, get_advoware_employees, set_employee_lock, clear_employee_lock
|
||||||
|
|
||||||
# Setup logging
|
# Setup logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -30,13 +31,11 @@ current_year = now.year
|
|||||||
FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year
|
FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year
|
||||||
FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
|
FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
|
||||||
|
|
||||||
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
|
||||||
|
|
||||||
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
|
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
|
||||||
RATE_LIMIT_KEY = 'google_calendar_api_tokens'
|
RATE_LIMIT_KEY = 'google_calendar_api_tokens'
|
||||||
MAX_TOKENS = 5
|
MAX_TOKENS = 7
|
||||||
REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers
|
REFILL_RATE_PER_MS = 7 / 1000 # Float nur hier; Ops mit Integers
|
||||||
MIN_WAIT = 0.2 # 200ms
|
MIN_WAIT = 0.1 # 100ms
|
||||||
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
|
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
|
||||||
|
|
||||||
async def enforce_global_rate_limit(context=None):
|
async def enforce_global_rate_limit(context=None):
|
||||||
@@ -104,61 +103,6 @@ async def enforce_global_rate_limit(context=None):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context)
|
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context)
|
||||||
|
|
||||||
def log_operation(level, message, context=None, **context_vars):
|
|
||||||
"""Centralized logging with context, supporting Motia workbench logging."""
|
|
||||||
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
|
||||||
full_message = f"{message} {context_str}".strip()
|
|
||||||
if context:
|
|
||||||
if level == 'info':
|
|
||||||
context.logger.info(full_message)
|
|
||||||
elif level == 'warning':
|
|
||||||
if hasattr(context.logger, 'warn'):
|
|
||||||
context.logger.warn(full_message)
|
|
||||||
else:
|
|
||||||
context.logger.warning(full_message)
|
|
||||||
elif level == 'error':
|
|
||||||
context.logger.error(full_message)
|
|
||||||
# elif level == 'debug':
|
|
||||||
# context.logger.debug(full_message)dddd
|
|
||||||
else:
|
|
||||||
if level == 'info':
|
|
||||||
logger.info(full_message)
|
|
||||||
elif level == 'warning':
|
|
||||||
logger.warning(full_message)
|
|
||||||
elif level == 'error':
|
|
||||||
logger.error(full_message)
|
|
||||||
elif level == 'debug':
|
|
||||||
logger.debug(full_message)
|
|
||||||
|
|
||||||
async def connect_db(context=None):
|
|
||||||
"""Connect to Postgres DB from Config."""
|
|
||||||
try:
|
|
||||||
conn = await asyncpg.connect(
|
|
||||||
host=Config.POSTGRES_HOST or 'localhost',
|
|
||||||
user=Config.POSTGRES_USER,
|
|
||||||
password=Config.POSTGRES_PASSWORD,
|
|
||||||
database=Config.POSTGRES_DB_NAME,
|
|
||||||
timeout=10
|
|
||||||
)
|
|
||||||
return conn
|
|
||||||
except Exception as e:
|
|
||||||
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def get_google_service(context=None):
|
|
||||||
"""Initialize Google Calendar service."""
|
|
||||||
try:
|
|
||||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
|
||||||
if not os.path.exists(service_account_path):
|
|
||||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
|
||||||
creds = service_account.Credentials.from_service_account_file(
|
|
||||||
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
|
||||||
)
|
|
||||||
service = build('calendar', 'v3', credentials=creds)
|
|
||||||
return service
|
|
||||||
except Exception as e:
|
|
||||||
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
|
||||||
raise
|
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||||
async def ensure_google_calendar(service, employee_kuerzel, context=None):
|
async def ensure_google_calendar(service, employee_kuerzel, context=None):
|
||||||
@@ -673,17 +617,6 @@ async def safe_advoware_operation(operation, write_allowed, context=None, *args,
|
|||||||
return None
|
return None
|
||||||
return await operation(*args, **kwargs)
|
return await operation(*args, **kwargs)
|
||||||
|
|
||||||
async def get_advoware_employees(advoware, context=None):
|
|
||||||
"""Fetch list of employees from Advoware."""
|
|
||||||
try:
|
|
||||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
|
||||||
employees = result if isinstance(result, list) else []
|
|
||||||
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
|
||||||
return employees
|
|
||||||
except Exception as e:
|
|
||||||
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def get_advoware_timestamp(advoware, frnr, context=None):
|
async def get_advoware_timestamp(advoware, frnr, context=None):
|
||||||
"""Fetch the last modified timestamp for an Advoware appointment."""
|
"""Fetch the last modified timestamp for an Advoware appointment."""
|
||||||
try:
|
try:
|
||||||
@@ -983,7 +916,6 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
|||||||
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
|
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
|
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
|
||||||
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
|
||||||
|
|
||||||
async def handler(event_data, context):
|
async def handler(event_data, context):
|
||||||
"""Main event handler for calendar sync."""
|
"""Main event handler for calendar sync."""
|
||||||
@@ -996,12 +928,7 @@ async def handler(event_data, context):
|
|||||||
|
|
||||||
log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context)
|
log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context)
|
||||||
|
|
||||||
redis_client = redis.Redis(
|
redis_client = get_redis_client(context)
|
||||||
host=Config.REDIS_HOST,
|
|
||||||
port=int(Config.REDIS_PORT),
|
|
||||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
|
||||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
@@ -1105,10 +1032,7 @@ async def handler(event_data, context):
|
|||||||
return {'status': 500, 'body': {'error': str(e)}}
|
return {'status': 500, 'body': {'error': str(e)}}
|
||||||
finally:
|
finally:
|
||||||
# Ensure lock is always released
|
# Ensure lock is always released
|
||||||
try:
|
clear_employee_lock(redis_client, kuerzel, context)
|
||||||
redis_client.delete(employee_lock_key)
|
|
||||||
except Exception:
|
|
||||||
pass # Ignore errors when deleting lock
|
|
||||||
|
|
||||||
# Motia Step Configuration
|
# Motia Step Configuration
|
||||||
config = {
|
config = {
|
||||||
|
|||||||
116
bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
116
bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import logging
|
||||||
|
import asyncpg
|
||||||
|
import os
|
||||||
|
import redis
|
||||||
|
from config import Config
|
||||||
|
from googleapiclient.discovery import build
|
||||||
|
from google.oauth2 import service_account
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def log_operation(level, message, context=None, **context_vars):
|
||||||
|
"""Centralized logging with context, supporting Motia workbench logging."""
|
||||||
|
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
||||||
|
full_message = f"{message} {context_str}".strip()
|
||||||
|
if context:
|
||||||
|
if level == 'info':
|
||||||
|
context.logger.info(full_message)
|
||||||
|
elif level == 'warning':
|
||||||
|
if hasattr(context.logger, 'warn'):
|
||||||
|
context.logger.warn(full_message)
|
||||||
|
else:
|
||||||
|
context.logger.warning(full_message)
|
||||||
|
elif level == 'error':
|
||||||
|
context.logger.error(full_message)
|
||||||
|
# elif level == 'debug':
|
||||||
|
# context.logger.debug(full_message)dddd
|
||||||
|
else:
|
||||||
|
if level == 'info':
|
||||||
|
logger.info(full_message)
|
||||||
|
elif level == 'warning':
|
||||||
|
logger.warning(full_message)
|
||||||
|
elif level == 'error':
|
||||||
|
logger.error(full_message)
|
||||||
|
elif level == 'debug':
|
||||||
|
logger.debug(full_message)
|
||||||
|
|
||||||
|
async def connect_db(context=None):
|
||||||
|
"""Connect to Postgres DB from Config."""
|
||||||
|
try:
|
||||||
|
conn = await asyncpg.connect(
|
||||||
|
host=Config.POSTGRES_HOST or 'localhost',
|
||||||
|
user=Config.POSTGRES_USER,
|
||||||
|
password=Config.POSTGRES_PASSWORD,
|
||||||
|
database=Config.POSTGRES_DB_NAME,
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
return conn
|
||||||
|
except Exception as e:
|
||||||
|
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def get_google_service(context=None):
|
||||||
|
"""Initialize Google Calendar service."""
|
||||||
|
try:
|
||||||
|
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
||||||
|
if not os.path.exists(service_account_path):
|
||||||
|
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
||||||
|
creds = service_account.Credentials.from_service_account_file(
|
||||||
|
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
||||||
|
)
|
||||||
|
service = build('calendar', 'v3', credentials=creds)
|
||||||
|
return service
|
||||||
|
except Exception as e:
|
||||||
|
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_redis_client(context=None):
|
||||||
|
"""Initialize Redis client for calendar sync operations."""
|
||||||
|
try:
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||||
|
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||||
|
)
|
||||||
|
return redis_client
|
||||||
|
except Exception as e:
|
||||||
|
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def get_advoware_employees(advoware, context=None):
|
||||||
|
"""Fetch list of employees from Advoware."""
|
||||||
|
try:
|
||||||
|
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||||
|
employees = result if isinstance(result, list) else []
|
||||||
|
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
||||||
|
return employees
|
||||||
|
except Exception as e:
|
||||||
|
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
|
||||||
|
"""Set lock for employee sync operation."""
|
||||||
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||||
|
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
||||||
|
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def clear_employee_lock(redis_client, kuerzel, context=None):
|
||||||
|
"""Clear lock for employee sync operation and update last-synced timestamp."""
|
||||||
|
try:
|
||||||
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||||
|
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||||
|
|
||||||
|
# Update last-synced timestamp (no TTL, persistent)
|
||||||
|
import time
|
||||||
|
current_time = int(time.time())
|
||||||
|
redis_client.set(employee_last_synced_key, current_time)
|
||||||
|
|
||||||
|
# Delete the lock
|
||||||
|
redis_client.delete(employee_lock_key)
|
||||||
|
|
||||||
|
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
|
||||||
|
except Exception as e:
|
||||||
|
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)
|
||||||
Reference in New Issue
Block a user