Compare commits
3 Commits
c5600b42ec
...
96eabe3db6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96eabe3db6 | ||
|
|
b18e770f12 | ||
|
|
e4bf21e676 |
@@ -7,6 +7,7 @@ import pytz
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
from config import Config
|
||||
from services.advoware import AdvowareAPI
|
||||
from .calendar_sync_utils import connect_db, get_google_service
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
from google.oauth2 import service_account
|
||||
@@ -23,35 +24,7 @@ BERLIN_TZ = pytz.timezone('Europe/Berlin')
|
||||
now = datetime.now(BERLIN_TZ)
|
||||
current_year = now.year
|
||||
|
||||
async def connect_db():
|
||||
"""Connect to Postgres DB from Config."""
|
||||
try:
|
||||
conn = await asyncpg.connect(
|
||||
host=Config.POSTGRES_HOST or 'localhost',
|
||||
user=Config.POSTGRES_USER,
|
||||
password=Config.POSTGRES_PASSWORD,
|
||||
database=Config.POSTGRES_DB_NAME,
|
||||
timeout=10
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to DB: {e}")
|
||||
raise
|
||||
|
||||
async def get_google_service():
|
||||
"""Initialize Google Calendar service."""
|
||||
try:
|
||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
||||
if not os.path.exists(service_account_path):
|
||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
||||
creds = service_account.Credentials.from_service_account_file(
|
||||
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
||||
)
|
||||
service = build('calendar', 'v3', credentials=creds)
|
||||
return service
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Google service: {e}")
|
||||
raise
|
||||
|
||||
async def ensure_google_calendar(service, employee_kuerzel):
|
||||
"""Ensure Google Calendar exists for employee."""
|
||||
|
||||
@@ -1,64 +1,70 @@
|
||||
import json
|
||||
import redis
|
||||
import math
|
||||
import time
|
||||
from datetime import datetime
|
||||
from config import Config
|
||||
from services.advoware import AdvowareAPI
|
||||
from .calendar_sync_utils import get_redis_client, get_advoware_employees, set_employee_lock
|
||||
|
||||
config = {
|
||||
'type': 'event',
|
||||
'name': 'Calendar Sync All Step',
|
||||
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter',
|
||||
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für die ältesten Mitarbeiter',
|
||||
'subscribes': ['calendar_sync_all'],
|
||||
'emits': ['calendar_sync_employee'],
|
||||
'flows': ['advoware']
|
||||
}
|
||||
|
||||
async def get_advoware_employees(context, advoware):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
context.logger.info(f"Fetched {len(employees)} Advoware employees")
|
||||
return employees
|
||||
except Exception as e:
|
||||
context.logger.error(f"Failed to fetch Advoware employees: {e}")
|
||||
raise
|
||||
|
||||
async def handler(event_data, context):
|
||||
try:
|
||||
triggered_by = event_data.get('triggered_by', 'unknown')
|
||||
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}")
|
||||
context.logger.info(f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}")
|
||||
|
||||
# Initialize Advoware API
|
||||
advoware = AdvowareAPI(context)
|
||||
|
||||
# Fetch employees
|
||||
employees = await get_advoware_employees(context, advoware)
|
||||
employees = await get_advoware_employees(advoware, context)
|
||||
if not employees:
|
||||
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
|
||||
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
||||
|
||||
# Emit event for each employee
|
||||
redis_client = get_redis_client(context)
|
||||
|
||||
# Collect last_synced timestamps
|
||||
employee_timestamps = {}
|
||||
for employee in employees:
|
||||
kuerzel = employee.get('kuerzel')
|
||||
if not kuerzel:
|
||||
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
||||
continue
|
||||
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||
timestamp_str = redis_client.get(employee_last_synced_key)
|
||||
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
|
||||
employee_timestamps[kuerzel] = timestamp
|
||||
|
||||
# # DEBUG: Nur für konfigurierte Nutzer syncen
|
||||
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
|
||||
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
|
||||
# continue
|
||||
# Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
|
||||
sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
|
||||
|
||||
# Log the sorted list with timestamps
|
||||
def format_timestamp(ts):
|
||||
if ts == 0:
|
||||
return "never"
|
||||
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
|
||||
context.logger.info(f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}")
|
||||
|
||||
# Calculate number to sync: ceil(N / 10)
|
||||
num_to_sync = math.ceil(len(sorted_kuerzel) / 10)
|
||||
context.logger.info(f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest")
|
||||
|
||||
# Emit for the oldest num_to_sync employees, if not locked
|
||||
emitted_count = 0
|
||||
for kuerzel in sorted_kuerzel[:num_to_sync]:
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
||||
if not set_employee_lock(redis_client, kuerzel, triggered_by, context):
|
||||
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||
continue
|
||||
|
||||
@@ -70,12 +76,14 @@ async def handler(event_data, context):
|
||||
"triggered_by": triggered_by
|
||||
}
|
||||
})
|
||||
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}")
|
||||
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})")
|
||||
emitted_count += 1
|
||||
|
||||
context.logger.info("Calendar Sync All: Completed emitting events for employees")
|
||||
context.logger.info(f"Calendar Sync All: Completed, emitted {emitted_count} events")
|
||||
return {
|
||||
'status': 'completed',
|
||||
'triggered_by': triggered_by
|
||||
'triggered_by': triggered_by,
|
||||
'emitted_count': emitted_count
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import redis
|
||||
from config import Config
|
||||
from .calendar_sync_utils import get_redis_client, set_employee_lock
|
||||
|
||||
config = {
|
||||
'type': 'api',
|
||||
@@ -50,14 +51,9 @@ async def handler(req, context):
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
|
||||
|
||||
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
redis_client = get_redis_client(context)
|
||||
|
||||
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
|
||||
if not set_employee_lock(redis_client, kuerzel_upper, 'api', context):
|
||||
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
|
||||
return {
|
||||
'status': 409,
|
||||
|
||||
@@ -6,8 +6,8 @@ from services.advoware import AdvowareAPI
|
||||
config = {
|
||||
'type': 'cron',
|
||||
'name': 'Calendar Sync Cron Job',
|
||||
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus',
|
||||
'cron': '*/5 * * * *', # Alle 5 Minuten
|
||||
'description': 'Führt den Calendar Sync alle 1 Minuten automatisch aus',
|
||||
'cron': '*/1 * * * *', # Alle 1 Minute
|
||||
'emits': ['calendar_sync_all'],
|
||||
'flows': ['advoware']
|
||||
}
|
||||
@@ -17,12 +17,12 @@ async def handler(context):
|
||||
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
|
||||
|
||||
# # Emit sync-all event
|
||||
# await context.emit({
|
||||
# "topic": "calendar_sync_all",
|
||||
# "data": {
|
||||
# "triggered_by": "cron"
|
||||
# }
|
||||
# })
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_all",
|
||||
"data": {
|
||||
"triggered_by": "cron"
|
||||
}
|
||||
})
|
||||
|
||||
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
|
||||
return {
|
||||
|
||||
@@ -14,6 +14,7 @@ import time
|
||||
import random
|
||||
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
|
||||
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
|
||||
from .calendar_sync_utils import connect_db, get_google_service, log_operation, get_redis_client, get_advoware_employees, set_employee_lock, clear_employee_lock
|
||||
|
||||
# Setup logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -30,13 +31,11 @@ current_year = now.year
|
||||
FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year
|
||||
FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
|
||||
|
||||
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
||||
|
||||
# Constants: Für 600/min -> 600 Tokens, Refill 600/60=10 pro Sekunde -> 10/1000 pro ms
|
||||
RATE_LIMIT_KEY = 'google_calendar_api_tokens'
|
||||
MAX_TOKENS = 5
|
||||
REFILL_RATE_PER_MS = 2 / 1000 # Float nur hier; Ops mit Integers
|
||||
MIN_WAIT = 0.2 # 200ms
|
||||
MAX_TOKENS = 7
|
||||
REFILL_RATE_PER_MS = 7 / 1000 # Float nur hier; Ops mit Integers
|
||||
MIN_WAIT = 0.1 # 100ms
|
||||
JITTER_MAX = 0.1 # Optional: Zufalls-Delay 0-100ms für Glättung
|
||||
|
||||
async def enforce_global_rate_limit(context=None):
|
||||
@@ -104,61 +103,6 @@ async def enforce_global_rate_limit(context=None):
|
||||
except Exception as e:
|
||||
log_operation('error', f"Rate limiting failed: {e}. Proceeding without limit.", context=context)
|
||||
|
||||
def log_operation(level, message, context=None, **context_vars):
|
||||
"""Centralized logging with context, supporting Motia workbench logging."""
|
||||
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
||||
full_message = f"{message} {context_str}".strip()
|
||||
if context:
|
||||
if level == 'info':
|
||||
context.logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
if hasattr(context.logger, 'warn'):
|
||||
context.logger.warn(full_message)
|
||||
else:
|
||||
context.logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
context.logger.error(full_message)
|
||||
# elif level == 'debug':
|
||||
# context.logger.debug(full_message)dddd
|
||||
else:
|
||||
if level == 'info':
|
||||
logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
logger.error(full_message)
|
||||
elif level == 'debug':
|
||||
logger.debug(full_message)
|
||||
|
||||
async def connect_db(context=None):
|
||||
"""Connect to Postgres DB from Config."""
|
||||
try:
|
||||
conn = await asyncpg.connect(
|
||||
host=Config.POSTGRES_HOST or 'localhost',
|
||||
user=Config.POSTGRES_USER,
|
||||
password=Config.POSTGRES_PASSWORD,
|
||||
database=Config.POSTGRES_DB_NAME,
|
||||
timeout=10
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_google_service(context=None):
|
||||
"""Initialize Google Calendar service."""
|
||||
try:
|
||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
||||
if not os.path.exists(service_account_path):
|
||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
||||
creds = service_account.Credentials.from_service_account_file(
|
||||
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
||||
)
|
||||
service = build('calendar', 'v3', credentials=creds)
|
||||
return service
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def ensure_google_calendar(service, employee_kuerzel, context=None):
|
||||
@@ -673,17 +617,6 @@ async def safe_advoware_operation(operation, write_allowed, context=None, *args,
|
||||
return None
|
||||
return await operation(*args, **kwargs)
|
||||
|
||||
async def get_advoware_employees(advoware, context=None):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
||||
return employees
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_advoware_timestamp(advoware, frnr, context=None):
|
||||
"""Fetch the last modified timestamp for an Advoware appointment."""
|
||||
try:
|
||||
@@ -983,7 +916,6 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
||||
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
|
||||
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
|
||||
|
||||
async def handler(event_data, context):
|
||||
"""Main event handler for calendar sync."""
|
||||
@@ -996,12 +928,7 @@ async def handler(event_data, context):
|
||||
|
||||
log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=context)
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
redis_client = get_redis_client(context)
|
||||
|
||||
try:
|
||||
|
||||
@@ -1105,10 +1032,7 @@ async def handler(event_data, context):
|
||||
return {'status': 500, 'body': {'error': str(e)}}
|
||||
finally:
|
||||
# Ensure lock is always released
|
||||
try:
|
||||
redis_client.delete(employee_lock_key)
|
||||
except Exception:
|
||||
pass # Ignore errors when deleting lock
|
||||
clear_employee_lock(redis_client, kuerzel, context)
|
||||
|
||||
# Motia Step Configuration
|
||||
config = {
|
||||
|
||||
116
bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
116
bitbylaw/steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import logging
|
||||
import asyncpg
|
||||
import os
|
||||
import redis
|
||||
from config import Config
|
||||
from googleapiclient.discovery import build
|
||||
from google.oauth2 import service_account
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def log_operation(level, message, context=None, **context_vars):
|
||||
"""Centralized logging with context, supporting Motia workbench logging."""
|
||||
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
||||
full_message = f"{message} {context_str}".strip()
|
||||
if context:
|
||||
if level == 'info':
|
||||
context.logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
if hasattr(context.logger, 'warn'):
|
||||
context.logger.warn(full_message)
|
||||
else:
|
||||
context.logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
context.logger.error(full_message)
|
||||
# elif level == 'debug':
|
||||
# context.logger.debug(full_message)dddd
|
||||
else:
|
||||
if level == 'info':
|
||||
logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
logger.error(full_message)
|
||||
elif level == 'debug':
|
||||
logger.debug(full_message)
|
||||
|
||||
async def connect_db(context=None):
|
||||
"""Connect to Postgres DB from Config."""
|
||||
try:
|
||||
conn = await asyncpg.connect(
|
||||
host=Config.POSTGRES_HOST or 'localhost',
|
||||
user=Config.POSTGRES_USER,
|
||||
password=Config.POSTGRES_PASSWORD,
|
||||
database=Config.POSTGRES_DB_NAME,
|
||||
timeout=10
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_google_service(context=None):
|
||||
"""Initialize Google Calendar service."""
|
||||
try:
|
||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
||||
if not os.path.exists(service_account_path):
|
||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
||||
creds = service_account.Credentials.from_service_account_file(
|
||||
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
|
||||
)
|
||||
service = build('calendar', 'v3', credentials=creds)
|
||||
return service
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
||||
raise
|
||||
|
||||
def get_redis_client(context=None):
|
||||
"""Initialize Redis client for calendar sync operations."""
|
||||
try:
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
return redis_client
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_advoware_employees(advoware, context=None):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
||||
return employees
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
||||
raise
|
||||
|
||||
def set_employee_lock(redis_client, kuerzel, triggered_by, context=None):
|
||||
"""Set lock for employee sync operation."""
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
||||
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
|
||||
return False
|
||||
return True
|
||||
|
||||
def clear_employee_lock(redis_client, kuerzel, context=None):
|
||||
"""Clear lock for employee sync operation and update last-synced timestamp."""
|
||||
try:
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||
|
||||
# Update last-synced timestamp (no TTL, persistent)
|
||||
import time
|
||||
current_time = int(time.time())
|
||||
redis_client.set(employee_last_synced_key, current_time)
|
||||
|
||||
# Delete the lock
|
||||
redis_client.delete(employee_lock_key)
|
||||
|
||||
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
|
||||
except Exception as e:
|
||||
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)
|
||||
Reference in New Issue
Block a user