Files
motia-iii/docs/INDEX.md

1529 lines
46 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# BitByLaw Motia III - Developer Guide
> **For AI Assistants**: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
**Quick Navigation:**
- [Core Concepts](#core-concepts) - System architecture and patterns
- [Design Principles](#design-principles) - Event Storm & Bidirectional References
- [Step Development](#step-development-best-practices) - How to create new steps
- [Services](#service-layer-patterns) - Reusable business logic
- [Integrations](#external-integrations) - xAI, EspoCRM, Advoware
- [Testing & Debugging](#testing-and-debugging)
---
## Project Status
**Migration:** ✅ 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)
**New to the project?** Start here:
1. [README.md](../README.md) - Project Overview & Quick Start
2. [MIGRATION_GUIDE.md](../MIGRATION_GUIDE.md) - Complete migration patterns
3. [ARCHITECTURE.md](ARCHITECTURE.md) - System design and architecture
---
## Core Concepts
### System Overview
**Architecture:**
- **Pure Python** (Motia III)
- **Event-Driven** with queue-based async processing
- **Redis-backed** distributed locking and caching
- **REST APIs** for webhooks and proxies
**Key Components:**
1. **Steps** (`steps/`) - Business logic handlers (HTTP, Queue, Cron)
2. **Services** (`services/`) - Shared API clients and utilities
3. **Configuration** (`iii-config.yaml`) - System setup
**Data Flow:**
```
Webhook (HTTP) → Queue Event → Event Handler → External APIs
↓ ↓
Redis Lock Service Layer
(EspoCRM, Advoware, xAI)
```
---
## Design Principles
### Event Storm over Lock Coordination
**Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"**
```
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Multiple redundant events over coordination ║
║ ✅ Idempotent handlers over distributed locks ║
║ ✅ Eventually consistent over perfectly synced ║
║ ✅ Simple duplication over complex orchestration ║
╚═══════════════════════════════════════════════════════╝
```
**Guidelines:**
1. **Fire events liberally** - 10 redundant events are cheaper than complex coordination
2. **Make handlers idempotent** - Early returns when nothing to do
3. **Sequential per entity, parallel across entities** - Lock prevents collisions, not updates
4. **Accept event storms** - Handlers queue up, Motia retries automatically
**Lock Strategy: Step + Input Values**
```
Lock Key Format: {step_name}:{entity_type}:{entity_id}
Examples:
- document_sync:CDokumente:doc123
- beteiligte_sync:CBeteiligte:bet456
- collection_create:Account:acc789
Rules:
✅ Parallel: sync document:A + sync document:B (different IDs)
❌ Blocked: sync document:A + sync document:A (same ID)
Lock Acquisition:
├─ Try acquire lock
├─ Success → Process → Release in finally block
└─ Fail (already locked) → Raise exception → Motia retries later
Lock Configuration:
- TTL: 30 minutes (auto-release on timeout)
- Robust release: Always in finally block
- Dead lock prevention: TTL ensures eventual recovery
```
**Handler Pattern:**
```python
async def handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Try acquire lock
lock_key = f"document_sync:{entity_type}:{entity_id}"
lock_acquired = await acquire_lock(lock_key, ttl=1800) # 30 minutes
if not lock_acquired:
# Raise exception → Motia will retry
raise RuntimeError(f"⏸️ Lock busy for {lock_key}, retry later")
try:
# 2. Do work (only if lock acquired)
await sync_document(entity_id)
finally:
# 3. ALWAYS release lock (robust cleanup)
await release_lock(lock_key)
```
**Retry Behavior:**
- Lock fail → Handler raises exception
- Motia Queue catches exception → Schedules retry
- Retry after configured delay (exponential backoff)
- Eventually lock available → Handler processes successfully
Check: fileStatus="synced", xaiSyncStatus="clean"
→ Early return (nothing to do) ✅
Result: Second event processed but no duplicate work!
```
**Why This Works:**
- **Lock prevents chaos**: No parallel file uploads for same entity
- **Queue enables updates**: New changes processed sequentially
- **Idempotency prevents waste**: Redundant events → cheap early returns
- **Parallel scaling**: Different entities process simultaneously
**Practical Example: Entity Link Event**
```
User links Document ↔ Räumungsklage
Webhooks fire:
├─ POST /vmh/webhook/entity/link
└─ Emits: raeumungsklage.update, cdokumente.update
Handlers (parallel, different entities):
├─ Räumungsklage Handler
│ ├─ Lock: raeumungsklage:abc123
│ ├─ Creates xAI Collection (if missing)
│ └─ Fires: cdokumente.update (for all linked docs) ← Event Storm!
└─ Document Handler (may run 2-3x on same doc)
├─ Lock: document:doc456 (sequential processing)
├─ Run 1: Collections not ready → Skip (cheap return)
├─ Run 2: Collection ready → Upload & sync
└─ Run 3: Already synced → Early return (idempotent!)
```
---
### Steps vs. Utils: Parallel vs. Sequential
**Separation Principle: "Events for Parallelism, Functions for Composition"**
```
╔═══════════════════════════════════════════════════════╗
║ Choose Architecture by Execution Model: ║
║ ║
║ ✅ Separate Steps → When parallel possible ║
║ → Communicate via Events ║
║ → Own lock scope, independent retry ║
║ ║
║ ✅ Shared Utils → When sequential required ║
║ → Call as function (need return values) ║
║ → Reusable across multiple steps ║
╚═══════════════════════════════════════════════════════╝
```
**Decision Matrix:**
| Requirement | Architecture | Communication | Example |
|------------|--------------|---------------|---------|
| **Can run in parallel** | Separate Steps | Events | Document sync + Collection create |
| **Needs return value** | Utils function | Function call | `get_or_create_collection()` |
| **Reusable logic** | Utils function | Import + call | `should_sync_to_xai()` |
| **One-time handler** | Inline in Step | N/A | Event-specific parsing |
**Examples:**
```python
# ✅ GOOD: Parallel → Separate Steps + Events
# steps/collection_manager_step.py
async def handle_entity_update(event_data, ctx):
"""Creates xAI Collection for entity"""
collection_id = await create_collection(entity_type, entity_id)
# Fire events for all linked documents (parallel processing!)
for doc in linked_docs:
await ctx.emit("cdokumente.update", {"entity_id": doc.id})
# steps/document_sync_step.py
async def handle_document_update(event_data, ctx):
"""Syncs document to xAI"""
# Runs in parallel with collection_manager_step!
await sync_document_to_xai(entity_id)
# ✅ GOOD: Sequential + Reusable → Utils
# services/document_sync_utils.py
async def get_required_collections(entity_id):
"""Reusable function, needs return value"""
return await _scan_entity_relations(entity_id)
# ❌ BAD: Sequential logic in Step (not reusable)
async def handle_document_update(event_data, ctx):
# Inline function call - hard to test and reuse
collection_id = await create_collection_inline(...)
await upload_file(collection_id, ...) # Needs collection_id from above!
```
**Guidelines:**
1. **Default to Steps + Events** - Maximize parallelism
2. **Use Utils when:**
- Need immediate return value
- Logic reused across 2+ steps
- Complex computation (not I/O bound)
3. **Keep Steps thin** - Mostly orchestration + event emission
4. **Keep Utils testable** - Pure functions, no event emission
**Code Organization:**
```
steps/
├─ document_sync_event_step.py # Event handler (thin)
├─ collection_manager_step.py # Event handler (thin)
└─ vmh/
└─ entity_link_webhook_step.py # Webhook handler (thin)
services/
├─ document_sync_utils.py # Reusable functions
├─ xai_service.py # API client
└─ espocrm.py # CRM client
```
---
### Bidirectional Reference Pattern
**Principle: Every sync maintains references on both sides**
**EspoCRM as Central Hub:**
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Advoware │◄────────┤ EspoCRM ├────────►│ xAI │
│ │ │ │ │ │
│ Akte 12345 │ │ Entity │ │ Collection │
└─────────────┘ │ - advowareId │ └─────────────┘
│ - xaiColId │
└──────────────┘
```
**Implementation:**
- **xAI Collection** → stores `entityType` + `entityId` in metadata
- **EspoCRM Entity** → stores `xaiCollectionId` field
- **EspoCRM Document** → stores `xaiFileId` + `xaiCollections[]` fields
- **Advoware Integration** → stores `advowareAkteId` in EspoCRM
**Benefits:**
- Bidirectional navigation without complex queries
- Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
- Idempotent lookups (can verify both directions)
- Debugging: Always know where things came from
- **Single Source of Truth**: EspoCRM is authoritative for relationships
**Example: Collection Creation**
```python
# xAI Collection
collection = await xai.create_collection(
name="CBeteiligte - Max Mustermann",
metadata={
"espocrm_entity_type": "CBeteiligte",
"espocrm_entity_id": "abc123",
"created_at": "2026-03-08T19:00:00Z"
}
)
# EspoCRM Entity
await espocrm.update_entity("CBeteiligte", "abc123", {
"xaiCollectionId": collection.id
})
```
---
### Error Handling & Resilience
**Principle: "Retry Smart, Fail Visible, Recover When Possible"**
```
╔═══════════════════════════════════════════════════════╗
║ Error Handling Strategy (in priority order): ║
║ ║
║ 1⃣ Idempotent Check → Skip if already done ║
║ 2⃣ Step-internal retry with exponential backoff ║
║ 3⃣ Rollback/Compensation (when possible) ║
║ 4⃣ Log failure clearly → Let Motia retry ║
║ 5⃣ Manual intervention (for unrecoverable failures) ║
╚═══════════════════════════════════════════════════════╝
```
#### Idempotency First
**Always check if work already done before starting:**
```python
async def sync_document_handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Load current state
doc = await espocrm.get_entity('CDokumente', entity_id)
# 2. Check if already synced (idempotent check)
if doc.get('xaiSyncStatus') == 'clean':
current_md5 = doc.get('md5sum')
synced_md5 = doc.get('xaiSyncedHash')
if current_md5 == synced_md5:
ctx.logger.info(f"✅ Already synced, skipping (hash match)")
return # Nothing to do
# 3. Check if file already uploaded to xAI
xai_file_id = doc.get('xaiId')
if xai_file_id:
# Verify file exists in xAI
file_exists = await xai.file_exists(xai_file_id)
if file_exists:
ctx.logger.info(f"✅ File already in xAI: {xai_file_id}")
# Only update collections, not re-upload
# 4. Proceed with sync...
```
#### Retry with Exponential Backoff
**For transient failures (network, rate limits):**
```python
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def retry_with_backoff(
operation: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
ctx = None
) -> T:
"""
Retry operation with exponential backoff.
Args:
operation: Async function to retry
max_attempts: Maximum retry attempts (not endless!)
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
"""
for attempt in range(1, max_attempts + 1):
try:
return await operation()
except Exception as e:
if attempt == max_attempts:
# Log and re-raise on final attempt
if ctx:
ctx.logger.error(
f"❌ Operation failed after {max_attempts} attempts: {e}"
)
raise
# Calculate backoff delay
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if ctx:
ctx.logger.warn(
f"⚠️ Attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
# Usage
file_id = await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=3,
ctx=ctx
)
```
#### Rollback/Compensation
**When operations fail mid-way, clean up what was created:**
```python
async def sync_document_with_rollback(entity_id, ctx):
xai_file_id = None
collection_created = False
try:
# 1. Upload file to xAI
xai_file_id = await xai.upload_file(content, filename)
ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}")
# 2. Create collection if needed
collection_id = await xai.create_collection(name, metadata)
collection_created = True
ctx.logger.info(f"✅ Created collection: {collection_id}")
# 3. Add file to collection
await xai.add_to_collection(collection_id, xai_file_id)
# 4. Update EspoCRM (critical - if this fails, we have orphans)
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': xai_file_id,
'xaiCollections': [collection_id],
'xaiSyncStatus': 'clean'
})
except Exception as e:
ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}")
# ROLLBACK: Remove what we created
try:
if xai_file_id:
# Note: Only remove from collections, not delete file
# (file may be referenced elsewhere)
ctx.logger.info(f"⚠️ Leaving orphaned file in xAI: {xai_file_id}")
ctx.logger.info(f"⚠️ Manual cleanup may be required")
# Update EspoCRM to reflect failure
await espocrm.update_entity('CDokumente', entity_id, {
'xaiSyncStatus': 'failed',
'xaiSyncError': str(e)
})
except Exception as rollback_error:
ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}")
ctx.logger.error(f"⚠️ REQUIRES MANUAL INTERVENTION")
ctx.logger.error(f"⚠️ Entity: {entity_id}, xaiId: {xai_file_id}")
# Re-raise original exception for Motia retry
raise
```
**Reality Check:**
- **Rollback not always possible** (e.g., Advoware API, third-party services)
- **Idempotency >> Rollback** - Better to handle re-execution than undo
- **Log orphans clearly** - Enable manual cleanup when needed
#### Clear Failure Logging
**When rollback impossible, log enough details for manual intervention:**
```python
try:
await external_api.create_resource(data)
except Exception as e:
# Log all context needed for manual recovery
ctx.logger.error("=" * 80)
ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Entity Type: CDokumente")
ctx.logger.error(f"Entity ID: {entity_id}")
ctx.logger.error(f"Operation: Create Advoware Document")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}")
ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
# Mark as failed in EspoCRM
await espocrm.update_entity('CDokumente', entity_id, {
'syncStatus': 'failed',
'syncError': str(e),
'syncFailedAt': datetime.now().isoformat()
})
# Re-raise for Motia (will retry later)
raise
```
#### Rate Limiting (xAI)
**xAI has rate limits - handle with backoff:**
```python
from aiohttp import ClientResponseError
async def upload_with_rate_limit_handling(xai, content, filename, ctx):
"""Upload file to xAI with rate limit handling."""
try:
return await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=5, # More attempts for rate limits
base_delay=2.0, # Longer initial delay
max_delay=60.0, # Up to 1 minute backoff
ctx=ctx
)
except ClientResponseError as e:
if e.status == 429: # Rate limit exceeded
ctx.logger.error(f"❌ xAI rate limit exceeded")
# Check Retry-After header
retry_after = e.headers.get('Retry-After', '60')
ctx.logger.error(f"⏰ Retry after {retry_after} seconds")
# Wait and try once more
await asyncio.sleep(int(retry_after))
return await xai.upload_file(content, filename)
raise
```
**Rate Limit Strategy:**
- **xAI**: Exponential backoff + respect `Retry-After` header
- **EspoCRM**: Internal system, no rate limits
- **Advoware**: No known rate limits (backoff on errors)
#### Input Validation
**Always validate webhook payloads and external data:**
```python
from typing import Dict, Any, List
def validate_webhook_payload(
payload: Any,
required_fields: List[str],
ctx
) -> Dict[str, Any]:
"""
Validate webhook payload structure.
Raises:
ValueError: If validation fails
"""
# Check payload is dict
if not isinstance(payload, dict):
raise ValueError(f"Payload must be dict, got {type(payload)}")
# Check required fields
missing = [f for f in required_fields if f not in payload]
if missing:
raise ValueError(f"Missing required fields: {missing}")
# Validate entity_id format (UUID)
entity_id = payload.get('entity_id', '')
if not entity_id or len(entity_id) < 10:
raise ValueError(f"Invalid entity_id: {entity_id}")
ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}")
return payload
# Usage in handler
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
try:
# Validate input
payload = validate_webhook_payload(
request.body,
required_fields=['entity_id', 'entity_type', 'action'],
ctx=ctx
)
# Process...
except ValueError as e:
ctx.logger.error(f"❌ Invalid payload: {e}")
return ApiResponse(status_code=400, body={'error': str(e)})
```
**Validation Checklist:**
- ✅ Type checking (dict, list, str)
- ✅ Required fields present
- ✅ Format validation (IDs, dates, enums)
- ✅ Length limits (prevent DoS)
- ✅ Enum validation (status values)
#### Environment Configuration
**Always use environment variables, validate on startup:**
```python
import os
from typing import Optional
class ServiceConfig:
"""Validated configuration from environment."""
def __init__(self):
# Required
self.xai_api_key = self._require_env('XAI_API_KEY')
self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL')
# Optional with defaults
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800')) # 30min
def _require_env(self, key: str) -> str:
"""Get required env var or raise."""
value = os.getenv(key)
if not value:
raise ValueError(f"Required environment variable not set: {key}")
return value
# Usage
config = ServiceConfig() # Fails fast on startup if misconfigured
```
**Notes:**
- Collections lifecycle (creation/deletion) managed by EspoCRM entity status
- Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere)
- Lock TTL: 30 minutes (forced release on timeout)
---
### Type Safety & Explicit Contracts
**Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"**
```
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Enums over string literals ║
║ ✅ Type hints over duck typing ║
║ ✅ Named constants over magic numbers ║
║ ✅ Central definitions over scattered duplicates ║
╚═══════════════════════════════════════════════════════╝
```
#### Status Enums
**Define all valid status values centrally:**
```python
# services/models.py
from enum import Enum
class FileStatus(str, Enum):
"""Valid values for CDokumente.fileStatus field."""
NEW = "new"
CHANGED = "changed"
SYNCED = "synced"
def __str__(self) -> str:
return self.value
class XAISyncStatus(str, Enum):
"""Valid values for CDokumente.xaiSyncStatus field."""
NO_SYNC = "no_sync" # Entity has no xAI collections
PENDING_SYNC = "pending_sync" # Sync in progress (locked)
CLEAN = "clean" # Synced successfully
UNCLEAN = "unclean" # Needs re-sync (file changed)
FAILED = "failed" # Sync failed (see xaiSyncError)
def __str__(self) -> str:
return self.value
class EntityAction(str, Enum):
"""Valid webhook actions."""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
LINK = "link"
UNLINK = "unlink"
def __str__(self) -> str:
return self.value
class EntityType(str, Enum):
"""Supported EspoCRM entity types."""
DOKUMENTE = "CDokumente"
BETEILIGTE = "CBeteiligte"
RAEUMUNGSKLAGE = "Raeumungsklage"
ACCOUNT = "Account"
def __str__(self) -> str:
return self.value
```
#### Usage in Code
**Before (Magic Strings):**
```python
# ❌ BAD: Typos not caught at runtime
if doc['xaiSyncStatus'] == 'claen': # Typo!
pass
if doc['fileStatus'] == 'synced':
# No IDE autocomplete, no type safety
pass
# ❌ BAD: Scattered definitions
status = 'pending_sync' # What are ALL valid values?
```
**After (Type-Safe Enums):**
```python
# ✅ GOOD: Type-safe, autocomplete, refactorable
from services.models import FileStatus, XAISyncStatus
if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN:
# IDE autocomplete works
# Typos caught by linter
# Easy to find all usages
pass
# Update status
await espocrm.update_entity('CDokumente', entity_id, {
'fileStatus': FileStatus.SYNCED,
'xaiSyncStatus': XAISyncStatus.CLEAN
})
# Enum validation
def validate_status(status: str) -> XAISyncStatus:
"""Validate and convert status string."""
try:
return XAISyncStatus(status)
except ValueError:
valid = [s.value for s in XAISyncStatus]
raise ValueError(f"Invalid status '{status}'. Valid: {valid}")
```
#### Benefits
**Why Enums Matter:**
1. **Prevent Typos**: `XAISyncStatus.CLAEN` → AttributeError at import time
2. **IDE Support**: Autocomplete shows all valid values
3. **Refactoring**: Rename in one place, affects entire codebase
4. **Documentation**: Single source of truth for valid values
5. **Type Safety**: `mypy` can catch invalid assignments
6. **Debugging**: Easy to find all usages with "Find References"
**Example - Easy Refactoring:**
```python
# Change enum value in one place:
class FileStatus(str, Enum):
NEW = "new"
CHANGED = "changed"
SYNCED = "synchronized" # Changed!
# All usages automatically updated (string value changes):
FileStatus.SYNCED # Returns "synchronized" everywhere
```
#### Type Hints
**Always use type hints for function signatures:**
```python
from typing import Dict, List, Optional, Tuple
from services.models import FileStatus, XAISyncStatus
# ✅ GOOD: Clear contracts
async def sync_document(
entity_id: str,
ctx: FlowContext
) -> Tuple[bool, Optional[str]]:
"""
Sync document to xAI.
Args:
entity_id: EspoCRM document ID
ctx: Motia flow context
Returns:
(success: bool, xai_file_id: Optional[str])
"""
pass
async def should_sync_to_xai(
doc: Dict[str, Any]
) -> Tuple[bool, str]:
"""
Check if document needs xAI sync.
Returns:
(should_sync: bool, reason: str)
"""
status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync'))
if status == XAISyncStatus.CLEAN:
return (False, "Already synced")
return (True, "Sync required")
```
#### Constants
**Define magic values as named constants:**
```python
# ✅ GOOD: Central configuration
# services/config.py
# Lock TTLs
LOCK_TTL_DOCUMENT_SYNC = 1800 # 30 minutes
LOCK_TTL_COLLECTION_CREATE = 300 # 5 minutes
# Retry configuration
RETRY_MAX_ATTEMPTS = 3
RETRY_BASE_DELAY = 1.0
RETRY_MAX_DELAY = 30.0
# xAI rate limits
XAI_RATE_LIMIT_RETRY_AFTER = 60 # Default retry delay
XAI_MAX_FILE_SIZE_MB = 512
# EspoCRM pagination
ESPOCRM_DEFAULT_PAGE_SIZE = 50
ESPOCRM_MAX_PAGE_SIZE = 200
# Usage
from services.config import LOCK_TTL_DOCUMENT_SYNC
lock_acquired = await acquire_lock(
lock_key,
ttl=LOCK_TTL_DOCUMENT_SYNC # Clear, refactorable
)
```
---
## Step Development Best Practices
### File Naming Convention
**CRITICAL: Always use `_step.py` suffix!**
```
✅ CORRECT:
steps/vmh/webhook/document_create_api_step.py
steps/vmh/document_sync_event_step.py
steps/vmh/beteiligte_sync_cron_step.py
❌ WRONG:
steps/vmh/document_handler.py # Missing _step.py
steps/vmh/sync.py # Missing _step.py
```
**Naming Pattern:**
- **Webhooks**: `{entity}_{action}_api_step.py`
- Examples: `beteiligte_create_api_step.py`, `document_update_api_step.py`
- **Event Handlers**: `{entity}_sync_event_step.py`
- Examples: `document_sync_event_step.py`, `beteiligte_sync_event_step.py`
- **Cron Jobs**: `{entity}_sync_cron_step.py`
- Examples: `beteiligte_sync_cron_step.py`, `calendar_sync_cron_step.py`
### Step Template
**Complete step template with all required patterns:**
```python
"""Module-level docstring describing the step's purpose"""
from typing import Dict, Any
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse
config = {
"name": "Clear Human-Readable Name",
"description": "Brief description of what this step does",
"flows": ["flow-name"], # Logical grouping
"triggers": [
# Pick ONE trigger type:
http("POST", "/path/to/endpoint"), # For webhooks
# queue("topic.name"), # For event handlers
# cron("0 */15 * * * *"), # For scheduled jobs
],
"enqueues": ["next.topic"], # Topics this step emits (optional)
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Handler docstring explaining:
- What triggers this handler
- What it does
- What events it emits
"""
try:
# 1. Log entry
ctx.logger.info("=")="=" * 80)
ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
ctx.logger.info("=" * 80)
# 2. Extract and validate input
payload = request.body
# 3. Business logic
# ...
# 4. Enqueue events if needed
await ctx.enqueue({
'topic': 'next.step',
'data': {
'entity_id': entity_id,
'action': 'create'
}
})
# 5. Log success
ctx.logger.info("✅ Step completed successfully")
# 6. Return response
return ApiResponse(
status_code=200,
body={'success': True}
)
except Exception as e:
# Always log errors with context
ctx.logger.error(f"❌ Error in {config['name']}: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={'success': False, 'error': str(e)}
)
```
### Handler Signatures by Trigger Type
**HTTP Trigger (Webhooks, APIs):**
```python
from motia import ApiRequest, ApiResponse, FlowContext
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
# Access: request.body, request.query_params, request.path_params
return ApiResponse(status_code=200, body={...})
```
**Queue Trigger (Event Handlers):**
```python
from motia import FlowContext
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
# Process event_data
# No return value
pass
```
**Cron Trigger (Scheduled Jobs):**
```python
from motia import FlowContext
async def handler(input_data: None, ctx: FlowContext) -> None:
# Cron jobs receive no input
# No return value
pass
```
### Logging Best Practices
**ALWAYS use `ctx.logger`, NEVER use `print()` or module-level `logger`:**
```python
# ✅ CORRECT: Context-aware logging
ctx.logger.info("Processing started")
ctx.logger.debug(f"Data: {data}")
ctx.logger.warn("Skipping invalid entry")
ctx.logger.error(f"Failed: {e}")
# ❌ WRONG: Direct print or module logger
print("Processing started") # Not visible in iii logs
logger.info("Processing started") # Loses context
```
**Log Levels:**
- `info()` - Normal operations (start, success, counts)
- `debug()` - Detailed data dumps (payloads, responses)
- `warn()` - Non-critical issues (skipped items, fallbacks)
- `error()` - Failures requiring attention
**Log Structure:**
```python
# Section headers with visual separators
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 SYNC HANDLER STARTED")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)
# Use emojis for visual scanning
ctx.logger.info("📥 Downloading file...")
ctx.logger.info("✅ Downloaded 1024 bytes")
ctx.logger.error("❌ Upload failed")
ctx.logger.warn("⚠️ No collections found")
```
### Event Topics Naming
**Pattern:** `{module}.{entity}.{action}`
```
✅ Examples:
vmh.document.create
vmh.document.update
vmh.document.delete
vmh.beteiligte.create
calendar.sync.employee
advoware.proxy.response
❌ Avoid:
document-create # Use dots, not dashes
DocumentCreate # Use lowercase
create_document # Wrong order
```
---
## Service Layer Patterns
### Service Base Class Pattern
**All services should follow this pattern:**
```python
"""Service docstring"""
import logging
from typing import Optional
from services.logging_utils import get_service_logger
logger = logging.getLogger(__name__)
class MyService:
"""Service for interacting with External API"""
def __init__(self, context=None):
"""
Initialize service.
Args:
context: Optional Motia FlowContext for logging
"""
self.context = context
self.logger = get_service_logger('my_service', context)
self._session = None
# Load config from env
self.api_key = os.getenv('MY_API_KEY', '')
if not self.api_key:
raise ValueError("MY_API_KEY not configured")
self.logger.info("MyService initialized")
def _log(self, message: str, level: str = 'info') -> None:
"""Internal logging helper"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(message)
async def _get_session(self):
"""Lazy session initialization"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self) -> None:
"""Cleanup resources"""
if self._session and not self._session.closed:
await self._session.close()
```
### Sync Utilities Pattern
**For bidirectional sync operations, inherit from `BaseSyncUtils`:**
```python
from services.sync_utils_base import BaseSyncUtils
class MyEntitySync(BaseSyncUtils):
"""Sync utilities for MyEntity"""
def _get_lock_key(self, entity_id: str) -> str:
"""Required: Define lock key pattern"""
return f"sync_lock:myentity:{entity_id}"
async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
"""
Decide if sync is needed.
Returns:
(needs_sync: bool, reason: str)
"""
# Implementation
pass
```
**Base class provides:**
- `_log()` - Context-aware logging
- `_acquire_redis_lock()` - Distributed locking
- `_release_redis_lock()` - Lock cleanup
- `self.espocrm` - EspoCRM API client
- `self.redis` - Redis client
- `self.context` - Motia context
- `self.logger` - Integration logger
---
## External Integrations
### xAI Collections Integration
**Status:** ✅ Fully Implemented
**Service:** `services/xai_service.py`
**Environment Variables:**
```bash
XAI_API_KEY=xai-... # For file uploads (api.x.ai)
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)
```
**Usage Example:**
```python
from services.xai_service import XAIService
xai = XAIService(ctx)
# Upload file
file_id = await xai.upload_file(
file_content=bytes_data,
filename="document.pdf",
mime_type="application/pdf"
)
# Add to collection
await xai.add_to_collection("collection_id", file_id)
# Add to multiple collections
added = await xai.add_to_collections(["col1", "col2"], file_id)
# Remove from collection
await xai.remove_from_collection("collection_id", file_id)
```
**Architecture:**
- Files are uploaded ONCE to Files API (`api.x.ai/v1/files`)
- Same `file_id` can be added to MULTIPLE collections
- Removing from collection does NOT delete the file (may be used elsewhere)
- Hash-based change detection prevents unnecessary reuploads
**Document Sync Flow:**
```
1. EspoCRM Webhook → vmh.document.{create|update|delete}
2. Document Sync Handler:
a. Acquire distributed lock (prevents duplicate syncs)
b. Load document from EspoCRM
c. Check if sync needed:
- dateiStatus = "Neu" or "Geändert" → SYNC
- Hash changed → SYNC
- Entity has xAI collections → SYNC
d. Download file from EspoCRM
e. Calculate MD5 hash
f. Upload to xAI (or reuse existing file_id)
g. Add to required collections
h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
i. Release lock
3. Delete: Remove from collections (keep file)
```
**EspoCRM Fields (CDokumente):**
```python
{
'xaiId': 'file-abc123', # xAI file_id
'xaiCollections': ['col1', 'col2'], # Array of collection IDs
'xaiSyncedHash': 'abc123def456', # MD5 at last sync
'fileStatus': 'synced', # Status: new, changed, synced
'xaiSyncStatus': 'clean', # Sync state: no_sync, pending_sync, clean, unclean, failed
'md5sum': 'abc123def456', # Current file hash
'sha256': 'def456...', # SHA-256 (optional)
}
```
### EspoCRM Integration
**Service:** `services/espocrm.py`
**Environment Variables:**
```bash
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-api-key
ESPOCRM_API_TIMEOUT_SECONDS=30
```
**Usage:**
```python
from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx)
# Get entity
entity = await espocrm.get_entity('CDokumente', entity_id)
# Update entity
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': file_id,
'fileStatus': 'synced'
})
# List entities
result = await espocrm.list_entities(
'CDokumente',
where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
select='id,name,fileStatus',
max_size=50
)
# Download attachment
file_bytes = await espocrm.download_attachment(attachment_id)
```
### Advoware Integration
**Service:** `services/advoware_service.py`
**Authentication:** HMAC-512 with token caching
**Environment Variables:**
```bash
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
REDIS_HOST=localhost
REDIS_PORT=6379
```
**Proxy Endpoints:**
- `GET /advoware/proxy?endpoint={path}` - Proxy GET requests
- `POST /advoware/proxy` - Proxy POST requests
- `PUT /advoware/proxy` - Proxy PUT requests
- `DELETE /advoware/proxy` - Proxy DELETE requests
---
## Testing and Debugging
### Start System
**Production (systemd):**
```bash
# Restart services
sudo systemctl restart motia.service
sudo systemctl restart iii-console.service
# Check status
sudo systemctl status motia.service
sudo systemctl status iii-console.service
# View real-time logs
journalctl -u motia.service -f
journalctl -u iii-console.service -f
# Enable auto-start on boot
sudo systemctl enable motia.service
sudo systemctl enable iii-console.service
```
**Manual (Development):**
```bash
# Start iii Engine
cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml
# Start iii Console (Web UI)
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
--engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114
```
### Check Registered Steps
```bash
curl http://localhost:3111/_console/functions | python3 -m json.tool
```
### Test HTTP Endpoint
```bash
# Test document webhook
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
-H "Content-Type: application/json" \
-d '[{"id": "test123", "entityType": "CDokumente"}]'
# Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees"
```
### Manually Trigger Cron
```bash
curl -X POST "http://localhost:3111/_console/cron/trigger" \
-H "Content-Type: application/json" \
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
```
### View Logs
```bash
# Live logs via journalctl
journalctl -u motia-iii -f
# Search for specific step
journalctl --since "today" | grep -i "document sync"
# Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
```
### Common Issues
**Step not showing up:**
1. Check file naming: Must end with `_step.py`
2. Check for import errors: `grep -i "importerror\|traceback" iii.log`
3. Verify `config` dict is present
4. Restart iii engine
**Redis connection failed:**
- Check `REDIS_HOST` and `REDIS_PORT` environment variables
- Verify Redis is running: `redis-cli ping`
- Service will work without Redis but with warnings
**AttributeError '_log' not found:**
- Ensure service inherits from `BaseSyncUtils` OR
- Implement `_log()` method manually
---
## Key Patterns Summary
### ✅ DO
- **Always** use `_step.py` suffix for step files
- **Always** use `ctx.logger` for logging (never `print`)
- **Always** wrap handlers in try/except with error logging
- **Always** use visual separators in logs (`"=" * 80`)
- **Always** return `ApiResponse` from HTTP handlers
- **Always** document what events a step emits
- **Always** use distributed locks for sync operations
- **Always** calculate hashes for change detection
### ❌ DON'T
- **Don't** use module-level `logger` in steps
- **Don't** forget `async` on handler functions
- **Don't** use blocking I/O (use `aiohttp`, not `requests`)
- **Don't** return values from queue/cron handlers
- **Don't** hardcode credentials (use environment variables)
- **Don't** skip lock cleanup in `finally` blocks
- **Don't** use `print()` for logging
---
## Module Documentation
---
## Module Documentation
### Steps
**Advoware Proxy** ([Module README](../steps/advoware_proxy/README.md))
- Universal HTTP proxy with HMAC-512 authentication
- Endpoints: GET, POST, PUT, DELETE
- Redis token caching
**Calendar Sync** ([Module README](../steps/advoware_cal_sync/README.md))
- Bidirectional Advoware ↔ Google Calendar sync
- Cron: Every 15 minutes
- API trigger: `/advoware/calendar/sync`
**VMH Integration** ([Module README](../steps/vmh/README.md))
- EspoCRM ↔ Advoware bidirectional sync
- Webhooks: Beteiligte, Bankverbindungen, Documents
- xAI Collections integration for documents
### Services
| Service | Purpose | Config |
|---------|---------|--------|
| `xai_service.py` | xAI file uploads & collections | `XAI_API_KEY`, `XAI_MANAGEMENT_KEY` |
| `espocrm.py` | EspoCRM REST API client | `ESPOCRM_API_BASE_URL`, `ESPOCRM_API_KEY` |
| `advoware_service.py` | Advoware API with HMAC auth | `ADVOWARE_API_KEY`, `ADVOWARE_API_SECRET` |
| `document_sync_utils.py` | Document sync logic | Redis connection |
| `beteiligte_sync_utils.py` | Beteiligte sync logic | Redis connection |
| `sync_utils_base.py` | Base class for sync utils | - |
---
## Quick Reference
### Environment Variables
**Required:**
```bash
# EspoCRM
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-key
# Advoware
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
# xAI
XAI_API_KEY=xai-...
XAI_MANAGEMENT_KEY=xai-token-...
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
```
**Optional:**
```bash
ESPOCRM_API_TIMEOUT_SECONDS=30
ESPOCRM_METADATA_TTL_SECONDS=300
```
### File Structure
```
bitbylaw/
├── iii-config.yaml # Motia III configuration
├── pyproject.toml # Python dependencies (uv)
├── steps/ # Business logic
│ ├── advoware_proxy/
│ ├── advoware_cal_sync/
│ └── vmh/
│ ├── webhook/ # HTTP webhook handlers
│ │ ├── *_create_api_step.py
│ │ ├── *_update_api_step.py
│ │ └── *_delete_api_step.py
│ ├── *_sync_event_step.py # Queue event handlers
│ └── *_sync_cron_step.py # Scheduled jobs
├── services/ # Shared services
│ ├── xai_service.py
│ ├── espocrm.py
│ ├── advoware_service.py
│ ├── *_sync_utils.py
│ ├── sync_utils_base.py
│ ├── logging_utils.py
│ └── exceptions.py
├── docs/ # Documentation
│ ├── INDEX.md # This file
│ ├── ARCHITECTURE.md
│ └── DOCUMENT_SYNC_XAI_STATUS.md
└── tests/ # Test scripts
└── test_xai_collections_api.py
```
### Motia III vs Old Motia
| Old Motia v0.17 | Motia III v1.0-RC |
|-----------------|-------------------|
| `type: 'api'` | `triggers: [http()]` |
| `type: 'event'` | `triggers: [queue()]` |
| `type: 'cron'` | `triggers: [cron()]` |
| `context.emit()` | `ctx.enqueue()` |
| `emits: [...]` | `enqueues: [...]` |
| `subscribes: [...]` | `triggers: [queue('topic')]` |
| 5-field cron | 6-field cron (seconds first) |
| `context.logger` | `ctx.logger` |
| Motia Workbench | iii Console |
| Node.js + Python | Pure Python |
### Cron Syntax
**6 fields (Motia III):** `second minute hour day month weekday`
```
0 */15 * * * * # Every 15 minutes
0 0 */6 * * * # Every 6 hours
0 0 2 * * * # Daily at 2 AM
0 30 9 * * 1-5 # Monday-Friday at 9:30 AM
```
---
## Additional Resources
### Documentation
- [MIGRATION_GUIDE.md](../MIGRATION_GUIDE.md) - v0.17 → v1.0 migration
- [MIGRATION_STATUS.md](../MIGRATION_STATUS.md) - Migration progress
- [ARCHITECTURE.md](ARCHITECTURE.md) - System design
- [DOCUMENT_SYNC_XAI_STATUS.md](DOCUMENT_SYNC_XAI_STATUS.md) - xAI integration details
### External Resources
- [Motia III Documentation](https://iii.dev)
- [Python SDK](https://pypi.org/project/motia/)
- [xAI API Docs](https://docs.x.ai/)
- [EspoCRM API](https://docs.espocrm.com/development/api/)
- [Redis Documentation](https://redis.io/documentation)
### Support & Troubleshooting
| Issue | Solution |
|-------|----------|
| Step not registered | Check `_step.py` suffix, restart: `sudo systemctl restart motia.service` |
| Import errors | Check logs: `journalctl -u motia.service \| grep -i importerror` |
| Code changes not applied | Auto-reload should work, force restart: `sudo systemctl restart motia.service` |
| ApiResponse validation | Use `status=200` not `status_code=200` |
| Redis unavailable | Service works with warnings, check `REDIS_HOST` |
| `'_log' not found` | Inherit from `BaseSyncUtils` or implement `_log()` method |
| Webhook not triggering | Verify endpoint: `curl http://localhost:3111/_console/functions` |
| xAI upload fails | Set `XAI_API_KEY` and `XAI_MANAGEMENT_KEY` in `/etc/systemd/system/motia.service` |
| View logs | `journalctl -u motia.service -f` (follow) or `--since "10 minutes ago"` |
---
**Last Updated:** 2026-03-08
**Migration Status:** ✅ Complete
**xAI Integration:** ✅ Implemented