1962 lines
57 KiB
Markdown
1962 lines
57 KiB
Markdown
# BitByLaw Motia III - Developer Guide
|
||
|
||
> **For AI Assistants**: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
|
||
|
||
**Quick Navigation:**
|
||
- [iii Platform & Development Workflow](#iii-platform--development-workflow) - Platform evolution and CLI tools
|
||
- [Core Concepts](#core-concepts) - System architecture and patterns
|
||
- [Design Principles](#design-principles) - Event Storm & Bidirectional References
|
||
- [Step Development](#step-development-best-practices) - How to create new steps
|
||
- [Services](#service-layer-patterns) - Reusable business logic
|
||
- [Integrations](#external-integrations) - xAI, EspoCRM, Advoware
|
||
- [Testing & Debugging](#testing-and-debugging)
|
||
|
||
---
|
||
|
||
## Project Status
|
||
|
||
**Migration:** ✅ 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)
|
||
|
||
**New to the project?** Start here:
|
||
1. [README.md](../README.md) - Project Overview & Quick Start
|
||
2. [MIGRATION_GUIDE.md](../MIGRATION_GUIDE.md) - Complete migration patterns
|
||
3. [ARCHITECTURE.md](ARCHITECTURE.md) - System design and architecture
|
||
|
||
---
|
||
|
||
## iii Platform & Development Workflow
|
||
|
||
### Platform Evolution (v0.8 → v0.9+)
|
||
|
||
**Status:** March 2026 - iii v0.9+ production-ready
|
||
|
||
iii has evolved from an all-in-one development tool to a **modular, production-grade event engine** with clear separation between development and deployment workflows.
|
||
|
||
#### Structural Changes Overview
|
||
|
||
| Component | Before (v0.2-v0.7) | Now (v0.9+) | Impact |
|
||
|-----------|-------------------|-------------|--------|
|
||
| **Console/Dashboard** | Integrated in engine process (port 3111) | Separate process (`iii-cli console` or `dev`) | More flexibility, less resource overhead, better scaling |
|
||
| **CLI Tool** | Minimal or non-existent | `iii-cli` is the central dev tool | Terminal-based dev workflow, scriptable, faster iteration |
|
||
| **Project Structure** | Steps anywhere in project | **Recommended:** `src/` + `src/steps/` | Cleaner structure, reliable hot-reload |
|
||
| **Hot-Reload/Watcher** | Integrated in engine | Separate `shell::ExecModule` with `watch` paths | Only Python/TS files watched (configurable) |
|
||
| **Start & Services** | Single `iii` process | Engine (`iii` or `iii-cli start`) + Console separate | Better for production (engine) vs dev (console) |
|
||
| **Config Handling** | YAML + ENV | YAML + ENV + CLI flags prioritized | More control via CLI flags |
|
||
| **Observability** | Basic | Enhanced (OTel, Rollups, Alerts, Traces) | Production-ready telemetry |
|
||
| **Streams & State** | KV-Store (file/memory) | More adapters + file_based default | Better persistence handling |
|
||
|
||
**Key Takeaway:** iii is now a **modular, production-ready engine** where development (CLI + separate console) is clearly separated from production deployment.
|
||
|
||
---
|
||
|
||
### Development Workflow with iii-cli
|
||
|
||
**`iii-cli` is your primary tool for local development, debugging, and testing.**
|
||
|
||
#### Essential Commands
|
||
|
||
| Command | Purpose | When to Use | Example |
|
||
|---------|---------|------------|---------|
|
||
| `iii-cli dev` | Start dev server with hot-reload + integrated console | Local development, immediate feedback on code changes | `iii-cli dev` |
|
||
| `iii-cli console` | Start dashboard only (separate port) | When you only need the console (no dev reload) | `iii-cli console --host 0.0.0.0 --port 3113` |
|
||
| `iii-cli start` | Start engine standalone (like `motia.service`) | Testing engine in isolation | `iii-cli start -c iii-config.yaml` |
|
||
| `iii-cli logs` | Live logs of all flows/workers/triggers | Debugging, error investigation | `iii-cli logs --level debug` |
|
||
| `iii-cli trace <id>` | Show detailed trace information (OTel) | Debug specific request/flow | `iii-cli trace abc123` |
|
||
| `iii-cli state ls` | List states (KV storage) | Verify state persistence | `iii-cli state ls` |
|
||
| `iii-cli state get` | Get specific state value | Inspect state content | `iii-cli state get key` |
|
||
| `iii-cli stream ls` | List all streams + groups | Inspect stream/websocket connections | `iii-cli stream ls` |
|
||
| `iii-cli flow list` | Show all registered flows/triggers | Overview of active steps & endpoints | `iii-cli flow list` |
|
||
| `iii-cli worker logs` | Worker logs (Python/TS execution) | Debug issues in step handlers | `iii-cli worker logs` |
|
||
|
||
#### Typical Development Workflow
|
||
|
||
```bash
|
||
# 1. Navigate to project
|
||
cd /opt/motia-iii/bitbylaw
|
||
|
||
# 2. Start dev mode (hot-reload + console on port 3113)
|
||
iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
|
||
|
||
# Alternative: Separate engine + console
|
||
# Terminal 1:
|
||
iii-cli start -c iii-config.yaml
|
||
|
||
# Terminal 2:
|
||
iii-cli console --host 0.0.0.0 --port 3113 \
|
||
--engine-host 192.168.1.62 --engine-port 3111
|
||
|
||
# 3. Watch logs live (separate terminal)
|
||
iii-cli logs -f
|
||
|
||
# 4. Debug specific trace
|
||
iii-cli trace <trace-id-from-logs>
|
||
|
||
# 5. Inspect state
|
||
iii-cli state ls
|
||
iii-cli state get document:sync:status
|
||
|
||
# 6. Verify flows registered
|
||
iii-cli flow list
|
||
```
|
||
|
||
#### Development vs. Production
|
||
|
||
**Development:**
|
||
- Use `iii-cli dev` for hot-reload
|
||
- Console accessible on localhost:3113
|
||
- Logs visible in terminal
|
||
- Immediate feedback on code changes
|
||
|
||
**Production:**
|
||
- `systemd` service runs `iii-cli start`
|
||
- Console runs separately (if needed)
|
||
- Logs via `journalctl -u motia.service -f`
|
||
- No hot-reload (restart service for changes)
|
||
|
||
**Example Production Service:**
|
||
```ini
|
||
[Unit]
|
||
Description=Motia III Engine
|
||
After=network.target redis.service
|
||
|
||
[Service]
|
||
Type=simple
|
||
User=motia
|
||
WorkingDirectory=/opt/motia-iii/bitbylaw
|
||
ExecStart=/usr/local/bin/iii-cli start -c /opt/motia-iii/bitbylaw/iii-config.yaml
|
||
Restart=always
|
||
RestartSec=10
|
||
Environment="PATH=/usr/local/bin:/usr/bin"
|
||
|
||
[Install]
|
||
WantedBy=multi-user.target
|
||
```
|
||
|
||
#### Project Structure Best Practices
|
||
|
||
**Recommended Structure (v0.9+):**
|
||
```
|
||
bitbylaw/
|
||
├── iii-config.yaml # Main configuration
|
||
├── src/ # Source code root
|
||
│ └── steps/ # All steps here (hot-reload reliable)
|
||
│ ├── __init__.py
|
||
│ ├── vmh/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── document_sync_event_step.py
|
||
│ │ └── webhook/
|
||
│ │ ├── __init__.py
|
||
│ │ └── document_create_api_step.py
|
||
│ └── advoware_proxy/
|
||
│ └── ...
|
||
├── services/ # Shared business logic
|
||
│ ├── __init__.py
|
||
│ ├── xai_service.py
|
||
│ ├── espocrm.py
|
||
│ └── ...
|
||
└── tests/ # Test files
|
||
```
|
||
|
||
**Why `src/steps/` is recommended:**
|
||
- **Hot-reload works reliably** - Watcher detects changes correctly
|
||
- **Cleaner project** - Source code isolated from config/docs
|
||
- **IDE support** - Better navigation and refactoring
|
||
- **Deployment** - Easier to package
|
||
|
||
**Note:** Old structure (steps in root) still works, but hot-reload may be less reliable.
|
||
|
||
#### Hot-Reload Configuration
|
||
|
||
**Hot-reload is configured via `shell::ExecModule` in `iii-config.yaml`:**
|
||
|
||
```yaml
|
||
modules:
|
||
- type: shell::ExecModule
|
||
config:
|
||
watch:
|
||
- "src/**/*.py" # Watch Python files in src/
|
||
- "services/**/*.py" # Watch service files
|
||
# Add more patterns as needed
|
||
ignore:
|
||
- "**/__pycache__/**"
|
||
- "**/*.pyc"
|
||
- "**/tests/**"
|
||
```
|
||
|
||
**Behavior:**
|
||
- Only files matching `watch` patterns trigger reload
|
||
- Changes in `ignore` patterns are ignored
|
||
- Reload is automatic in `iii-cli dev` mode
|
||
- Production mode (`iii-cli start`) does NOT watch files
|
||
|
||
---
|
||
|
||
### Observability & Debugging
|
||
|
||
#### OpenTelemetry Integration
|
||
|
||
**iii v0.9+ has built-in OpenTelemetry support:**
|
||
|
||
```python
|
||
# Traces are automatically created for:
|
||
# - HTTP requests
|
||
# - Queue processing
|
||
# - Cron execution
|
||
# - Service calls (if instrumented)
|
||
|
||
# Access trace ID in handler:
|
||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||
trace_id = ctx.trace_id # Use for debugging
|
||
ctx.logger.info(f"Trace ID: {trace_id}")
|
||
```
|
||
|
||
**View traces:**
|
||
```bash
|
||
# Get trace details
|
||
iii-cli trace <trace-id>
|
||
|
||
# Filter logs by trace
|
||
iii-cli logs --trace <trace-id>
|
||
```
|
||
|
||
#### Debugging Workflow
|
||
|
||
**1. Live Logs:**
|
||
```bash
|
||
# All logs
|
||
iii-cli logs -f
|
||
|
||
# Specific level
|
||
iii-cli logs --level error
|
||
|
||
# With grep
|
||
iii-cli logs -f | grep "document_sync"
|
||
```
|
||
|
||
**2. State Inspection:**
|
||
```bash
|
||
# List all state keys
|
||
iii-cli state ls
|
||
|
||
# Get specific state
|
||
iii-cli state get sync:document:last_run
|
||
```
|
||
|
||
**3. Flow Verification:**
|
||
```bash
|
||
# List all registered flows
|
||
iii-cli flow list
|
||
|
||
# Verify endpoint exists
|
||
iii-cli flow list | grep "/vmh/webhook"
|
||
```
|
||
|
||
**4. Worker Issues:**
|
||
```bash
|
||
# Worker-specific logs
|
||
iii-cli worker logs
|
||
|
||
# Check worker health
|
||
iii-cli worker status
|
||
```
|
||
|
||
---
|
||
|
||
## Core Concepts
|
||
|
||
### System Overview
|
||
|
||
**Architecture:**
|
||
- **Pure Python** (Motia III)
|
||
- **Event-Driven** with queue-based async processing
|
||
- **Redis-backed** distributed locking and caching
|
||
- **REST APIs** for webhooks and proxies
|
||
|
||
**Key Components:**
|
||
1. **Steps** (`steps/`) - Business logic handlers (HTTP, Queue, Cron)
|
||
2. **Services** (`services/`) - Shared API clients and utilities
|
||
3. **Configuration** (`iii-config.yaml`) - System setup
|
||
|
||
**Data Flow:**
|
||
```
|
||
Webhook (HTTP) → Queue Event → Event Handler → External APIs
|
||
↓ ↓
|
||
Redis Lock Service Layer
|
||
(EspoCRM, Advoware, xAI)
|
||
```
|
||
|
||
---
|
||
|
||
## Design Principles
|
||
|
||
### Event Storm over Lock Coordination
|
||
|
||
**Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"**
|
||
|
||
```
|
||
╔═══════════════════════════════════════════════════════╗
|
||
║ Prefer: ║
|
||
║ ✅ Multiple redundant events over coordination ║
|
||
║ ✅ Idempotent handlers over distributed locks ║
|
||
║ ✅ Eventually consistent over perfectly synced ║
|
||
║ ✅ Simple duplication over complex orchestration ║
|
||
╚═══════════════════════════════════════════════════════╝
|
||
```
|
||
|
||
**Guidelines:**
|
||
|
||
1. **Fire events liberally** - 10 redundant events are cheaper than complex coordination
|
||
2. **Make handlers idempotent** - Early returns when nothing to do
|
||
3. **Sequential per entity, parallel across entities** - Lock prevents collisions, not updates
|
||
4. **Accept event storms** - Handlers queue up, Motia retries automatically
|
||
|
||
**Lock Strategy: Step + Input Values**
|
||
|
||
```
|
||
Lock Key Format: {step_name}:{entity_type}:{entity_id}
|
||
|
||
Examples:
|
||
- document_sync:CDokumente:doc123
|
||
- beteiligte_sync:CBeteiligte:bet456
|
||
- collection_create:Account:acc789
|
||
|
||
Rules:
|
||
✅ Parallel: sync document:A + sync document:B (different IDs)
|
||
❌ Blocked: sync document:A + sync document:A (same ID)
|
||
|
||
Lock Acquisition:
|
||
├─ Try acquire lock
|
||
├─ Success → Process → Release in finally block
|
||
└─ Fail (already locked) → Raise exception → Motia retries later
|
||
|
||
Lock Configuration:
|
||
- TTL: 30 minutes (auto-release on timeout)
|
||
- Robust release: Always in finally block
|
||
- Dead lock prevention: TTL ensures eventual recovery
|
||
```
|
||
|
||
**Handler Pattern:**
|
||
```python
|
||
async def handler(event_data, ctx):
|
||
entity_id = event_data['entity_id']
|
||
|
||
# 1. Try acquire lock
|
||
lock_key = f"document_sync:{entity_type}:{entity_id}"
|
||
lock_acquired = await acquire_lock(lock_key, ttl=1800) # 30 minutes
|
||
|
||
if not lock_acquired:
|
||
# Raise exception → Motia will retry
|
||
raise RuntimeError(f"⏸️ Lock busy for {lock_key}, retry later")
|
||
|
||
try:
|
||
# 2. Do work (only if lock acquired)
|
||
await sync_document(entity_id)
|
||
|
||
finally:
|
||
# 3. ALWAYS release lock (robust cleanup)
|
||
await release_lock(lock_key)
|
||
```
|
||
|
||
**Retry Behavior:**
|
||
- Lock fail → Handler raises exception
|
||
- Motia Queue catches exception → Schedules retry
|
||
- Retry after configured delay (exponential backoff)
|
||
- Eventually lock available → Handler processes successfully
|
||
Check: fileStatus="synced", xaiSyncStatus="clean"
|
||
→ Early return (nothing to do) ✅
|
||
|
||
Result: Second event processed but no duplicate work!
|
||
```
|
||
|
||
**Why This Works:**
|
||
- **Lock prevents chaos**: No parallel file uploads for same entity
|
||
- **Queue enables updates**: New changes processed sequentially
|
||
- **Idempotency prevents waste**: Redundant events → cheap early returns
|
||
- **Parallel scaling**: Different entities process simultaneously
|
||
|
||
**Practical Example: Entity Link Event**
|
||
```
|
||
User links Document ↔ Räumungsklage
|
||
|
||
Webhooks fire:
|
||
├─ POST /vmh/webhook/entity/link
|
||
└─ Emits: raeumungsklage.update, cdokumente.update
|
||
|
||
Handlers (parallel, different entities):
|
||
├─ Räumungsklage Handler
|
||
│ ├─ Lock: raeumungsklage:abc123
|
||
│ ├─ Creates xAI Collection (if missing)
|
||
│ └─ Fires: cdokumente.update (for all linked docs) ← Event Storm!
|
||
│
|
||
└─ Document Handler (may run 2-3x on same doc)
|
||
├─ Lock: document:doc456 (sequential processing)
|
||
├─ Run 1: Collections not ready → Skip (cheap return)
|
||
├─ Run 2: Collection ready → Upload & sync
|
||
└─ Run 3: Already synced → Early return (idempotent!)
|
||
```
|
||
|
||
---
|
||
|
||
### Steps vs. Utils: Parallel vs. Sequential
|
||
|
||
**Separation Principle: "Events for Parallelism, Functions for Composition"**
|
||
|
||
```
|
||
╔═══════════════════════════════════════════════════════╗
|
||
║ Choose Architecture by Execution Model: ║
|
||
║ ║
|
||
║ ✅ Separate Steps → When parallel possible ║
|
||
║ → Communicate via Events ║
|
||
║ → Own lock scope, independent retry ║
|
||
║ ║
|
||
║ ✅ Shared Utils → When sequential required ║
|
||
║ → Call as function (need return values) ║
|
||
║ → Reusable across multiple steps ║
|
||
╚═══════════════════════════════════════════════════════╝
|
||
```
|
||
|
||
**Decision Matrix:**
|
||
|
||
| Requirement | Architecture | Communication | Example |
|
||
|------------|--------------|---------------|---------|
|
||
| **Can run in parallel** | Separate Steps | Events | Document sync + Collection create |
|
||
| **Needs return value** | Utils function | Function call | `get_or_create_collection()` |
|
||
| **Reusable logic** | Utils function | Import + call | `should_sync_to_xai()` |
|
||
| **One-time handler** | Inline in Step | N/A | Event-specific parsing |
|
||
|
||
**Examples:**
|
||
|
||
```python
|
||
# ✅ GOOD: Parallel → Separate Steps + Events
|
||
# steps/collection_manager_step.py
|
||
async def handle_entity_update(event_data, ctx):
|
||
"""Creates xAI Collection for entity"""
|
||
collection_id = await create_collection(entity_type, entity_id)
|
||
|
||
# Fire events for all linked documents (parallel processing!)
|
||
for doc in linked_docs:
|
||
await ctx.emit("cdokumente.update", {"entity_id": doc.id})
|
||
|
||
# steps/document_sync_step.py
|
||
async def handle_document_update(event_data, ctx):
|
||
"""Syncs document to xAI"""
|
||
# Runs in parallel with collection_manager_step!
|
||
await sync_document_to_xai(entity_id)
|
||
|
||
# ✅ GOOD: Sequential + Reusable → Utils
|
||
# services/document_sync_utils.py
|
||
async def get_required_collections(entity_id):
|
||
"""Reusable function, needs return value"""
|
||
return await _scan_entity_relations(entity_id)
|
||
|
||
# ❌ BAD: Sequential logic in Step (not reusable)
|
||
async def handle_document_update(event_data, ctx):
|
||
# Inline function call - hard to test and reuse
|
||
collection_id = await create_collection_inline(...)
|
||
await upload_file(collection_id, ...) # Needs collection_id from above!
|
||
```
|
||
|
||
**Guidelines:**
|
||
|
||
1. **Default to Steps + Events** - Maximize parallelism
|
||
2. **Use Utils when:**
|
||
- Need immediate return value
|
||
- Logic reused across 2+ steps
|
||
- Complex computation (not I/O bound)
|
||
3. **Keep Steps thin** - Mostly orchestration + event emission
|
||
4. **Keep Utils testable** - Pure functions, no event emission
|
||
|
||
**Code Organization:**
|
||
```
|
||
steps/
|
||
├─ document_sync_event_step.py # Event handler (thin)
|
||
├─ collection_manager_step.py # Event handler (thin)
|
||
└─ vmh/
|
||
└─ entity_link_webhook_step.py # Webhook handler (thin)
|
||
|
||
services/
|
||
├─ document_sync_utils.py # Reusable functions
|
||
├─ xai_service.py # API client
|
||
└─ espocrm.py # CRM client
|
||
```
|
||
|
||
---
|
||
|
||
### Bidirectional Reference Pattern
|
||
|
||
**Principle: Every sync maintains references on both sides**
|
||
|
||
**EspoCRM as Central Hub:**
|
||
```
|
||
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
|
||
│ Advoware │◄────────┤ EspoCRM ├────────►│ xAI │
|
||
│ │ │ │ │ │
|
||
│ Akte 12345 │ │ Entity │ │ Collection │
|
||
└─────────────┘ │ - advowareId │ └─────────────┘
|
||
│ - xaiColId │
|
||
└──────────────┘
|
||
```
|
||
|
||
**Implementation:**
|
||
- **xAI Collection** → stores `entityType` + `entityId` in metadata
|
||
- **EspoCRM Entity** → stores `xaiCollectionId` field
|
||
- **EspoCRM Document** → stores `xaiFileId` + `xaiCollections[]` fields
|
||
- **Advoware Integration** → stores `advowareAkteId` in EspoCRM
|
||
|
||
**Benefits:**
|
||
- Bidirectional navigation without complex queries
|
||
- Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
|
||
- Idempotent lookups (can verify both directions)
|
||
- Debugging: Always know where things came from
|
||
- **Single Source of Truth**: EspoCRM is authoritative for relationships
|
||
|
||
**Example: Collection Creation**
|
||
```python
|
||
# xAI Collection
|
||
collection = await xai.create_collection(
|
||
name="CBeteiligte - Max Mustermann",
|
||
metadata={
|
||
"espocrm_entity_type": "CBeteiligte",
|
||
"espocrm_entity_id": "abc123",
|
||
"created_at": "2026-03-08T19:00:00Z"
|
||
}
|
||
)
|
||
|
||
# EspoCRM Entity
|
||
await espocrm.update_entity("CBeteiligte", "abc123", {
|
||
"xaiCollectionId": collection.id
|
||
})
|
||
```
|
||
|
||
---
|
||
|
||
### Error Handling & Resilience
|
||
|
||
**Principle: "Retry Smart, Fail Visible, Recover When Possible"**
|
||
|
||
```
|
||
╔═══════════════════════════════════════════════════════╗
|
||
║ Error Handling Strategy (in priority order): ║
|
||
║ ║
|
||
║ 1️⃣ Idempotent Check → Skip if already done ║
|
||
║ 2️⃣ Step-internal retry with exponential backoff ║
|
||
║ 3️⃣ Rollback/Compensation (when possible) ║
|
||
║ 4️⃣ Log failure clearly → Let Motia retry ║
|
||
║ 5️⃣ Manual intervention (for unrecoverable failures) ║
|
||
╚═══════════════════════════════════════════════════════╝
|
||
```
|
||
|
||
#### Idempotency First
|
||
|
||
**Always check if work already done before starting:**
|
||
|
||
```python
|
||
async def sync_document_handler(event_data, ctx):
|
||
entity_id = event_data['entity_id']
|
||
|
||
# 1. Load current state
|
||
doc = await espocrm.get_entity('CDokumente', entity_id)
|
||
|
||
# 2. Check if already synced (idempotent check)
|
||
if doc.get('xaiSyncStatus') == 'clean':
|
||
current_md5 = doc.get('md5sum')
|
||
synced_md5 = doc.get('xaiSyncedHash')
|
||
|
||
if current_md5 == synced_md5:
|
||
ctx.logger.info(f"✅ Already synced, skipping (hash match)")
|
||
return # Nothing to do
|
||
|
||
# 3. Check if file already uploaded to xAI
|
||
xai_file_id = doc.get('xaiId')
|
||
if xai_file_id:
|
||
# Verify file exists in xAI
|
||
file_exists = await xai.file_exists(xai_file_id)
|
||
if file_exists:
|
||
ctx.logger.info(f"✅ File already in xAI: {xai_file_id}")
|
||
# Only update collections, not re-upload
|
||
|
||
# 4. Proceed with sync...
|
||
```
|
||
|
||
#### Retry with Exponential Backoff
|
||
|
||
**For transient failures (network, rate limits):**
|
||
|
||
```python
|
||
import asyncio
|
||
from typing import TypeVar, Callable
|
||
|
||
T = TypeVar('T')
|
||
|
||
async def retry_with_backoff(
|
||
operation: Callable[[], T],
|
||
max_attempts: int = 3,
|
||
base_delay: float = 1.0,
|
||
max_delay: float = 30.0,
|
||
ctx = None
|
||
) -> T:
|
||
"""
|
||
Retry operation with exponential backoff.
|
||
|
||
Args:
|
||
operation: Async function to retry
|
||
max_attempts: Maximum retry attempts (not endless!)
|
||
base_delay: Initial delay in seconds
|
||
max_delay: Maximum delay cap
|
||
"""
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
return await operation()
|
||
|
||
except Exception as e:
|
||
if attempt == max_attempts:
|
||
# Log and re-raise on final attempt
|
||
if ctx:
|
||
ctx.logger.error(
|
||
f"❌ Operation failed after {max_attempts} attempts: {e}"
|
||
)
|
||
raise
|
||
|
||
# Calculate backoff delay
|
||
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
|
||
|
||
if ctx:
|
||
ctx.logger.warn(
|
||
f"⚠️ Attempt {attempt}/{max_attempts} failed: {e}. "
|
||
f"Retrying in {delay}s..."
|
||
)
|
||
|
||
await asyncio.sleep(delay)
|
||
|
||
# Usage
|
||
file_id = await retry_with_backoff(
|
||
lambda: xai.upload_file(content, filename),
|
||
max_attempts=3,
|
||
ctx=ctx
|
||
)
|
||
```
|
||
|
||
#### Rollback/Compensation
|
||
|
||
**When operations fail mid-way, clean up what was created:**
|
||
|
||
```python
|
||
async def sync_document_with_rollback(entity_id, ctx):
|
||
xai_file_id = None
|
||
collection_created = False
|
||
|
||
try:
|
||
# 1. Upload file to xAI
|
||
xai_file_id = await xai.upload_file(content, filename)
|
||
ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}")
|
||
|
||
# 2. Create collection if needed
|
||
collection_id = await xai.create_collection(name, metadata)
|
||
collection_created = True
|
||
ctx.logger.info(f"✅ Created collection: {collection_id}")
|
||
|
||
# 3. Add file to collection
|
||
await xai.add_to_collection(collection_id, xai_file_id)
|
||
|
||
# 4. Update EspoCRM (critical - if this fails, we have orphans)
|
||
await espocrm.update_entity('CDokumente', entity_id, {
|
||
'xaiId': xai_file_id,
|
||
'xaiCollections': [collection_id],
|
||
'xaiSyncStatus': 'clean'
|
||
})
|
||
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}")
|
||
|
||
# ROLLBACK: Remove what we created
|
||
try:
|
||
if xai_file_id:
|
||
# Note: Only remove from collections, not delete file
|
||
# (file may be referenced elsewhere)
|
||
ctx.logger.info(f"⚠️ Leaving orphaned file in xAI: {xai_file_id}")
|
||
ctx.logger.info(f"⚠️ Manual cleanup may be required")
|
||
|
||
# Update EspoCRM to reflect failure
|
||
await espocrm.update_entity('CDokumente', entity_id, {
|
||
'xaiSyncStatus': 'failed',
|
||
'xaiSyncError': str(e)
|
||
})
|
||
|
||
except Exception as rollback_error:
|
||
ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}")
|
||
ctx.logger.error(f"⚠️ REQUIRES MANUAL INTERVENTION")
|
||
ctx.logger.error(f"⚠️ Entity: {entity_id}, xaiId: {xai_file_id}")
|
||
|
||
# Re-raise original exception for Motia retry
|
||
raise
|
||
```
|
||
|
||
**Reality Check:**
|
||
- **Rollback not always possible** (e.g., Advoware API, third-party services)
|
||
- **Idempotency >> Rollback** - Better to handle re-execution than undo
|
||
- **Log orphans clearly** - Enable manual cleanup when needed
|
||
|
||
#### Clear Failure Logging
|
||
|
||
**When rollback impossible, log enough details for manual intervention:**
|
||
|
||
```python
|
||
try:
|
||
await external_api.create_resource(data)
|
||
except Exception as e:
|
||
# Log all context needed for manual recovery
|
||
ctx.logger.error("=" * 80)
|
||
ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED")
|
||
ctx.logger.error("=" * 80)
|
||
ctx.logger.error(f"Entity Type: CDokumente")
|
||
ctx.logger.error(f"Entity ID: {entity_id}")
|
||
ctx.logger.error(f"Operation: Create Advoware Document")
|
||
ctx.logger.error(f"Error: {e}")
|
||
ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}")
|
||
ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}")
|
||
ctx.logger.error("=" * 80)
|
||
|
||
# Mark as failed in EspoCRM
|
||
await espocrm.update_entity('CDokumente', entity_id, {
|
||
'syncStatus': 'failed',
|
||
'syncError': str(e),
|
||
'syncFailedAt': datetime.now().isoformat()
|
||
})
|
||
|
||
# Re-raise for Motia (will retry later)
|
||
raise
|
||
```
|
||
|
||
#### Rate Limiting (xAI)
|
||
|
||
**xAI has rate limits - handle with backoff:**
|
||
|
||
```python
|
||
from aiohttp import ClientResponseError
|
||
|
||
async def upload_with_rate_limit_handling(xai, content, filename, ctx):
|
||
"""Upload file to xAI with rate limit handling."""
|
||
try:
|
||
return await retry_with_backoff(
|
||
lambda: xai.upload_file(content, filename),
|
||
max_attempts=5, # More attempts for rate limits
|
||
base_delay=2.0, # Longer initial delay
|
||
max_delay=60.0, # Up to 1 minute backoff
|
||
ctx=ctx
|
||
)
|
||
|
||
except ClientResponseError as e:
|
||
if e.status == 429: # Rate limit exceeded
|
||
ctx.logger.error(f"❌ xAI rate limit exceeded")
|
||
|
||
# Check Retry-After header
|
||
retry_after = e.headers.get('Retry-After', '60')
|
||
ctx.logger.error(f"⏰ Retry after {retry_after} seconds")
|
||
|
||
# Wait and try once more
|
||
await asyncio.sleep(int(retry_after))
|
||
return await xai.upload_file(content, filename)
|
||
|
||
raise
|
||
```
|
||
|
||
**Rate Limit Strategy:**
|
||
- **xAI**: Exponential backoff + respect `Retry-After` header
|
||
- **EspoCRM**: Internal system, no rate limits
|
||
- **Advoware**: No known rate limits (backoff on errors)
|
||
|
||
#### Input Validation
|
||
|
||
**Always validate webhook payloads and external data:**
|
||
|
||
```python
|
||
from typing import Dict, Any, List
|
||
|
||
def validate_webhook_payload(
|
||
payload: Any,
|
||
required_fields: List[str],
|
||
ctx
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Validate webhook payload structure.
|
||
|
||
Raises:
|
||
ValueError: If validation fails
|
||
"""
|
||
# Check payload is dict
|
||
if not isinstance(payload, dict):
|
||
raise ValueError(f"Payload must be dict, got {type(payload)}")
|
||
|
||
# Check required fields
|
||
missing = [f for f in required_fields if f not in payload]
|
||
if missing:
|
||
raise ValueError(f"Missing required fields: {missing}")
|
||
|
||
# Validate entity_id format (UUID)
|
||
entity_id = payload.get('entity_id', '')
|
||
if not entity_id or len(entity_id) < 10:
|
||
raise ValueError(f"Invalid entity_id: {entity_id}")
|
||
|
||
ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}")
|
||
return payload
|
||
|
||
# Usage in handler
|
||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||
try:
|
||
# Validate input
|
||
payload = validate_webhook_payload(
|
||
request.body,
|
||
required_fields=['entity_id', 'entity_type', 'action'],
|
||
ctx=ctx
|
||
)
|
||
|
||
# Process...
|
||
|
||
except ValueError as e:
|
||
ctx.logger.error(f"❌ Invalid payload: {e}")
|
||
return ApiResponse(status_code=400, body={'error': str(e)})
|
||
```
|
||
|
||
**Validation Checklist:**
|
||
- ✅ Type checking (dict, list, str)
|
||
- ✅ Required fields present
|
||
- ✅ Format validation (IDs, dates, enums)
|
||
- ✅ Length limits (prevent DoS)
|
||
- ✅ Enum validation (status values)
|
||
|
||
#### Environment Configuration
|
||
|
||
**Always use environment variables, validate on startup:**
|
||
|
||
```python
|
||
import os
|
||
from typing import Optional
|
||
|
||
class ServiceConfig:
|
||
"""Validated configuration from environment."""
|
||
|
||
def __init__(self):
|
||
# Required
|
||
self.xai_api_key = self._require_env('XAI_API_KEY')
|
||
self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL')
|
||
|
||
# Optional with defaults
|
||
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||
self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||
self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800')) # 30min
|
||
|
||
def _require_env(self, key: str) -> str:
|
||
"""Get required env var or raise."""
|
||
value = os.getenv(key)
|
||
if not value:
|
||
raise ValueError(f"Required environment variable not set: {key}")
|
||
return value
|
||
|
||
# Usage
|
||
config = ServiceConfig() # Fails fast on startup if misconfigured
|
||
```
|
||
|
||
**Notes:**
|
||
- Collections lifecycle (creation/deletion) managed by EspoCRM entity status
|
||
- Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere)
|
||
- Lock TTL: 30 minutes (forced release on timeout)
|
||
|
||
---
|
||
|
||
### Type Safety & Explicit Contracts
|
||
|
||
**Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"**
|
||
|
||
```
|
||
╔═══════════════════════════════════════════════════════╗
|
||
║ Prefer: ║
|
||
║ ✅ Enums over string literals ║
|
||
║ ✅ Type hints over duck typing ║
|
||
║ ✅ Named constants over magic numbers ║
|
||
║ ✅ Central definitions over scattered duplicates ║
|
||
╚═══════════════════════════════════════════════════════╝
|
||
```
|
||
|
||
#### Status Enums
|
||
|
||
**Define all valid status values centrally:**
|
||
|
||
```python
|
||
# services/models.py
|
||
from enum import Enum
|
||
|
||
class FileStatus(str, Enum):
|
||
"""Valid values for CDokumente.fileStatus field."""
|
||
NEW = "new"
|
||
CHANGED = "changed"
|
||
SYNCED = "synced"
|
||
|
||
def __str__(self) -> str:
|
||
return self.value
|
||
|
||
class XAISyncStatus(str, Enum):
|
||
"""Valid values for CDokumente.xaiSyncStatus field."""
|
||
NO_SYNC = "no_sync" # Entity has no xAI collections
|
||
PENDING_SYNC = "pending_sync" # Sync in progress (locked)
|
||
CLEAN = "clean" # Synced successfully
|
||
UNCLEAN = "unclean" # Needs re-sync (file changed)
|
||
FAILED = "failed" # Sync failed (see xaiSyncError)
|
||
|
||
def __str__(self) -> str:
|
||
return self.value
|
||
|
||
class EntityAction(str, Enum):
|
||
"""Valid webhook actions."""
|
||
CREATE = "create"
|
||
UPDATE = "update"
|
||
DELETE = "delete"
|
||
LINK = "link"
|
||
UNLINK = "unlink"
|
||
|
||
def __str__(self) -> str:
|
||
return self.value
|
||
|
||
class EntityType(str, Enum):
|
||
"""Supported EspoCRM entity types."""
|
||
DOKUMENTE = "CDokumente"
|
||
BETEILIGTE = "CBeteiligte"
|
||
RAEUMUNGSKLAGE = "Raeumungsklage"
|
||
ACCOUNT = "Account"
|
||
|
||
def __str__(self) -> str:
|
||
return self.value
|
||
```
|
||
|
||
#### Usage in Code
|
||
|
||
**Before (Magic Strings):**
|
||
|
||
```python
|
||
# ❌ BAD: Typos not caught at runtime
|
||
if doc['xaiSyncStatus'] == 'claen': # Typo!
|
||
pass
|
||
|
||
if doc['fileStatus'] == 'synced':
|
||
# No IDE autocomplete, no type safety
|
||
pass
|
||
|
||
# ❌ BAD: Scattered definitions
|
||
status = 'pending_sync' # What are ALL valid values?
|
||
```
|
||
|
||
**After (Type-Safe Enums):**
|
||
|
||
```python
|
||
# ✅ GOOD: Type-safe, autocomplete, refactorable
|
||
from services.models import FileStatus, XAISyncStatus
|
||
|
||
if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN:
|
||
# IDE autocomplete works
|
||
# Typos caught by linter
|
||
# Easy to find all usages
|
||
pass
|
||
|
||
# Update status
|
||
await espocrm.update_entity('CDokumente', entity_id, {
|
||
'fileStatus': FileStatus.SYNCED,
|
||
'xaiSyncStatus': XAISyncStatus.CLEAN
|
||
})
|
||
|
||
# Enum validation
|
||
def validate_status(status: str) -> XAISyncStatus:
|
||
"""Validate and convert status string."""
|
||
try:
|
||
return XAISyncStatus(status)
|
||
except ValueError:
|
||
valid = [s.value for s in XAISyncStatus]
|
||
raise ValueError(f"Invalid status '{status}'. Valid: {valid}")
|
||
```
|
||
|
||
#### Benefits
|
||
|
||
**Why Enums Matter:**
|
||
|
||
1. **Prevent Typos**: `XAISyncStatus.CLAEN` → AttributeError at import time
|
||
2. **IDE Support**: Autocomplete shows all valid values
|
||
3. **Refactoring**: Rename in one place, affects entire codebase
|
||
4. **Documentation**: Single source of truth for valid values
|
||
5. **Type Safety**: `mypy` can catch invalid assignments
|
||
6. **Debugging**: Easy to find all usages with "Find References"
|
||
|
||
**Example - Easy Refactoring:**
|
||
```python
|
||
# Change enum value in one place:
|
||
class FileStatus(str, Enum):
|
||
NEW = "new"
|
||
CHANGED = "changed"
|
||
SYNCED = "synchronized" # Changed!
|
||
|
||
# All usages automatically updated (string value changes):
|
||
FileStatus.SYNCED # Returns "synchronized" everywhere
|
||
```
|
||
|
||
#### Type Hints
|
||
|
||
**Always use type hints for function signatures:**
|
||
|
||
```python
|
||
from typing import Dict, List, Optional, Tuple
|
||
from services.models import FileStatus, XAISyncStatus
|
||
|
||
# ✅ GOOD: Clear contracts
|
||
async def sync_document(
|
||
entity_id: str,
|
||
ctx: FlowContext
|
||
) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
Sync document to xAI.
|
||
|
||
Args:
|
||
entity_id: EspoCRM document ID
|
||
ctx: Motia flow context
|
||
|
||
Returns:
|
||
(success: bool, xai_file_id: Optional[str])
|
||
"""
|
||
pass
|
||
|
||
async def should_sync_to_xai(
|
||
doc: Dict[str, Any]
|
||
) -> Tuple[bool, str]:
|
||
"""
|
||
Check if document needs xAI sync.
|
||
|
||
Returns:
|
||
(should_sync: bool, reason: str)
|
||
"""
|
||
status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync'))
|
||
|
||
if status == XAISyncStatus.CLEAN:
|
||
return (False, "Already synced")
|
||
|
||
return (True, "Sync required")
|
||
```
|
||
|
||
#### Constants
|
||
|
||
**Define magic values as named constants:**
|
||
|
||
```python
|
||
# ✅ GOOD: Central configuration
|
||
# services/config.py
|
||
|
||
# Lock TTLs
|
||
LOCK_TTL_DOCUMENT_SYNC = 1800 # 30 minutes
|
||
LOCK_TTL_COLLECTION_CREATE = 300 # 5 minutes
|
||
|
||
# Retry configuration
|
||
RETRY_MAX_ATTEMPTS = 3
|
||
RETRY_BASE_DELAY = 1.0
|
||
RETRY_MAX_DELAY = 30.0
|
||
|
||
# xAI rate limits
|
||
XAI_RATE_LIMIT_RETRY_AFTER = 60 # Default retry delay
|
||
XAI_MAX_FILE_SIZE_MB = 512
|
||
|
||
# EspoCRM pagination
|
||
ESPOCRM_DEFAULT_PAGE_SIZE = 50
|
||
ESPOCRM_MAX_PAGE_SIZE = 200
|
||
|
||
# Usage
|
||
from services.config import LOCK_TTL_DOCUMENT_SYNC
|
||
|
||
lock_acquired = await acquire_lock(
|
||
lock_key,
|
||
ttl=LOCK_TTL_DOCUMENT_SYNC # Clear, refactorable
|
||
)
|
||
```
|
||
|
||
---
|
||
|
||
## Step Development Best Practices
|
||
|
||
### File Naming Convention
|
||
|
||
**CRITICAL: Always use `_step.py` suffix!**
|
||
|
||
```
|
||
✅ CORRECT:
|
||
steps/vmh/webhook/document_create_api_step.py
|
||
steps/vmh/document_sync_event_step.py
|
||
steps/vmh/beteiligte_sync_cron_step.py
|
||
|
||
❌ WRONG:
|
||
steps/vmh/document_handler.py # Missing _step.py
|
||
steps/vmh/sync.py # Missing _step.py
|
||
```
|
||
|
||
**Naming Pattern:**
|
||
- **Webhooks**: `{entity}_{action}_api_step.py`
|
||
- Examples: `beteiligte_create_api_step.py`, `document_update_api_step.py`
|
||
- **Event Handlers**: `{entity}_sync_event_step.py`
|
||
- Examples: `document_sync_event_step.py`, `beteiligte_sync_event_step.py`
|
||
- **Cron Jobs**: `{entity}_sync_cron_step.py`
|
||
- Examples: `beteiligte_sync_cron_step.py`, `calendar_sync_cron_step.py`
|
||
|
||
### Step Template
|
||
|
||
**Complete step template with all required patterns:**
|
||
|
||
```python
|
||
"""Module-level docstring describing the step's purpose"""
|
||
from typing import Dict, Any
|
||
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse
|
||
|
||
|
||
config = {
|
||
"name": "Clear Human-Readable Name",
|
||
"description": "Brief description of what this step does",
|
||
"flows": ["flow-name"], # Logical grouping
|
||
"triggers": [
|
||
# Pick ONE trigger type:
|
||
http("POST", "/path/to/endpoint"), # For webhooks
|
||
# queue("topic.name"), # For event handlers
|
||
# cron("0 */15 * * * *"), # For scheduled jobs
|
||
],
|
||
"enqueues": ["next.topic"], # Topics this step emits (optional)
|
||
}
|
||
|
||
|
||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||
"""
|
||
Handler docstring explaining:
|
||
- What triggers this handler
|
||
- What it does
|
||
- What events it emits
|
||
"""
|
||
try:
|
||
# 1. Log entry
|
||
ctx.logger.info("=")="=" * 80)
|
||
ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
|
||
ctx.logger.info("=" * 80)
|
||
|
||
# 2. Extract and validate input
|
||
payload = request.body
|
||
|
||
# 3. Business logic
|
||
# ...
|
||
|
||
# 4. Enqueue events if needed
|
||
await ctx.enqueue({
|
||
'topic': 'next.step',
|
||
'data': {
|
||
'entity_id': entity_id,
|
||
'action': 'create'
|
||
}
|
||
})
|
||
|
||
# 5. Log success
|
||
ctx.logger.info("✅ Step completed successfully")
|
||
|
||
# 6. Return response
|
||
return ApiResponse(
|
||
status_code=200,
|
||
body={'success': True}
|
||
)
|
||
|
||
except Exception as e:
|
||
# Always log errors with context
|
||
ctx.logger.error(f"❌ Error in {config['name']}: {e}")
|
||
ctx.logger.error(f"Payload: {request.body}")
|
||
|
||
return ApiResponse(
|
||
status_code=500,
|
||
body={'success': False, 'error': str(e)}
|
||
)
|
||
```
|
||
|
||
### Handler Signatures by Trigger Type
|
||
|
||
**HTTP Trigger (Webhooks, APIs):**
|
||
```python
|
||
from motia import ApiRequest, ApiResponse, FlowContext
|
||
|
||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||
# Access: request.body, request.query_params, request.path_params
|
||
return ApiResponse(status_code=200, body={...})
|
||
```
|
||
|
||
**Queue Trigger (Event Handlers):**
|
||
```python
|
||
from motia import FlowContext
|
||
|
||
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||
# Process event_data
|
||
# No return value
|
||
pass
|
||
```
|
||
|
||
**Cron Trigger (Scheduled Jobs):**
|
||
```python
|
||
from motia import FlowContext
|
||
|
||
async def handler(input_data: None, ctx: FlowContext) -> None:
|
||
# Cron jobs receive no input
|
||
# No return value
|
||
pass
|
||
```
|
||
|
||
### Logging Best Practices
|
||
|
||
**ALWAYS use `ctx.logger`, NEVER use `print()` or module-level `logger`:**
|
||
|
||
```python
|
||
# ✅ CORRECT: Context-aware logging
|
||
ctx.logger.info("Processing started")
|
||
ctx.logger.debug(f"Data: {data}")
|
||
ctx.logger.warn("Skipping invalid entry")
|
||
ctx.logger.error(f"Failed: {e}")
|
||
|
||
# ❌ WRONG: Direct print or module logger
|
||
print("Processing started") # Not visible in iii logs
|
||
logger.info("Processing started") # Loses context
|
||
```
|
||
|
||
**Log Levels:**
|
||
- `info()` - Normal operations (start, success, counts)
|
||
- `debug()` - Detailed data dumps (payloads, responses)
|
||
- `warn()` - Non-critical issues (skipped items, fallbacks)
|
||
- `error()` - Failures requiring attention
|
||
|
||
**Log Structure:**
|
||
```python
|
||
# Section headers with visual separators
|
||
ctx.logger.info("=" * 80)
|
||
ctx.logger.info("🔄 SYNC HANDLER STARTED")
|
||
ctx.logger.info("=" * 80)
|
||
ctx.logger.info(f"Entity Type: {entity_type}")
|
||
ctx.logger.info(f"Action: {action.upper()}")
|
||
ctx.logger.info(f"Document ID: {entity_id}")
|
||
ctx.logger.info("=" * 80)
|
||
|
||
# Use emojis for visual scanning
|
||
ctx.logger.info("📥 Downloading file...")
|
||
ctx.logger.info("✅ Downloaded 1024 bytes")
|
||
ctx.logger.error("❌ Upload failed")
|
||
ctx.logger.warn("⚠️ No collections found")
|
||
```
|
||
|
||
### Event Topics Naming
|
||
|
||
**Pattern:** `{module}.{entity}.{action}`
|
||
|
||
```
|
||
✅ Examples:
|
||
vmh.document.create
|
||
vmh.document.update
|
||
vmh.document.delete
|
||
vmh.beteiligte.create
|
||
calendar.sync.employee
|
||
advoware.proxy.response
|
||
|
||
❌ Avoid:
|
||
document-create # Use dots, not dashes
|
||
DocumentCreate # Use lowercase
|
||
create_document # Wrong order
|
||
```
|
||
|
||
---
|
||
|
||
## Service Layer Patterns
|
||
|
||
### Service Base Class Pattern
|
||
|
||
**All services should follow this pattern:**
|
||
|
||
```python
|
||
"""Service docstring"""
|
||
import logging
|
||
from typing import Optional
|
||
from services.logging_utils import get_service_logger
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MyService:
|
||
"""Service for interacting with External API"""
|
||
|
||
def __init__(self, context=None):
|
||
"""
|
||
Initialize service.
|
||
|
||
Args:
|
||
context: Optional Motia FlowContext for logging
|
||
"""
|
||
self.context = context
|
||
self.logger = get_service_logger('my_service', context)
|
||
self._session = None
|
||
|
||
# Load config from env
|
||
self.api_key = os.getenv('MY_API_KEY', '')
|
||
if not self.api_key:
|
||
raise ValueError("MY_API_KEY not configured")
|
||
|
||
self.logger.info("MyService initialized")
|
||
|
||
def _log(self, message: str, level: str = 'info') -> None:
|
||
"""Internal logging helper"""
|
||
log_func = getattr(self.logger, level, self.logger.info)
|
||
log_func(message)
|
||
|
||
async def _get_session(self):
|
||
"""Lazy session initialization"""
|
||
if self._session is None or self._session.closed:
|
||
self._session = aiohttp.ClientSession()
|
||
return self._session
|
||
|
||
async def close(self) -> None:
|
||
"""Cleanup resources"""
|
||
if self._session and not self._session.closed:
|
||
await self._session.close()
|
||
```
|
||
|
||
### Sync Utilities Pattern
|
||
|
||
**For bidirectional sync operations, inherit from `BaseSyncUtils`:**
|
||
|
||
```python
|
||
from services.sync_utils_base import BaseSyncUtils
|
||
|
||
class MyEntitySync(BaseSyncUtils):
|
||
"""Sync utilities for MyEntity"""
|
||
|
||
def _get_lock_key(self, entity_id: str) -> str:
|
||
"""Required: Define lock key pattern"""
|
||
return f"sync_lock:myentity:{entity_id}"
|
||
|
||
async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
|
||
"""
|
||
Decide if sync is needed.
|
||
|
||
Returns:
|
||
(needs_sync: bool, reason: str)
|
||
"""
|
||
# Implementation
|
||
pass
|
||
```
|
||
|
||
**Base class provides:**
|
||
- `_log()` - Context-aware logging
|
||
- `_acquire_redis_lock()` - Distributed locking
|
||
- `_release_redis_lock()` - Lock cleanup
|
||
- `self.espocrm` - EspoCRM API client
|
||
- `self.redis` - Redis client
|
||
- `self.context` - Motia context
|
||
- `self.logger` - Integration logger
|
||
|
||
---
|
||
|
||
## External Integrations
|
||
|
||
### xAI Collections Integration
|
||
|
||
**Status:** ✅ Fully Implemented
|
||
|
||
**Service:** `services/xai_service.py`
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
XAI_API_KEY=xai-... # For file uploads (api.x.ai)
|
||
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)
|
||
```
|
||
|
||
**Usage Example:**
|
||
```python
|
||
from services.xai_service import XAIService
|
||
|
||
xai = XAIService(ctx)
|
||
|
||
# Upload file
|
||
file_id = await xai.upload_file(
|
||
file_content=bytes_data,
|
||
filename="document.pdf",
|
||
mime_type="application/pdf"
|
||
)
|
||
|
||
# Add to collection
|
||
await xai.add_to_collection("collection_id", file_id)
|
||
|
||
# Add to multiple collections
|
||
added = await xai.add_to_collections(["col1", "col2"], file_id)
|
||
|
||
# Remove from collection
|
||
await xai.remove_from_collection("collection_id", file_id)
|
||
```
|
||
|
||
**Architecture:**
|
||
- Files are uploaded ONCE to Files API (`api.x.ai/v1/files`)
|
||
- Same `file_id` can be added to MULTIPLE collections
|
||
- Removing from collection does NOT delete the file (may be used elsewhere)
|
||
- Hash-based change detection prevents unnecessary reuploads
|
||
|
||
**Document Sync Flow:**
|
||
```
|
||
1. EspoCRM Webhook → vmh.document.{create|update|delete}
|
||
2. Document Sync Handler:
|
||
a. Acquire distributed lock (prevents duplicate syncs)
|
||
b. Load document from EspoCRM
|
||
c. Check if sync needed:
|
||
- dateiStatus = "Neu" or "Geändert" → SYNC
|
||
- Hash changed → SYNC
|
||
- Entity has xAI collections → SYNC
|
||
d. Download file from EspoCRM
|
||
e. Calculate MD5 hash
|
||
f. Upload to xAI (or reuse existing file_id)
|
||
g. Add to required collections
|
||
h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
|
||
i. Release lock
|
||
3. Delete: Remove from collections (keep file)
|
||
```
|
||
|
||
**EspoCRM Fields (CDokumente):**
|
||
```python
|
||
{
|
||
'xaiId': 'file-abc123', # xAI file_id
|
||
'xaiCollections': ['col1', 'col2'], # Array of collection IDs
|
||
'xaiSyncedHash': 'abc123def456', # MD5 at last sync
|
||
'fileStatus': 'synced', # Status: new, changed, synced
|
||
'xaiSyncStatus': 'clean', # Sync state: no_sync, pending_sync, clean, unclean, failed
|
||
'md5sum': 'abc123def456', # Current file hash
|
||
'sha256': 'def456...', # SHA-256 (optional)
|
||
}
|
||
```
|
||
|
||
### EspoCRM Integration
|
||
|
||
**Service:** `services/espocrm.py`
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
|
||
ESPOCRM_API_KEY=your-api-key
|
||
ESPOCRM_API_TIMEOUT_SECONDS=30
|
||
```
|
||
|
||
**Usage:**
|
||
```python
|
||
from services.espocrm import EspoCRMAPI
|
||
|
||
espocrm = EspoCRMAPI(ctx)
|
||
|
||
# Get entity
|
||
entity = await espocrm.get_entity('CDokumente', entity_id)
|
||
|
||
# Update entity
|
||
await espocrm.update_entity('CDokumente', entity_id, {
|
||
'xaiId': file_id,
|
||
'fileStatus': 'synced'
|
||
})
|
||
|
||
# List entities
|
||
result = await espocrm.list_entities(
|
||
'CDokumente',
|
||
where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
|
||
select='id,name,fileStatus',
|
||
max_size=50
|
||
)
|
||
|
||
# Download attachment
|
||
file_bytes = await espocrm.download_attachment(attachment_id)
|
||
```
|
||
|
||
### Advoware Integration
|
||
|
||
**Service:** `services/advoware_service.py`
|
||
|
||
**Authentication:** HMAC-512 with token caching
|
||
|
||
**Environment Variables:**
|
||
```bash
|
||
ADVOWARE_API_BASE_URL=https://api.advoware.de
|
||
ADVOWARE_API_KEY=your-key
|
||
ADVOWARE_API_SECRET=your-secret
|
||
REDIS_HOST=localhost
|
||
REDIS_PORT=6379
|
||
```
|
||
|
||
**Proxy Endpoints:**
|
||
- `GET /advoware/proxy?endpoint={path}` - Proxy GET requests
|
||
- `POST /advoware/proxy` - Proxy POST requests
|
||
- `PUT /advoware/proxy` - Proxy PUT requests
|
||
- `DELETE /advoware/proxy` - Proxy DELETE requests
|
||
|
||
---
|
||
|
||
## Testing and Debugging
|
||
|
||
### Start System
|
||
|
||
**Production (systemd):**
|
||
```bash
|
||
# Restart services
|
||
sudo systemctl restart motia.service
|
||
sudo systemctl restart iii-console.service
|
||
|
||
# Check status
|
||
sudo systemctl status motia.service
|
||
sudo systemctl status iii-console.service
|
||
|
||
# View real-time logs
|
||
journalctl -u motia.service -f
|
||
journalctl -u iii-console.service -f
|
||
|
||
# Enable auto-start on boot
|
||
sudo systemctl enable motia.service
|
||
sudo systemctl enable iii-console.service
|
||
```
|
||
|
||
**Development (iii-cli):**
|
||
```bash
|
||
# Option 1: Dev mode with integrated console and hot-reload
|
||
cd /opt/motia-iii/bitbylaw
|
||
iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
|
||
|
||
# Option 2: Separate engine and console
|
||
# Terminal 1: Start engine
|
||
iii-cli start -c iii-config.yaml
|
||
|
||
# Terminal 2: Start console
|
||
iii-cli console --host 0.0.0.0 --port 3113 \
|
||
--engine-host 192.168.1.62 --engine-port 3111
|
||
|
||
# Option 3: Manual (legacy)
|
||
/opt/bin/iii -c iii-config.yaml
|
||
```
|
||
|
||
### Check Registered Steps
|
||
|
||
**Using iii-cli (recommended):**
|
||
```bash
|
||
# List all flows and triggers
|
||
iii-cli flow list
|
||
|
||
# Filter for specific step
|
||
iii-cli flow list | grep document_sync
|
||
```
|
||
|
||
**Using curl (legacy):**
|
||
```bash
|
||
curl http://localhost:3111/_console/functions | python3 -m json.tool
|
||
```
|
||
|
||
### Test HTTP Endpoints
|
||
|
||
```bash
|
||
# Test document webhook
|
||
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
|
||
-H "Content-Type: application/json" \
|
||
-d '[{"id": "test123", "entityType": "CDokumente"}]'
|
||
|
||
# Test advoware proxy
|
||
curl "http://localhost:3111/advoware/proxy?endpoint=employees"
|
||
|
||
# Test beteiligte sync
|
||
curl -X POST "http://localhost:3111/vmh/webhook/beteiligte/create" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"entity_type": "CBeteiligte", "entity_id": "abc123", "action": "create"}'
|
||
```
|
||
|
||
### Manually Trigger Cron
|
||
|
||
```bash
|
||
curl -X POST "http://localhost:3111/_console/cron/trigger" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
|
||
```
|
||
|
||
### View and Debug Logs
|
||
|
||
**Using iii-cli (recommended):**
|
||
```bash
|
||
# Live logs (all)
|
||
iii-cli logs -f
|
||
|
||
# Live logs with specific level
|
||
iii-cli logs -f --level error
|
||
iii-cli logs -f --level debug
|
||
|
||
# Filter by component
|
||
iii-cli logs -f | grep "document_sync"
|
||
|
||
# Worker-specific logs
|
||
iii-cli worker logs
|
||
|
||
# Get specific trace
|
||
iii-cli trace <trace-id>
|
||
|
||
# Filter logs by trace ID
|
||
iii-cli logs --trace <trace-id>
|
||
```
|
||
|
||
**Using journalctl (production):**
|
||
```bash
|
||
# Live logs
|
||
journalctl -u motia.service -f
|
||
|
||
# Search for specific step
|
||
journalctl -u motia.service --since "today" | grep -i "document sync"
|
||
|
||
# Show errors only
|
||
journalctl -u motia.service -p err -f
|
||
|
||
# Last 100 lines
|
||
journalctl -u motia.service -n 100
|
||
|
||
# Specific time range
|
||
journalctl -u motia.service --since "2026-03-19 10:00" --until "2026-03-19 11:00"
|
||
```
|
||
|
||
**Using log files (legacy):**
|
||
```bash
|
||
# Check for errors
|
||
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
|
||
|
||
# Follow log file
|
||
tail -f /opt/motia-iii/bitbylaw/iii_new.log
|
||
```
|
||
|
||
### Inspect State and Streams
|
||
|
||
**State Management:**
|
||
```bash
|
||
# List all state keys
|
||
iii-cli state ls
|
||
|
||
# Get specific state value
|
||
iii-cli state get document:sync:last_run
|
||
|
||
# Set state (if needed for testing)
|
||
iii-cli state set test:key "test value"
|
||
|
||
# Delete state
|
||
iii-cli state delete test:key
|
||
```
|
||
|
||
**Stream Management:**
|
||
```bash
|
||
# List all active streams
|
||
iii-cli stream ls
|
||
|
||
# Inspect specific stream
|
||
iii-cli stream info <stream-id>
|
||
|
||
# List consumer groups
|
||
iii-cli stream groups <stream-name>
|
||
```
|
||
|
||
### Debugging Workflow
|
||
|
||
**1. Identify the Issue:**
|
||
```bash
|
||
# Check if step is registered
|
||
iii-cli flow list | grep my_step
|
||
|
||
# View recent errors
|
||
iii-cli logs --level error -n 50
|
||
|
||
# Check service status
|
||
sudo systemctl status motia.service
|
||
```
|
||
|
||
**2. Get Detailed Information:**
|
||
```bash
|
||
# Live tail logs for specific step
|
||
iii-cli logs -f | grep "document_sync"
|
||
|
||
# Check worker processes
|
||
iii-cli worker logs
|
||
|
||
# Inspect state
|
||
iii-cli state ls
|
||
```
|
||
|
||
**3. Test Specific Functionality:**
|
||
```bash
|
||
# Trigger webhook manually
|
||
curl -X POST http://localhost:3111/vmh/webhook/...
|
||
|
||
# Check response and logs
|
||
iii-cli logs -f | grep "webhook"
|
||
|
||
# Verify state changed
|
||
iii-cli state get entity:sync:status
|
||
```
|
||
|
||
**4. Trace Specific Request:**
|
||
```bash
|
||
# Make request, note trace ID from logs
|
||
curl -X POST http://localhost:3111/vmh/webhook/document/create ...
|
||
|
||
# Get full trace
|
||
iii-cli trace <trace-id>
|
||
|
||
# View all logs for this trace
|
||
iii-cli logs --trace <trace-id>
|
||
```
|
||
|
||
### Performance Monitoring
|
||
|
||
**Check System Resources:**
|
||
```bash
|
||
# CPU and memory usage
|
||
htop
|
||
|
||
# Process-specific
|
||
ps aux | grep iii
|
||
|
||
# Redis memory
|
||
redis-cli info memory
|
||
|
||
# File descriptors
|
||
lsof -p $(pgrep -f "iii-cli start")
|
||
```
|
||
|
||
**Check Processing Metrics:**
|
||
```bash
|
||
# Queue lengths (if using Redis streams)
|
||
redis-cli XINFO STREAM vmh:document:sync
|
||
|
||
# Pending messages
|
||
redis-cli XPENDING vmh:document:sync group1
|
||
|
||
# Lock status
|
||
redis-cli KEYS "lock:*"
|
||
```
|
||
|
||
### Common Issues
|
||
|
||
**Step not showing up:**
|
||
1. Check file naming: Must end with `_step.py`
|
||
2. Check for syntax errors: `iii-cli logs --level error`
|
||
3. Check for import errors: `iii-cli logs | grep -i "importerror\|traceback"`
|
||
4. Verify `config` dict is present
|
||
5. Restart: `sudo systemctl restart motia.service` or restart `iii-cli dev`
|
||
6. Verify hot-reload working: Check terminal output in `iii-cli dev`
|
||
|
||
**Redis connection failed:**
|
||
- Check `REDIS_HOST` and `REDIS_PORT` environment variables
|
||
- Verify Redis is running: `redis-cli ping`
|
||
- Check Redis logs: `journalctl -u redis -f`
|
||
- Service will work without Redis but with warnings
|
||
|
||
**Hot-reload not working:**
|
||
- Verify using `iii-cli dev` (not `iii-cli start`)
|
||
- Check `watch` patterns in `iii-config.yaml`
|
||
- Ensure files are in watched directories (`src/**/*.py`)
|
||
- Look for watcher errors: `iii-cli logs | grep -i "watch"`
|
||
|
||
**Handler not triggered:**
|
||
- Verify endpoint registered: `iii-cli flow list`
|
||
- Check HTTP method matches (GET, POST, etc.)
|
||
- Test with curl to isolate issue
|
||
- Check trigger configuration in step's `config` dict
|
||
|
||
**AttributeError '_log' not found:**
|
||
- Ensure service inherits from `BaseSyncUtils` OR
|
||
- Implement `_log()` method manually
|
||
|
||
**Trace not found:**
|
||
- Ensure OpenTelemetry enabled in config
|
||
- Check if trace ID is valid format
|
||
- Use `iii-cli logs` with filters instead
|
||
|
||
**Console not accessible:**
|
||
- Check if console service running: `systemctl status iii-console.service`
|
||
- Verify port not blocked by firewall: `sudo ufw status`
|
||
- Check console logs: `journalctl -u iii-console.service -f`
|
||
- Try accessing via `localhost:3113` instead of public IP
|
||
|
||
---
|
||
|
||
## Key Patterns Summary
|
||
|
||
### ✅ DO
|
||
|
||
- **Always** use `_step.py` suffix for step files
|
||
- **Always** use `ctx.logger` for logging (never `print`)
|
||
- **Always** wrap handlers in try/except with error logging
|
||
- **Always** use visual separators in logs (`"=" * 80`)
|
||
- **Always** return `ApiResponse` from HTTP handlers
|
||
- **Always** document what events a step emits
|
||
- **Always** use distributed locks for sync operations
|
||
- **Always** calculate hashes for change detection
|
||
|
||
### ❌ DON'T
|
||
|
||
- **Don't** use module-level `logger` in steps
|
||
- **Don't** forget `async` on handler functions
|
||
- **Don't** use blocking I/O (use `aiohttp`, not `requests`)
|
||
- **Don't** return values from queue/cron handlers
|
||
- **Don't** hardcode credentials (use environment variables)
|
||
- **Don't** skip lock cleanup in `finally` blocks
|
||
- **Don't** use `print()` for logging
|
||
|
||
---
|
||
|
||
## Module Documentation
|
||
|
||
---
|
||
|
||
## Module Documentation
|
||
|
||
### Steps
|
||
|
||
**Advoware Proxy** ([Module README](../steps/advoware_proxy/README.md))
|
||
- Universal HTTP proxy with HMAC-512 authentication
|
||
- Endpoints: GET, POST, PUT, DELETE
|
||
- Redis token caching
|
||
|
||
**Calendar Sync** ([Module README](../steps/advoware_cal_sync/README.md))
|
||
- Bidirectional Advoware ↔ Google Calendar sync
|
||
- Cron: Every 15 minutes
|
||
- API trigger: `/advoware/calendar/sync`
|
||
|
||
**VMH Integration** ([Module README](../steps/vmh/README.md))
|
||
- EspoCRM ↔ Advoware bidirectional sync
|
||
- Webhooks: Beteiligte, Bankverbindungen, Documents
|
||
- xAI Collections integration for documents
|
||
|
||
### Services
|
||
|
||
| Service | Purpose | Config |
|
||
|---------|---------|--------|
|
||
| `xai_service.py` | xAI file uploads & collections | `XAI_API_KEY`, `XAI_MANAGEMENT_KEY` |
|
||
| `espocrm.py` | EspoCRM REST API client | `ESPOCRM_API_BASE_URL`, `ESPOCRM_API_KEY` |
|
||
| `advoware_service.py` | Advoware API with HMAC auth | `ADVOWARE_API_KEY`, `ADVOWARE_API_SECRET` |
|
||
| `document_sync_utils.py` | Document sync logic | Redis connection |
|
||
| `beteiligte_sync_utils.py` | Beteiligte sync logic | Redis connection |
|
||
| `sync_utils_base.py` | Base class for sync utils | - |
|
||
|
||
---
|
||
|
||
## Quick Reference
|
||
|
||
### Environment Variables
|
||
|
||
**Required:**
|
||
```bash
|
||
# EspoCRM
|
||
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
|
||
ESPOCRM_API_KEY=your-key
|
||
|
||
# Advoware
|
||
ADVOWARE_API_BASE_URL=https://api.advoware.de
|
||
ADVOWARE_API_KEY=your-key
|
||
ADVOWARE_API_SECRET=your-secret
|
||
|
||
# xAI
|
||
XAI_API_KEY=xai-...
|
||
XAI_MANAGEMENT_KEY=xai-token-...
|
||
|
||
# Redis
|
||
REDIS_HOST=localhost
|
||
REDIS_PORT=6379
|
||
REDIS_DB_ADVOWARE_CACHE=1
|
||
```
|
||
|
||
**Optional:**
|
||
```bash
|
||
ESPOCRM_API_TIMEOUT_SECONDS=30
|
||
ESPOCRM_METADATA_TTL_SECONDS=300
|
||
```
|
||
|
||
### File Structure
|
||
|
||
```
|
||
bitbylaw/
|
||
├── iii-config.yaml # Motia III configuration
|
||
├── pyproject.toml # Python dependencies (uv)
|
||
├── steps/ # Business logic
|
||
│ ├── advoware_proxy/
|
||
│ ├── advoware_cal_sync/
|
||
│ └── vmh/
|
||
│ ├── webhook/ # HTTP webhook handlers
|
||
│ │ ├── *_create_api_step.py
|
||
│ │ ├── *_update_api_step.py
|
||
│ │ └── *_delete_api_step.py
|
||
│ ├── *_sync_event_step.py # Queue event handlers
|
||
│ └── *_sync_cron_step.py # Scheduled jobs
|
||
├── services/ # Shared services
|
||
│ ├── xai_service.py
|
||
│ ├── espocrm.py
|
||
│ ├── advoware_service.py
|
||
│ ├── *_sync_utils.py
|
||
│ ├── sync_utils_base.py
|
||
│ ├── logging_utils.py
|
||
│ └── exceptions.py
|
||
├── docs/ # Documentation
|
||
│ ├── INDEX.md # This file
|
||
│ ├── ARCHITECTURE.md
|
||
│ └── DOCUMENT_SYNC_XAI_STATUS.md
|
||
└── tests/ # Test scripts
|
||
└── test_xai_collections_api.py
|
||
```
|
||
|
||
### Motia III vs Old Motia
|
||
|
||
| Old Motia v0.17 | Motia III v1.0-RC |
|
||
|-----------------|-------------------|
|
||
| `type: 'api'` | `triggers: [http()]` |
|
||
| `type: 'event'` | `triggers: [queue()]` |
|
||
| `type: 'cron'` | `triggers: [cron()]` |
|
||
| `context.emit()` | `ctx.enqueue()` |
|
||
| `emits: [...]` | `enqueues: [...]` |
|
||
| `subscribes: [...]` | `triggers: [queue('topic')]` |
|
||
| 5-field cron | 6-field cron (seconds first) |
|
||
| `context.logger` | `ctx.logger` |
|
||
| Motia Workbench | iii Console |
|
||
| Node.js + Python | Pure Python |
|
||
|
||
### Cron Syntax
|
||
|
||
**6 fields (Motia III):** `second minute hour day month weekday`
|
||
|
||
```
|
||
0 */15 * * * * # Every 15 minutes
|
||
0 0 */6 * * * # Every 6 hours
|
||
0 0 2 * * * # Daily at 2 AM
|
||
0 30 9 * * 1-5 # Monday-Friday at 9:30 AM
|
||
```
|
||
|
||
---
|
||
|
||
## Additional Resources
|
||
|
||
### Documentation
|
||
- [MIGRATION_GUIDE.md](../MIGRATION_GUIDE.md) - v0.17 → v1.0 migration
|
||
- [MIGRATION_STATUS.md](../MIGRATION_STATUS.md) - Migration progress
|
||
- [ARCHITECTURE.md](ARCHITECTURE.md) - System design
|
||
- [DOCUMENT_SYNC_XAI_STATUS.md](DOCUMENT_SYNC_XAI_STATUS.md) - xAI integration details
|
||
|
||
### External Resources
|
||
- [Motia III Documentation](https://iii.dev)
|
||
- [Python SDK](https://pypi.org/project/motia/)
|
||
- [xAI API Docs](https://docs.x.ai/)
|
||
- [EspoCRM API](https://docs.espocrm.com/development/api/)
|
||
- [Redis Documentation](https://redis.io/documentation)
|
||
|
||
### Support & Troubleshooting
|
||
|
||
| Issue | Solution |
|
||
|-------|----------|
|
||
| Step not registered | Check `_step.py` suffix, restart: `sudo systemctl restart motia.service` |
|
||
| Import errors | Check logs: `journalctl -u motia.service \| grep -i importerror` |
|
||
| Code changes not applied | Auto-reload should work, force restart: `sudo systemctl restart motia.service` |
|
||
| ApiResponse validation | Use `status=200` not `status_code=200` |
|
||
| Redis unavailable | Service works with warnings, check `REDIS_HOST` |
|
||
| `'_log' not found` | Inherit from `BaseSyncUtils` or implement `_log()` method |
|
||
| Webhook not triggering | Verify endpoint: `curl http://localhost:3111/_console/functions` |
|
||
| xAI upload fails | Set `XAI_API_KEY` and `XAI_MANAGEMENT_KEY` in `/etc/systemd/system/motia.service` |
|
||
| View logs | `journalctl -u motia.service -f` (follow) or `--since "10 minutes ago"` |
|
||
|
||
---
|
||
|
||
**Last Updated:** 2026-03-08
|
||
**Migration Status:** ✅ Complete
|
||
**xAI Integration:** ✅ Implemented
|