- iii-config.yaml mit Production-Settings (CORS, all interfaces) - Ticketing-System Steps (create, triage, escalate, notify, SLA monitoring) - Python dependencies via uv - Systemd services für Motia Engine und iii Console - README mit Deployment-Info
101 lines
3.8 KiB
Python
101 lines
3.8 KiB
Python
"""Triage Ticket Step - multi-trigger: auto-triage from queue, manual triage via API, sweep via cron.
|
|
|
|
Demonstrates a single step responding to three trigger types using ctx.match().
|
|
"""
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from motia import ApiRequest, ApiResponse, FlowContext, cron, http, queue
|
|
|
|
config = {
|
|
"name": "TriageTicket",
|
|
"description": "Multi-trigger: auto-triage from queue, manual triage via API, sweep via cron",
|
|
"flows": ["support-ticket-flow"],
|
|
"triggers": [
|
|
queue("ticket::created"),
|
|
http("POST", "/tickets/triage"),
|
|
cron("0 */5 * * * * *"),
|
|
],
|
|
"enqueues": ["ticket::triaged"],
|
|
}
|
|
|
|
|
|
async def _triage_ticket(
|
|
ticket_id: str,
|
|
existing: dict[str, Any] | None,
|
|
state_updates: dict[str, Any],
|
|
enqueue_data: dict[str, Any],
|
|
ctx: FlowContext[Any],
|
|
) -> None:
|
|
"""Updates ticket state with triage fields and emits the triaged event."""
|
|
if not existing:
|
|
return
|
|
updated = {**existing, "triagedAt": datetime.now(timezone.utc).isoformat(), **state_updates}
|
|
await ctx.state.set("tickets", ticket_id, updated)
|
|
await ctx.enqueue({"topic": "ticket::triaged", "data": {"ticketId": ticket_id, **enqueue_data}})
|
|
|
|
|
|
async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
|
|
async def _queue_handler(data: Any) -> None:
|
|
ticket_id = data.get("ticketId")
|
|
title = data.get("title", "")
|
|
priority = data.get("priority", "medium")
|
|
|
|
ctx.logger.info("Auto-triaging ticket from queue", {"ticketId": ticket_id, "priority": priority})
|
|
|
|
assignee = "senior-support" if priority in ("critical", "high") else "support-pool"
|
|
existing = await ctx.state.get("tickets", ticket_id)
|
|
|
|
await _triage_ticket(
|
|
ticket_id, existing,
|
|
{"assignee": assignee, "triageMethod": "auto"},
|
|
{"assignee": assignee, "priority": priority, "title": title},
|
|
ctx,
|
|
)
|
|
ctx.logger.info("Ticket auto-triaged", {"ticketId": ticket_id, "assignee": assignee})
|
|
|
|
async def _http_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
|
|
body = request.body or {}
|
|
ticket_id = body.get("ticketId")
|
|
assignee = body.get("assignee")
|
|
priority = body.get("priority", "medium")
|
|
|
|
existing = await ctx.state.get("tickets", ticket_id)
|
|
if not existing:
|
|
return ApiResponse(status=404, body={"error": f"Ticket {ticket_id} not found"})
|
|
|
|
ctx.logger.info("Manual triage via API", {"ticketId": ticket_id, "assignee": assignee})
|
|
|
|
await _triage_ticket(
|
|
ticket_id, existing,
|
|
{"assignee": assignee, "priority": priority, "triageMethod": "manual"},
|
|
{"assignee": assignee, "priority": priority, "title": existing.get("title", "")},
|
|
ctx,
|
|
)
|
|
return ApiResponse(status=200, body={"ticketId": ticket_id, "assignee": assignee, "status": "triaged"})
|
|
|
|
async def _cron_handler() -> None:
|
|
ctx.logger.info("Running untriaged ticket sweep.")
|
|
tickets = await ctx.state.list("tickets")
|
|
swept = 0
|
|
|
|
for ticket in tickets:
|
|
if not ticket.get("assignee") and ticket.get("status") == "open":
|
|
ctx.logger.warn("Found untriaged ticket during sweep", {"ticketId": ticket["id"]})
|
|
await _triage_ticket(
|
|
ticket["id"], ticket,
|
|
{"assignee": "support-pool", "triageMethod": "auto-sweep"},
|
|
{"assignee": "support-pool", "priority": ticket.get("priority", "medium"), "title": ticket.get("title", "unknown")},
|
|
ctx,
|
|
)
|
|
swept += 1
|
|
|
|
ctx.logger.info("Sweep complete", {"sweptCount": swept})
|
|
|
|
return await ctx.match({
|
|
"queue": _queue_handler,
|
|
"http": _http_handler,
|
|
"cron": _cron_handler,
|
|
})
|