Files
motia-iii/docs/INDEX.md

26 KiB

BitByLaw Motia III - Developer Guide

For AI Assistants: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.

Quick Navigation:


Project Status

Migration: 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)

New to the project? Start here:

  1. README.md - Project Overview & Quick Start
  2. MIGRATION_GUIDE.md - Complete migration patterns
  3. ARCHITECTURE.md - System design and architecture

Core Concepts

System Overview

Architecture:

  • Pure Python (Motia III)
  • Event-Driven with queue-based async processing
  • Redis-backed distributed locking and caching
  • REST APIs for webhooks and proxies

Key Components:

  1. Steps (steps/) - Business logic handlers (HTTP, Queue, Cron)
  2. Services (services/) - Shared API clients and utilities
  3. Configuration (iii-config.yaml) - System setup

Data Flow:

Webhook (HTTP) → Queue Event → Event Handler → External APIs
                    ↓                ↓
              Redis Lock      Service Layer
                              (EspoCRM, Advoware, xAI)

Design Principles

Event Storm over Lock Coordination

Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"

╔═══════════════════════════════════════════════════════╗
║  Prefer:                                              ║
║  ✅ Multiple redundant events over coordination       ║
║  ✅ Idempotent handlers over distributed locks        ║
║  ✅ Eventually consistent over perfectly synced       ║
║  ✅ Simple duplication over complex orchestration     ║
╚═══════════════════════════════════════════════════════╝

Guidelines:

  1. Fire events liberally - 10 redundant events are cheaper than complex coordination
  2. Make handlers idempotent - Early returns when nothing to do
  3. Sequential per entity, parallel across entities - Lock prevents collisions, not updates
  4. Accept event storms - Handlers queue up, process one by one

Lock Strategy: Sequential Processing per Entity

Event Storm für Document A:
├─ Event 1: update → Handler starts → Lock acquired
├─ Event 2: update → Queued (waits for lock)
├─ Event 3: update → Queued (after Event 2)
└─ Event 4: update → Queued (after Event 3)

Document B (parallel):
└─ Event 1: update → Own lock, processes in parallel!

Result:
- Same entity: Sequential (prevents file upload collisions)
- Different entities: Parallel (independent locks)
- Lost events: Zero (all queued and processed)
- Duplicate work: Prevented by idempotency checks

Example Flow:

t=0: User ändert Document A (fileStatus → "changed")
t=1: Event 1 fired → Lock acquired → Sync starts
t=2: User ändert Document A again (fileStatus → "changed")
t=3: Event 2 fired → Queued (lock busy)
t=4: Event 1 completes → fileStatus="unchanged", xaiSyncStatus="clean"
t=5: Event 2 starts → Lock acquired
     Check: fileStatus="unchanged", xaiSyncStatus="clean"
     → Early return (nothing to do) ✅

Result: Second event processed but no duplicate work!

Why This Works:

  • Lock prevents chaos: No parallel file uploads for same entity
  • Queue enables updates: New changes processed sequentially
  • Idempotency prevents waste: Redundant events → cheap early returns
  • Parallel scaling: Different entities process simultaneously

Practical Example: Entity Link Event

User links Document ↔ Räumungsklage

Webhooks fire:
├─ POST /vmh/webhook/entity/link
└─ Emits: raeumungsklage.update, cdokumente.update

Handlers (parallel, different entities):
├─ Räumungsklage Handler
│  ├─ Lock: raeumungsklage:abc123
│  ├─ Creates xAI Collection (if missing)
│  └─ Fires: cdokumente.update (for all linked docs) ← Event Storm!
│
└─ Document Handler (may run 2-3x on same doc)
   ├─ Lock: document:doc456 (sequential processing)
   ├─ Run 1: Collections not ready → Skip (cheap return)
   ├─ Run 2: Collection ready → Upload & sync
   └─ Run 3: Already synced → Early return (idempotent!)

Bidirectional Reference Pattern

Principle: Every sync maintains references on both sides

EspoCRM as Central Hub:

┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Advoware  │◄────────┤   EspoCRM    ├────────►│     xAI     │
│             │         │              │         │             │
│ Akte 12345  │         │ Entity       │         │ Collection  │
└─────────────┘         │ - advowareId │         └─────────────┘
                        │ - xaiColId   │
                        └──────────────┘

