Add calendar sync utilities and Beteiligte sync cron job
- Implemented calendar_sync_utils.py for shared utility functions including DB connection, Google Calendar service initialization, Redis client setup, and employee sync operations. - Created beteiligte_sync_cron_step.py to handle periodic sync of Beteiligte entities, checking for new, modified, failed, and stale records, and emitting sync events accordingly.
This commit is contained in:
5
steps/advoware_cal_sync/__init__.py
Normal file
5
steps/advoware_cal_sync/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""
|
||||
Advoware Calendar Sync Module
|
||||
|
||||
Bidirectional synchronization between Google Calendar and Advoware appointments.
|
||||
"""
|
||||
113
steps/advoware_cal_sync/calendar_sync_all_step.py
Normal file
113
steps/advoware_cal_sync/calendar_sync_all_step.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""
|
||||
Calendar Sync All Step
|
||||
|
||||
Handles calendar_sync_all event and emits individual sync events for oldest employees.
|
||||
Uses Redis to track last sync times and distribute work.
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from calendar_sync_utils import (
|
||||
get_redis_client,
|
||||
get_advoware_employees,
|
||||
set_employee_lock,
|
||||
log_operation
|
||||
)
|
||||
|
||||
import math
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from motia import queue, FlowContext
|
||||
from pydantic import BaseModel, Field
|
||||
from services.advoware_service import AdvowareService
|
||||
|
||||
config = {
|
||||
'name': 'Calendar Sync All Step',
|
||||
'description': 'Receives sync-all event and emits individual events for oldest employees',
|
||||
'flows': ['advoware'],
|
||||
'triggers': [
|
||||
queue('calendar_sync_all')
|
||||
],
|
||||
'enqueues': ['calendar_sync_employee']
|
||||
}
|
||||
|
||||
|
||||
async def handler(input_data: dict, ctx: FlowContext):
|
||||
"""
|
||||
Handler that fetches all employees, sorts by last sync time,
|
||||
and emits calendar_sync_employee events for the oldest ones.
|
||||
"""
|
||||
try:
|
||||
triggered_by = input_data.get('triggered_by', 'unknown')
|
||||
log_operation('info', f"Calendar Sync All: Starting to emit events for oldest employees, triggered by {triggered_by}", context=ctx)
|
||||
|
||||
# Initialize Advoware service
|
||||
advoware = AdvowareService(ctx)
|
||||
|
||||
# Fetch employees
|
||||
employees = await get_advoware_employees(advoware, ctx)
|
||||
if not employees:
|
||||
log_operation('error', "Keine Mitarbeiter gefunden. All-Sync abgebrochen.", context=ctx)
|
||||
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
||||
|
||||
redis_client = get_redis_client(ctx)
|
||||
|
||||
# Collect last_synced timestamps
|
||||
employee_timestamps = {}
|
||||
for employee in employees:
|
||||
kuerzel = employee.get('kuerzel')
|
||||
if not kuerzel:
|
||||
continue
|
||||
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||
timestamp_str = redis_client.get(employee_last_synced_key)
|
||||
timestamp = int(timestamp_str) if timestamp_str else 0 # 0 if no timestamp (very old)
|
||||
employee_timestamps[kuerzel] = timestamp
|
||||
|
||||
# Sort employees by last_synced (ascending, oldest first), then by kuerzel alphabetically
|
||||
sorted_kuerzel = sorted(employee_timestamps.keys(), key=lambda k: (employee_timestamps[k], k))
|
||||
|
||||
# Log the sorted list with timestamps
|
||||
def format_timestamp(ts):
|
||||
if ts == 0:
|
||||
return "never"
|
||||
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
sorted_list_str = ", ".join(f"{k} ({format_timestamp(employee_timestamps[k])})" for k in sorted_kuerzel)
|
||||
log_operation('info', f"Calendar Sync All: Sorted employees by last synced: {sorted_list_str}", context=ctx)
|
||||
|
||||
# Calculate number to sync: ceil(N / 10)
|
||||
num_to_sync = math.ceil(len(sorted_kuerzel) / 1)
|
||||
log_operation('info', f"Calendar Sync All: Total employees {len(sorted_kuerzel)}, syncing {num_to_sync} oldest", context=ctx)
|
||||
|
||||
# Emit for the oldest num_to_sync employees, if not locked
|
||||
emitted_count = 0
|
||||
for kuerzel in sorted_kuerzel[:num_to_sync]:
|
||||
if not set_employee_lock(redis_client, kuerzel, triggered_by, ctx):
|
||||
log_operation('info', f"Calendar Sync All: Sync already active for {kuerzel}, skipping", context=ctx)
|
||||
continue
|
||||
|
||||
# Emit event for this employee
|
||||
await ctx.enqueue({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel,
|
||||
"triggered_by": triggered_by
|
||||
}
|
||||
})
|
||||
log_operation('info', f"Calendar Sync All: Emitted event for employee {kuerzel} (last synced: {format_timestamp(employee_timestamps[kuerzel])})", context=ctx)
|
||||
emitted_count += 1
|
||||
|
||||
log_operation('info', f"Calendar Sync All: Completed, emitted {emitted_count} events", context=ctx)
|
||||
return {
|
||||
'status': 'completed',
|
||||
'triggered_by': triggered_by,
|
||||
'emitted_count': emitted_count
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
log_operation('error', f"Fehler beim All-Sync: {e}", context=ctx)
|
||||
return {
|
||||
'status': 'error',
|
||||
'error': str(e)
|
||||
}
|
||||
112
steps/advoware_cal_sync/calendar_sync_api_step.py
Normal file
112
steps/advoware_cal_sync/calendar_sync_api_step.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""
|
||||
Calendar Sync API Step
|
||||
|
||||
HTTP API endpoint for manual calendar sync triggering.
|
||||
Supports syncing a single employee or all employees.
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from calendar_sync_utils import get_redis_client, set_employee_lock, log_operation
|
||||
|
||||
from motia import http, ApiRequest, ApiResponse, FlowContext
|
||||
|
||||
|
||||
config = {
|
||||
'name': 'Calendar Sync API Trigger',
|
||||
'description': 'API endpoint for manual calendar sync triggering',
|
||||
'flows': ['advoware'],
|
||||
'triggers': [
|
||||
http('POST', '/advoware/calendar/sync')
|
||||
],
|
||||
'enqueues': ['calendar_sync_employee', 'calendar_sync_all']
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||||
"""
|
||||
HTTP handler for manual calendar sync triggering.
|
||||
|
||||
Request body:
|
||||
{
|
||||
"kuerzel": "SB" // or "ALL" for all employees
|
||||
}
|
||||
"""
|
||||
try:
|
||||
# Get kuerzel from request body
|
||||
body = request.body
|
||||
kuerzel = body.get('kuerzel')
|
||||
if not kuerzel:
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={
|
||||
'error': 'kuerzel required',
|
||||
'message': 'Bitte kuerzel im Body angeben'
|
||||
}
|
||||
)
|
||||
|
||||
kuerzel_upper = kuerzel.upper()
|
||||
|
||||
if kuerzel_upper == 'ALL':
|
||||
# Emit sync-all event
|
||||
log_operation('info', "Calendar Sync API: Emitting sync-all event", context=ctx)
|
||||
await ctx.enqueue({
|
||||
"topic": "calendar_sync_all",
|
||||
"data": {
|
||||
"triggered_by": "api"
|
||||
}
|
||||
})
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'triggered',
|
||||
'message': 'Calendar sync wurde für alle Mitarbeiter ausgelöst',
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
)
|
||||
else:
|
||||
# Single employee sync
|
||||
redis_client = get_redis_client(ctx)
|
||||
|
||||
if not set_employee_lock(redis_client, kuerzel_upper, 'api', ctx):
|
||||
log_operation('info', f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping", context=ctx)
|
||||
return ApiResponse(
|
||||
status=409,
|
||||
body={
|
||||
'status': 'conflict',
|
||||
'message': f'Calendar sync already active for {kuerzel_upper}',
|
||||
'kuerzel': kuerzel_upper,
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
)
|
||||
|
||||
log_operation('info', f"Calendar Sync API called for {kuerzel_upper}", context=ctx)
|
||||
|
||||
# Lock successfully set, now emit event
|
||||
await ctx.enqueue({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel_upper,
|
||||
"triggered_by": "api"
|
||||
}
|
||||
})
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'triggered',
|
||||
'message': f'Calendar sync was triggered for {kuerzel_upper}',
|
||||
'kuerzel': kuerzel_upper,
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log_operation('error', f"Error in API trigger: {e}", context=ctx)
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
50
steps/advoware_cal_sync/calendar_sync_cron_step.py
Normal file
50
steps/advoware_cal_sync/calendar_sync_cron_step.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""
|
||||
Calendar Sync Cron Step
|
||||
|
||||
Cron trigger for automatic calendar synchronization.
|
||||
Emits calendar_sync_all event to start sync cascade.
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from calendar_sync_utils import log_operation
|
||||
|
||||
from motia import cron, FlowContext
|
||||
|
||||
|
||||
config = {
|
||||
'name': 'Calendar Sync Cron Job',
|
||||
'description': 'Runs calendar sync automatically every 15 minutes',
|
||||
'flows': ['advoware'],
|
||||
'triggers': [
|
||||
cron("0 */15 * * * *") # Every 15 minutes (6-field: sec min hour day month weekday)
|
||||
],
|
||||
'enqueues': ['calendar_sync_all']
|
||||
}
|
||||
|
||||
|
||||
async def handler(input_data: dict, ctx: FlowContext):
|
||||
"""Cron handler that triggers the calendar sync cascade."""
|
||||
try:
|
||||
log_operation('info', "Calendar Sync Cron: Starting to emit sync-all event", context=ctx)
|
||||
|
||||
# Enqueue sync-all event
|
||||
await ctx.enqueue({
|
||||
"topic": "calendar_sync_all",
|
||||
"data": {
|
||||
"triggered_by": "cron"
|
||||
}
|
||||
})
|
||||
|
||||
log_operation('info', "Calendar Sync Cron: Emitted sync-all event", context=ctx)
|
||||
return {
|
||||
'status': 'completed',
|
||||
'triggered_by': 'cron'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
log_operation('error', f"Fehler beim Cron-Job: {e}", context=ctx)
|
||||
return {
|
||||
'status': 'error',
|
||||
'error': str(e)
|
||||
}
|
||||
1057
steps/advoware_cal_sync/calendar_sync_event_step.py
Normal file
1057
steps/advoware_cal_sync/calendar_sync_event_step.py
Normal file
File diff suppressed because it is too large
Load Diff
122
steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
122
steps/advoware_cal_sync/calendar_sync_utils.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""
|
||||
Calendar Sync Utilities
|
||||
|
||||
Shared utility functions for calendar synchronization between Google Calendar and Advoware.
|
||||
"""
|
||||
import logging
|
||||
import asyncpg
|
||||
import os
|
||||
import redis
|
||||
import time
|
||||
from googleapiclient.discovery import build
|
||||
from google.oauth2 import service_account
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def log_operation(level: str, message: str, context=None, **context_vars):
|
||||
"""Centralized logging with context, supporting file and console logging."""
|
||||
context_str = ' '.join(f"{k}={v}" for k, v in context_vars.items() if v is not None)
|
||||
full_message = f"[{time.time()}] {message} {context_str}".strip()
|
||||
|
||||
# Log via logger
|
||||
if level == 'info':
|
||||
logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
logger.error(full_message)
|
||||
elif level == 'debug':
|
||||
logger.debug(full_message)
|
||||
|
||||
# Also log to console for journalctl visibility
|
||||
print(f"[{level.upper()}] {full_message}")
|
||||
|
||||
|
||||
async def connect_db(context=None):
|
||||
"""Connect to Postgres DB from environment variables."""
|
||||
try:
|
||||
conn = await asyncpg.connect(
|
||||
host=os.getenv('POSTGRES_HOST', 'localhost'),
|
||||
user=os.getenv('POSTGRES_USER', 'calendar_sync_user'),
|
||||
password=os.getenv('POSTGRES_PASSWORD', 'default_password'),
|
||||
database=os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db'),
|
||||
timeout=10
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
||||
raise
|
||||
|
||||
|
||||
async def get_google_service(context=None):
|
||||
"""Initialize Google Calendar service."""
|
||||
try:
|
||||
service_account_path = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
|
||||
if not os.path.exists(service_account_path):
|
||||
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
|
||||
|
||||
scopes = ['https://www.googleapis.com/auth/calendar']
|
||||
creds = service_account.Credentials.from_service_account_file(
|
||||
service_account_path, scopes=scopes
|
||||
)
|
||||
service = build('calendar', 'v3', credentials=creds)
|
||||
return service
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
||||
raise
|
||||
|
||||
|
||||
def get_redis_client(context=None):
|
||||
"""Initialize Redis client for calendar sync operations."""
|
||||
try:
|
||||
redis_client = redis.Redis(
|
||||
host=os.getenv('REDIS_HOST', 'localhost'),
|
||||
port=int(os.getenv('REDIS_PORT', '6379')),
|
||||
db=int(os.getenv('REDIS_DB_CALENDAR_SYNC', '2')),
|
||||
socket_timeout=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
||||
)
|
||||
return redis_client
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to initialize Redis client: {e}", context=context)
|
||||
raise
|
||||
|
||||
|
||||
async def get_advoware_employees(advoware, context=None):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
||||
return employees
|
||||
except Exception as e:
|
||||
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
||||
raise
|
||||
|
||||
|
||||
def set_employee_lock(redis_client, kuerzel: str, triggered_by: str, context=None) -> bool:
|
||||
"""Set lock for employee sync operation."""
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
||||
log_operation('info', f"Sync already active for {kuerzel}, skipping", context=context)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def clear_employee_lock(redis_client, kuerzel: str, context=None):
|
||||
"""Clear lock for employee sync operation and update last-synced timestamp."""
|
||||
try:
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
employee_last_synced_key = f'calendar_sync_last_synced_{kuerzel}'
|
||||
|
||||
# Update last-synced timestamp (no TTL, persistent)
|
||||
current_time = int(time.time())
|
||||
redis_client.set(employee_last_synced_key, current_time)
|
||||
|
||||
# Delete the lock
|
||||
redis_client.delete(employee_lock_key)
|
||||
|
||||
log_operation('debug', f"Cleared lock and updated last-synced for {kuerzel} to {current_time}", context=context)
|
||||
except Exception as e:
|
||||
log_operation('warning', f"Failed to clear lock and update last-synced for {kuerzel}: {e}", context=context)
|
||||
@@ -1,61 +0,0 @@
|
||||
"""Create Ticket Step - accepts a new support ticket via API and enqueues it for triage."""
|
||||
|
||||
import random
|
||||
import string
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from motia import ApiRequest, ApiResponse, FlowContext, http
|
||||
|
||||
config = {
|
||||
"name": "CreateTicket",
|
||||
"description": "Accepts a new support ticket via API and enqueues it for triage",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
http("POST", "/tickets"),
|
||||
],
|
||||
"enqueues": ["ticket::created"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest[dict[str, Any]], ctx: FlowContext[Any]) -> ApiResponse[Any]:
|
||||
body = request.body or {}
|
||||
title = body.get("title")
|
||||
description = body.get("description")
|
||||
priority = body.get("priority", "medium")
|
||||
customer_email = body.get("customerEmail")
|
||||
|
||||
if not title or not description:
|
||||
return ApiResponse(status=400, body={"error": "Title and description are required"})
|
||||
|
||||
random_suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
|
||||
ticket_id = f"TKT-{int(datetime.now(timezone.utc).timestamp() * 1000)}-{random_suffix}"
|
||||
|
||||
ticket = {
|
||||
"id": ticket_id,
|
||||
"title": title,
|
||||
"description": description,
|
||||
"priority": priority,
|
||||
"customerEmail": customer_email,
|
||||
"status": "open",
|
||||
"createdAt": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
await ctx.state.set("tickets", ticket_id, ticket)
|
||||
ctx.logger.info("Ticket created", {"ticketId": ticket_id, "priority": priority})
|
||||
|
||||
await ctx.enqueue({
|
||||
"topic": "ticket::created",
|
||||
"data": {
|
||||
"ticketId": ticket_id,
|
||||
"title": title,
|
||||
"priority": priority,
|
||||
"customerEmail": customer_email,
|
||||
},
|
||||
})
|
||||
|
||||
return ApiResponse(status=200, body={
|
||||
"ticketId": ticket_id,
|
||||
"status": "open",
|
||||
"message": "Ticket created and queued for triage",
|
||||
})
|
||||
@@ -1,90 +0,0 @@
|
||||
"""Escalate Ticket Step - multi-trigger: escalates tickets from SLA breach or manual request.
|
||||
|
||||
Uses ctx.match() to route logic per trigger type.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from motia import ApiRequest, ApiResponse, FlowContext, http, queue
|
||||
|
||||
config = {
|
||||
"name": "EscalateTicket",
|
||||
"description": "Multi-trigger: escalates tickets from SLA breach or manual request",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
queue("ticket::sla-breached"),
|
||||
http("POST", "/tickets/escalate"),
|
||||
],
|
||||
"enqueues": [],
|
||||
}
|
||||
|
||||
|
||||
async def _escalate_ticket(
|
||||
ticket_id: str,
|
||||
updates: dict[str, Any],
|
||||
ctx: FlowContext[Any],
|
||||
) -> dict[str, Any] | None:
|
||||
"""Fetches a ticket and applies escalation fields to state. Returns pre-update ticket or None."""
|
||||
existing = await ctx.state.get("tickets", ticket_id)
|
||||
if not existing:
|
||||
return None
|
||||
await ctx.state.set("tickets", ticket_id, {
|
||||
**existing,
|
||||
"escalatedTo": "engineering-lead",
|
||||
"escalatedAt": datetime.now(timezone.utc).isoformat(),
|
||||
**updates,
|
||||
})
|
||||
return existing
|
||||
|
||||
|
||||
async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
|
||||
async def _queue_handler(breach: Any) -> None:
|
||||
ticket_id = breach.get("ticketId")
|
||||
age_minutes = breach.get("ageMinutes", 0)
|
||||
priority = breach.get("priority", "medium")
|
||||
|
||||
ctx.logger.info("Escalating ticket", {"ticketId": ticket_id, "triggerType": "queue"})
|
||||
ctx.logger.warn("Auto-escalation from SLA breach", {
|
||||
"ticketId": ticket_id,
|
||||
"ageMinutes": age_minutes,
|
||||
"priority": priority,
|
||||
})
|
||||
|
||||
escalated = await _escalate_ticket(
|
||||
ticket_id,
|
||||
{"escalationReason": f"SLA breach: {age_minutes} minutes without resolution", "escalationMethod": "auto"},
|
||||
ctx,
|
||||
)
|
||||
|
||||
if not escalated:
|
||||
ctx.logger.error("Ticket not found during SLA escalation", {"ticketId": ticket_id, "ageMinutes": age_minutes})
|
||||
|
||||
async def _http_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
|
||||
body = request.body or {}
|
||||
ticket_id = body.get("ticketId")
|
||||
reason = body.get("reason", "")
|
||||
|
||||
ctx.logger.info("Escalating ticket", {"ticketId": ticket_id, "triggerType": "http"})
|
||||
|
||||
existing = await _escalate_ticket(
|
||||
ticket_id,
|
||||
{"escalationReason": reason, "escalationMethod": "manual"},
|
||||
ctx,
|
||||
)
|
||||
|
||||
if not existing:
|
||||
return ApiResponse(status=404, body={"error": f"Ticket {ticket_id} not found"})
|
||||
|
||||
ctx.logger.info("Manual escalation via API", {"ticketId": ticket_id, "reason": reason})
|
||||
|
||||
return ApiResponse(status=200, body={
|
||||
"ticketId": ticket_id,
|
||||
"escalatedTo": "engineering-lead",
|
||||
"message": "Ticket escalated successfully",
|
||||
})
|
||||
|
||||
return await ctx.match({
|
||||
"queue": _queue_handler,
|
||||
"http": _http_handler,
|
||||
})
|
||||
@@ -1,24 +0,0 @@
|
||||
"""List Tickets Step - returns all tickets from state."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from motia import ApiRequest, ApiResponse, FlowContext, http
|
||||
|
||||
config = {
|
||||
"name": "ListTickets",
|
||||
"description": "Returns all tickets from state",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
http("GET", "/tickets"),
|
||||
],
|
||||
"enqueues": [],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest[Any], ctx: FlowContext[Any]) -> ApiResponse[Any]:
|
||||
_ = request
|
||||
tickets = await ctx.state.list("tickets")
|
||||
|
||||
ctx.logger.info("Listing tickets", {"count": len(tickets)})
|
||||
|
||||
return ApiResponse(status=200, body={"tickets": tickets, "count": len(tickets)})
|
||||
@@ -1,37 +0,0 @@
|
||||
"""Notify Customer Step - sends a notification when a ticket has been triaged."""
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from motia import FlowContext, queue
|
||||
|
||||
config = {
|
||||
"name": "NotifyCustomer",
|
||||
"description": "Sends a notification when a ticket has been triaged",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
queue("ticket::triaged"),
|
||||
],
|
||||
"enqueues": [],
|
||||
}
|
||||
|
||||
|
||||
async def handler(input_data: Any, ctx: FlowContext[Any]) -> None:
|
||||
ticket_id = input_data.get("ticketId")
|
||||
assignee = input_data.get("assignee")
|
||||
priority = input_data.get("priority")
|
||||
title = input_data.get("title")
|
||||
|
||||
ctx.logger.info("Sending customer notification", {"ticketId": ticket_id, "assignee": assignee})
|
||||
|
||||
ticket = await ctx.state.get("tickets", ticket_id)
|
||||
customer_email = ticket.get("customerEmail", "") if ticket else ""
|
||||
redacted_email = re.sub(r"(?<=.{2}).(?=.*@)", "*", customer_email) if customer_email else "unknown"
|
||||
|
||||
ctx.logger.info("Notification sent", {
|
||||
"ticketId": ticket_id,
|
||||
"assignee": assignee,
|
||||
"priority": priority,
|
||||
"title": title,
|
||||
"email": redacted_email,
|
||||
})
|
||||
@@ -1,67 +0,0 @@
|
||||
"""SLA Monitor Step - cron job that checks for SLA breaches on open tickets."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from motia import FlowContext, cron
|
||||
|
||||
SLA_THRESHOLDS_MS = {
|
||||
"critical": 15 * 60 * 1000, # 15 minutes
|
||||
"high": 60 * 60 * 1000, # 1 hour
|
||||
"medium": 4 * 60 * 60 * 1000, # 4 hours
|
||||
"low": 24 * 60 * 60 * 1000, # 24 hours
|
||||
}
|
||||
|
||||
config = {
|
||||
"name": "SlaMonitor",
|
||||
"description": "Cron job that checks for SLA breaches on open tickets",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
cron("0/30 * * * * *"),
|
||||
],
|
||||
"enqueues": ["ticket::sla-breached"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
_ = input_data
|
||||
ctx.logger.info("Running SLA compliance check")
|
||||
|
||||
tickets = await ctx.state.list("tickets")
|
||||
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
||||
breaches = 0
|
||||
|
||||
for ticket in tickets:
|
||||
if ticket.get("status") != "open" or not ticket.get("createdAt"):
|
||||
continue
|
||||
|
||||
try:
|
||||
created_dt = datetime.fromisoformat(ticket["createdAt"])
|
||||
created_ms = int(created_dt.timestamp() * 1000)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
age_ms = now_ms - created_ms
|
||||
threshold = SLA_THRESHOLDS_MS.get(ticket.get("priority", "medium"), SLA_THRESHOLDS_MS["medium"])
|
||||
|
||||
if age_ms > threshold:
|
||||
breaches += 1
|
||||
age_minutes = round(age_ms / 60_000)
|
||||
|
||||
ctx.logger.warn("SLA breach detected!", {
|
||||
"ticketId": ticket["id"],
|
||||
"priority": ticket.get("priority"),
|
||||
"ageMinutes": age_minutes,
|
||||
})
|
||||
|
||||
await ctx.enqueue({
|
||||
"topic": "ticket::sla-breached",
|
||||
"data": {
|
||||
"ticketId": ticket["id"],
|
||||
"priority": ticket.get("priority", "medium"),
|
||||
"title": ticket.get("title", ""),
|
||||
"ageMinutes": age_minutes,
|
||||
},
|
||||
})
|
||||
|
||||
ctx.logger.info("SLA check complete", {"totalTickets": len(tickets), "breaches": breaches})
|
||||
@@ -1,100 +0,0 @@
|
||||
"""Triage Ticket Step - multi-trigger: auto-triage from queue, manual triage via API, sweep via cron.
|
||||
|
||||
Demonstrates a single step responding to three trigger types using ctx.match().
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from motia import ApiRequest, ApiResponse, FlowContext, cron, http, queue
|
||||
|
||||
config = {
|
||||
"name": "TriageTicket",
|
||||
"description": "Multi-trigger: auto-triage from queue, manual triage via API, sweep via cron",
|
||||
"flows": ["support-ticket-flow"],
|
||||
"triggers": [
|
||||
queue("ticket::created"),
|
||||
http("POST", "/tickets/triage"),
|
||||
cron("0 */5 * * * * *"),
|
||||
],
|
||||
"enqueues": ["ticket::triaged"],
|
||||
}
|
||||
|
||||
|
||||
async def _triage_ticket(
|
||||
ticket_id: str,
|
||||
existing: dict[str, Any] | None,
|
||||
state_updates: dict[str, Any],
|
||||
enqueue_data: dict[str, Any],
|
||||
ctx: FlowContext[Any],
|
||||
) -> None:
|
||||
"""Updates ticket state with triage fields and emits the triaged event."""
|
||||
if not existing:
|
||||
return
|
||||
updated = {**existing, "triagedAt": datetime.now(timezone.utc).isoformat(), **state_updates}
|
||||
await ctx.state.set("tickets", ticket_id, updated)
|
||||
await ctx.enqueue({"topic": "ticket::triaged", "data": {"ticketId": ticket_id, **enqueue_data}})
|
||||
|
||||
|
||||
async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
|
||||
async def _queue_handler(data: Any) -> None:
|
||||
ticket_id = data.get("ticketId")
|
||||
title = data.get("title", "")
|
||||
priority = data.get("priority", "medium")
|
||||
|
||||
ctx.logger.info("Auto-triaging ticket from queue", {"ticketId": ticket_id, "priority": priority})
|
||||
|
||||
assignee = "senior-support" if priority in ("critical", "high") else "support-pool"
|
||||
existing = await ctx.state.get("tickets", ticket_id)
|
||||
|
||||
await _triage_ticket(
|
||||
ticket_id, existing,
|
||||
{"assignee": assignee, "triageMethod": "auto"},
|
||||
{"assignee": assignee, "priority": priority, "title": title},
|
||||
ctx,
|
||||
)
|
||||
ctx.logger.info("Ticket auto-triaged", {"ticketId": ticket_id, "assignee": assignee})
|
||||
|
||||
async def _http_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
|
||||
body = request.body or {}
|
||||
ticket_id = body.get("ticketId")
|
||||
assignee = body.get("assignee")
|
||||
priority = body.get("priority", "medium")
|
||||
|
||||
existing = await ctx.state.get("tickets", ticket_id)
|
||||
if not existing:
|
||||
return ApiResponse(status=404, body={"error": f"Ticket {ticket_id} not found"})
|
||||
|
||||
ctx.logger.info("Manual triage via API", {"ticketId": ticket_id, "assignee": assignee})
|
||||
|
||||
await _triage_ticket(
|
||||
ticket_id, existing,
|
||||
{"assignee": assignee, "priority": priority, "triageMethod": "manual"},
|
||||
{"assignee": assignee, "priority": priority, "title": existing.get("title", "")},
|
||||
ctx,
|
||||
)
|
||||
return ApiResponse(status=200, body={"ticketId": ticket_id, "assignee": assignee, "status": "triaged"})
|
||||
|
||||
async def _cron_handler() -> None:
|
||||
ctx.logger.info("Running untriaged ticket sweep.")
|
||||
tickets = await ctx.state.list("tickets")
|
||||
swept = 0
|
||||
|
||||
for ticket in tickets:
|
||||
if not ticket.get("assignee") and ticket.get("status") == "open":
|
||||
ctx.logger.warn("Found untriaged ticket during sweep", {"ticketId": ticket["id"]})
|
||||
await _triage_ticket(
|
||||
ticket["id"], ticket,
|
||||
{"assignee": "support-pool", "triageMethod": "auto-sweep"},
|
||||
{"assignee": "support-pool", "priority": ticket.get("priority", "medium"), "title": ticket.get("title", "unknown")},
|
||||
ctx,
|
||||
)
|
||||
swept += 1
|
||||
|
||||
ctx.logger.info("Sweep complete", {"sweptCount": swept})
|
||||
|
||||
return await ctx.match({
|
||||
"queue": _queue_handler,
|
||||
"http": _http_handler,
|
||||
"cron": _cron_handler,
|
||||
})
|
||||
164
steps/vmh/beteiligte_sync_cron_step.py
Normal file
164
steps/vmh/beteiligte_sync_cron_step.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""
|
||||
Beteiligte Sync Cron Job
|
||||
|
||||
Läuft alle 15 Minuten und emittiert Sync-Events für Beteiligte die:
|
||||
- Neu sind (pending_sync)
|
||||
- Geändert wurden (dirty)
|
||||
- Fehlgeschlagen sind (failed → Retry)
|
||||
- Lange nicht gesynct wurden (clean aber > 24h alt)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Dict, Any
|
||||
from motia import FlowContext, cron
|
||||
from services.espocrm import EspoCRMAPI
|
||||
import datetime
|
||||
|
||||
config = {
|
||||
"name": "VMH Beteiligte Sync Cron",
|
||||
"description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen",
|
||||
"flows": ["vmh"],
|
||||
"triggers": [
|
||||
cron("0 */15 * * * *") # Alle 15 Minuten (6-field format!)
|
||||
],
|
||||
"enqueues": ["vmh.beteiligte.sync_check"]
|
||||
}
|
||||
|
||||
|
||||
async def handler(input_data: Dict[str, Any], ctx: FlowContext):
|
||||
"""
|
||||
Cron-Handler: Findet alle Beteiligte die Sync benötigen und emittiert Events
|
||||
"""
|
||||
ctx.logger.info("🕐 Beteiligte Sync Cron gestartet")
|
||||
|
||||
try:
|
||||
espocrm = EspoCRMAPI()
|
||||
|
||||
# Berechne Threshold für "veraltete" Syncs (24 Stunden)
|
||||
threshold = datetime.datetime.now() - datetime.timedelta(hours=24)
|
||||
threshold_str = threshold.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
ctx.logger.info(f"📅 Suche Entities mit Sync-Bedarf (älter als {threshold_str})")
|
||||
|
||||
# QUERY 1: Entities mit Status pending_sync, dirty oder failed
|
||||
unclean_filter = {
|
||||
'where': [
|
||||
{
|
||||
'type': 'or',
|
||||
'value': [
|
||||
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'},
|
||||
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'},
|
||||
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'},
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
unclean_result = await espocrm.search_entities('CBeteiligte', unclean_filter, max_size=100)
|
||||
unclean_entities = unclean_result.get('list', [])
|
||||
|
||||
ctx.logger.info(f"📊 Gefunden: {len(unclean_entities)} Entities mit Status pending/dirty/failed")
|
||||
|
||||
# QUERY 1b: permanently_failed Entities die Auto-Reset erreicht haben
|
||||
permanently_failed_filter = {
|
||||
'where': [
|
||||
{
|
||||
'type': 'and',
|
||||
'value': [
|
||||
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'permanently_failed'},
|
||||
{'type': 'isNotNull', 'attribute': 'syncAutoResetAt'},
|
||||
{'type': 'before', 'attribute': 'syncAutoResetAt', 'value': threshold_str}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
reset_result = await espocrm.search_entities('CBeteiligte', permanently_failed_filter, max_size=50)
|
||||
reset_entities = reset_result.get('list', [])
|
||||
|
||||
# Reset permanently_failed entities
|
||||
for entity in reset_entities:
|
||||
entity_id = entity['id']
|
||||
ctx.logger.info(f"🔄 Auto-Reset für permanently_failed Entity {entity_id}")
|
||||
|
||||
# Reset Status und Retry-Count
|
||||
await espocrm.update_entity('CBeteiligte', entity_id, {
|
||||
'syncStatus': 'failed', # Zurück zu 'failed' für normalen Retry
|
||||
'syncRetryCount': 0,
|
||||
'syncAutoResetAt': None,
|
||||
'syncErrorMessage': f"Auto-Reset nach 24h - vorheriger Fehler: {entity.get('syncErrorMessage', 'N/A')}"
|
||||
})
|
||||
|
||||
ctx.logger.info(f"📊 Auto-Reset: {len(reset_entities)} permanently_failed Entities")
|
||||
|
||||
# QUERY 2: Clean Entities die > 24h nicht gesynct wurden
|
||||
stale_filter = {
|
||||
'where': [
|
||||
{
|
||||
'type': 'and',
|
||||
'value': [
|
||||
{'type': 'equals', 'attribute': 'syncStatus', 'value': 'clean'},
|
||||
{'type': 'isNotNull', 'attribute': 'betnr'},
|
||||
{
|
||||
'type': 'or',
|
||||
'value': [
|
||||
{'type': 'isNull', 'attribute': 'advowareLastSync'},
|
||||
{'type': 'before', 'attribute': 'advowareLastSync', 'value': threshold_str}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
stale_result = await espocrm.search_entities('CBeteiligte', stale_filter, max_size=50)
|
||||
stale_entities = stale_result.get('list', [])
|
||||
|
||||
ctx.logger.info(f"📊 Gefunden: {len(stale_entities)} Entities mit veraltetem Sync (> 24h)")
|
||||
|
||||
# KOMBINIERE ALLE (inkl. reset_entities)
|
||||
all_entities = unclean_entities + stale_entities + reset_entities
|
||||
entity_ids = list(set([e['id'] for e in all_entities])) # Dedupliziere
|
||||
|
||||
ctx.logger.info(f"🎯 Total: {len(entity_ids)} eindeutige Entities zum Sync")
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.info("✅ Keine Entities benötigen Sync")
|
||||
return
|
||||
|
||||
# Emittiere Events parallel
|
||||
ctx.logger.info(f"🚀 Emittiere {len(entity_ids)} Events parallel...")
|
||||
|
||||
emit_tasks = [
|
||||
ctx.enqueue({
|
||||
'topic': 'vmh.beteiligte.sync_check',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'sync_check',
|
||||
'source': 'cron',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
for entity_id in entity_ids
|
||||
]
|
||||
|
||||
# Parallel emit mit error handling
|
||||
results = await asyncio.gather(*emit_tasks, return_exceptions=True)
|
||||
|
||||
# Count successes and failures
|
||||
emitted_count = sum(1 for r in results if not isinstance(r, Exception))
|
||||
failed_count = sum(1 for r in results if isinstance(r, Exception))
|
||||
|
||||
if failed_count > 0:
|
||||
ctx.logger.warn(f"⚠️ {failed_count} Events konnten nicht emittiert werden")
|
||||
# Log first few errors
|
||||
for i, result in enumerate(results[:5]): # Log max 5 errors
|
||||
if isinstance(result, Exception):
|
||||
ctx.logger.error(f" Entity {entity_ids[i]}: {result}")
|
||||
|
||||
ctx.logger.info(f"✅ Cron fertig: {emitted_count}/{len(entity_ids)} Events emittiert")
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler im Sync Cron: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
Reference in New Issue
Block a user