218 lines
5.4 KiB
Plaintext
218 lines
5.4 KiB
Plaintext
---
|
|
description: How to create background tasks in Motia
|
|
globs: steps/**/*.step.ts,steps/**/*.step.js,steps/**/*_step.py
|
|
alwaysApply: false
|
|
---
|
|
# Event Steps Guide
|
|
|
|
Event Steps are used to handle asynchronous events. These steps cannot be
|
|
invoked by a client or user. In order to ultimately trigger an Event Step,
|
|
you need to connect it to an API Step or a CRON Step.
|
|
|
|
Examples of event steps are:
|
|
- LLM Calls
|
|
- Processing big files, like images, videos, audio, etc.
|
|
- Sending emails
|
|
|
|
Other applicable examples are tasks that are likely to fail, examples:
|
|
- Webhook call to external systems
|
|
|
|
## Creating Event Steps
|
|
|
|
Steps need to be created in the `steps` folder, it can be in subfolders.
|
|
|
|
- Steps in TS and JS should end with `.step.ts` and `.step.js` respectively.
|
|
- Steps in Python should end with `_step.py`.
|
|
|
|
## Definition
|
|
|
|
Defining an API Step is done by two elements. Configuration and Handler.
|
|
|
|
### Schema Definition
|
|
|
|
- **TypeScript/JavaScript**: Motia uses Zod schemas for automatic validation of input data
|
|
- **Python**: Motia uses JSON Schema format. You can optionally use Pydantic models to generate JSON Schemas and handle manual validation in your handlers
|
|
|
|
### Configuration
|
|
|
|
**TypeScript/JavaScript**: You need to export a config constant via `export const config` that is a `EventConfig` type.
|
|
|
|
**Python**: You need to define a `config` dictionary with the same properties as the TypeScript `EventConfig`.
|
|
|
|
```typescript
|
|
export type Emit = string | {
|
|
/**
|
|
* The topic name to emit to.
|
|
*/
|
|
topic: string;
|
|
|
|
/**
|
|
* Optional label for the emission, could be used for documentation or UI.
|
|
*/
|
|
label?: string;
|
|
|
|
/**
|
|
* This is purely for documentation purposes,
|
|
* it doesn't affect the execution of the step.
|
|
*
|
|
* In Workbench, it will render differently based on this value.
|
|
*/
|
|
conditional?: boolean;
|
|
}
|
|
|
|
export type EventConfig = {
|
|
/**
|
|
* Should always be event
|
|
*/
|
|
type: 'event'
|
|
|
|
/**
|
|
* A unique name for this event step, used internally and for linking handlers.
|
|
*/
|
|
name: string
|
|
|
|
/**
|
|
* Optional human-readable description.
|
|
*/
|
|
description?: string
|
|
|
|
/**
|
|
* An array of topic names this step listens to.
|
|
*/
|
|
subscribes: string[]
|
|
|
|
/**
|
|
* An array of topics this step can emit events to.
|
|
*/
|
|
emits: Emit[]
|
|
|
|
/**
|
|
* Optional: Topics that are virtually emitted, perhaps for documentation or lineage, but not strictly required for execution.
|
|
*/
|
|
virtualEmits?: Emit[]
|
|
|
|
/**
|
|
* The Zod schema of the input data of events this step processes.
|
|
*
|
|
* This is used by Motia to create the correct types for whoever emits the event
|
|
* to this step.
|
|
*
|
|
* Avoid adding too much data to the input schema, only add the data that
|
|
* is necessary for the Event Step to process. If the data is too big it's
|
|
* recommended to store it in the state and fetch it from the state on the
|
|
* Event Step handler.
|
|
*/
|
|
input: ZodInput
|
|
|
|
/**
|
|
* Optional: An array of flow names this step belongs to.
|
|
*/
|
|
flows?: string[]
|
|
/**
|
|
* Files to include in the step bundle.
|
|
* Needs to be relative to the step file.
|
|
*/
|
|
includeFiles?: string[]
|
|
}
|
|
```
|
|
|
|
### Handler
|
|
|
|
The handler is a function that is exported via `export const handler` that is a `EventHandler` type.
|
|
|
|
### Handler definition
|
|
|
|
**TypeScript/JavaScript:**
|
|
```typescript
|
|
/**
|
|
* Input is inferred from the Event Step config['input']
|
|
* Context is the FlowContext
|
|
*/
|
|
export const handler: Handlers['SendEmail'] = async (input, { emit, logger, state, streams }) => {
|
|
// Implementation
|
|
}
|
|
```
|
|
|
|
**Python:**
|
|
```python
|
|
async def handler(input_data, context):
|
|
# input_data: dictionary with the event data (matches the input schema)
|
|
# context: object containing emit, logger, state, streams, trace_id
|
|
pass
|
|
```
|
|
|
|
### Examples
|
|
|
|
#### TypeScript Example
|
|
|
|
```typescript
|
|
import { EventConfig, Handlers } from 'motia';
|
|
import { z } from 'zod';
|
|
|
|
const inputSchema = z.object({
|
|
email: z.string(),
|
|
templateId: z.string(),
|
|
templateData: z.record(z.string(), z.any()),
|
|
})
|
|
|
|
export const config: EventConfig = {
|
|
type: 'event',
|
|
name: 'SendEmail',
|
|
description: 'Sends email notification to the user',
|
|
subscribes: ['send-email'],
|
|
emits: [],
|
|
input: inputSchema,
|
|
flows: ['resource-management']
|
|
};
|
|
|
|
export const handler: Handlers['SendEmail'] = async (input, { emit, logger }) => {
|
|
const { email, templateId, templateData } = input;
|
|
|
|
// Process email sending logic here
|
|
await emailService.send({
|
|
to: email,
|
|
templateId,
|
|
data: templateData
|
|
});
|
|
|
|
logger.info('Email sent successfully', { email, templateId });
|
|
};
|
|
```
|
|
|
|
#### Python Example
|
|
|
|
```python
|
|
from pydantic import BaseModel
|
|
from typing import Dict, Any
|
|
|
|
class EmailData(BaseModel):
|
|
email: str
|
|
template_id: str
|
|
template_data: Dict[str, Any]
|
|
|
|
config = {
|
|
"type": "event",
|
|
"name": "SendEmail",
|
|
"description": "Sends email notification to the user",
|
|
"subscribes": ["send-email"],
|
|
"emits": [],
|
|
"input": EmailData.model_json_schema(),
|
|
"flows": ["resource-management"]
|
|
}
|
|
|
|
async def handler(input_data, context):
|
|
# Optional: Validate input manually using Pydantic (Motia doesn't do this automatically)
|
|
email_data = EmailData(**input_data)
|
|
|
|
# Process email sending logic here
|
|
await email_service.send({
|
|
"to": email_data.email,
|
|
"template_id": email_data.template_id,
|
|
"data": email_data.template_data
|
|
})
|
|
|
|
context.logger.info("Email sent successfully", {
|
|
"email": email_data.email,
|
|
"template_id": email_data.template_id
|
|
})
|
|
``` |