Implementation:

  • xAI Collection → stores entityType + entityId in metadata
  • EspoCRM Entity → stores xaiCollectionId field
  • EspoCRM Document → stores xaiFileId + xaiCollections[] fields
  • Advoware Integration → stores advowareAkteId in EspoCRM

Benefits:

  • Bidirectional navigation without complex queries
  • Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
  • Idempotent lookups (can verify both directions)
  • Debugging: Always know where things came from

Example: Collection Creation

# xAI Collection
collection = await xai.create_collection(
    name="CBeteiligte - Max Mustermann",
    metadata={
        "espocrm_entity_type": "CBeteiligte",
        "espocrm_entity_id": "abc123",
        "created_at": "2026-03-08T19:00:00Z"
    }
)

# EspoCRM Entity
await espocrm.update_entity("CBeteiligte", "abc123", {
    "xaiCollectionId": collection.id
})

Step Development Best Practices

File Naming Convention

CRITICAL: Always use _step.py suffix!

✅ CORRECT:
steps/vmh/webhook/document_create_api_step.py
steps/vmh/document_sync_event_step.py
steps/vmh/beteiligte_sync_cron_step.py

❌ WRONG:
steps/vmh/document_handler.py        # Missing _step.py
steps/vmh/sync.py                    # Missing _step.py

Naming Pattern:

  • Webhooks: {entity}_{action}_api_step.py
    • Examples: beteiligte_create_api_step.py, document_update_api_step.py
  • Event Handlers: {entity}_sync_event_step.py
    • Examples: document_sync_event_step.py, beteiligte_sync_event_step.py
  • Cron Jobs: {entity}_sync_cron_step.py
    • Examples: beteiligte_sync_cron_step.py, calendar_sync_cron_step.py

Step Template

Complete step template with all required patterns:

"""Module-level docstring describing the step's purpose"""
from typing import Dict, Any
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse


config = {
    "name": "Clear Human-Readable Name",
    "description": "Brief description of what this step does",
    "flows": ["flow-name"],  # Logical grouping
    "triggers": [
        # Pick ONE trigger type:
        http("POST", "/path/to/endpoint"),           # For webhooks
        # queue("topic.name"),                       # For event handlers
        # cron("0 */15 * * * *"),                    # For scheduled jobs
    ],
    "enqueues": ["next.topic"],  # Topics this step emits (optional)
}


async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
    """
    Handler docstring explaining:
    - What triggers this handler
    - What it does
    - What events it emits
    """
    try:
        # 1. Log entry
        ctx.logger.info("=")="=" * 80)
        ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
        ctx.logger.info("=" * 80)
        
        # 2. Extract and validate input
        payload = request.body
        
        # 3. Business logic
        # ...
        
        # 4. Enqueue events if needed
        await ctx.enqueue({
            'topic': 'next.step',
            'data': {
                'entity_id': entity_id,
                'action': 'create'
            }
        })
        
        # 5. Log success
        ctx.logger.info("✅ Step completed successfully")
        
        # 6. Return response
        return ApiResponse(
            status_code=200,
            body={'success': True}
        )
        
    except Exception as e:
        # Always log errors with context
        ctx.logger.error(f"❌ Error in {config['name']}: {e}")
        ctx.logger.error(f"Payload: {request.body}")
        
        return ApiResponse(
            status_code=500,
            body={'success': False, 'error': str(e)}
        )

Handler Signatures by Trigger Type

HTTP Trigger (Webhooks, APIs):

from motia import ApiRequest, ApiResponse, FlowContext

async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Access: request.body, request.query_params, request.path_params
    return ApiResponse(status_code=200, body={...})

Queue Trigger (Event Handlers):

from motia import FlowContext

async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
    # Process event_data
    # No return value
    pass

Cron Trigger (Scheduled Jobs):

from motia import FlowContext

async def handler(input_data: None, ctx: FlowContext) -> None:
    # Cron jobs receive no input
    # No return value
    pass

Logging Best Practices

ALWAYS use ctx.logger, NEVER use print() or module-level logger:

# ✅ CORRECT: Context-aware logging
ctx.logger.info("Processing started")
ctx.logger.debug(f"Data: {data}")
ctx.logger.warn("Skipping invalid entry")
ctx.logger.error(f"Failed: {e}")

