381 lines
9.4 KiB
Plaintext
381 lines
9.4 KiB
Plaintext
---
|
|
description: Real-time streaming
|
|
globs: steps/**/*.step.ts,steps/**/*.step.js,steps/**/*_step.py,steps/**/*.stream.ts,steps/**/*.stream.js,steps/**/*_stream.py
|
|
alwaysApply: false
|
|
---
|
|
# Real-time Streaming
|
|
|
|
Building event driven applications often requires some real-time streaming capabilities.
|
|
- Like integration with LLMs, they should be implemented in an asynchronous way and updates should come in real-time.
|
|
- Chat applications, real-time collaboration, etc.
|
|
- Long living processes like data processing, video processing, etc.
|
|
|
|
Motia has a built-in real-time streaming system that allows you to easily implement real-time streaming capabilities in your application.
|
|
|
|
It's called Streams.
|
|
|
|
## Stream Configuration
|
|
|
|
Creating a Stream means defining a data schema that will be stored and served to the clients who are subscribing.
|
|
|
|
### TypeScript Example
|
|
|
|
```typescript
|
|
// steps/streams/chat-messages.stream.ts
|
|
import { StreamConfig } from 'motia'
|
|
import { z } from 'zod'
|
|
|
|
export const chatMessageSchema = z.object({
|
|
id: z.string(),
|
|
userId: z.string(),
|
|
message: z.string(),
|
|
timestamp: z.string()
|
|
})
|
|
|
|
export type ChatMessage = z.infer<typeof chatMessageSchema>
|
|
|
|
export const config: StreamConfig = {
|
|
/**
|
|
* This is the stream name, it's extremely important to
|
|
* be used on the client side.
|
|
*/
|
|
name: 'chatMessage',
|
|
|
|
/**
|
|
* This is the schema of the data that will be stored in the stream.
|
|
*
|
|
* It helps Motia to create the types on the steps to enforce the
|
|
* streams objects are created correctly.
|
|
*/
|
|
schema: chatMessageSchema,
|
|
|
|
/**
|
|
* Let's not worry about base config for now, all streams
|
|
* have this storage type default
|
|
*/
|
|
baseConfig: { storageType: 'default' },
|
|
}
|
|
```
|
|
|
|
### Python Examples
|
|
|
|
#### With Pydantic (Optional)
|
|
|
|
```python
|
|
# steps/streams/chat_messages_stream.py
|
|
from pydantic import BaseModel
|
|
|
|
class ChatMessage(BaseModel):
|
|
id: str
|
|
user_id: str
|
|
message: str
|
|
timestamp: str
|
|
|
|
config = {
|
|
"name": "chatMessage",
|
|
"schema": ChatMessage.model_json_schema(),
|
|
"baseConfig": {"storageType": "default"}
|
|
}
|
|
```
|
|
|
|
#### Without Pydantic (Pure JSON Schema)
|
|
|
|
```python
|
|
# steps/streams/chat_messages_stream.py
|
|
|
|
config = {
|
|
"name": "chatMessage",
|
|
"schema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string"},
|
|
"user_id": {"type": "string"},
|
|
"message": {"type": "string"},
|
|
"timestamp": {"type": "string"}
|
|
},
|
|
"required": ["id", "user_id", "message", "timestamp"]
|
|
},
|
|
"baseConfig": {"storageType": "default"}
|
|
}
|
|
```
|
|
|
|
## Using streams
|
|
|
|
Streams managers are automatically injected into the context of the steps.
|
|
|
|
The interface of each stream is this
|
|
|
|
```typescript
|
|
interface MotiaStream<TData> {
|
|
/**
|
|
* Retrieves a single item from the stream
|
|
*
|
|
* @param groupId - The group id of the stream
|
|
* @param id - The id of the item to get
|
|
* @returns The item or null if it doesn't exist
|
|
*/
|
|
get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
|
|
|
|
/**
|
|
* Create or update a single item in the stream.
|
|
*
|
|
* If the item doesn't exist, it will be created.
|
|
* If the item exists, it will be updated.
|
|
*
|
|
* @param groupId - The group id of the stream
|
|
* @param id - The id of the item to set
|
|
* @param data - The data to set
|
|
* @returns The item
|
|
*/
|
|
set(groupId: string, id: string, data: TData): Promise<BaseStreamItem<TData>>
|
|
|
|
/**
|
|
* Deletes a single item from the stream
|
|
*
|
|
* @param groupId - The group id of the stream
|
|
* @param id - The id of the item to delete
|
|
* @returns The item or null if it doesn't exist
|
|
*/
|
|
delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
|
|
|
|
/**
|
|
* Retrieves a group of items from the stream based on the group id
|
|
*
|
|
* @param groupId - The group id of the stream
|
|
* @returns The items
|
|
*/
|
|
getGroup(groupId: string): Promise<BaseStreamItem<TData>[]>
|
|
|
|
/**
|
|
* This is used mostly for ephemeral events in streams.
|
|
* A chat message for example has a state, which is the message content, user id, etc.
|
|
*
|
|
* However, if you want to send an event to the subscribers like:
|
|
* - online status
|
|
* - reactions
|
|
* - typing indicators
|
|
* - etc.
|
|
*
|
|
* @param channel - The channel to send the event to
|
|
* @param event - The event to send
|
|
*/
|
|
send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
|
|
}
|
|
```
|
|
|
|
## Sending ephemeral events
|
|
|
|
Streams hold state, which means when the client connects and subscribes to a GroupID and ItemID,
|
|
they will automatically sync with the state of the stream, however, there might be cases where
|
|
you want to send an ephemeral event to the subscribers, like:
|
|
|
|
- online status
|
|
- reactions
|
|
- typing indicators
|
|
- etc.
|
|
|
|
This is where the `send` method comes in.
|
|
|
|
```typescript
|
|
/**
|
|
* The channel to send the event to
|
|
*/
|
|
type StateStreamEventChannel = {
|
|
/**
|
|
* The group id of the stream
|
|
*/
|
|
groupId: string
|
|
|
|
/**
|
|
* The id of the item to send the event to
|
|
*
|
|
* Optional, when not provided, the event will be sent to the entire group.
|
|
* Subscribers to the group will receive the event.
|
|
*/
|
|
id?: string
|
|
}
|
|
|
|
export type StateStreamEvent<TData> = {
|
|
/**
|
|
* The type of the event, use as the name of the event
|
|
* to be handled in the subscribers.
|
|
*/
|
|
type: string
|
|
|
|
/**
|
|
* The data of the event, the data that will be sent to the subscribers.
|
|
*/
|
|
data: TData
|
|
}
|
|
```
|
|
|
|
## Using in handlers
|
|
|
|
### TypeScript Example
|
|
|
|
```typescript
|
|
import { ApiRouteConfig, Handlers } from 'motia'
|
|
import { z } from 'zod'
|
|
import { chatMessageSchema } from './streams/chat-messages.stream'
|
|
|
|
export const config: ApiRouteConfig = {
|
|
type: 'api',
|
|
name: 'CreateChatMessage',
|
|
method: 'POST',
|
|
path: '/chat-messages',
|
|
bodySchema: z.object({
|
|
channelId: z.string(),
|
|
message: z.string(),
|
|
}),
|
|
emits: [],
|
|
responseSchema: {
|
|
201: chatMessageSchema
|
|
}
|
|
}
|
|
|
|
export const handler = async (req, { streams }) => {
|
|
/**
|
|
* Say this is an API Step that a user sends a message to a channel.
|
|
*
|
|
* In your application logic you should have a channel ID defined somewhere
|
|
* so the client can send the message to the correct channel.
|
|
*/
|
|
const { channelId, message } = req.body
|
|
|
|
/**
|
|
* Define the message ID however you want, but should be a unique identifier underneath the channel ID.
|
|
*
|
|
* This is used to identify the message in the stream.
|
|
*/
|
|
const messageId = crypto.randomUUID()
|
|
|
|
/**
|
|
* In your application logic you should have a user ID defined somewhere.
|
|
* We recommend using middlewares to identify the user on the request.
|
|
*/
|
|
const userId = 'example-user-id'
|
|
|
|
/**
|
|
* Creates a message in the stream
|
|
*/
|
|
const message = await streams.chatMessage.set(channelId, messageId, {
|
|
id: messageId,
|
|
userId: userId,
|
|
message: message,
|
|
timestamp: new Date().toISOString()
|
|
})
|
|
|
|
/**
|
|
* Returning the stream result directly to the client helps Workbench to
|
|
* render the stream object and update it in real-time in the UI.
|
|
*/
|
|
return { status: 201, body: message }
|
|
}
|
|
```
|
|
|
|
### Python Examples
|
|
|
|
#### With Pydantic (Optional)
|
|
|
|
```python
|
|
import uuid
|
|
from datetime import datetime
|
|
from pydantic import BaseModel
|
|
|
|
class ChatMessageRequest(BaseModel):
|
|
channel_id: str
|
|
message: str
|
|
|
|
class ChatMessageResponse(BaseModel):
|
|
id: str
|
|
user_id: str
|
|
message: str
|
|
timestamp: str
|
|
|
|
config = {
|
|
"type": "api",
|
|
"name": "CreateChatMessage",
|
|
"method": "POST",
|
|
"path": "/chat-messages",
|
|
"bodySchema": ChatMessageRequest.model_json_schema(),
|
|
"emits": [],
|
|
"responseSchema": {
|
|
201: ChatMessageResponse.model_json_schema()
|
|
}
|
|
}
|
|
|
|
async def handler(req, context):
|
|
body = req.get("body", {})
|
|
|
|
# Optional: Validate with Pydantic
|
|
request_data = ChatMessageRequest(**body)
|
|
|
|
channel_id = request_data.channel_id
|
|
message_text = request_data.message
|
|
|
|
message_id = str(uuid.uuid4())
|
|
user_id = "example-user-id"
|
|
|
|
# Creates a message in the stream
|
|
chat_message = await context.streams.chatMessage.set(channel_id, message_id, {
|
|
"id": message_id,
|
|
"user_id": user_id,
|
|
"message": message_text,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
return {"status": 201, "body": chat_message}
|
|
```
|
|
|
|
#### Without Pydantic (Pure JSON Schema)
|
|
|
|
```python
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
config = {
|
|
"type": "api",
|
|
"name": "CreateChatMessage",
|
|
"method": "POST",
|
|
"path": "/chat-messages",
|
|
"bodySchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"channel_id": {"type": "string"},
|
|
"message": {"type": "string"}
|
|
},
|
|
"required": ["channel_id", "message"]
|
|
},
|
|
"emits": [],
|
|
"responseSchema": {
|
|
201: {
|
|
"type": "object",
|
|
"properties": {
|
|
"id": {"type": "string"},
|
|
"user_id": {"type": "string"},
|
|
"message": {"type": "string"},
|
|
"timestamp": {"type": "string"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async def handler(req, context):
|
|
body = req.get("body", {})
|
|
channel_id = body.get("channel_id")
|
|
message_text = body.get("message")
|
|
|
|
message_id = str(uuid.uuid4())
|
|
user_id = "example-user-id"
|
|
|
|
# Creates a message in the stream
|
|
chat_message = await context.streams.chatMessage.set(channel_id, message_id, {
|
|
"id": message_id,
|
|
"user_id": user_id,
|
|
"message": message_text,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
return {"status": 201, "body": chat_message}
|
|
```
|