Implement atomic locking in API and cron steps for per-employee parallel sync
This commit is contained in:
@@ -12,8 +12,7 @@ config = {
|
|||||||
'flows': ['advoware']
|
'flows': ['advoware']
|
||||||
}
|
}
|
||||||
|
|
||||||
async def handler(req):
|
async def handler(req, context):
|
||||||
context = req.context
|
|
||||||
try:
|
try:
|
||||||
# Konfiguration aus Request-Body
|
# Konfiguration aus Request-Body
|
||||||
body = req.get('body', {})
|
body = req.get('body', {})
|
||||||
@@ -37,7 +36,7 @@ async def handler(req):
|
|||||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||||
)
|
)
|
||||||
|
|
||||||
if redis_client.get(employee_lock_key):
|
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
|
||||||
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel}, überspringe")
|
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||||
return {
|
return {
|
||||||
'status': 409,
|
'status': 409,
|
||||||
@@ -51,9 +50,7 @@ async def handler(req):
|
|||||||
|
|
||||||
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel}")
|
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel}")
|
||||||
|
|
||||||
# Setze Lock für 30 Minuten
|
# Lock erfolgreich gesetzt, jetzt emittieren
|
||||||
redis_client.set(employee_lock_key, 'api', ex=1800)
|
|
||||||
context.logger.info(f"Calendar Sync API: Lock gesetzt für {kuerzel}")
|
|
||||||
|
|
||||||
# Emit Event für den Sync
|
# Emit Event für den Sync
|
||||||
await context.emit({
|
await context.emit({
|
||||||
|
|||||||
@@ -50,6 +50,19 @@ async def handler(context):
|
|||||||
context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur SB wird gesynct")
|
context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur SB wird gesynct")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||||
|
|
||||||
|
redis_client = redis.Redis(
|
||||||
|
host=Config.REDIS_HOST,
|
||||||
|
port=int(Config.REDIS_PORT),
|
||||||
|
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||||
|
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
if redis_client.set(employee_lock_key, 'cron', ex=1800, nx=True) is None:
|
||||||
|
context.logger.info(f"Calendar Sync Cron: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||||
|
continue
|
||||||
|
|
||||||
# Emit event for this employee
|
# Emit event for this employee
|
||||||
await context.emit({
|
await context.emit({
|
||||||
"topic": "calendar_sync_employee",
|
"topic": "calendar_sync_employee",
|
||||||
|
|||||||
4
bitbylaw/types.d.ts
vendored
4
bitbylaw/types.d.ts
vendored
@@ -25,7 +25,7 @@ declare module 'motia' {
|
|||||||
'Advoware Proxy GET': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
'Advoware Proxy GET': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
||||||
'Advoware Proxy DELETE': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
'Advoware Proxy DELETE': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
||||||
'Calendar Sync Event Step': EventHandler<never, never>
|
'Calendar Sync Event Step': EventHandler<never, never>
|
||||||
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar.sync.triggered'; data: never }>
|
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar_sync_employee'; data: never }>
|
||||||
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar.sync.triggered'; data: never }>
|
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar_sync_employee'; data: never }>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user