# ❌ WRONG: Direct print or module logger
print("Processing started")              # Not visible in iii logs
logger.info("Processing started")        # Loses context

Log Levels:

  • info() - Normal operations (start, success, counts)
  • debug() - Detailed data dumps (payloads, responses)
  • warn() - Non-critical issues (skipped items, fallbacks)
  • error() - Failures requiring attention

Log Structure:

# Section headers with visual separators
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 SYNC HANDLER STARTED")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)

# Use emojis for visual scanning
ctx.logger.info("📥 Downloading file...")
ctx.logger.info("✅ Downloaded 1024 bytes")
ctx.logger.error("❌ Upload failed")
ctx.logger.warn("⚠️  No collections found")

Event Topics Naming

Pattern: {module}.{entity}.{action}

✅ Examples:
vmh.document.create
vmh.document.update
vmh.document.delete
vmh.beteiligte.create
calendar.sync.employee
advoware.proxy.response

❌ Avoid:
document-create          # Use dots, not dashes
DocumentCreate           # Use lowercase
create_document          # Wrong order

Service Layer Patterns

Service Base Class Pattern

All services should follow this pattern:

"""Service docstring"""
import logging
from typing import Optional
from services.logging_utils import get_service_logger

logger = logging.getLogger(__name__)


class MyService:
    """Service for interacting with External API"""
    
    def __init__(self, context=None):
        """
        Initialize service.
        
        Args:
            context: Optional Motia FlowContext for logging
        """
        self.context = context
        self.logger = get_service_logger('my_service', context)
        self._session = None
        
        # Load config from env
        self.api_key = os.getenv('MY_API_KEY', '')
        if not self.api_key:
            raise ValueError("MY_API_KEY not configured")
        
        self.logger.info("MyService initialized")
    
    def _log(self, message: str, level: str = 'info') -> None:
        """Internal logging helper"""
        log_func = getattr(self.logger, level, self.logger.info)
        log_func(message)
    
    async def _get_session(self):
        """Lazy session initialization"""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session
    
    async def close(self) -> None:
        """Cleanup resources"""
        if self._session and not self._session.closed:
            await self._session.close()

Sync Utilities Pattern

For bidirectional sync operations, inherit from BaseSyncUtils:

from services.sync_utils_base import BaseSyncUtils

class MyEntitySync(BaseSyncUtils):
    """Sync utilities for MyEntity"""
    
    def _get_lock_key(self, entity_id: str) -> str:
        """Required: Define lock key pattern"""
        return f"sync_lock:myentity:{entity_id}"
    
    async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
        """
        Decide if sync is needed.
        
        Returns:
            (needs_sync: bool, reason: str)
        """
        # Implementation
        pass

Base class provides:

  • _log() - Context-aware logging
  • _acquire_redis_lock() - Distributed locking
  • _release_redis_lock() - Lock cleanup
  • self.espocrm - EspoCRM API client
  • self.redis - Redis client
  • self.context - Motia context
  • self.logger - Integration logger

External Integrations

xAI Collections Integration

Status: Fully Implemented

Service: services/xai_service.py

Environment Variables:

XAI_API_KEY=xai-...              # For file uploads (api.x.ai)
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)

Usage Example:

from services.xai_service import XAIService

xai = XAIService(ctx)

# Upload file
file_id = await xai.upload_file(
    file_content=bytes_data,
    filename="document.pdf",
    mime_type="application/pdf"
)

# Add to collection
await xai.add_to_collection("collection_id", file_id)

# Add to multiple collections
added = await xai.add_to_collections(["col1", "col2"], file_id)

# Remove from collection
await xai.remove_from_collection("collection_id", file_id)

Architecture:

  • Files are uploaded ONCE to Files API (api.x.ai/v1/files)
  • Same file_id can be added to MULTIPLE collections
  • Removing from collection does NOT delete the file (may be used elsewhere)
  • Hash-based change detection prevents unnecessary reuploads

Document Sync Flow:

1. EspoCRM Webhook → vmh.document.{create|update|delete}
2. Document Sync Handler:
   a. Acquire distributed lock (prevents duplicate syncs)
   b. Load document from EspoCRM
   c. Check if sync needed:
      - dateiStatus = "Neu" or "Geändert" → SYNC
      - Hash changed → SYNC
      - Entity has xAI collections → SYNC
   d. Download file from EspoCRM
   e. Calculate MD5 hash
   f. Upload to xAI (or reuse existing file_id)
   g. Add to required collections
   h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
   i. Release lock
