46 KiB
BitByLaw Motia III - Developer Guide
For AI Assistants: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
Quick Navigation:
- Core Concepts - System architecture and patterns
- Design Principles - Event Storm & Bidirectional References
- Step Development - How to create new steps
- Services - Reusable business logic
- Integrations - xAI, EspoCRM, Advoware
- Testing & Debugging
Project Status
Migration: ✅ 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)
New to the project? Start here:
- README.md - Project Overview & Quick Start
- MIGRATION_GUIDE.md - Complete migration patterns
- ARCHITECTURE.md - System design and architecture
Core Concepts
System Overview
Architecture:
- Pure Python (Motia III)
- Event-Driven with queue-based async processing
- Redis-backed distributed locking and caching
- REST APIs for webhooks and proxies
Key Components:
- Steps (
steps/) - Business logic handlers (HTTP, Queue, Cron) - Services (
services/) - Shared API clients and utilities - Configuration (
iii-config.yaml) - System setup
Data Flow:
Webhook (HTTP) → Queue Event → Event Handler → External APIs
↓ ↓
Redis Lock Service Layer
(EspoCRM, Advoware, xAI)
Design Principles
Event Storm over Lock Coordination
Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Multiple redundant events over coordination ║
║ ✅ Idempotent handlers over distributed locks ║
║ ✅ Eventually consistent over perfectly synced ║
║ ✅ Simple duplication over complex orchestration ║
╚═══════════════════════════════════════════════════════╝
Guidelines:
- Fire events liberally - 10 redundant events are cheaper than complex coordination
- Make handlers idempotent - Early returns when nothing to do
- Sequential per entity, parallel across entities - Lock prevents collisions, not updates
- Accept event storms - Handlers queue up, Motia retries automatically
Lock Strategy: Step + Input Values
Lock Key Format: {step_name}:{entity_type}:{entity_id}
Examples:
- document_sync:CDokumente:doc123
- beteiligte_sync:CBeteiligte:bet456
- collection_create:Account:acc789
Rules:
✅ Parallel: sync document:A + sync document:B (different IDs)
❌ Blocked: sync document:A + sync document:A (same ID)
Lock Acquisition:
├─ Try acquire lock
├─ Success → Process → Release in finally block
└─ Fail (already locked) → Raise exception → Motia retries later
Lock Configuration:
- TTL: 30 minutes (auto-release on timeout)
- Robust release: Always in finally block
- Dead lock prevention: TTL ensures eventual recovery
Handler Pattern:
async def handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Try acquire lock
lock_key = f"document_sync:{entity_type}:{entity_id}"
lock_acquired = await acquire_lock(lock_key, ttl=1800) # 30 minutes
if not lock_acquired:
# Raise exception → Motia will retry
raise RuntimeError(f"⏸️ Lock busy for {lock_key}, retry later")
try:
# 2. Do work (only if lock acquired)
await sync_document(entity_id)
finally:
# 3. ALWAYS release lock (robust cleanup)
await release_lock(lock_key)
Retry Behavior:
- Lock fail → Handler raises exception
- Motia Queue catches exception → Schedules retry
- Retry after configured delay (exponential backoff)
- Eventually lock available → Handler processes successfully Check: fileStatus="synced", xaiSyncStatus="clean" → Early return (nothing to do) ✅
Result: Second event processed but no duplicate work!
**Why This Works:**
- **Lock prevents chaos**: No parallel file uploads for same entity
- **Queue enables updates**: New changes processed sequentially
- **Idempotency prevents waste**: Redundant events → cheap early returns
- **Parallel scaling**: Different entities process simultaneously
**Practical Example: Entity Link Event**
User links Document ↔ Räumungsklage
Webhooks fire: ├─ POST /vmh/webhook/entity/link └─ Emits: raeumungsklage.update, cdokumente.update
Handlers (parallel, different entities): ├─ Räumungsklage Handler │ ├─ Lock: raeumungsklage:abc123 │ ├─ Creates xAI Collection (if missing) │ └─ Fires: cdokumente.update (for all linked docs) ← Event Storm! │ └─ Document Handler (may run 2-3x on same doc) ├─ Lock: document:doc456 (sequential processing) ├─ Run 1: Collections not ready → Skip (cheap return) ├─ Run 2: Collection ready → Upload & sync └─ Run 3: Already synced → Early return (idempotent!)
---
### Steps vs. Utils: Parallel vs. Sequential
**Separation Principle: "Events for Parallelism, Functions for Composition"**
╔═══════════════════════════════════════════════════════╗ ║ Choose Architecture by Execution Model: ║ ║ ║ ║ ✅ Separate Steps → When parallel possible ║ ║ → Communicate via Events ║ ║ → Own lock scope, independent retry ║ ║ ║ ║ ✅ Shared Utils → When sequential required ║ ║ → Call as function (need return values) ║ ║ → Reusable across multiple steps ║ ╚═══════════════════════════════════════════════════════╝
**Decision Matrix:**
| Requirement | Architecture | Communication | Example |
|------------|--------------|---------------|---------|
| **Can run in parallel** | Separate Steps | Events | Document sync + Collection create |
| **Needs return value** | Utils function | Function call | `get_or_create_collection()` |
| **Reusable logic** | Utils function | Import + call | `should_sync_to_xai()` |
| **One-time handler** | Inline in Step | N/A | Event-specific parsing |
**Examples:**
```python
# ✅ GOOD: Parallel → Separate Steps + Events
# steps/collection_manager_step.py
async def handle_entity_update(event_data, ctx):
"""Creates xAI Collection for entity"""
collection_id = await create_collection(entity_type, entity_id)
# Fire events for all linked documents (parallel processing!)
for doc in linked_docs:
await ctx.emit("cdokumente.update", {"entity_id": doc.id})
# steps/document_sync_step.py
async def handle_document_update(event_data, ctx):
"""Syncs document to xAI"""
# Runs in parallel with collection_manager_step!
await sync_document_to_xai(entity_id)
# ✅ GOOD: Sequential + Reusable → Utils
# services/document_sync_utils.py
async def get_required_collections(entity_id):
"""Reusable function, needs return value"""
return await _scan_entity_relations(entity_id)
# ❌ BAD: Sequential logic in Step (not reusable)
async def handle_document_update(event_data, ctx):
# Inline function call - hard to test and reuse
collection_id = await create_collection_inline(...)
await upload_file(collection_id, ...) # Needs collection_id from above!
Guidelines:
- Default to Steps + Events - Maximize parallelism
- Use Utils when:
- Need immediate return value
- Logic reused across 2+ steps
- Complex computation (not I/O bound)
- Keep Steps thin - Mostly orchestration + event emission
- Keep Utils testable - Pure functions, no event emission
Code Organization:
steps/
├─ document_sync_event_step.py # Event handler (thin)
├─ collection_manager_step.py # Event handler (thin)
└─ vmh/
└─ entity_link_webhook_step.py # Webhook handler (thin)
services/
├─ document_sync_utils.py # Reusable functions
├─ xai_service.py # API client
└─ espocrm.py # CRM client
Bidirectional Reference Pattern
Principle: Every sync maintains references on both sides
EspoCRM as Central Hub:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Advoware │◄────────┤ EspoCRM ├────────►│ xAI │
│ │ │ │ │ │
│ Akte 12345 │ │ Entity │ │ Collection │
└─────────────┘ │ - advowareId │ └─────────────┘
│ - xaiColId │
└──────────────┘
Implementation:
- xAI Collection → stores
entityType+entityIdin metadata - EspoCRM Entity → stores
xaiCollectionIdfield - EspoCRM Document → stores
xaiFileId+xaiCollections[]fields - Advoware Integration → stores
advowareAkteIdin EspoCRM
Benefits:
- Bidirectional navigation without complex queries
- Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
- Idempotent lookups (can verify both directions)
- Debugging: Always know where things came from
- Single Source of Truth: EspoCRM is authoritative for relationships
Example: Collection Creation
# xAI Collection
collection = await xai.create_collection(
name="CBeteiligte - Max Mustermann",
metadata={
"espocrm_entity_type": "CBeteiligte",
"espocrm_entity_id": "abc123",
"created_at": "2026-03-08T19:00:00Z"
}
)
# EspoCRM Entity
await espocrm.update_entity("CBeteiligte", "abc123", {
"xaiCollectionId": collection.id
})
Error Handling & Resilience
Principle: "Retry Smart, Fail Visible, Recover When Possible"
╔═══════════════════════════════════════════════════════╗
║ Error Handling Strategy (in priority order): ║
║ ║
║ 1️⃣ Idempotent Check → Skip if already done ║
║ 2️⃣ Step-internal retry with exponential backoff ║
║ 3️⃣ Rollback/Compensation (when possible) ║
║ 4️⃣ Log failure clearly → Let Motia retry ║
║ 5️⃣ Manual intervention (for unrecoverable failures) ║
╚═══════════════════════════════════════════════════════╝
Idempotency First
Always check if work already done before starting:
async def sync_document_handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Load current state
doc = await espocrm.get_entity('CDokumente', entity_id)
# 2. Check if already synced (idempotent check)
if doc.get('xaiSyncStatus') == 'clean':
current_md5 = doc.get('md5sum')
synced_md5 = doc.get('xaiSyncedHash')
if current_md5 == synced_md5:
ctx.logger.info(f"✅ Already synced, skipping (hash match)")
return # Nothing to do
# 3. Check if file already uploaded to xAI
xai_file_id = doc.get('xaiId')
if xai_file_id:
# Verify file exists in xAI
file_exists = await xai.file_exists(xai_file_id)
if file_exists:
ctx.logger.info(f"✅ File already in xAI: {xai_file_id}")
# Only update collections, not re-upload
# 4. Proceed with sync...
Retry with Exponential Backoff
For transient failures (network, rate limits):
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def retry_with_backoff(
operation: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
ctx = None
) -> T:
"""
Retry operation with exponential backoff.
Args:
operation: Async function to retry
max_attempts: Maximum retry attempts (not endless!)
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
"""
for attempt in range(1, max_attempts + 1):
try:
return await operation()
except Exception as e:
if attempt == max_attempts:
# Log and re-raise on final attempt
if ctx:
ctx.logger.error(
f"❌ Operation failed after {max_attempts} attempts: {e}"
)
raise
# Calculate backoff delay
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if ctx:
ctx.logger.warn(
f"⚠️ Attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
# Usage
file_id = await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=3,
ctx=ctx
)
Rollback/Compensation
When operations fail mid-way, clean up what was created:
async def sync_document_with_rollback(entity_id, ctx):
xai_file_id = None
collection_created = False
try:
# 1. Upload file to xAI
xai_file_id = await xai.upload_file(content, filename)
ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}")
# 2. Create collection if needed
collection_id = await xai.create_collection(name, metadata)
collection_created = True
ctx.logger.info(f"✅ Created collection: {collection_id}")
# 3. Add file to collection
await xai.add_to_collection(collection_id, xai_file_id)
# 4. Update EspoCRM (critical - if this fails, we have orphans)
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': xai_file_id,
'xaiCollections': [collection_id],
'xaiSyncStatus': 'clean'
})
except Exception as e:
ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}")
# ROLLBACK: Remove what we created
try:
if xai_file_id:
# Note: Only remove from collections, not delete file
# (file may be referenced elsewhere)
ctx.logger.info(f"⚠️ Leaving orphaned file in xAI: {xai_file_id}")
ctx.logger.info(f"⚠️ Manual cleanup may be required")
# Update EspoCRM to reflect failure
await espocrm.update_entity('CDokumente', entity_id, {
'xaiSyncStatus': 'failed',
'xaiSyncError': str(e)
})
except Exception as rollback_error:
ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}")
ctx.logger.error(f"⚠️ REQUIRES MANUAL INTERVENTION")
ctx.logger.error(f"⚠️ Entity: {entity_id}, xaiId: {xai_file_id}")
# Re-raise original exception for Motia retry
raise
Reality Check:
- Rollback not always possible (e.g., Advoware API, third-party services)
- Idempotency >> Rollback - Better to handle re-execution than undo
- Log orphans clearly - Enable manual cleanup when needed
Clear Failure Logging
When rollback impossible, log enough details for manual intervention:
try:
await external_api.create_resource(data)
except Exception as e:
# Log all context needed for manual recovery
ctx.logger.error("=" * 80)
ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Entity Type: CDokumente")
ctx.logger.error(f"Entity ID: {entity_id}")
ctx.logger.error(f"Operation: Create Advoware Document")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}")
ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
# Mark as failed in EspoCRM
await espocrm.update_entity('CDokumente', entity_id, {
'syncStatus': 'failed',
'syncError': str(e),
'syncFailedAt': datetime.now().isoformat()
})
# Re-raise for Motia (will retry later)
raise
Rate Limiting (xAI)
xAI has rate limits - handle with backoff:
from aiohttp import ClientResponseError
async def upload_with_rate_limit_handling(xai, content, filename, ctx):
"""Upload file to xAI with rate limit handling."""
try:
return await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=5, # More attempts for rate limits
base_delay=2.0, # Longer initial delay
max_delay=60.0, # Up to 1 minute backoff
ctx=ctx
)
except ClientResponseError as e:
if e.status == 429: # Rate limit exceeded
ctx.logger.error(f"❌ xAI rate limit exceeded")
# Check Retry-After header
retry_after = e.headers.get('Retry-After', '60')
ctx.logger.error(f"⏰ Retry after {retry_after} seconds")
# Wait and try once more
await asyncio.sleep(int(retry_after))
return await xai.upload_file(content, filename)
raise
Rate Limit Strategy:
- xAI: Exponential backoff + respect
Retry-Afterheader - EspoCRM: Internal system, no rate limits
- Advoware: No known rate limits (backoff on errors)
Input Validation
Always validate webhook payloads and external data:
from typing import Dict, Any, List
def validate_webhook_payload(
payload: Any,
required_fields: List[str],
ctx
) -> Dict[str, Any]:
"""
Validate webhook payload structure.
Raises:
ValueError: If validation fails
"""
# Check payload is dict
if not isinstance(payload, dict):
raise ValueError(f"Payload must be dict, got {type(payload)}")
# Check required fields
missing = [f for f in required_fields if f not in payload]
if missing:
raise ValueError(f"Missing required fields: {missing}")
# Validate entity_id format (UUID)
entity_id = payload.get('entity_id', '')
if not entity_id or len(entity_id) < 10:
raise ValueError(f"Invalid entity_id: {entity_id}")
ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}")
return payload
# Usage in handler
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
try:
# Validate input
payload = validate_webhook_payload(
request.body,
required_fields=['entity_id', 'entity_type', 'action'],
ctx=ctx
)
# Process...
except ValueError as e:
ctx.logger.error(f"❌ Invalid payload: {e}")
return ApiResponse(status_code=400, body={'error': str(e)})
Validation Checklist:
- ✅ Type checking (dict, list, str)
- ✅ Required fields present
- ✅ Format validation (IDs, dates, enums)
- ✅ Length limits (prevent DoS)
- ✅ Enum validation (status values)
Environment Configuration
Always use environment variables, validate on startup:
import os
from typing import Optional
class ServiceConfig:
"""Validated configuration from environment."""
def __init__(self):
# Required
self.xai_api_key = self._require_env('XAI_API_KEY')
self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL')
# Optional with defaults
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800')) # 30min
def _require_env(self, key: str) -> str:
"""Get required env var or raise."""
value = os.getenv(key)
if not value:
raise ValueError(f"Required environment variable not set: {key}")
return value
# Usage
config = ServiceConfig() # Fails fast on startup if misconfigured
Notes:
- Collections lifecycle (creation/deletion) managed by EspoCRM entity status
- Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere)
- Lock TTL: 30 minutes (forced release on timeout)
Type Safety & Explicit Contracts
Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Enums over string literals ║
║ ✅ Type hints over duck typing ║
║ ✅ Named constants over magic numbers ║
║ ✅ Central definitions over scattered duplicates ║
╚═══════════════════════════════════════════════════════╝
Status Enums
Define all valid status values centrally:
# services/models.py
from enum import Enum
class FileStatus(str, Enum):
"""Valid values for CDokumente.fileStatus field."""
NEW = "new"
CHANGED = "changed"
SYNCED = "synced"
def __str__(self) -> str:
return self.value
class XAISyncStatus(str, Enum):
"""Valid values for CDokumente.xaiSyncStatus field."""
NO_SYNC = "no_sync" # Entity has no xAI collections
PENDING_SYNC = "pending_sync" # Sync in progress (locked)
CLEAN = "clean" # Synced successfully
UNCLEAN = "unclean" # Needs re-sync (file changed)
FAILED = "failed" # Sync failed (see xaiSyncError)
def __str__(self) -> str:
return self.value
class EntityAction(str, Enum):
"""Valid webhook actions."""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
LINK = "link"
UNLINK = "unlink"
def __str__(self) -> str:
return self.value
class EntityType(str, Enum):
"""Supported EspoCRM entity types."""
DOKUMENTE = "CDokumente"
BETEILIGTE = "CBeteiligte"
RAEUMUNGSKLAGE = "Raeumungsklage"
ACCOUNT = "Account"
def __str__(self) -> str:
return self.value
Usage in Code
Before (Magic Strings):
# ❌ BAD: Typos not caught at runtime
if doc['xaiSyncStatus'] == 'claen': # Typo!
pass
if doc['fileStatus'] == 'synced':
# No IDE autocomplete, no type safety
pass
# ❌ BAD: Scattered definitions
status = 'pending_sync' # What are ALL valid values?
After (Type-Safe Enums):
# ✅ GOOD: Type-safe, autocomplete, refactorable
from services.models import FileStatus, XAISyncStatus
if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN:
# IDE autocomplete works
# Typos caught by linter
# Easy to find all usages
pass
# Update status
await espocrm.update_entity('CDokumente', entity_id, {
'fileStatus': FileStatus.SYNCED,
'xaiSyncStatus': XAISyncStatus.CLEAN
})
# Enum validation
def validate_status(status: str) -> XAISyncStatus:
"""Validate and convert status string."""
try:
return XAISyncStatus(status)
except ValueError:
valid = [s.value for s in XAISyncStatus]
raise ValueError(f"Invalid status '{status}'. Valid: {valid}")
Benefits
Why Enums Matter:
- Prevent Typos:
XAISyncStatus.CLAEN→ AttributeError at import time - IDE Support: Autocomplete shows all valid values
- Refactoring: Rename in one place, affects entire codebase
- Documentation: Single source of truth for valid values
- Type Safety:
mypycan catch invalid assignments - Debugging: Easy to find all usages with "Find References"
Example - Easy Refactoring:
# Change enum value in one place:
class FileStatus(str, Enum):
NEW = "new"
CHANGED = "changed"
SYNCED = "synchronized" # Changed!
# All usages automatically updated (string value changes):
FileStatus.SYNCED # Returns "synchronized" everywhere
Type Hints
Always use type hints for function signatures:
from typing import Dict, List, Optional, Tuple
from services.models import FileStatus, XAISyncStatus
# ✅ GOOD: Clear contracts
async def sync_document(
entity_id: str,
ctx: FlowContext
) -> Tuple[bool, Optional[str]]:
"""
Sync document to xAI.
Args:
entity_id: EspoCRM document ID
ctx: Motia flow context
Returns:
(success: bool, xai_file_id: Optional[str])
"""
pass
async def should_sync_to_xai(
doc: Dict[str, Any]
) -> Tuple[bool, str]:
"""
Check if document needs xAI sync.
Returns:
(should_sync: bool, reason: str)
"""
status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync'))
if status == XAISyncStatus.CLEAN:
return (False, "Already synced")
return (True, "Sync required")
Constants
Define magic values as named constants:
# ✅ GOOD: Central configuration
# services/config.py
# Lock TTLs
LOCK_TTL_DOCUMENT_SYNC = 1800 # 30 minutes
LOCK_TTL_COLLECTION_CREATE = 300 # 5 minutes
# Retry configuration
RETRY_MAX_ATTEMPTS = 3
RETRY_BASE_DELAY = 1.0
RETRY_MAX_DELAY = 30.0
# xAI rate limits
XAI_RATE_LIMIT_RETRY_AFTER = 60 # Default retry delay
XAI_MAX_FILE_SIZE_MB = 512
# EspoCRM pagination
ESPOCRM_DEFAULT_PAGE_SIZE = 50
ESPOCRM_MAX_PAGE_SIZE = 200
# Usage
from services.config import LOCK_TTL_DOCUMENT_SYNC
lock_acquired = await acquire_lock(
lock_key,
ttl=LOCK_TTL_DOCUMENT_SYNC # Clear, refactorable
)
Step Development Best Practices
File Naming Convention
CRITICAL: Always use _step.py suffix!
✅ CORRECT:
steps/vmh/webhook/document_create_api_step.py
steps/vmh/document_sync_event_step.py
steps/vmh/beteiligte_sync_cron_step.py
❌ WRONG:
steps/vmh/document_handler.py # Missing _step.py
steps/vmh/sync.py # Missing _step.py
Naming Pattern:
- Webhooks:
{entity}_{action}_api_step.py- Examples:
beteiligte_create_api_step.py,document_update_api_step.py
- Examples:
- Event Handlers:
{entity}_sync_event_step.py- Examples:
document_sync_event_step.py,beteiligte_sync_event_step.py
- Examples:
- Cron Jobs:
{entity}_sync_cron_step.py- Examples:
beteiligte_sync_cron_step.py,calendar_sync_cron_step.py
- Examples:
Step Template
Complete step template with all required patterns:
"""Module-level docstring describing the step's purpose"""
from typing import Dict, Any
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse
config = {
"name": "Clear Human-Readable Name",
"description": "Brief description of what this step does",
"flows": ["flow-name"], # Logical grouping
"triggers": [
# Pick ONE trigger type:
http("POST", "/path/to/endpoint"), # For webhooks
# queue("topic.name"), # For event handlers
# cron("0 */15 * * * *"), # For scheduled jobs
],
"enqueues": ["next.topic"], # Topics this step emits (optional)
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Handler docstring explaining:
- What triggers this handler
- What it does
- What events it emits
"""
try:
# 1. Log entry
ctx.logger.info("=")="=" * 80)
ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
ctx.logger.info("=" * 80)
# 2. Extract and validate input
payload = request.body
# 3. Business logic
# ...
# 4. Enqueue events if needed
await ctx.enqueue({
'topic': 'next.step',
'data': {
'entity_id': entity_id,
'action': 'create'
}
})
# 5. Log success
ctx.logger.info("✅ Step completed successfully")
# 6. Return response
return ApiResponse(
status_code=200,
body={'success': True}
)
except Exception as e:
# Always log errors with context
ctx.logger.error(f"❌ Error in {config['name']}: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={'success': False, 'error': str(e)}
)
Handler Signatures by Trigger Type
HTTP Trigger (Webhooks, APIs):
from motia import ApiRequest, ApiResponse, FlowContext
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Access: request.body, request.query_params, request.path_params
return ApiResponse(status_code=200, body={...})
Queue Trigger (Event Handlers):
from motia import FlowContext
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
# Process event_data
# No return value
pass
Cron Trigger (Scheduled Jobs):
from motia import FlowContext
async def handler(input_data: None, ctx: FlowContext) -> None:
# Cron jobs receive no input
# No return value
pass
Logging Best Practices
ALWAYS use ctx.logger, NEVER use print() or module-level logger:
# ✅ CORRECT: Context-aware logging
ctx.logger.info("Processing started")
ctx.logger.debug(f"Data: {data}")
ctx.logger.warn("Skipping invalid entry")
ctx.logger.error(f"Failed: {e}")
# ❌ WRONG: Direct print or module logger
print("Processing started") # Not visible in iii logs
logger.info("Processing started") # Loses context
Log Levels:
info()- Normal operations (start, success, counts)debug()- Detailed data dumps (payloads, responses)warn()- Non-critical issues (skipped items, fallbacks)error()- Failures requiring attention
Log Structure:
# Section headers with visual separators
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 SYNC HANDLER STARTED")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)
# Use emojis for visual scanning
ctx.logger.info("📥 Downloading file...")
ctx.logger.info("✅ Downloaded 1024 bytes")
ctx.logger.error("❌ Upload failed")
ctx.logger.warn("⚠️ No collections found")
Event Topics Naming
Pattern: {module}.{entity}.{action}
✅ Examples:
vmh.document.create
vmh.document.update
vmh.document.delete
vmh.beteiligte.create
calendar.sync.employee
advoware.proxy.response
❌ Avoid:
document-create # Use dots, not dashes
DocumentCreate # Use lowercase
create_document # Wrong order
Service Layer Patterns
Service Base Class Pattern
All services should follow this pattern:
"""Service docstring"""
import logging
from typing import Optional
from services.logging_utils import get_service_logger
logger = logging.getLogger(__name__)
class MyService:
"""Service for interacting with External API"""
def __init__(self, context=None):
"""
Initialize service.
Args:
context: Optional Motia FlowContext for logging
"""
self.context = context
self.logger = get_service_logger('my_service', context)
self._session = None
# Load config from env
self.api_key = os.getenv('MY_API_KEY', '')
if not self.api_key:
raise ValueError("MY_API_KEY not configured")
self.logger.info("MyService initialized")
def _log(self, message: str, level: str = 'info') -> None:
"""Internal logging helper"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(message)
async def _get_session(self):
"""Lazy session initialization"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self) -> None:
"""Cleanup resources"""
if self._session and not self._session.closed:
await self._session.close()
Sync Utilities Pattern
For bidirectional sync operations, inherit from BaseSyncUtils:
from services.sync_utils_base import BaseSyncUtils
class MyEntitySync(BaseSyncUtils):
"""Sync utilities for MyEntity"""
def _get_lock_key(self, entity_id: str) -> str:
"""Required: Define lock key pattern"""
return f"sync_lock:myentity:{entity_id}"
async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
"""
Decide if sync is needed.
Returns:
(needs_sync: bool, reason: str)
"""
# Implementation
pass
Base class provides:
_log()- Context-aware logging_acquire_redis_lock()- Distributed locking_release_redis_lock()- Lock cleanupself.espocrm- EspoCRM API clientself.redis- Redis clientself.context- Motia contextself.logger- Integration logger
External Integrations
xAI Collections Integration
Status: ✅ Fully Implemented
Service: services/xai_service.py
Environment Variables:
XAI_API_KEY=xai-... # For file uploads (api.x.ai)
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)
Usage Example:
from services.xai_service import XAIService
xai = XAIService(ctx)
# Upload file
file_id = await xai.upload_file(
file_content=bytes_data,
filename="document.pdf",
mime_type="application/pdf"
)
# Add to collection
await xai.add_to_collection("collection_id", file_id)
# Add to multiple collections
added = await xai.add_to_collections(["col1", "col2"], file_id)
# Remove from collection
await xai.remove_from_collection("collection_id", file_id)
Architecture:
- Files are uploaded ONCE to Files API (
api.x.ai/v1/files) - Same
file_idcan be added to MULTIPLE collections - Removing from collection does NOT delete the file (may be used elsewhere)
- Hash-based change detection prevents unnecessary reuploads
Document Sync Flow:
1. EspoCRM Webhook → vmh.document.{create|update|delete}
2. Document Sync Handler:
a. Acquire distributed lock (prevents duplicate syncs)
b. Load document from EspoCRM
c. Check if sync needed:
- dateiStatus = "Neu" or "Geändert" → SYNC
- Hash changed → SYNC
- Entity has xAI collections → SYNC
d. Download file from EspoCRM
e. Calculate MD5 hash
f. Upload to xAI (or reuse existing file_id)
g. Add to required collections
h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
i. Release lock
3. Delete: Remove from collections (keep file)
EspoCRM Fields (CDokumente):
{
'xaiId': 'file-abc123', # xAI file_id
'xaiCollections': ['col1', 'col2'], # Array of collection IDs
'xaiSyncedHash': 'abc123def456', # MD5 at last sync
'fileStatus': 'synced', # Status: new, changed, synced
'xaiSyncStatus': 'clean', # Sync state: no_sync, pending_sync, clean, unclean, failed
'md5sum': 'abc123def456', # Current file hash
'sha256': 'def456...', # SHA-256 (optional)
}
EspoCRM Integration
Service: services/espocrm.py
Environment Variables:
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-api-key
ESPOCRM_API_TIMEOUT_SECONDS=30
Usage:
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
# Get entity
entity = await espocrm.get_entity('CDokumente', entity_id)
# Update entity
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': file_id,
'fileStatus': 'synced'
})
# List entities
result = await espocrm.list_entities(
'CDokumente',
where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
select='id,name,fileStatus',
max_size=50
)
# Download attachment
file_bytes = await espocrm.download_attachment(attachment_id)
Advoware Integration
Service: services/advoware_service.py
Authentication: HMAC-512 with token caching
Environment Variables:
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
REDIS_HOST=localhost
REDIS_PORT=6379
Proxy Endpoints:
GET /advoware/proxy?endpoint={path}- Proxy GET requestsPOST /advoware/proxy- Proxy POST requestsPUT /advoware/proxy- Proxy PUT requestsDELETE /advoware/proxy- Proxy DELETE requests
Testing and Debugging
Start System
Production (systemd):
# Restart services
sudo systemctl restart motia.service
sudo systemctl restart iii-console.service
# Check status
sudo systemctl status motia.service
sudo systemctl status iii-console.service
# View real-time logs
journalctl -u motia.service -f
journalctl -u iii-console.service -f
# Enable auto-start on boot
sudo systemctl enable motia.service
sudo systemctl enable iii-console.service
Manual (Development):
# Start iii Engine
cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml
# Start iii Console (Web UI)
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
--engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114
Check Registered Steps
curl http://localhost:3111/_console/functions | python3 -m json.tool
Test HTTP Endpoint
# Test document webhook
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
-H "Content-Type: application/json" \
-d '[{"id": "test123", "entityType": "CDokumente"}]'
# Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees"
Manually Trigger Cron
curl -X POST "http://localhost:3111/_console/cron/trigger" \
-H "Content-Type: application/json" \
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
View Logs
# Live logs via journalctl
journalctl -u motia-iii -f
# Search for specific step
journalctl --since "today" | grep -i "document sync"
# Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
Common Issues
Step not showing up:
- Check file naming: Must end with
_step.py - Check for import errors:
grep -i "importerror\|traceback" iii.log - Verify
configdict is present - Restart iii engine
Redis connection failed:
- Check
REDIS_HOSTandREDIS_PORTenvironment variables - Verify Redis is running:
redis-cli ping - Service will work without Redis but with warnings
AttributeError '_log' not found:
- Ensure service inherits from
BaseSyncUtilsOR - Implement
_log()method manually
Key Patterns Summary
✅ DO
- Always use
_step.pysuffix for step files - Always use
ctx.loggerfor logging (neverprint) - Always wrap handlers in try/except with error logging
- Always use visual separators in logs (
"=" * 80) - Always return
ApiResponsefrom HTTP handlers - Always document what events a step emits
- Always use distributed locks for sync operations
- Always calculate hashes for change detection
❌ DON'T
- Don't use module-level
loggerin steps - Don't forget
asyncon handler functions - Don't use blocking I/O (use
aiohttp, notrequests) - Don't return values from queue/cron handlers
- Don't hardcode credentials (use environment variables)
- Don't skip lock cleanup in
finallyblocks - Don't use
print()for logging
Module Documentation
Module Documentation
Steps
Advoware Proxy (Module README)
- Universal HTTP proxy with HMAC-512 authentication
- Endpoints: GET, POST, PUT, DELETE
- Redis token caching
Calendar Sync (Module README)
- Bidirectional Advoware ↔ Google Calendar sync
- Cron: Every 15 minutes
- API trigger:
/advoware/calendar/sync
VMH Integration (Module README)
- EspoCRM ↔ Advoware bidirectional sync
- Webhooks: Beteiligte, Bankverbindungen, Documents
- xAI Collections integration for documents
Services
| Service | Purpose | Config |
|---|---|---|
xai_service.py |
xAI file uploads & collections | XAI_API_KEY, XAI_MANAGEMENT_KEY |
espocrm.py |
EspoCRM REST API client | ESPOCRM_API_BASE_URL, ESPOCRM_API_KEY |
advoware_service.py |
Advoware API with HMAC auth | ADVOWARE_API_KEY, ADVOWARE_API_SECRET |
document_sync_utils.py |
Document sync logic | Redis connection |
beteiligte_sync_utils.py |
Beteiligte sync logic | Redis connection |
sync_utils_base.py |
Base class for sync utils | - |
Quick Reference
Environment Variables
Required:
# EspoCRM
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-key
# Advoware
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
# xAI
XAI_API_KEY=xai-...
XAI_MANAGEMENT_KEY=xai-token-...
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
Optional:
ESPOCRM_API_TIMEOUT_SECONDS=30
ESPOCRM_METADATA_TTL_SECONDS=300
File Structure
bitbylaw/
├── iii-config.yaml # Motia III configuration
├── pyproject.toml # Python dependencies (uv)
├── steps/ # Business logic
│ ├── advoware_proxy/
│ ├── advoware_cal_sync/
│ └── vmh/
│ ├── webhook/ # HTTP webhook handlers
│ │ ├── *_create_api_step.py
│ │ ├── *_update_api_step.py
│ │ └── *_delete_api_step.py
│ ├── *_sync_event_step.py # Queue event handlers
│ └── *_sync_cron_step.py # Scheduled jobs
├── services/ # Shared services
│ ├── xai_service.py
│ ├── espocrm.py
│ ├── advoware_service.py
│ ├── *_sync_utils.py
│ ├── sync_utils_base.py
│ ├── logging_utils.py
│ └── exceptions.py
├── docs/ # Documentation
│ ├── INDEX.md # This file
│ ├── ARCHITECTURE.md
│ └── DOCUMENT_SYNC_XAI_STATUS.md
└── tests/ # Test scripts
└── test_xai_collections_api.py
Motia III vs Old Motia
| Old Motia v0.17 | Motia III v1.0-RC |
|---|---|
type: 'api' |
triggers: [http()] |
type: 'event' |
triggers: [queue()] |
type: 'cron' |
triggers: [cron()] |
context.emit() |
ctx.enqueue() |
emits: [...] |
enqueues: [...] |
subscribes: [...] |
triggers: [queue('topic')] |
| 5-field cron | 6-field cron (seconds first) |
context.logger |
ctx.logger |
| Motia Workbench | iii Console |
| Node.js + Python | Pure Python |
Cron Syntax
6 fields (Motia III): second minute hour day month weekday
0 */15 * * * * # Every 15 minutes
0 0 */6 * * * # Every 6 hours
0 0 2 * * * # Daily at 2 AM
0 30 9 * * 1-5 # Monday-Friday at 9:30 AM
Additional Resources
Documentation
- MIGRATION_GUIDE.md - v0.17 → v1.0 migration
- MIGRATION_STATUS.md - Migration progress
- ARCHITECTURE.md - System design
- DOCUMENT_SYNC_XAI_STATUS.md - xAI integration details
External Resources
Support & Troubleshooting
| Issue | Solution |
|---|---|
| Step not registered | Check _step.py suffix, restart: sudo systemctl restart motia.service |
| Import errors | Check logs: journalctl -u motia.service | grep -i importerror |
| Code changes not applied | Auto-reload should work, force restart: sudo systemctl restart motia.service |
| ApiResponse validation | Use status=200 not status_code=200 |
| Redis unavailable | Service works with warnings, check REDIS_HOST |
'_log' not found |
Inherit from BaseSyncUtils or implement _log() method |
| Webhook not triggering | Verify endpoint: curl http://localhost:3111/_console/functions |
| xAI upload fails | Set XAI_API_KEY and XAI_MANAGEMENT_KEY in /etc/systemd/system/motia.service |
| View logs | journalctl -u motia.service -f (follow) or --since "10 minutes ago" |
Last Updated: 2026-03-08
Migration Status: ✅ Complete
xAI Integration: ✅ Implemented