Files
motia-iii/docs/INDEX.md

46 KiB
Raw Blame History

BitByLaw Motia III - Developer Guide

For AI Assistants: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.

Quick Navigation:


Project Status

Migration: 100% Complete (21/21 Steps migrated from Motia v0.17 → Motia III v1.0-RC)

New to the project? Start here:

  1. README.md - Project Overview & Quick Start
  2. MIGRATION_GUIDE.md - Complete migration patterns
  3. ARCHITECTURE.md - System design and architecture

Core Concepts

System Overview

Architecture:

  • Pure Python (Motia III)
  • Event-Driven with queue-based async processing
  • Redis-backed distributed locking and caching
  • REST APIs for webhooks and proxies

Key Components:

  1. Steps (steps/) - Business logic handlers (HTTP, Queue, Cron)
  2. Services (services/) - Shared API clients and utilities
  3. Configuration (iii-config.yaml) - System setup

Data Flow:

Webhook (HTTP) → Queue Event → Event Handler → External APIs
                    ↓                ↓
              Redis Lock      Service Layer
                              (EspoCRM, Advoware, xAI)

Design Principles

Event Storm over Lock Coordination

Core Philosophy: "Idempotent Chaos - Check Cheap, Sync Once"

╔═══════════════════════════════════════════════════════╗
║  Prefer:                                              ║
║  ✅ Multiple redundant events over coordination       ║
║  ✅ Idempotent handlers over distributed locks        ║
║  ✅ Eventually consistent over perfectly synced       ║
║  ✅ Simple duplication over complex orchestration     ║
╚═══════════════════════════════════════════════════════╝

Guidelines:

  1. Fire events liberally - 10 redundant events are cheaper than complex coordination
  2. Make handlers idempotent - Early returns when nothing to do
  3. Sequential per entity, parallel across entities - Lock prevents collisions, not updates
  4. Accept event storms - Handlers queue up, Motia retries automatically

Lock Strategy: Step + Input Values

Lock Key Format: {step_name}:{entity_type}:{entity_id}

Examples:
- document_sync:CDokumente:doc123
- beteiligte_sync:CBeteiligte:bet456
- collection_create:Account:acc789

Rules:
✅ Parallel: sync document:A + sync document:B (different IDs)
❌ Blocked: sync document:A + sync document:A (same ID)

Lock Acquisition:
├─ Try acquire lock
├─ Success → Process → Release in finally block
└─ Fail (already locked) → Raise exception → Motia retries later

Lock Configuration:
- TTL: 30 minutes (auto-release on timeout)
- Robust release: Always in finally block
- Dead lock prevention: TTL ensures eventual recovery

Handler Pattern:

async def handler(event_data, ctx):
    entity_id = event_data['entity_id']
    
    # 1. Try acquire lock
    lock_key = f"document_sync:{entity_type}:{entity_id}"
    lock_acquired = await acquire_lock(lock_key, ttl=1800)  # 30 minutes
    
    if not lock_acquired:
        # Raise exception → Motia will retry
        raise RuntimeError(f"⏸️  Lock busy for {lock_key}, retry later")
    
    try:
        # 2. Do work (only if lock acquired)
        await sync_document(entity_id)
        
    finally:
        # 3. ALWAYS release lock (robust cleanup)
        await release_lock(lock_key)

Retry Behavior:

  • Lock fail → Handler raises exception
  • Motia Queue catches exception → Schedules retry
  • Retry after configured delay (exponential backoff)
  • Eventually lock available → Handler processes successfully Check: fileStatus="synced", xaiSyncStatus="clean" → Early return (nothing to do)

Result: Second event processed but no duplicate work!


**Why This Works:**
- **Lock prevents chaos**: No parallel file uploads for same entity
- **Queue enables updates**: New changes processed sequentially
- **Idempotency prevents waste**: Redundant events → cheap early returns
- **Parallel scaling**: Different entities process simultaneously

**Practical Example: Entity Link Event**

User links Document ↔ Räumungsklage

Webhooks fire: ├─ POST /vmh/webhook/entity/link └─ Emits: raeumungsklage.update, cdokumente.update

Handlers (parallel, different entities): ├─ Räumungsklage Handler │ ├─ Lock: raeumungsklage:abc123 │ ├─ Creates xAI Collection (if missing) │ └─ Fires: cdokumente.update (for all linked docs) ← Event Storm! │ └─ Document Handler (may run 2-3x on same doc) ├─ Lock: document:doc456 (sequential processing) ├─ Run 1: Collections not ready → Skip (cheap return) ├─ Run 2: Collection ready → Upload & sync └─ Run 3: Already synced → Early return (idempotent!)


---

### Steps vs. Utils: Parallel vs. Sequential

**Separation Principle: "Events for Parallelism, Functions for Composition"**

╔═══════════════════════════════════════════════════════╗ ║ Choose Architecture by Execution Model: ║ ║ ║ ║ Separate Steps → When parallel possible ║ ║ → Communicate via Events ║ ║ → Own lock scope, independent retry ║ ║ ║ ║ Shared Utils → When sequential required ║ ║ → Call as function (need return values) ║ ║ → Reusable across multiple steps ║ ╚═══════════════════════════════════════════════════════╝


