Files
motia-iii/steps/triage_ticket_step.py
bsiggel b3bdb56753 Initial commit: Motia III Backend Setup
- iii-config.yaml mit Production-Settings (CORS, all interfaces)
- Ticketing-System Steps (create, triage, escalate, notify, SLA monitoring)
- Python dependencies via uv
- Systemd services für Motia Engine und iii Console
- README mit Deployment-Info
2026-03-01 21:38:07 +00:00

101 lines
3.8 KiB
Python

"""Triage Ticket Step - multi-trigger: auto-triage from queue, manual triage via API, sweep via cron.
Demonstrates a single step responding to three trigger types using ctx.match().
"""
from datetime import datetime, timezone
from typing import Any
from motia import ApiRequest, ApiResponse, FlowContext, cron, http, queue
config = {
"name": "TriageTicket",
"description": "Multi-trigger: auto-triage from queue, manual triage via API, sweep via cron",
"flows": ["support-ticket-flow"],
"triggers": [
queue("ticket::created"),
http("POST", "/tickets/triage"),
cron("0 */5 * * * * *"),
],
"enqueues": ["ticket::triaged"],
}
async def _triage_ticket(
ticket_id: str,
existing: dict[str, Any] | None,
state_updates: dict[str, Any],
enqueue_data: dict[str, Any],
ctx: FlowContext[Any],
) -> None:
"""Updates ticket state with triage fields and emits the triaged event."""
if not existing:
return
updated = {**existing, "triagedAt": datetime.now(timezone.utc).isoformat(), **state_updates}
await ctx.state.set("tickets", ticket_id, updated)
await ctx.enqueue({"topic": "ticket::triaged", "data": {"ticketId": ticket_id, **enqueue_data}})
async def handler(input_data: Any, ctx: FlowContext[Any]) -> Any:
async def _queue_handler(data: Any) -> None:
ticket_id = data.get("ticketId")
title = data.get("title", "")
priority = data.get("priority", "medium")
ctx.logger.info("Auto-triaging ticket from queue", {"ticketId": ticket_id, "priority": priority})
assignee = "senior-support" if priority in ("critical", "high") else "support-pool"
existing = await ctx.state.get("tickets", ticket_id)
await _triage_ticket(
ticket_id, existing,
{"assignee": assignee, "triageMethod": "auto"},
{"assignee": assignee, "priority": priority, "title": title},
ctx,
)
ctx.logger.info("Ticket auto-triaged", {"ticketId": ticket_id, "assignee": assignee})
async def _http_handler(request: ApiRequest[Any]) -> ApiResponse[Any]:
body = request.body or {}
ticket_id = body.get("ticketId")
assignee = body.get("assignee")
priority = body.get("priority", "medium")
existing = await ctx.state.get("tickets", ticket_id)
if not existing:
return ApiResponse(status=404, body={"error": f"Ticket {ticket_id} not found"})
ctx.logger.info("Manual triage via API", {"ticketId": ticket_id, "assignee": assignee})
await _triage_ticket(
ticket_id, existing,
{"assignee": assignee, "priority": priority, "triageMethod": "manual"},
{"assignee": assignee, "priority": priority, "title": existing.get("title", "")},
ctx,
)
return ApiResponse(status=200, body={"ticketId": ticket_id, "assignee": assignee, "status": "triaged"})
async def _cron_handler() -> None:
ctx.logger.info("Running untriaged ticket sweep.")
tickets = await ctx.state.list("tickets")
swept = 0
for ticket in tickets:
if not ticket.get("assignee") and ticket.get("status") == "open":
ctx.logger.warn("Found untriaged ticket during sweep", {"ticketId": ticket["id"]})
await _triage_ticket(
ticket["id"], ticket,
{"assignee": "support-pool", "triageMethod": "auto-sweep"},
{"assignee": "support-pool", "priority": ticket.get("priority", "medium"), "title": ticket.get("title", "unknown")},
ctx,
)
swept += 1
ctx.logger.info("Sweep complete", {"sweptCount": swept})
return await ctx.match({
"queue": _queue_handler,
"http": _http_handler,
"cron": _cron_handler,
})