diff --git a/docs/INDEX.md b/docs/INDEX.md index 1861ffe..80d85ca 100644 --- a/docs/INDEX.md +++ b/docs/INDEX.md @@ -88,7 +88,12 @@ Rules: Lock Acquisition: ├─ Try acquire lock ├─ Success → Process → Release in finally block -└─ Fail (already locked) → Return early → Motia retries later +└─ Fail (already locked) → Raise exception → Motia retries later + +Lock Configuration: +- TTL: 30 minutes (auto-release on timeout) +- Robust release: Always in finally block +- Dead lock prevention: TTL ensures eventual recovery ``` **Handler Pattern:** @@ -98,11 +103,11 @@ async def handler(event_data, ctx): # 1. Try acquire lock lock_key = f"document_sync:{entity_type}:{entity_id}" - lock_acquired = await acquire_lock(lock_key) + lock_acquired = await acquire_lock(lock_key, ttl=1800) # 30 minutes if not lock_acquired: - ctx.logger.info(f"⏸️ Lock busy, skipping (Motia will retry)") - return # Early return - no error, just skip + # Raise exception → Motia will retry + raise RuntimeError(f"⏸️ Lock busy for {lock_key}, retry later") try: # 2. Do work (only if lock acquired) @@ -114,11 +119,11 @@ async def handler(event_data, ctx): ``` **Retry Behavior:** -- Lock fail → Handler returns without error -- Motia Queue sees: No error, no completion marker -- Motia automatically retries after configured delay -- Eventually lock available → Handler processes - Check: fileStatus="unchanged", xaiSyncStatus="clean" +- Lock fail → Handler raises exception +- Motia Queue catches exception → Schedules retry +- Retry after configured delay (exponential backoff) +- Eventually lock available → Handler processes successfully + Check: fileStatus="synced", xaiSyncStatus="clean" → Early return (nothing to do) ✅ Result: Second event processed but no duplicate work! @@ -264,6 +269,7 @@ services/ - Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM) - Idempotent lookups (can verify both directions) - Debugging: Always know where things came from +- **Single Source of Truth**: EspoCRM is authoritative for relationships **Example: Collection Creation** ```python @@ -285,6 +291,549 @@ await espocrm.update_entity("CBeteiligte", "abc123", { --- +### Error Handling & Resilience + +**Principle: "Retry Smart, Fail Visible, Recover When Possible"** + +``` +╔═══════════════════════════════════════════════════════╗ +║ Error Handling Strategy (in priority order): ║ +║ ║ +║ 1️⃣ Idempotent Check → Skip if already done ║ +║ 2️⃣ Step-internal retry with exponential backoff ║ +║ 3️⃣ Rollback/Compensation (when possible) ║ +║ 4️⃣ Log failure clearly → Let Motia retry ║ +║ 5️⃣ Manual intervention (for unrecoverable failures) ║ +╚═══════════════════════════════════════════════════════╝ +``` + +#### Idempotency First + +**Always check if work already done before starting:** + +```python +async def sync_document_handler(event_data, ctx): + entity_id = event_data['entity_id'] + + # 1. Load current state + doc = await espocrm.get_entity('CDokumente', entity_id) + + # 2. Check if already synced (idempotent check) + if doc.get('xaiSyncStatus') == 'clean': + current_md5 = doc.get('md5sum') + synced_md5 = doc.get('xaiSyncedHash') + + if current_md5 == synced_md5: + ctx.logger.info(f"✅ Already synced, skipping (hash match)") + return # Nothing to do + + # 3. Check if file already uploaded to xAI + xai_file_id = doc.get('xaiId') + if xai_file_id: + # Verify file exists in xAI + file_exists = await xai.file_exists(xai_file_id) + if file_exists: + ctx.logger.info(f"✅ File already in xAI: {xai_file_id}") + # Only update collections, not re-upload + + # 4. Proceed with sync... +``` + +#### Retry with Exponential Backoff + +**For transient failures (network, rate limits):** + +```python +import asyncio +from typing import TypeVar, Callable + +T = TypeVar('T') + +async def retry_with_backoff( + operation: Callable[[], T], + max_attempts: int = 3, + base_delay: float = 1.0, + max_delay: float = 30.0, + ctx = None +) -> T: + """ + Retry operation with exponential backoff. + + Args: + operation: Async function to retry + max_attempts: Maximum retry attempts (not endless!) + base_delay: Initial delay in seconds + max_delay: Maximum delay cap + """ + for attempt in range(1, max_attempts + 1): + try: + return await operation() + + except Exception as e: + if attempt == max_attempts: + # Log and re-raise on final attempt + if ctx: + ctx.logger.error( + f"❌ Operation failed after {max_attempts} attempts: {e}" + ) + raise + + # Calculate backoff delay + delay = min(base_delay * (2 ** (attempt - 1)), max_delay) + + if ctx: + ctx.logger.warn( + f"⚠️ Attempt {attempt}/{max_attempts} failed: {e}. " + f"Retrying in {delay}s..." + ) + + await asyncio.sleep(delay) + +# Usage +file_id = await retry_with_backoff( + lambda: xai.upload_file(content, filename), + max_attempts=3, + ctx=ctx +) +``` + +#### Rollback/Compensation + +**When operations fail mid-way, clean up what was created:** + +```python +async def sync_document_with_rollback(entity_id, ctx): + xai_file_id = None + collection_created = False + + try: + # 1. Upload file to xAI + xai_file_id = await xai.upload_file(content, filename) + ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}") + + # 2. Create collection if needed + collection_id = await xai.create_collection(name, metadata) + collection_created = True + ctx.logger.info(f"✅ Created collection: {collection_id}") + + # 3. Add file to collection + await xai.add_to_collection(collection_id, xai_file_id) + + # 4. Update EspoCRM (critical - if this fails, we have orphans) + await espocrm.update_entity('CDokumente', entity_id, { + 'xaiId': xai_file_id, + 'xaiCollections': [collection_id], + 'xaiSyncStatus': 'clean' + }) + + except Exception as e: + ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}") + + # ROLLBACK: Remove what we created + try: + if xai_file_id: + # Note: Only remove from collections, not delete file + # (file may be referenced elsewhere) + ctx.logger.info(f"⚠️ Leaving orphaned file in xAI: {xai_file_id}") + ctx.logger.info(f"⚠️ Manual cleanup may be required") + + # Update EspoCRM to reflect failure + await espocrm.update_entity('CDokumente', entity_id, { + 'xaiSyncStatus': 'failed', + 'xaiSyncError': str(e) + }) + + except Exception as rollback_error: + ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}") + ctx.logger.error(f"⚠️ REQUIRES MANUAL INTERVENTION") + ctx.logger.error(f"⚠️ Entity: {entity_id}, xaiId: {xai_file_id}") + + # Re-raise original exception for Motia retry + raise +``` + +**Reality Check:** +- **Rollback not always possible** (e.g., Advoware API, third-party services) +- **Idempotency >> Rollback** - Better to handle re-execution than undo +- **Log orphans clearly** - Enable manual cleanup when needed + +#### Clear Failure Logging + +**When rollback impossible, log enough details for manual intervention:** + +```python +try: + await external_api.create_resource(data) +except Exception as e: + # Log all context needed for manual recovery + ctx.logger.error("=" * 80) + ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED") + ctx.logger.error("=" * 80) + ctx.logger.error(f"Entity Type: CDokumente") + ctx.logger.error(f"Entity ID: {entity_id}") + ctx.logger.error(f"Operation: Create Advoware Document") + ctx.logger.error(f"Error: {e}") + ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}") + ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}") + ctx.logger.error("=" * 80) + + # Mark as failed in EspoCRM + await espocrm.update_entity('CDokumente', entity_id, { + 'syncStatus': 'failed', + 'syncError': str(e), + 'syncFailedAt': datetime.now().isoformat() + }) + + # Re-raise for Motia (will retry later) + raise +``` + +#### Rate Limiting (xAI) + +**xAI has rate limits - handle with backoff:** + +```python +from aiohttp import ClientResponseError + +async def upload_with_rate_limit_handling(xai, content, filename, ctx): + """Upload file to xAI with rate limit handling.""" + try: + return await retry_with_backoff( + lambda: xai.upload_file(content, filename), + max_attempts=5, # More attempts for rate limits + base_delay=2.0, # Longer initial delay + max_delay=60.0, # Up to 1 minute backoff + ctx=ctx + ) + + except ClientResponseError as e: + if e.status == 429: # Rate limit exceeded + ctx.logger.error(f"❌ xAI rate limit exceeded") + + # Check Retry-After header + retry_after = e.headers.get('Retry-After', '60') + ctx.logger.error(f"⏰ Retry after {retry_after} seconds") + + # Wait and try once more + await asyncio.sleep(int(retry_after)) + return await xai.upload_file(content, filename) + + raise +``` + +**Rate Limit Strategy:** +- **xAI**: Exponential backoff + respect `Retry-After` header +- **EspoCRM**: Internal system, no rate limits +- **Advoware**: No known rate limits (backoff on errors) + +#### Input Validation + +**Always validate webhook payloads and external data:** + +```python +from typing import Dict, Any, List + +def validate_webhook_payload( + payload: Any, + required_fields: List[str], + ctx +) -> Dict[str, Any]: + """ + Validate webhook payload structure. + + Raises: + ValueError: If validation fails + """ + # Check payload is dict + if not isinstance(payload, dict): + raise ValueError(f"Payload must be dict, got {type(payload)}") + + # Check required fields + missing = [f for f in required_fields if f not in payload] + if missing: + raise ValueError(f"Missing required fields: {missing}") + + # Validate entity_id format (UUID) + entity_id = payload.get('entity_id', '') + if not entity_id or len(entity_id) < 10: + raise ValueError(f"Invalid entity_id: {entity_id}") + + ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}") + return payload + +# Usage in handler +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + try: + # Validate input + payload = validate_webhook_payload( + request.body, + required_fields=['entity_id', 'entity_type', 'action'], + ctx=ctx + ) + + # Process... + + except ValueError as e: + ctx.logger.error(f"❌ Invalid payload: {e}") + return ApiResponse(status_code=400, body={'error': str(e)}) +``` + +**Validation Checklist:** +- ✅ Type checking (dict, list, str) +- ✅ Required fields present +- ✅ Format validation (IDs, dates, enums) +- ✅ Length limits (prevent DoS) +- ✅ Enum validation (status values) + +#### Environment Configuration + +**Always use environment variables, validate on startup:** + +```python +import os +from typing import Optional + +class ServiceConfig: + """Validated configuration from environment.""" + + def __init__(self): + # Required + self.xai_api_key = self._require_env('XAI_API_KEY') + self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL') + + # Optional with defaults + self.redis_host = os.getenv('REDIS_HOST', 'localhost') + self.redis_port = int(os.getenv('REDIS_PORT', '6379')) + self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800')) # 30min + + def _require_env(self, key: str) -> str: + """Get required env var or raise.""" + value = os.getenv(key) + if not value: + raise ValueError(f"Required environment variable not set: {key}") + return value + +# Usage +config = ServiceConfig() # Fails fast on startup if misconfigured +``` + +**Notes:** +- Collections lifecycle (creation/deletion) managed by EspoCRM entity status +- Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere) +- Lock TTL: 30 minutes (forced release on timeout) + +--- + +### Type Safety & Explicit Contracts + +**Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"** + +``` +╔═══════════════════════════════════════════════════════╗ +║ Prefer: ║ +║ ✅ Enums over string literals ║ +║ ✅ Type hints over duck typing ║ +║ ✅ Named constants over magic numbers ║ +║ ✅ Central definitions over scattered duplicates ║ +╚═══════════════════════════════════════════════════════╝ +``` + +#### Status Enums + +**Define all valid status values centrally:** + +```python +# services/models.py +from enum import Enum + +class FileStatus(str, Enum): + """Valid values for CDokumente.fileStatus field.""" + NEW = "new" + CHANGED = "changed" + SYNCED = "synced" + + def __str__(self) -> str: + return self.value + +class XAISyncStatus(str, Enum): + """Valid values for CDokumente.xaiSyncStatus field.""" + NO_SYNC = "no_sync" # Entity has no xAI collections + PENDING_SYNC = "pending_sync" # Sync in progress (locked) + CLEAN = "clean" # Synced successfully + UNCLEAN = "unclean" # Needs re-sync (file changed) + FAILED = "failed" # Sync failed (see xaiSyncError) + + def __str__(self) -> str: + return self.value + +class EntityAction(str, Enum): + """Valid webhook actions.""" + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + LINK = "link" + UNLINK = "unlink" + + def __str__(self) -> str: + return self.value + +class EntityType(str, Enum): + """Supported EspoCRM entity types.""" + DOKUMENTE = "CDokumente" + BETEILIGTE = "CBeteiligte" + RAEUMUNGSKLAGE = "Raeumungsklage" + ACCOUNT = "Account" + + def __str__(self) -> str: + return self.value +``` + +#### Usage in Code + +**Before (Magic Strings):** + +```python +# ❌ BAD: Typos not caught at runtime +if doc['xaiSyncStatus'] == 'claen': # Typo! + pass + +if doc['fileStatus'] == 'synced': + # No IDE autocomplete, no type safety + pass + +# ❌ BAD: Scattered definitions +status = 'pending_sync' # What are ALL valid values? +``` + +**After (Type-Safe Enums):** + +```python +# ✅ GOOD: Type-safe, autocomplete, refactorable +from services.models import FileStatus, XAISyncStatus + +if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN: + # IDE autocomplete works + # Typos caught by linter + # Easy to find all usages + pass + +# Update status +await espocrm.update_entity('CDokumente', entity_id, { + 'fileStatus': FileStatus.SYNCED, + 'xaiSyncStatus': XAISyncStatus.CLEAN +}) + +# Enum validation +def validate_status(status: str) -> XAISyncStatus: + """Validate and convert status string.""" + try: + return XAISyncStatus(status) + except ValueError: + valid = [s.value for s in XAISyncStatus] + raise ValueError(f"Invalid status '{status}'. Valid: {valid}") +``` + +#### Benefits + +**Why Enums Matter:** + +1. **Prevent Typos**: `XAISyncStatus.CLAEN` → AttributeError at import time +2. **IDE Support**: Autocomplete shows all valid values +3. **Refactoring**: Rename in one place, affects entire codebase +4. **Documentation**: Single source of truth for valid values +5. **Type Safety**: `mypy` can catch invalid assignments +6. **Debugging**: Easy to find all usages with "Find References" + +**Example - Easy Refactoring:** +```python +# Change enum value in one place: +class FileStatus(str, Enum): + NEW = "new" + CHANGED = "changed" + SYNCED = "synchronized" # Changed! + +# All usages automatically updated (string value changes): +FileStatus.SYNCED # Returns "synchronized" everywhere +``` + +#### Type Hints + +**Always use type hints for function signatures:** + +```python +from typing import Dict, List, Optional, Tuple +from services.models import FileStatus, XAISyncStatus + +# ✅ GOOD: Clear contracts +async def sync_document( + entity_id: str, + ctx: FlowContext +) -> Tuple[bool, Optional[str]]: + """ + Sync document to xAI. + + Args: + entity_id: EspoCRM document ID + ctx: Motia flow context + + Returns: + (success: bool, xai_file_id: Optional[str]) + """ + pass + +async def should_sync_to_xai( + doc: Dict[str, Any] +) -> Tuple[bool, str]: + """ + Check if document needs xAI sync. + + Returns: + (should_sync: bool, reason: str) + """ + status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync')) + + if status == XAISyncStatus.CLEAN: + return (False, "Already synced") + + return (True, "Sync required") +``` + +#### Constants + +**Define magic values as named constants:** + +```python +# ✅ GOOD: Central configuration +# services/config.py + +# Lock TTLs +LOCK_TTL_DOCUMENT_SYNC = 1800 # 30 minutes +LOCK_TTL_COLLECTION_CREATE = 300 # 5 minutes + +# Retry configuration +RETRY_MAX_ATTEMPTS = 3 +RETRY_BASE_DELAY = 1.0 +RETRY_MAX_DELAY = 30.0 + +# xAI rate limits +XAI_RATE_LIMIT_RETRY_AFTER = 60 # Default retry delay +XAI_MAX_FILE_SIZE_MB = 512 + +# EspoCRM pagination +ESPOCRM_DEFAULT_PAGE_SIZE = 50 +ESPOCRM_MAX_PAGE_SIZE = 200 + +# Usage +from services.config import LOCK_TTL_DOCUMENT_SYNC + +lock_acquired = await acquire_lock( + lock_key, + ttl=LOCK_TTL_DOCUMENT_SYNC # Clear, refactorable +) +``` + +--- + ## Step Development Best Practices ### File Naming Convention @@ -631,7 +1180,8 @@ await xai.remove_from_collection("collection_id", file_id) 'xaiId': 'file-abc123', # xAI file_id 'xaiCollections': ['col1', 'col2'], # Array of collection IDs 'xaiSyncedHash': 'abc123def456', # MD5 at last sync - 'fileStatus': 'synced', # Status: neu, geändert, synced + 'fileStatus': 'synced', # Status: new, changed, synced + 'xaiSyncStatus': 'clean', # Sync state: no_sync, pending_sync, clean, unclean, failed 'md5sum': 'abc123def456', # Current file hash 'sha256': 'def456...', # SHA-256 (optional) }