**Decision Matrix:**

| Requirement | Architecture | Communication | Example |
|------------|--------------|---------------|---------|
| **Can run in parallel** | Separate Steps | Events | Document sync + Collection create |
| **Needs return value** | Utils function | Function call | `get_or_create_collection()` |
| **Reusable logic** | Utils function | Import + call | `should_sync_to_xai()` |
| **One-time handler** | Inline in Step | N/A | Event-specific parsing |

**Examples:**

```python
# ✅ GOOD: Parallel → Separate Steps + Events
# steps/collection_manager_step.py
async def handle_entity_update(event_data, ctx):
    """Creates xAI Collection for entity"""
    collection_id = await create_collection(entity_type, entity_id)
    
    # Fire events for all linked documents (parallel processing!)
    for doc in linked_docs:
        await ctx.emit("cdokumente.update", {"entity_id": doc.id})

# steps/document_sync_step.py  
async def handle_document_update(event_data, ctx):
    """Syncs document to xAI"""
    # Runs in parallel with collection_manager_step!
    await sync_document_to_xai(entity_id)

# ✅ GOOD: Sequential + Reusable → Utils
# services/document_sync_utils.py
async def get_required_collections(entity_id):
    """Reusable function, needs return value"""
    return await _scan_entity_relations(entity_id)

# ❌ BAD: Sequential logic in Step (not reusable)
async def handle_document_update(event_data, ctx):
    # Inline function call - hard to test and reuse
    collection_id = await create_collection_inline(...)
    await upload_file(collection_id, ...)  # Needs collection_id from above!

Guidelines:

  1. Default to Steps + Events - Maximize parallelism
  2. Use Utils when:
    • Need immediate return value
    • Logic reused across 2+ steps
    • Complex computation (not I/O bound)
  3. Keep Steps thin - Mostly orchestration + event emission
  4. Keep Utils testable - Pure functions, no event emission

Code Organization:

steps/
├─ document_sync_event_step.py      # Event handler (thin)
├─ collection_manager_step.py       # Event handler (thin)
└─ vmh/
   └─ entity_link_webhook_step.py   # Webhook handler (thin)

services/
├─ document_sync_utils.py           # Reusable functions
├─ xai_service.py                   # API client
└─ espocrm.py                       # CRM client

Bidirectional Reference Pattern

Principle: Every sync maintains references on both sides

EspoCRM as Central Hub:

┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Advoware  │◄────────┤   EspoCRM    ├────────►│     xAI     │
│             │         │              │         │             │
│ Akte 12345  │         │ Entity       │         │ Collection  │
└─────────────┘         │ - advowareId │         └─────────────┘
                        │ - xaiColId   │
                        └──────────────┘

Implementation:

  • xAI Collection → stores entityType + entityId in metadata
  • EspoCRM Entity → stores xaiCollectionId field
  • EspoCRM Document → stores xaiFileId + xaiCollections[] fields
  • Advoware Integration → stores advowareAkteId in EspoCRM

Benefits:

  • Bidirectional navigation without complex queries
  • Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
  • Idempotent lookups (can verify both directions)
  • Debugging: Always know where things came from
  • Single Source of Truth: EspoCRM is authoritative for relationships

Example: Collection Creation

# xAI Collection
collection = await xai.create_collection(
    name="CBeteiligte - Max Mustermann",
    metadata={
        "espocrm_entity_type": "CBeteiligte",
        "espocrm_entity_id": "abc123",
        "created_at": "2026-03-08T19:00:00Z"
    }
)

# EspoCRM Entity
await espocrm.update_entity("CBeteiligte", "abc123", {
    "xaiCollectionId": collection.id
})

Error Handling & Resilience

Principle: "Retry Smart, Fail Visible, Recover When Possible"

╔═══════════════════════════════════════════════════════╗
║  Error Handling Strategy (in priority order):         ║
║                                                       ║
║  1⃣ Idempotent Check → Skip if already done          ║
║  2⃣ Step-internal retry with exponential backoff     ║
║  3⃣ Rollback/Compensation (when possible)            ║
║  4⃣ Log failure clearly → Let Motia retry            ║
║  5⃣ Manual intervention (for unrecoverable failures) ║
╚═══════════════════════════════════════════════════════╝

Idempotency First

Always check if work already done before starting:

async def sync_document_handler(event_data, ctx):
    entity_id = event_data['entity_id']
    
    # 1. Load current state
    doc = await espocrm.get_entity('CDokumente', entity_id)
    
    # 2. Check if already synced (idempotent check)
    if doc.get('xaiSyncStatus') == 'clean':
        current_md5 = doc.get('md5sum')
        synced_md5 = doc.get('xaiSyncedHash')
        
        if current_md5 == synced_md5:
            ctx.logger.info(f"✅ Already synced, skipping (hash match)")
            return  # Nothing to do
    
    # 3. Check if file already uploaded to xAI
    xai_file_id = doc.get('xaiId')
    if xai_file_id:
        # Verify file exists in xAI
        file_exists = await xai.file_exists(xai_file_id)
        if file_exists:
            ctx.logger.info(f"✅ File already in xAI: {xai_file_id}")
            # Only update collections, not re-upload
    
    # 4. Proceed with sync...

Retry with Exponential Backoff

For transient failures (network, rate limits):

import asyncio
from typing import TypeVar, Callable

T = TypeVar('T')

async def retry_with_backoff(
    operation: Callable[[], T],
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    ctx = None
) -> T:
    """
    Retry operation with exponential backoff.
    
    Args:
        operation: Async function to retry
        max_attempts: Maximum retry attempts (not endless!)
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap
    """
    for attempt in range(1, max_attempts + 1):
        try:
            return await operation()
            
        except Exception as e:
            if attempt == max_attempts:
                # Log and re-raise on final attempt
                if ctx:
                    ctx.logger.error(
                        f"❌ Operation failed after {max_attempts} attempts: {e}"
                    )
                raise
            
            # Calculate backoff delay
            delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
            
            if ctx:
                ctx.logger.warn(
                    f"⚠️  Attempt {attempt}/{max_attempts} failed: {e}. "
                    f"Retrying in {delay}s..."
                )
            
            await asyncio.sleep(delay)

# Usage
file_id = await retry_with_backoff(
    lambda: xai.upload_file(content, filename),
    max_attempts=3,
    ctx=ctx
)

Rollback/Compensation

When operations fail mid-way, clean up what was created:

async def sync_document_with_rollback(entity_id, ctx):
    xai_file_id = None
    collection_created = False
    
    try:
        # 1. Upload file to xAI
        xai_file_id = await xai.upload_file(content, filename)
        ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}")
        
        # 2. Create collection if needed
        collection_id = await xai.create_collection(name, metadata)
        collection_created = True
        ctx.logger.info(f"✅ Created collection: {collection_id}")
        
        # 3. Add file to collection
        await xai.add_to_collection(collection_id, xai_file_id)
        
        # 4. Update EspoCRM (critical - if this fails, we have orphans)
        await espocrm.update_entity('CDokumente', entity_id, {
            'xaiId': xai_file_id,
            'xaiCollections': [collection_id],
            'xaiSyncStatus': 'clean'
        })
        
    except Exception as e:
        ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}")
        
        # ROLLBACK: Remove what we created
        try:
            if xai_file_id:
                # Note: Only remove from collections, not delete file
                # (file may be referenced elsewhere)
                ctx.logger.info(f"⚠️  Leaving orphaned file in xAI: {xai_file_id}")
                ctx.logger.info(f"⚠️  Manual cleanup may be required")
                
            # Update EspoCRM to reflect failure
            await espocrm.update_entity('CDokumente', entity_id, {
                'xaiSyncStatus': 'failed',
                'xaiSyncError': str(e)
            })
            
        except Exception as rollback_error:
            ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}")
            ctx.logger.error(f"⚠️  REQUIRES MANUAL INTERVENTION")
            ctx.logger.error(f"⚠️  Entity: {entity_id}, xaiId: {xai_file_id}")
        
        # Re-raise original exception for Motia retry
        raise

Reality Check:

  • Rollback not always possible (e.g., Advoware API, third-party services)
  • Idempotency >> Rollback - Better to handle re-execution than undo
  • Log orphans clearly - Enable manual cleanup when needed

Clear Failure Logging

When rollback impossible, log enough details for manual intervention:

try:
    await external_api.create_resource(data)
except Exception as e:
    # Log all context needed for manual recovery
    ctx.logger.error("=" * 80)
    ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED")
    ctx.logger.error("=" * 80)
    ctx.logger.error(f"Entity Type: CDokumente")
    ctx.logger.error(f"Entity ID: {entity_id}")
    ctx.logger.error(f"Operation: Create Advoware Document")
    ctx.logger.error(f"Error: {e}")
    ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}")
    ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}")
    ctx.logger.error("=" * 80)
    
    # Mark as failed in EspoCRM
    await espocrm.update_entity('CDokumente', entity_id, {
        'syncStatus': 'failed',
        'syncError': str(e),
        'syncFailedAt': datetime.now().isoformat()
    })
    
    # Re-raise for Motia (will retry later)
    raise

Rate Limiting (xAI)

xAI has rate limits - handle with backoff:

from aiohttp import ClientResponseError

async def upload_with_rate_limit_handling(xai, content, filename, ctx):
    """Upload file to xAI with rate limit handling."""
    try:
        return await retry_with_backoff(
            lambda: xai.upload_file(content, filename),
            max_attempts=5,  # More attempts for rate limits
            base_delay=2.0,  # Longer initial delay
            max_delay=60.0,  # Up to 1 minute backoff
            ctx=ctx
        )
        
    except ClientResponseError as e:
        if e.status == 429:  # Rate limit exceeded
            ctx.logger.error(f"❌ xAI rate limit exceeded")
            
            # Check Retry-After header
            retry_after = e.headers.get('Retry-After', '60')
            ctx.logger.error(f"⏰ Retry after {retry_after} seconds")
            
            # Wait and try once more
            await asyncio.sleep(int(retry_after))
            return await xai.upload_file(content, filename)
        
        raise

Rate Limit Strategy:

  • xAI: Exponential backoff + respect Retry-After header
  • EspoCRM: Internal system, no rate limits
  • Advoware: No known rate limits (backoff on errors)

Input Validation

Always validate webhook payloads and external data:

from typing import Dict, Any, List

def validate_webhook_payload(
    payload: Any,
    required_fields: List[str],
    ctx
) -> Dict[str, Any]:
    """
    Validate webhook payload structure.
    
    Raises:
        ValueError: If validation fails
    """
    # Check payload is dict
    if not isinstance(payload, dict):
        raise ValueError(f"Payload must be dict, got {type(payload)}")
    
    # Check required fields
    missing = [f for f in required_fields if f not in payload]
    if missing:
        raise ValueError(f"Missing required fields: {missing}")
    
    # Validate entity_id format (UUID)
    entity_id = payload.get('entity_id', '')
    if not entity_id or len(entity_id) < 10:
        raise ValueError(f"Invalid entity_id: {entity_id}")
    
    ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}")
    return payload

# Usage in handler
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    try:
        # Validate input
        payload = validate_webhook_payload(
            request.body,
            required_fields=['entity_id', 'entity_type', 'action'],
            ctx=ctx
        )
        
        # Process...
        
    except ValueError as e:
        ctx.logger.error(f"❌ Invalid payload: {e}")
        return ApiResponse(status_code=400, body={'error': str(e)})

Validation Checklist:

  • Type checking (dict, list, str)
  • Required fields present
  • Format validation (IDs, dates, enums)
  • Length limits (prevent DoS)
  • Enum validation (status values)

Environment Configuration

Always use environment variables, validate on startup:

import os
from typing import Optional

class ServiceConfig:
    """Validated configuration from environment."""
    
    def __init__(self):
        # Required
        self.xai_api_key = self._require_env('XAI_API_KEY')
        self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL')
        
        # Optional with defaults
        self.redis_host = os.getenv('REDIS_HOST', 'localhost')
        self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
        self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800'))  # 30min
        
    def _require_env(self, key: str) -> str:
        """Get required env var or raise."""
        value = os.getenv(key)
        if not value:
            raise ValueError(f"Required environment variable not set: {key}")
        return value

# Usage
config = ServiceConfig()  # Fails fast on startup if misconfigured

Notes:

  • Collections lifecycle (creation/deletion) managed by EspoCRM entity status
  • Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere)
  • Lock TTL: 30 minutes (forced release on timeout)

Type Safety & Explicit Contracts

Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"

╔═══════════════════════════════════════════════════════╗
║  Prefer:                                              ║
║  ✅ Enums over string literals                        ║
║  ✅ Type hints over duck typing                       ║
║  ✅ Named constants over magic numbers                ║
║  ✅ Central definitions over scattered duplicates     ║
╚═══════════════════════════════════════════════════════╝

Status Enums

Define all valid status values centrally:

# services/models.py
from enum import Enum

class FileStatus(str, Enum):
    """Valid values for CDokumente.fileStatus field."""
    NEW = "new"
    CHANGED = "changed"
    SYNCED = "synced"
    
    def __str__(self) -> str:
        return self.value

class XAISyncStatus(str, Enum):
    """Valid values for CDokumente.xaiSyncStatus field."""
    NO_SYNC = "no_sync"           # Entity has no xAI collections
    PENDING_SYNC = "pending_sync"  # Sync in progress (locked)
    CLEAN = "clean"                # Synced successfully
    UNCLEAN = "unclean"           # Needs re-sync (file changed)
    FAILED = "failed"              # Sync failed (see xaiSyncError)
    
    def __str__(self) -> str:
        return self.value

class EntityAction(str, Enum):
    """Valid webhook actions."""
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    LINK = "link"
    UNLINK = "unlink"
    
    def __str__(self) -> str:
        return self.value

class EntityType(str, Enum):
    """Supported EspoCRM entity types."""
    DOKUMENTE = "CDokumente"
    BETEILIGTE = "CBeteiligte"
    RAEUMUNGSKLAGE = "Raeumungsklage"
    ACCOUNT = "Account"
    
    def __str__(self) -> str:
        return self.value

Usage in Code

Before (Magic Strings):

# ❌ BAD: Typos not caught at runtime
if doc['xaiSyncStatus'] == 'claen':  # Typo!
    pass

if doc['fileStatus'] == 'synced':
    # No IDE autocomplete, no type safety
    pass

# ❌ BAD: Scattered definitions
status = 'pending_sync'  # What are ALL valid values?

After (Type-Safe Enums):

# ✅ GOOD: Type-safe, autocomplete, refactorable
from services.models import FileStatus, XAISyncStatus

if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN:
    # IDE autocomplete works
    # Typos caught by linter
    # Easy to find all usages
    pass

# Update status
await espocrm.update_entity('CDokumente', entity_id, {
    'fileStatus': FileStatus.SYNCED,
    'xaiSyncStatus': XAISyncStatus.CLEAN
})

# Enum validation
def validate_status(status: str) -> XAISyncStatus:
    """Validate and convert status string."""
    try:
        return XAISyncStatus(status)
    except ValueError:
        valid = [s.value for s in XAISyncStatus]
        raise ValueError(f"Invalid status '{status}'. Valid: {valid}")

Benefits

Why Enums Matter:

  1. Prevent Typos: XAISyncStatus.CLAEN → AttributeError at import time
  2. IDE Support: Autocomplete shows all valid values
  3. Refactoring: Rename in one place, affects entire codebase
  4. Documentation: Single source of truth for valid values
  5. Type Safety: mypy can catch invalid assignments
  6. Debugging: Easy to find all usages with "Find References"

Example - Easy Refactoring:

# Change enum value in one place:
class FileStatus(str, Enum):
    NEW = "new"
    CHANGED = "changed"
    SYNCED = "synchronized"  # Changed!

# All usages automatically updated (string value changes):
FileStatus.SYNCED  # Returns "synchronized" everywhere

Type Hints

Always use type hints for function signatures:

from typing import Dict, List, Optional, Tuple
from services.models import FileStatus, XAISyncStatus

# ✅ GOOD: Clear contracts
async def sync_document(
    entity_id: str,
    ctx: FlowContext
) -> Tuple[bool, Optional[str]]:
    """
    Sync document to xAI.
    
    Args:
        entity_id: EspoCRM document ID
        ctx: Motia flow context
        
    Returns:
        (success: bool, xai_file_id: Optional[str])
    """
    pass

async def should_sync_to_xai(
    doc: Dict[str, Any]
) -> Tuple[bool, str]:
    """
    Check if document needs xAI sync.
    
    Returns:
        (should_sync: bool, reason: str)
    """
    status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync'))
    
    if status == XAISyncStatus.CLEAN:
        return (False, "Already synced")
    
    return (True, "Sync required")

Constants

Define magic values as named constants:

# ✅ GOOD: Central configuration
# services/config.py

# Lock TTLs
LOCK_TTL_DOCUMENT_SYNC = 1800      # 30 minutes
LOCK_TTL_COLLECTION_CREATE = 300   # 5 minutes

# Retry configuration
RETRY_MAX_ATTEMPTS = 3
RETRY_BASE_DELAY = 1.0
RETRY_MAX_DELAY = 30.0

# xAI rate limits
XAI_RATE_LIMIT_RETRY_AFTER = 60   # Default retry delay
XAI_MAX_FILE_SIZE_MB = 512

# EspoCRM pagination
ESPOCRM_DEFAULT_PAGE_SIZE = 50
ESPOCRM_MAX_PAGE_SIZE = 200

# Usage
from services.config import LOCK_TTL_DOCUMENT_SYNC

lock_acquired = await acquire_lock(
    lock_key,
    ttl=LOCK_TTL_DOCUMENT_SYNC  # Clear, refactorable
)

Step Development Best Practices

File Naming Convention

CRITICAL: Always use _step.py suffix!

✅ CORRECT:
steps/vmh/webhook/document_create_api_step.py
steps/vmh/document_sync_event_step.py
steps/vmh/beteiligte_sync_cron_step.py

❌ WRONG:
steps/vmh/document_handler.py        # Missing _step.py
steps/vmh/sync.py                    # Missing _step.py

Naming Pattern:

  • Webhooks: {entity}_{action}_api_step.py
    • Examples: beteiligte_create_api_step.py, document_update_api_step.py
  • Event Handlers: {entity}_sync_event_step.py
    • Examples: document_sync_event_step.py, beteiligte_sync_event_step.py
  • Cron Jobs: {entity}_sync_cron_step.py
    • Examples: beteiligte_sync_cron_step.py, calendar_sync_cron_step.py

Step Template

Complete step template with all required patterns:

"""Module-level docstring describing the step's purpose"""
from typing import Dict, Any
from motia import FlowContext, http, queue, cron, ApiRequest, ApiResponse


config = {
    "name": "Clear Human-Readable Name",
    "description": "Brief description of what this step does",
    "flows": ["flow-name"],  # Logical grouping
    "triggers": [
        # Pick ONE trigger type:
        http("POST", "/path/to/endpoint"),           # For webhooks
        # queue("topic.name"),                       # For event handlers
        # cron("0 */15 * * * *"),                    # For scheduled jobs
    ],
    "enqueues": ["next.topic"],  # Topics this step emits (optional)
}


async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
    """
    Handler docstring explaining:
    - What triggers this handler
    - What it does
    - What events it emits
    """
    try:
        # 1. Log entry
        ctx.logger.info("=")="=" * 80)
        ctx.logger.info(f"🔄 STEP STARTED: {config['name']}")
        ctx.logger.info("=" * 80)
        
        # 2. Extract and validate input
        payload = request.body
        
        # 3. Business logic
        # ...
        
        # 4. Enqueue events if needed
        await ctx.enqueue({
            'topic': 'next.step',
            'data': {
                'entity_id': entity_id,
                'action': 'create'
            }
        })
        
        # 5. Log success
        ctx.logger.info("✅ Step completed successfully")
        
        # 6. Return response
        return ApiResponse(
            status_code=200,
            body={'success': True}
        )
        
    except Exception as e:
        # Always log errors with context
        ctx.logger.error(f"❌ Error in {config['name']}: {e}")
        ctx.logger.error(f"Payload: {request.body}")
        
        return ApiResponse(
            status_code=500,
            body={'success': False, 'error': str(e)}
        )

Handler Signatures by Trigger Type

HTTP Trigger (Webhooks, APIs):

from motia import ApiRequest, ApiResponse, FlowContext

async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
    # Access: request.body, request.query_params, request.path_params
    return ApiResponse(status_code=200, body={...})

Queue Trigger (Event Handlers):

from motia import FlowContext

async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
    # Process event_data
    # No return value
    pass

Cron Trigger (Scheduled Jobs):

from motia import FlowContext

async def handler(input_data: None, ctx: FlowContext) -> None:
    # Cron jobs receive no input
    # No return value
    pass

Logging Best Practices

ALWAYS use ctx.logger, NEVER use print() or module-level logger:

# ✅ CORRECT: Context-aware logging
ctx.logger.info("Processing started")
ctx.logger.debug(f"Data: {data}")
ctx.logger.warn("Skipping invalid entry")
ctx.logger.error(f"Failed: {e}")

# ❌ WRONG: Direct print or module logger
print("Processing started")              # Not visible in iii logs
logger.info("Processing started")        # Loses context

Log Levels:

  • info() - Normal operations (start, success, counts)
  • debug() - Detailed data dumps (payloads, responses)
  • warn() - Non-critical issues (skipped items, fallbacks)
  • error() - Failures requiring attention

Log Structure:

# Section headers with visual separators
ctx.logger.info("=" * 80)
ctx.logger.info("🔄 SYNC HANDLER STARTED")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)

# Use emojis for visual scanning
ctx.logger.info("📥 Downloading file...")
ctx.logger.info("✅ Downloaded 1024 bytes")
ctx.logger.error("❌ Upload failed")
ctx.logger.warn("⚠️  No collections found")

Event Topics Naming

Pattern: {module}.{entity}.{action}

✅ Examples:
vmh.document.create
vmh.document.update
vmh.document.delete
vmh.beteiligte.create
calendar.sync.employee
advoware.proxy.response

❌ Avoid:
document-create          # Use dots, not dashes
DocumentCreate           # Use lowercase
create_document          # Wrong order

Service Layer Patterns

Service Base Class Pattern

All services should follow this pattern:

"""Service docstring"""
import logging
from typing import Optional
from services.logging_utils import get_service_logger

logger = logging.getLogger(__name__)


class MyService:
    """Service for interacting with External API"""
    
    def __init__(self, context=None):
        """
        Initialize service.
        
        Args:
            context: Optional Motia FlowContext for logging
        """
        self.context = context
        self.logger = get_service_logger('my_service', context)
        self._session = None
        
        # Load config from env
        self.api_key = os.getenv('MY_API_KEY', '')
        if not self.api_key:
            raise ValueError("MY_API_KEY not configured")
        
        self.logger.info("MyService initialized")
    
    def _log(self, message: str, level: str = 'info') -> None:
        """Internal logging helper"""
        log_func = getattr(self.logger, level, self.logger.info)
        log_func(message)
    
    async def _get_session(self):
        """Lazy session initialization"""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session
    
    async def close(self) -> None:
        """Cleanup resources"""
        if self._session and not self._session.closed:
            await self._session.close()

Sync Utilities Pattern

For bidirectional sync operations, inherit from BaseSyncUtils:

from services.sync_utils_base import BaseSyncUtils

class MyEntitySync(BaseSyncUtils):
    """Sync utilities for MyEntity"""
    
    def _get_lock_key(self, entity_id: str) -> str:
        """Required: Define lock key pattern"""
        return f"sync_lock:myentity:{entity_id}"
    
    async def should_sync(self, entity: Dict) -> Tuple[bool, str]:
        """
        Decide if sync is needed.
        
        Returns:
            (needs_sync: bool, reason: str)
        """
        # Implementation
        pass

Base class provides:

  • _log() - Context-aware logging
  • _acquire_redis_lock() - Distributed locking
  • _release_redis_lock() - Lock cleanup
  • self.espocrm - EspoCRM API client
  • self.redis - Redis client
  • self.context - Motia context
  • self.logger - Integration logger

External Integrations

xAI Collections Integration

Status: Fully Implemented

Service: services/xai_service.py

Environment Variables:

XAI_API_KEY=xai-...              # For file uploads (api.x.ai)
XAI_MANAGEMENT_KEY=xai-token-... # For collections (management-api.x.ai)

Usage Example:

from services.xai_service import XAIService

xai = XAIService(ctx)

# Upload file
file_id = await xai.upload_file(
    file_content=bytes_data,
    filename="document.pdf",
    mime_type="application/pdf"
)

# Add to collection
await xai.add_to_collection("collection_id", file_id)

# Add to multiple collections
added = await xai.add_to_collections(["col1", "col2"], file_id)

# Remove from collection
await xai.remove_from_collection("collection_id", file_id)

Architecture:

  • Files are uploaded ONCE to Files API (api.x.ai/v1/files)
  • Same file_id can be added to MULTIPLE collections
  • Removing from collection does NOT delete the file (may be used elsewhere)
  • Hash-based change detection prevents unnecessary reuploads

Document Sync Flow:

1. EspoCRM Webhook → vmh.document.{create|update|delete}
2. Document Sync Handler:
   a. Acquire distributed lock (prevents duplicate syncs)
   b. Load document from EspoCRM
   c. Check if sync needed:
      - dateiStatus = "Neu" or "Geändert" → SYNC
      - Hash changed → SYNC
      - Entity has xAI collections → SYNC
   d. Download file from EspoCRM
   e. Calculate MD5 hash
   f. Upload to xAI (or reuse existing file_id)
   g. Add to required collections
   h. Update EspoCRM metadata (xaiFileId, xaiCollections, xaiSyncedHash)
   i. Release lock
3. Delete: Remove from collections (keep file)

EspoCRM Fields (CDokumente):

{
    'xaiId': 'file-abc123',                    # xAI file_id
    'xaiCollections': ['col1', 'col2'],        # Array of collection IDs
    'xaiSyncedHash': 'abc123def456',           # MD5 at last sync
    'fileStatus': 'synced',                    # Status: new, changed, synced
    'xaiSyncStatus': 'clean',                  # Sync state: no_sync, pending_sync, clean, unclean, failed
    'md5sum': 'abc123def456',                  # Current file hash
    'sha256': 'def456...',                     # SHA-256 (optional)
}

EspoCRM Integration

Service: services/espocrm.py

Environment Variables:

ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-api-key
ESPOCRM_API_TIMEOUT_SECONDS=30

Usage:

from services.espocrm import EspoCRMAPI

espocrm = EspoCRMAPI(ctx)

# Get entity
entity = await espocrm.get_entity('CDokumente', entity_id)

# Update entity
await espocrm.update_entity('CDokumente', entity_id, {
    'xaiId': file_id,
    'fileStatus': 'synced'
})

# List entities
result = await espocrm.list_entities(
    'CDokumente',
    where=[{'type': 'equals', 'attribute': 'fileStatus', 'value': 'neu'}],
    select='id,name,fileStatus',
    max_size=50
)

# Download attachment
file_bytes = await espocrm.download_attachment(attachment_id)

Advoware Integration

Service: services/advoware_service.py

Authentication: HMAC-512 with token caching

Environment Variables:

ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret
REDIS_HOST=localhost
REDIS_PORT=6379

Proxy Endpoints:

  • GET /advoware/proxy?endpoint={path} - Proxy GET requests
  • POST /advoware/proxy - Proxy POST requests
  • PUT /advoware/proxy - Proxy PUT requests
  • DELETE /advoware/proxy - Proxy DELETE requests

Testing and Debugging

Start System

Production (systemd):

# Restart services
sudo systemctl restart motia.service
sudo systemctl restart iii-console.service

# Check status
sudo systemctl status motia.service
sudo systemctl status iii-console.service

# View real-time logs
journalctl -u motia.service -f
journalctl -u iii-console.service -f

# Enable auto-start on boot
sudo systemctl enable motia.service
sudo systemctl enable iii-console.service

Manual (Development):

# Start iii Engine
cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml

# Start iii Console (Web UI)
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
  --engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114

Check Registered Steps

curl http://localhost:3111/_console/functions | python3 -m json.tool

Test HTTP Endpoint

# Test document webhook
curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
  -H "Content-Type: application/json" \
  -d '[{"id": "test123", "entityType": "CDokumente"}]'

# Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees"

Manually Trigger Cron

curl -X POST "http://localhost:3111/_console/cron/trigger" \
  -H "Content-Type: application/json" \
  -d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'

View Logs

# Live logs via journalctl
journalctl -u motia-iii -f

# Search for specific step
journalctl --since "today" | grep -i "document sync"

# Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error

Common Issues

Step not showing up:

  1. Check file naming: Must end with _step.py
  2. Check for import errors: grep -i "importerror\|traceback" iii.log
  3. Verify config dict is present
  4. Restart iii engine

Redis connection failed:

  • Check REDIS_HOST and REDIS_PORT environment variables
  • Verify Redis is running: redis-cli ping
  • Service will work without Redis but with warnings

AttributeError '_log' not found:

  • Ensure service inherits from BaseSyncUtils OR
  • Implement _log() method manually

Key Patterns Summary

DO

  • Always use _step.py suffix for step files
  • Always use ctx.logger for logging (never print)
  • Always wrap handlers in try/except with error logging
  • Always use visual separators in logs ("=" * 80)
  • Always return ApiResponse from HTTP handlers
  • Always document what events a step emits
  • Always use distributed locks for sync operations
  • Always calculate hashes for change detection

DON'T

  • Don't use module-level logger in steps
  • Don't forget async on handler functions
  • Don't use blocking I/O (use aiohttp, not requests)
  • Don't return values from queue/cron handlers
  • Don't hardcode credentials (use environment variables)
  • Don't skip lock cleanup in finally blocks
  • Don't use print() for logging

Module Documentation


Module Documentation

Steps

Advoware Proxy (Module README)

  • Universal HTTP proxy with HMAC-512 authentication
  • Endpoints: GET, POST, PUT, DELETE
  • Redis token caching

Calendar Sync (Module README)

  • Bidirectional Advoware ↔ Google Calendar sync
  • Cron: Every 15 minutes
  • API trigger: /advoware/calendar/sync

VMH Integration (Module README)

  • EspoCRM ↔ Advoware bidirectional sync
  • Webhooks: Beteiligte, Bankverbindungen, Documents
  • xAI Collections integration for documents

Services

Service Purpose Config
xai_service.py xAI file uploads & collections XAI_API_KEY, XAI_MANAGEMENT_KEY
espocrm.py EspoCRM REST API client ESPOCRM_API_BASE_URL, ESPOCRM_API_KEY
advoware_service.py Advoware API with HMAC auth ADVOWARE_API_KEY, ADVOWARE_API_SECRET
document_sync_utils.py Document sync logic Redis connection
beteiligte_sync_utils.py Beteiligte sync logic Redis connection
sync_utils_base.py Base class for sync utils -

Quick Reference

Environment Variables

Required:

# EspoCRM
ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1
ESPOCRM_API_KEY=your-key

# Advoware
ADVOWARE_API_BASE_URL=https://api.advoware.de
ADVOWARE_API_KEY=your-key
ADVOWARE_API_SECRET=your-secret

# xAI
XAI_API_KEY=xai-...
XAI_MANAGEMENT_KEY=xai-token-...

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1

Optional:

ESPOCRM_API_TIMEOUT_SECONDS=30
ESPOCRM_METADATA_TTL_SECONDS=300

File Structure

bitbylaw/
├── iii-config.yaml           # Motia III configuration
├── pyproject.toml            # Python dependencies (uv)
├── steps/                    # Business logic
│   ├── advoware_proxy/
│   ├── advoware_cal_sync/
│   └── vmh/
│       ├── webhook/          # HTTP webhook handlers
│       │   ├── *_create_api_step.py
│       │   ├── *_update_api_step.py
│       │   └── *_delete_api_step.py
│       ├── *_sync_event_step.py   # Queue event handlers
│       └── *_sync_cron_step.py    # Scheduled jobs
├── services/                 # Shared services
│   ├── xai_service.py
│   ├── espocrm.py
│   ├── advoware_service.py
│   ├── *_sync_utils.py
│   ├── sync_utils_base.py
│   ├── logging_utils.py
│   └── exceptions.py
├── docs/                     # Documentation
│   ├── INDEX.md             # This file
│   ├── ARCHITECTURE.md
│   └── DOCUMENT_SYNC_XAI_STATUS.md
└── tests/                    # Test scripts
    └── test_xai_collections_api.py

Motia III vs Old Motia

Old Motia v0.17 Motia III v1.0-RC
type: 'api' triggers: [http()]
type: 'event' triggers: [queue()]
type: 'cron' triggers: [cron()]
context.emit() ctx.enqueue()
emits: [...] enqueues: [...]
subscribes: [...] triggers: [queue('topic')]
5-field cron 6-field cron (seconds first)
context.logger ctx.logger
Motia Workbench iii Console
Node.js + Python Pure Python

Cron Syntax

6 fields (Motia III): second minute hour day month weekday

0 */15 * * * *     # Every 15 minutes
0 0 */6 * * *      # Every 6 hours
0 0 2 * * *        # Daily at 2 AM
0 30 9 * * 1-5     # Monday-Friday at 9:30 AM

Additional Resources

Documentation

External Resources

Support & Troubleshooting

Issue Solution
Step not registered Check _step.py suffix, restart: sudo systemctl restart motia.service
Import errors Check logs: journalctl -u motia.service | grep -i importerror
Code changes not applied Auto-reload should work, force restart: sudo systemctl restart motia.service
ApiResponse validation Use status=200 not status_code=200
Redis unavailable Service works with warnings, check REDIS_HOST
'_log' not found Inherit from BaseSyncUtils or implement _log() method
Webhook not triggering Verify endpoint: curl http://localhost:3111/_console/functions
xAI upload fails Set XAI_API_KEY and XAI_MANAGEMENT_KEY in /etc/systemd/system/motia.service
View logs journalctl -u motia.service -f (follow) or --since "10 minutes ago"

Last Updated: 2026-03-08
Migration Status: Complete
xAI Integration: Implemented