29 KiB
BitByLaw Motia III - Developer Guide
For AI Assistants: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
Quick Navigation:
- Core Concepts - System architecture and patterns
- Design Principles - Event Storm & Bidirectional References
- Step Development - How to create new steps
- Services - Reusable business logic
- Integrations - xAI, EspoCRM, Advoware
- Testing & Debugging
Project Status
Migration: ✅ 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)
New to the project? Start here:
- README.md - Project Overview & Quick Start
- MIGRATION_GUIDE.md - Complete migration patterns
- ARCHITECTURE.md - System design and architecture
Core Concepts
System Overview
Architecture:
- Pure Python (Motia III)
- Event-Driven with queue-based async processing
- Redis-backed distributed locking and caching
- REST APIs for webhooks and proxies
Key Components:
- Steps (
steps/) - Business logic handlers (HTTP, Queue, Cron) - Services (
services/) - Shared API clients and utilities - Configuration (
iii-config.yaml) - System setup
Data Flow:
Webhook (HTTP) → Queue Event → Event Handler → External APIs
↓ ↓
Redis Lock Service Layer
(EspoCRM, Advoware, xAI)
Design Principles
Event Storm over Lock Coordination
Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Multiple redundant events over coordination ║
║ ✅ Idempotent handlers over distributed locks ║
║ ✅ Eventually consistent over perfectly synced ║
║ ✅ Simple duplication over complex orchestration ║
╚═══════════════════════════════════════════════════════╝
Guidelines:
- Fire events liberally - 10 redundant events are cheaper than complex coordination
- Make handlers idempotent - Early returns when nothing to do
- Sequential per entity, parallel across entities - Lock prevents collisions, not updates
- Accept event storms - Handlers queue up, Motia retries automatically
Lock Strategy: Step + Input Values
Lock Key Format: {step_name}:{entity_type}:{entity_id}
Examples:
- document_sync:CDokumente:doc123
- beteiligte_sync:CBeteiligte:bet456
- collection_create:Account:acc789
Rules:
✅ Parallel: sync document:A + sync document:B (different IDs)
❌ Blocked: sync document:A + sync document:A (same ID)
Lock Acquisition:
├─ Try acquire lock
├─ Success → Process → Release in finally block
└─ Fail (already locked) → Return early → Motia retries later
Handler Pattern:
async def handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Try acquire lock
lock_key = f"document_sync:{entity_type}:{entity_id}"
lock_acquired = await acquire_lock(lock_key)
if not lock_acquired:
ctx.logger.info(f"⏸️ Lock busy, skipping (Motia will retry)")
return # Early return - no error, just skip
try:
# 2. Do work (only if lock acquired)
await sync_document(entity_id)
finally:
# 3. ALWAYS release lock (robust cleanup)
await release_lock(lock_key)
Retry Behavior:
- Lock fail → Handler returns without error
- Motia Queue sees: No error, no completion marker
- Motia automatically retries after configured delay
- Eventually lock available → Handler processes Check: fileStatus="unchanged", xaiSyncStatus="clean" → Early return (nothing to do) ✅
Result: Second event processed but no duplicate work!
**Why This Works:**
- **Lock prevents chaos**: No parallel file uploads for same entity
- **Queue enables updates**: New changes processed sequentially
- **Idempotency prevents waste**: Redundant events → cheap early returns
- **Parallel scaling**: Different entities process simultaneously
**Practical Example: Entity Link Event**
User links Document ↔ Räumungsklage
Webhooks fire: ├─ POST /vmh/webhook/entity/link └─ Emits: raeumungsklage.update, cdokumente.update
Handlers (parallel, different entities): ├─ Räumungsklage Handler │ ├─ Lock: raeumungsklage:abc123 │ ├─ Creates xAI Collection (if missing) │ └─ Fires: cdokumente.update (for all linked docs) ← Event Storm! │ └─ Document Handler (may run 2-3x on same doc) ├─ Lock: document:doc456 (sequential processing) ├─ Run 1: Collections not ready → Skip (cheap return) ├─ Run 2: Collection ready → Upload & sync └─ Run 3: Already synced → Early return (idempotent!)
---
### Steps vs. Utils: Parallel vs. Sequential
**Separation Principle: "Events for Parallelism, Functions for Composition"**
╔═══════════════════════════════════════════════════════╗ ║ Choose Architecture by Execution Model: ║ ║ ║ ║ ✅ Separate Steps → When parallel possible ║ ║ → Communicate via Events ║ ║ → Own lock scope, independent retry ║ ║ ║ ║ ✅ Shared Utils → When sequential required ║ ║ → Call as function (need return values) ║ ║ → Reusable across multiple steps ║ ╚═══════════════════════════════════════════════════════╝
**Decision Matrix:**
| Requirement | Architecture | Communication | Example |
|------------|--------------|---------------|---------|
| **Can run in parallel** | Separate Steps | Events | Document sync + Collection create |
| **Needs return value** | Utils function | Function call | `get_or_create_collection()` |
| **Reusable logic** | Utils function | Import + call | `should_sync_to_xai()` |
| **One-time handler** | Inline in Step | N/A | Event-specific parsing |
**Examples:**
```python
# ✅ GOOD: Parallel → Separate Steps + Events
# steps/collection_manager_step.py
async def handle_entity_update(event_data, ctx):
"""Creates xAI Collection for entity"""
collection_id = await create_collection(entity_type, entity_id)
# Fire events for all linked documents (parallel processing!)
for doc in linked_docs:
await ctx.emit("cdokumente.update", {"entity_id": doc.id})
# steps/document_sync_step.py
async def handle_document_update(event_data, ctx):
"""Syncs document to xAI"""
# Runs in parallel with collection_manager_step!
await sync_document_to_xai(entity_id)
# ✅ GOOD: Sequential + Reusable → Utils
# services/document_sync_utils.py
async def get_required_collections(entity_id):
"""Reusable function, needs return value"""
return await _scan_entity_relations(entity_id)
# ❌ BAD: Sequential logic in Step (not reusable)
async def handle_document_update(event_data, ctx):
# Inline function call - hard to test and reuse
collection_id = await create_collection_inline(...)
await upload_file(collection_id, ...) # Needs collection_id from above!
Guidelines:
- Default to Steps + Events - Maximize parallelism
- Use Utils when:
- Need immediate return value
- Logic reused across 2+ steps
- Complex computation (not I/O bound)
- Keep Steps thin - Mostly orchestration + event emission
- Keep Utils testable - Pure functions, no event emission
Code Organization:
steps/
├─ document_sync_event_step.py # Event handler (thin)
├─ collection_manager_step.py # Event handler (thin)
└─ vmh/
└─ entity_link_webhook_step.py # Webhook handler (thin)
services/
├─ document_sync_utils.py # Reusable functions
├─ xai_service.py # API client
└─ espocrm.py # CRM client
Bidirectional Reference Pattern
Principle: Every sync maintains references on both sides
EspoCRM as Central Hub:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Advoware │◄────────┤ EspoCRM ├────────►│ xAI │
│ │ │ │ │ │
│ Akte 12345 │ │ Entity │ │ Collection │
└─────────────┘ │ - advowareId │ └─────────────┘
│ - xaiColId │
└──────────────┘
Implementation:
- xAI Collection → stores
entityType+entityIdin metadata - EspoCRM Entity → stores
xaiCollectionIdfield - EspoCRM Document → stores
xaiFileId+xaiCollections[]fields - Advoware Integration → stores
advowareAkteIdin EspoCRM
Benefits:
- Bidirectional navigation without complex queries
- Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
- Idempotent lookups (can verify both directions)
- Debugging: Always know where things came from
Example: Collection Creation
# xAI Collection
collection = await xai.create_collection(
name="CBeteiligte - Max Mustermann",
metadata={
"espocrm_entity_type": "CBeteiligte",
"espocrm_entity_id": "abc123",
"created_at": "2026-03-08T19:00:00Z"
}
)
# EspoCRM Entity
await espocrm.update_entity("CBeteiligte", "abc123", {
"xaiCollectionId": collection.id
})
Step Development Best Practices
File Naming Convention
CRITICAL: Always use _step.py suffix!
✅ CORRECT:
steps/vmh/webhook/document_create_api_step.py
steps/vmh/document_sync_event_step.py
steps/vmh/beteiligte_sync_cron_step.py
❌ WRONG:
steps/vmh/document_handler.py # Missing _step.py
steps/vmh/sync.py # Missing _step.py
Naming Pattern:
- Webhooks:
{entity}_{action}_api_step.py- Examples:
beteiligte_create_api_step.py,document_update_api_step.py
- Examples:
- Event Handlers:
{entity}_sync_event_step.py- Examples:
document_sync_event_step.py,beteiligte_sync_event_step.py
- Examples:
- Cron Jobs:
{entity}_sync_cron_step.py- Examples:
beteiligte_sync_cron_step.py,calendar_sync_cron_step.py
- Examples:
Step Template
Complete step template with all required patterns:
"""Module-level docstring describing the step's purpose"""
from typing import Dict, Any
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse
config = {
"name": "Clear Human-Readable Name",
"description": "Brief description of what this step does",
"flows": ["flow-name"], # Logical grouping
"triggers": [
# Pick ONE trigger type:
http("POST", "/path/to/endpoint"), # For webhooks
# queue("topic.name"), # For event handlers
# cron("0 */15 * * * *"), # For scheduled jobs
],
"enqueues": ["next.topic"], # Topics this step emits (optional)
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Handler docstring explaining:
- What triggers this handler
- What it does
- What events it emits
"""
try:
# 1. Log entry
ctx.logger.info("=")="=" * 80)
ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
ctx.logger.info("=" * 80)
# 2. Extract and validate input
payload = request.body
# 3. Business logic
# ...
# 4. Enqueue events if needed
await ctx.enqueue({
'topic': 'next.step',
'data': {
'entity_id': entity_id,
'action': 'create'
}
})
# 5. Log success
ctx.logger.info("✅ Step completed successfully")
# 6. Return response
return ApiResponse(
status_code=200,
body={'success': True}
)
except Exception as e:
# Always log errors with context
ctx.logger.error(f"❌ Error in {config['name']}: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={'success': False, 'error': str(e)}
)
Handler Signatures by Trigger Type
HTTP Trigger (Webhooks, APIs):
from motia import ApiRequest, ApiResponse, FlowContext
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Access: request.body, request.query_params, request.path_params
return ApiResponse(status_code=200, body={...})
Queue Trigger (Event Handlers):
from motia import FlowContext
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
# Process event_data
# No return value
pass
Cron Trigger (Scheduled Jobs):
from motia import FlowContext
async def handler(input_data: None, ctx: FlowContext) -> None:
# Cron jobs receive no input
# No return value
pass
Logging Best Practices
ALWAYS use ctx.logger, NEVER use print() or module-level logger:
# ✅ CORRECT: Context-aware logging
ctx.logger.info("Processing started")
ctx.logger.debug(f"Data: {data}")
ctx.logger.warn("Skipping invalid entry")
ctx.logger.error(f"Failed: {e}")
# ❌ WRONG: Direct print or module logger
print("Processing started") # Not visible in iii logs
logger.info("Processing started") # Loses context
Log Levels:
info()- Normal operations (start, success, counts)debug()- Detailed data dumps (payloads, responses)warn()- Non-critical issues (skipped items, fallbacks)error()- Failures requiring attention
Log Structure:
# Section headers with visual separators
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 SYNC HANDLER STARTED")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)
# Use emojis for visual scanning
ctx.logger.info("📥 Downloading file...")
ctx.logger.info("✅ Downloaded 1024 bytes")
ctx.logger.error("❌ Upload failed")
ctx.logger.warn("⚠️ No collections found")
Event Topics Naming
Pattern: {module}.{entity}.{action}
✅ Examples:
vmh.document.create
vmh.document.update
vmh.document.delete
vmh.beteiligte.create
calendar.sync.employee
advoware.proxy.response
❌ Avoid:
document-create # Use dots, not dashes
DocumentCreate # Use lowercase
create_document # Wrong order
Service Layer Patterns
Service Base Class Pattern
All services should follow this pattern:
"""Service docstring"""
import logging
from typing import Optional
from services.logging_utils import get_service_logger
logger = logging.getLogger(__name__)
class MyService:
"""Service for interacting with External API"""
def __init__(self, context=None):
"""
Initialize service.
Args:
context: Optional Motia FlowContext for logging
"""
self.context = context
self.logger = get_service_logger('my_service', context)
self._session = None
# Load config from env
self.api_key = os.getenv('MY_API_KEY', '')
if not self.api_key:
raise ValueError("MY_API_KEY not configured")
self.logger.info("MyService initialized")
def _log(self, message: str, level: str = 'info') -> None:
"""Internal logging helper"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(message)
async def _get_session(self):
"""Lazy session initialization"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self) -> None:
"""Cleanup resources"""
if self._session and not self._session.closed:
await self._session.close()
Sync Utilities Pattern
For bidirectional sync operations, inherit from BaseSyncUtils:
from services.sync_utils_base import BaseSyncUtils
class MyEntitySync(BaseSyncUtils):
"""Sync utilities for MyEntity"""
def _get_lock_key(self, entity_id: str) -> str:
"""Required: Define lock key pattern"""
return f"sync_lock:myentity:{entity_id}"
async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
"""
Decide if sync is needed.
Returns:
(needs_sync: bool, reason: str)
"""
# Implementation
pass
Base class provides:
_log()- Context-aware logging_acquire_redis_lock()- Distributed locking_release_redis_lock()- Lock cleanupself.espocrm- EspoCRM API clientself.redis- Redis clientself.context- Motia contextself.logger- Integration logger
External Integrations
xAI Collections Integration
Status: ✅ Fully Implemented
Service: services/xai_service.py
Environment Variables:
XAI_API_KEY=xai-... # For file uploads (api.x.ai)
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)
Usage Example:
from services.xai_service import XAIService
xai = XAIService(ctx)
# Upload file
file_id = await xai.upload_file(
file_content=bytes_data,
filename="document.pdf",
mime_type="application/pdf"
)
# Add to collection
await xai.add_to_collection("collection_id", file_id)
# Add to multiple collections
added = await xai.add_to_collections(["col1", "col2"], file_id)
# Remove from collection
await xai.remove_from_collection("collection_id", file_id)
Architecture:
- Files are uploaded ONCE to Files API (
api.x.ai/v1/files) - Same
file_idcan be added to MULTIPLE collections - Removing from collection does NOT delete the file (may be used elsewhere)
- Hash-based change detection prevents unnecessary reuploads
Document Sync Flow:
1. EspoCRM Webhook → vmh.document.{create|update|delete}
2. Document Sync Handler:
a. Acquire distributed lock (prevents duplicate syncs)
b. Load document from EspoCRM
c. Check if sync needed:
- dateiStatus = "Neu" or "Geändert" → SYNC
- Hash changed → SYNC
- Entity has xAI collections → SYNC
d. Download file from EspoCRM
e. Calculate MD5 hash
f. Upload to xAI (or reuse existing file_id)
g. Add to required collections
h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
i. Release lock
3. Delete: Remove from collections (keep file)
EspoCRM Fields (CDokumente):
{
'xaiId': 'file-abc123', # xAI file_id
'xaiCollections': ['col1', 'col2'], # Array of collection IDs
'xaiSyncedHash': 'abc123def456', # MD5 at last sync
'fileStatus': 'synced', # Status: neu, geändert, synced
'md5sum': 'abc123def456', # Current file hash
'sha256': 'def456...', # SHA-256 (optional)
}
EspoCRM Integration
Service: services/espocrm.py
Environment Variables:
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-api-key
ESPOCRM_API_TIMEOUT_SECONDS=30
Usage:
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
# Get entity
entity = await espocrm.get_entity('CDokumente', entity_id)
# Update entity
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': file_id,
'fileStatus': 'synced'
})
# List entities
result = await espocrm.list_entities(
'CDokumente',
where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
select='id,name,fileStatus',
max_size=50
)
# Download attachment
file_bytes = await espocrm.download_attachment(attachment_id)
Advoware Integration
Service: services/advoware_service.py
Authentication: HMAC-512 with token caching
Environment Variables:
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
REDIS_HOST=localhost
REDIS_PORT=6379
Proxy Endpoints:
GET /advoware/proxy?endpoint={path}- Proxy GET requestsPOST /advoware/proxy- Proxy POST requestsPUT /advoware/proxy- Proxy PUT requestsDELETE /advoware/proxy- Proxy DELETE requests
Testing and Debugging
Start System
Production (systemd):
# Restart services
sudo systemctl restart motia.service
sudo systemctl restart iii-console.service
# Check status
sudo systemctl status motia.service
sudo systemctl status iii-console.service
# View real-time logs
journalctl -u motia.service -f
journalctl -u iii-console.service -f
# Enable auto-start on boot
sudo systemctl enable motia.service
sudo systemctl enable iii-console.service
Manual (Development):
# Start iii Engine
cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml
# Start iii Console (Web UI)
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
--engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114
Check Registered Steps
curl http://localhost:3111/_console/functions | python3 -m json.tool
Test HTTP Endpoint
# Test document webhook
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
-H "Content-Type: application/json" \
-d '[{"id": "test123", "entityType": "CDokumente"}]'
# Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees"
Manually Trigger Cron
curl -X POST "http://localhost:3111/_console/cron/trigger" \
-H "Content-Type: application/json" \
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
View Logs
# Live logs via journalctl
journalctl -u motia-iii -f
# Search for specific step
journalctl --since "today" | grep -i "document sync"
# Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
Common Issues
Step not showing up:
- Check file naming: Must end with
_step.py - Check for import errors:
grep -i "importerror\|traceback" iii.log - Verify
configdict is present - Restart iii engine
Redis connection failed:
- Check
REDIS_HOSTandREDIS_PORTenvironment variables - Verify Redis is running:
redis-cli ping - Service will work without Redis but with warnings
AttributeError '_log' not found:
- Ensure service inherits from
BaseSyncUtilsOR - Implement
_log()method manually
Key Patterns Summary
✅ DO
- Always use
_step.pysuffix for step files - Always use
ctx.loggerfor logging (neverprint) - Always wrap handlers in try/except with error logging
- Always use visual separators in logs (
"=" * 80) - Always return
ApiResponsefrom HTTP handlers - Always document what events a step emits
- Always use distributed locks for sync operations
- Always calculate hashes for change detection
❌ DON'T
- Don't use module-level
loggerin steps - Don't forget
asyncon handler functions - Don't use blocking I/O (use
aiohttp, notrequests) - Don't return values from queue/cron handlers
- Don't hardcode credentials (use environment variables)
- Don't skip lock cleanup in
finallyblocks - Don't use
print()for logging
Module Documentation
Module Documentation
Steps
Advoware Proxy (Module README)
- Universal HTTP proxy with HMAC-512 authentication
- Endpoints: GET, POST, PUT, DELETE
- Redis token caching
Calendar Sync (Module README)
- Bidirectional Advoware ↔ Google Calendar sync
- Cron: Every 15 minutes
- API trigger:
/advoware/calendar/sync
VMH Integration (Module README)
- EspoCRM ↔ Advoware bidirectional sync
- Webhooks: Beteiligte, Bankverbindungen, Documents
- xAI Collections integration for documents
Services
| Service | Purpose | Config |
|---|---|---|
xai_service.py |
xAI file uploads & collections | XAI_API_KEY, XAI_MANAGEMENT_KEY |
espocrm.py |
EspoCRM REST API client | ESPOCRM_API_BASE_URL, ESPOCRM_API_KEY |
advoware_service.py |
Advoware API with HMAC auth | ADVOWARE_API_KEY, ADVOWARE_API_SECRET |
document_sync_utils.py |
Document sync logic | Redis connection |
beteiligte_sync_utils.py |
Beteiligte sync logic | Redis connection |
sync_utils_base.py |
Base class for sync utils | - |
Quick Reference
Environment Variables
Required:
# EspoCRM
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-key
# Advoware
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
# xAI
XAI_API_KEY=xai-...
XAI_MANAGEMENT_KEY=xai-token-...
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
Optional:
ESPOCRM_API_TIMEOUT_SECONDS=30
ESPOCRM_METADATA_TTL_SECONDS=300
File Structure
bitbylaw/
├── iii-config.yaml # Motia III configuration
├── pyproject.toml # Python dependencies (uv)
├── steps/ # Business logic
│ ├── advoware_proxy/
│ ├── advoware_cal_sync/
│ └── vmh/
│ ├── webhook/ # HTTP webhook handlers
│ │ ├── *_create_api_step.py
│ │ ├── *_update_api_step.py
│ │ └── *_delete_api_step.py
│ ├── *_sync_event_step.py # Queue event handlers
│ └── *_sync_cron_step.py # Scheduled jobs
├── services/ # Shared services
│ ├── xai_service.py
│ ├── espocrm.py
│ ├── advoware_service.py
│ ├── *_sync_utils.py
│ ├── sync_utils_base.py
│ ├── logging_utils.py
│ └── exceptions.py
├── docs/ # Documentation
│ ├── INDEX.md # This file
│ ├── ARCHITECTURE.md
│ └── DOCUMENT_SYNC_XAI_STATUS.md
└── tests/ # Test scripts
└── test_xai_collections_api.py
Motia III vs Old Motia
| Old Motia v0.17 | Motia III v1.0-RC |
|---|---|
type: 'api' |
triggers: [http()] |
type: 'event' |
triggers: [queue()] |
type: 'cron' |
triggers: [cron()] |
context.emit() |
ctx.enqueue() |
emits: [...] |
enqueues: [...] |
subscribes: [...] |
triggers: [queue('topic')] |
| 5-field cron | 6-field cron (seconds first) |
context.logger |
ctx.logger |
| Motia Workbench | iii Console |
| Node.js + Python | Pure Python |
Cron Syntax
6 fields (Motia III): second minute hour day month weekday
0 */15 * * * * # Every 15 minutes
0 0 */6 * * * # Every 6 hours
0 0 2 * * * # Daily at 2 AM
0 30 9 * * 1-5 # Monday-Friday at 9:30 AM
Additional Resources
Documentation
- MIGRATION_GUIDE.md - v0.17 → v1.0 migration
- MIGRATION_STATUS.md - Migration progress
- ARCHITECTURE.md - System design
- DOCUMENT_SYNC_XAI_STATUS.md - xAI integration details
External Resources
Support & Troubleshooting
| Issue | Solution |
|---|---|
| Step not registered | Check _step.py suffix, restart: sudo systemctl restart motia.service |
| Import errors | Check logs: journalctl -u motia.service | grep -i importerror |
| Code changes not applied | Auto-reload should work, force restart: sudo systemctl restart motia.service |
| ApiResponse validation | Use status=200 not status_code=200 |
| Redis unavailable | Service works with warnings, check REDIS_HOST |
'_log' not found |
Inherit from BaseSyncUtils or implement _log() method |
| Webhook not triggering | Verify endpoint: curl http://localhost:3111/_console/functions |
| xAI upload fails | Set XAI_API_KEY and XAI_MANAGEMENT_KEY in /etc/systemd/system/motia.service |
| View logs | journalctl -u motia.service -f (follow) or --since "10 minutes ago" |
Last Updated: 2026-03-08
Migration Status: ✅ Complete
xAI Integration: ✅ Implemented