3. Delete: Remove from collections (keep file)

EspoCRM Fields (CDokumente):

{
    'xaiId': 'file-abc123',                    # xAI file_id
    'xaiCollections': ['col1', 'col2'],        # Array of collection IDs
    'xaiSyncedHash': 'abc123def456',           # MD5 at last sync
    'fileStatus': 'synced',                    # Status: neu, geändert, synced
    'md5sum': 'abc123def456',                  # Current file hash
    'sha256': 'def456...',                     # SHA-256 (optional)
}

EspoCRM Integration

Service: services/espocrm.py

Environment Variables:

ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-api-key
ESPOCRM_API_TIMEOUT_SECONDS=30

Usage:

from services.espocrm import EspoCRMAPI

espocrm = EspoCRMAPI(ctx)

# Get entity
entity = await espocrm.get_entity('CDokumente', entity_id)

# Update entity
await espocrm.update_entity('CDokumente', entity_id, {
    'xaiId': file_id,
    'fileStatus': 'synced'
})

# List entities
result = await espocrm.list_entities(
    'CDokumente',
    where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
    select='id,name,fileStatus',
    max_size=50
)

# Download attachment
file_bytes = await espocrm.download_attachment(attachment_id)

Advoware Integration

Service: services/advoware_service.py

Authentication: HMAC-512 with token caching

Environment Variables:

ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
REDIS_HOST=localhost
REDIS_PORT=6379

Proxy Endpoints:

  • GET /advoware/proxy?endpoint={path} - Proxy GET requests
  • POST /advoware/proxy - Proxy POST requests
  • PUT /advoware/proxy - Proxy PUT requests
  • DELETE /advoware/proxy - Proxy DELETE requests

Testing and Debugging

Start System

Production (systemd):

# Restart services
sudo systemctl restart motia.service
sudo systemctl restart iii-console.service

# Check status
sudo systemctl status motia.service
sudo systemctl status iii-console.service

# View real-time logs
journalctl -u motia.service -f
journalctl -u iii-console.service -f

# Enable auto-start on boot
sudo systemctl enable motia.service
sudo systemctl enable iii-console.service

Manual (Development):

# Start iii Engine
cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml

# Start iii Console (Web UI)
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
  --engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114

Check Registered Steps

curl http://localhost:3111/_console/functions | python3 -m json.tool

Test HTTP Endpoint

# Test document webhook
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
  -H "Content-Type: application/json" \
  -d '[{"id": "test123", "entityType": "CDokumente"}]'

# Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees"

Manually Trigger Cron

curl -X POST "http://localhost:3111/_console/cron/trigger" \
  -H "Content-Type: application/json" \
  -d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'

View Logs

# Live logs via journalctl
journalctl -u motia-iii -f

# Search for specific step
journalctl --since "today" | grep -i "document sync"

# Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error

Common Issues

Step not showing up:

  1. Check file naming: Must end with _step.py
  2. Check for import errors: grep -i "importerror\|traceback" iii.log
  3. Verify config dict is present
  4. Restart iii engine

Redis connection failed:

  • Check REDIS_HOST and REDIS_PORT environment variables
  • Verify Redis is running: redis-cli ping
  • Service will work without Redis but with warnings

AttributeError '_log' not found:

  • Ensure service inherits from BaseSyncUtils OR
  • Implement _log() method manually

Key Patterns Summary

DO

  • Always use _step.py suffix for step files
  • Always use ctx.logger for logging (never print)
  • Always wrap handlers in try/except with error logging
  • Always use visual separators in logs ("=" * 80)
  • Always return ApiResponse from HTTP handlers
  • Always document what events a step emits
  • Always use distributed locks for sync operations
  • Always calculate hashes for change detection

DON'T

  • Don't use module-level logger in steps
  • Don't forget async on handler functions
  • Don't use blocking I/O (use aiohttp, not requests)
  • Don't return values from queue/cron handlers
  • Don't hardcode credentials (use environment variables)
  • Don't skip lock cleanup in finally blocks
  • Don't use print() for logging

