Files
motia/bitbylaw/.cursor/rules/motia/realtime-streaming.mdc
2025-10-19 14:57:07 +00:00

381 lines
9.4 KiB
Plaintext

---
description: Real-time streaming
globs: steps/**/*.step.ts,steps/**/*.step.js,steps/**/*_step.py,steps/**/*.stream.ts,steps/**/*.stream.js,steps/**/*_stream.py
alwaysApply: false
---
# Real-time Streaming
Building event driven applications often requires some real-time streaming capabilities.
- Like integration with LLMs, they should be implemented in an asynchronous way and updates should come in real-time.
- Chat applications, real-time collaboration, etc.
- Long living processes like data processing, video processing, etc.
Motia has a built-in real-time streaming system that allows you to easily implement real-time streaming capabilities in your application.
It's called Streams.
## Stream Configuration
Creating a Stream means defining a data schema that will be stored and served to the clients who are subscribing.
### TypeScript Example
```typescript
// steps/streams/chat-messages.stream.ts
import { StreamConfig } from 'motia'
import { z } from 'zod'
export const chatMessageSchema = z.object({
id: z.string(),
userId: z.string(),
message: z.string(),
timestamp: z.string()
})
export type ChatMessage = z.infer<typeof chatMessageSchema>
export const config: StreamConfig = {
/**
* This is the stream name, it's extremely important to
* be used on the client side.
*/
name: 'chatMessage',
/**
* This is the schema of the data that will be stored in the stream.
*
* It helps Motia to create the types on the steps to enforce the
* streams objects are created correctly.
*/
schema: chatMessageSchema,
/**
* Let's not worry about base config for now, all streams
* have this storage type default
*/
baseConfig: { storageType: 'default' },
}
```
### Python Examples
#### With Pydantic (Optional)
```python
# steps/streams/chat_messages_stream.py
from pydantic import BaseModel
class ChatMessage(BaseModel):
id: str
user_id: str
message: str
timestamp: str
config = {
"name": "chatMessage",
"schema": ChatMessage.model_json_schema(),
"baseConfig": {"storageType": "default"}
}
```
#### Without Pydantic (Pure JSON Schema)
```python
# steps/streams/chat_messages_stream.py
config = {
"name": "chatMessage",
"schema": {
"type": "object",
"properties": {
"id": {"type": "string"},
"user_id": {"type": "string"},
"message": {"type": "string"},
"timestamp": {"type": "string"}
},
"required": ["id", "user_id", "message", "timestamp"]
},
"baseConfig": {"storageType": "default"}
}
```
## Using streams
Streams managers are automatically injected into the context of the steps.
The interface of each stream is this
```typescript
interface MotiaStream<TData> {
/**
* Retrieves a single item from the stream
*
* @param groupId - The group id of the stream
* @param id - The id of the item to get
* @returns The item or null if it doesn't exist
*/
get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
/**
* Create or update a single item in the stream.
*
* If the item doesn't exist, it will be created.
* If the item exists, it will be updated.
*
* @param groupId - The group id of the stream
* @param id - The id of the item to set
* @param data - The data to set
* @returns The item
*/
set(groupId: string, id: string, data: TData): Promise<BaseStreamItem<TData>>
/**
* Deletes a single item from the stream
*
* @param groupId - The group id of the stream
* @param id - The id of the item to delete
* @returns The item or null if it doesn't exist
*/
delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
/**
* Retrieves a group of items from the stream based on the group id
*
* @param groupId - The group id of the stream
* @returns The items
*/
getGroup(groupId: string): Promise<BaseStreamItem<TData>[]>
/**
* This is used mostly for ephemeral events in streams.
* A chat message for example has a state, which is the message content, user id, etc.
*
* However, if you want to send an event to the subscribers like:
* - online status
* - reactions
* - typing indicators
* - etc.
*
* @param channel - The channel to send the event to
* @param event - The event to send
*/
send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
}
```
## Sending ephemeral events
Streams hold state, which means when the client connects and subscribes to a GroupID and ItemID,
they will automatically sync with the state of the stream, however, there might be cases where
you want to send an ephemeral event to the subscribers, like:
- online status
- reactions
- typing indicators
- etc.
This is where the `send` method comes in.
```typescript
/**
* The channel to send the event to
*/
type StateStreamEventChannel = {
/**
* The group id of the stream
*/
groupId: string
/**
* The id of the item to send the event to
*
* Optional, when not provided, the event will be sent to the entire group.
* Subscribers to the group will receive the event.
*/
id?: string
}
export type StateStreamEvent<TData> = {
/**
* The type of the event, use as the name of the event
* to be handled in the subscribers.
*/
type: string
/**
* The data of the event, the data that will be sent to the subscribers.
*/
data: TData
}
```
## Using in handlers
### TypeScript Example
```typescript
import { ApiRouteConfig, Handlers } from 'motia'
import { z } from 'zod'
import { chatMessageSchema } from './streams/chat-messages.stream'
export const config: ApiRouteConfig = {
type: 'api',
name: 'CreateChatMessage',
method: 'POST',
path: '/chat-messages',
bodySchema: z.object({
channelId: z.string(),
message: z.string(),
}),
emits: [],
responseSchema: {
201: chatMessageSchema
}
}
export const handler = async (req, { streams }) => {
/**
* Say this is an API Step that a user sends a message to a channel.
*
* In your application logic you should have a channel ID defined somewhere
* so the client can send the message to the correct channel.
*/
const { channelId, message } = req.body
/**
* Define the message ID however you want, but should be a unique identifier underneath the channel ID.
*
* This is used to identify the message in the stream.
*/
const messageId = crypto.randomUUID()
/**
* In your application logic you should have a user ID defined somewhere.
* We recommend using middlewares to identify the user on the request.
*/
const userId = 'example-user-id'
/**
* Creates a message in the stream
*/
const message = await streams.chatMessage.set(channelId, messageId, {
id: messageId,
userId: userId,
message: message,
timestamp: new Date().toISOString()
})
/**
* Returning the stream result directly to the client helps Workbench to
* render the stream object and update it in real-time in the UI.
*/
return { status: 201, body: message }
}
```
### Python Examples
#### With Pydantic (Optional)
```python
import uuid
from datetime import datetime
from pydantic import BaseModel
class ChatMessageRequest(BaseModel):
channel_id: str
message: str
class ChatMessageResponse(BaseModel):
id: str
user_id: str
message: str
timestamp: str
config = {
"type": "api",
"name": "CreateChatMessage",
"method": "POST",
"path": "/chat-messages",
"bodySchema": ChatMessageRequest.model_json_schema(),
"emits": [],
"responseSchema": {
201: ChatMessageResponse.model_json_schema()
}
}
async def handler(req, context):
body = req.get("body", {})
# Optional: Validate with Pydantic
request_data = ChatMessageRequest(**body)
channel_id = request_data.channel_id
message_text = request_data.message
message_id = str(uuid.uuid4())
user_id = "example-user-id"
# Creates a message in the stream
chat_message = await context.streams.chatMessage.set(channel_id, message_id, {
"id": message_id,
"user_id": user_id,
"message": message_text,
"timestamp": datetime.now().isoformat()
})
return {"status": 201, "body": chat_message}
```
#### Without Pydantic (Pure JSON Schema)
```python
import uuid
from datetime import datetime
config = {
"type": "api",
"name": "CreateChatMessage",
"method": "POST",
"path": "/chat-messages",
"bodySchema": {
"type": "object",
"properties": {
"channel_id": {"type": "string"},
"message": {"type": "string"}
},
"required": ["channel_id", "message"]
},
"emits": [],
"responseSchema": {
201: {
"type": "object",
"properties": {
"id": {"type": "string"},
"user_id": {"type": "string"},
"message": {"type": "string"},
"timestamp": {"type": "string"}
}
}
}
}
async def handler(req, context):
body = req.get("body", {})
channel_id = body.get("channel_id")
message_text = body.get("message")
message_id = str(uuid.uuid4())
user_id = "example-user-id"
# Creates a message in the stream
chat_message = await context.streams.chatMessage.set(channel_id, message_id, {
"id": message_id,
"user_id": user_id,
"message": message_text,
"timestamp": datetime.now().isoformat()
})
return {"status": 201, "body": chat_message}
```