Module Documentation


Module Documentation

Steps

Advoware Proxy (Module README)

  • Universal HTTP proxy with HMAC-512 authentication
  • Endpoints: GET, POST, PUT, DELETE
  • Redis token caching

Calendar Sync (Module README)

  • Bidirectional Advoware ↔ Google Calendar sync
  • Cron: Every 15 minutes
  • API trigger: /advoware/calendar/sync

VMH Integration (Module README)

  • EspoCRM ↔ Advoware bidirectional sync
  • Webhooks: Beteiligte, Bankverbindungen, Documents
  • xAI Collections integration for documents

Services

Service Purpose Config
xai_service.py xAI file uploads & collections XAI_API_KEY, XAI_MANAGEMENT_KEY
espocrm.py EspoCRM REST API client ESPOCRM_API_BASE_URL, ESPOCRM_API_KEY
advoware_service.py Advoware API with HMAC auth ADVOWARE_API_KEY, ADVOWARE_API_SECRET
document_sync_utils.py Document sync logic Redis connection
beteiligte_sync_utils.py Beteiligte sync logic Redis connection
sync_utils_base.py Base class for sync utils -

Quick Reference

Environment Variables

Required:

# EspoCRM
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-key

# Advoware
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret

# xAI
XAI_API_KEY=xai-...
XAI_MANAGEMENT_KEY=xai-token-...

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1

Optional:

ESPOCRM_API_TIMEOUT_SECONDS=30
ESPOCRM_METADATA_TTL_SECONDS=300

File Structure

bitbylaw/
├── iii-config.yaml           # Motia III configuration
├── pyproject.toml            # Python dependencies (uv)
├── steps/                    # Business logic
│   ├── advoware_proxy/
│   ├── advoware_cal_sync/
│   └── vmh/
│       ├── webhook/          # HTTP webhook handlers
│       │   ├── *_create_api_step.py
│       │   ├── *_update_api_step.py
│       │   └── *_delete_api_step.py
│       ├── *_sync_event_step.py   # Queue event handlers
│       └── *_sync_cron_step.py    # Scheduled jobs
├── services/                 # Shared services
│   ├── xai_service.py
│   ├── espocrm.py
│   ├── advoware_service.py
│   ├── *_sync_utils.py
│   ├── sync_utils_base.py
│   ├── logging_utils.py
│   └── exceptions.py
├── docs/                     # Documentation
│   ├── INDEX.md             # This file
│   ├── ARCHITECTURE.md
│   └── DOCUMENT_SYNC_XAI_STATUS.md
└── tests/                    # Test scripts
    └── test_xai_collections_api.py

Motia III vs Old Motia

Old Motia v0.17 Motia III v1.0-RC
type: 'api' triggers: [http()]
type: 'event' triggers: [queue()]
type: 'cron' triggers: [cron()]
context.emit() ctx.enqueue()
emits: [...] enqueues: [...]
subscribes: [...] triggers: [queue('topic')]
5-field cron 6-field cron (seconds first)
context.logger ctx.logger
Motia Workbench iii Console
Node.js + Python Pure Python

Cron Syntax

6 fields (Motia III): second minute hour day month weekday

0 */15 * * * *     # Every 15 minutes
0 0 */6 * * *      # Every 6 hours
0 0 2 * * *        # Daily at 2 AM
0 30 9 * * 1-5     # Monday-Friday at 9:30 AM

Additional Resources

Documentation

External Resources

Support & Troubleshooting

Issue Solution
Step not registered Check _step.py suffix, restart: sudo systemctl restart motia.service
Import errors Check logs: journalctl -u motia.service | grep -i importerror
Code changes not applied Auto-reload should work, force restart: sudo systemctl restart motia.service
ApiResponse validation Use status=200 not status_code=200
Redis unavailable Service works with warnings, check REDIS_HOST
'_log' not found Inherit from BaseSyncUtils or implement _log() method
Webhook not triggering Verify endpoint: curl http://localhost:3111/_console/functions
xAI upload fails Set XAI_API_KEY and XAI_MANAGEMENT_KEY in /etc/systemd/system/motia.service
View logs journalctl -u motia.service -f (follow) or --since "10 minutes ago"

Last Updated: 2026-03-08
Migration Status: Complete
xAI Integration: Implemented