docs: enhance error handling and locking strategies in document synchronization

This commit is contained in:
bsiggel
2026-03-08 20:58:58 +00:00
parent fd0196ec31
commit 2e449d2928

View File

@@ -88,7 +88,12 @@ Rules:
Lock Acquisition: Lock Acquisition:
├─ Try acquire lock ├─ Try acquire lock
├─ Success → Process → Release in finally block ├─ Success → Process → Release in finally block
└─ Fail (already locked) → Return early → Motia retries later └─ Fail (already locked) → Raise exception → Motia retries later
Lock Configuration:
- TTL: 30 minutes (auto-release on timeout)
- Robust release: Always in finally block
- Dead lock prevention: TTL ensures eventual recovery
``` ```
**Handler Pattern:** **Handler Pattern:**
@@ -98,11 +103,11 @@ async def handler(event_data, ctx):
# 1. Try acquire lock # 1. Try acquire lock
lock_key = f"document_sync:{entity_type}:{entity_id}" lock_key = f"document_sync:{entity_type}:{entity_id}"
lock_acquired = await acquire_lock(lock_key) lock_acquired = await acquire_lock(lock_key, ttl=1800) # 30 minutes
if not lock_acquired: if not lock_acquired:
ctx.logger.info(f"⏸️ Lock busy, skipping (Motia will retry)") # Raise exception → Motia will retry
return # Early return - no error, just skip raise RuntimeError(f"⏸️ Lock busy for {lock_key}, retry later")
try: try:
# 2. Do work (only if lock acquired) # 2. Do work (only if lock acquired)
@@ -114,11 +119,11 @@ async def handler(event_data, ctx):
``` ```
**Retry Behavior:** **Retry Behavior:**
- Lock fail → Handler returns without error - Lock fail → Handler raises exception
- Motia Queue sees: No error, no completion marker - Motia Queue catches exception → Schedules retry
- Motia automatically retries after configured delay - Retry after configured delay (exponential backoff)
- Eventually lock available → Handler processes - Eventually lock available → Handler processes successfully
Check: fileStatus="unchanged", xaiSyncStatus="clean" Check: fileStatus="synced", xaiSyncStatus="clean"
→ Early return (nothing to do) ✅ → Early return (nothing to do) ✅
Result: Second event processed but no duplicate work! Result: Second event processed but no duplicate work!
@@ -264,6 +269,7 @@ services/
- Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM) - Easy relationship building (e.g., Advoware Akte ↔ xAI Collection via EspoCRM)
- Idempotent lookups (can verify both directions) - Idempotent lookups (can verify both directions)
- Debugging: Always know where things came from - Debugging: Always know where things came from
- **Single Source of Truth**: EspoCRM is authoritative for relationships
**Example: Collection Creation** **Example: Collection Creation**
```python ```python
@@ -285,6 +291,549 @@ await espocrm.update_entity("CBeteiligte", "abc123", {
--- ---
### Error Handling & Resilience
**Principle: "Retry Smart, Fail Visible, Recover When Possible"**
```
╔═══════════════════════════════════════════════════════╗
║ Error Handling Strategy (in priority order): ║
║ ║
║ 1⃣ Idempotent Check → Skip if already done ║
║ 2⃣ Step-internal retry with exponential backoff ║
║ 3⃣ Rollback/Compensation (when possible) ║
║ 4⃣ Log failure clearly → Let Motia retry ║
║ 5⃣ Manual intervention (for unrecoverable failures) ║
╚═══════════════════════════════════════════════════════╝
```
#### Idempotency First
**Always check if work already done before starting:**
```python
async def sync_document_handler(event_data, ctx):
entity_id = event_data['entity_id']
# 1. Load current state
doc = await espocrm.get_entity('CDokumente', entity_id)
# 2. Check if already synced (idempotent check)
if doc.get('xaiSyncStatus') == 'clean':
current_md5 = doc.get('md5sum')
synced_md5 = doc.get('xaiSyncedHash')
if current_md5 == synced_md5:
ctx.logger.info(f"✅ Already synced, skipping (hash match)")
return # Nothing to do
# 3. Check if file already uploaded to xAI
xai_file_id = doc.get('xaiId')
if xai_file_id:
# Verify file exists in xAI
file_exists = await xai.file_exists(xai_file_id)
if file_exists:
ctx.logger.info(f"✅ File already in xAI: {xai_file_id}")
# Only update collections, not re-upload
# 4. Proceed with sync...
```
#### Retry with Exponential Backoff
**For transient failures (network, rate limits):**
```python
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def retry_with_backoff(
operation: Callable[[], T],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
ctx = None
) -> T:
"""
Retry operation with exponential backoff.
Args:
operation: Async function to retry
max_attempts: Maximum retry attempts (not endless!)
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
"""
for attempt in range(1, max_attempts + 1):
try:
return await operation()
except Exception as e:
if attempt == max_attempts:
# Log and re-raise on final attempt
if ctx:
ctx.logger.error(
f"❌ Operation failed after {max_attempts} attempts: {e}"
)
raise
# Calculate backoff delay
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if ctx:
ctx.logger.warn(
f"⚠️ Attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
# Usage
file_id = await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=3,
ctx=ctx
)
```
#### Rollback/Compensation
**When operations fail mid-way, clean up what was created:**
```python
async def sync_document_with_rollback(entity_id, ctx):
xai_file_id = None
collection_created = False
try:
# 1. Upload file to xAI
xai_file_id = await xai.upload_file(content, filename)
ctx.logger.info(f"✅ Uploaded to xAI: {xai_file_id}")
# 2. Create collection if needed
collection_id = await xai.create_collection(name, metadata)
collection_created = True
ctx.logger.info(f"✅ Created collection: {collection_id}")
# 3. Add file to collection
await xai.add_to_collection(collection_id, xai_file_id)
# 4. Update EspoCRM (critical - if this fails, we have orphans)
await espocrm.update_entity('CDokumente', entity_id, {
'xaiId': xai_file_id,
'xaiCollections': [collection_id],
'xaiSyncStatus': 'clean'
})
except Exception as e:
ctx.logger.error(f"❌ Sync failed, attempting rollback: {e}")
# ROLLBACK: Remove what we created
try:
if xai_file_id:
# Note: Only remove from collections, not delete file
# (file may be referenced elsewhere)
ctx.logger.info(f"⚠️ Leaving orphaned file in xAI: {xai_file_id}")
ctx.logger.info(f"⚠️ Manual cleanup may be required")
# Update EspoCRM to reflect failure
await espocrm.update_entity('CDokumente', entity_id, {
'xaiSyncStatus': 'failed',
'xaiSyncError': str(e)
})
except Exception as rollback_error:
ctx.logger.error(f"❌ ROLLBACK FAILED: {rollback_error}")
ctx.logger.error(f"⚠️ REQUIRES MANUAL INTERVENTION")
ctx.logger.error(f"⚠️ Entity: {entity_id}, xaiId: {xai_file_id}")
# Re-raise original exception for Motia retry
raise
```
**Reality Check:**
- **Rollback not always possible** (e.g., Advoware API, third-party services)
- **Idempotency >> Rollback** - Better to handle re-execution than undo
- **Log orphans clearly** - Enable manual cleanup when needed
#### Clear Failure Logging
**When rollback impossible, log enough details for manual intervention:**
```python
try:
await external_api.create_resource(data)
except Exception as e:
# Log all context needed for manual recovery
ctx.logger.error("=" * 80)
ctx.logger.error("❌ UNRECOVERABLE FAILURE - MANUAL INTERVENTION REQUIRED")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Entity Type: CDokumente")
ctx.logger.error(f"Entity ID: {entity_id}")
ctx.logger.error(f"Operation: Create Advoware Document")
ctx.logger.error(f"Error: {e}")
ctx.logger.error(f"Payload: {json.dumps(data, indent=2)}")
ctx.logger.error(f"Timestamp: {datetime.now().isoformat()}")
ctx.logger.error("=" * 80)
# Mark as failed in EspoCRM
await espocrm.update_entity('CDokumente', entity_id, {
'syncStatus': 'failed',
'syncError': str(e),
'syncFailedAt': datetime.now().isoformat()
})
# Re-raise for Motia (will retry later)
raise
```
#### Rate Limiting (xAI)
**xAI has rate limits - handle with backoff:**
```python
from aiohttp import ClientResponseError
async def upload_with_rate_limit_handling(xai, content, filename, ctx):
"""Upload file to xAI with rate limit handling."""
try:
return await retry_with_backoff(
lambda: xai.upload_file(content, filename),
max_attempts=5, # More attempts for rate limits
base_delay=2.0, # Longer initial delay
max_delay=60.0, # Up to 1 minute backoff
ctx=ctx
)
except ClientResponseError as e:
if e.status == 429: # Rate limit exceeded
ctx.logger.error(f"❌ xAI rate limit exceeded")
# Check Retry-After header
retry_after = e.headers.get('Retry-After', '60')
ctx.logger.error(f"⏰ Retry after {retry_after} seconds")
# Wait and try once more
await asyncio.sleep(int(retry_after))
return await xai.upload_file(content, filename)
raise
```
**Rate Limit Strategy:**
- **xAI**: Exponential backoff + respect `Retry-After` header
- **EspoCRM**: Internal system, no rate limits
- **Advoware**: No known rate limits (backoff on errors)
#### Input Validation
**Always validate webhook payloads and external data:**
```python
from typing import Dict, Any, List
def validate_webhook_payload(
payload: Any,
required_fields: List[str],
ctx
) -> Dict[str, Any]:
"""
Validate webhook payload structure.
Raises:
ValueError: If validation fails
"""
# Check payload is dict
if not isinstance(payload, dict):
raise ValueError(f"Payload must be dict, got {type(payload)}")
# Check required fields
missing = [f for f in required_fields if f not in payload]
if missing:
raise ValueError(f"Missing required fields: {missing}")
# Validate entity_id format (UUID)
entity_id = payload.get('entity_id', '')
if not entity_id or len(entity_id) < 10:
raise ValueError(f"Invalid entity_id: {entity_id}")
ctx.logger.debug(f"✅ Payload validated: {list(payload.keys())}")
return payload
# Usage in handler
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
try:
# Validate input
payload = validate_webhook_payload(
request.body,
required_fields=['entity_id', 'entity_type', 'action'],
ctx=ctx
)
# Process...
except ValueError as e:
ctx.logger.error(f"❌ Invalid payload: {e}")
return ApiResponse(status_code=400, body={'error': str(e)})
```
**Validation Checklist:**
- ✅ Type checking (dict, list, str)
- ✅ Required fields present
- ✅ Format validation (IDs, dates, enums)
- ✅ Length limits (prevent DoS)
- ✅ Enum validation (status values)
#### Environment Configuration
**Always use environment variables, validate on startup:**
```python
import os
from typing import Optional
class ServiceConfig:
"""Validated configuration from environment."""
def __init__(self):
# Required
self.xai_api_key = self._require_env('XAI_API_KEY')
self.espocrm_url = self._require_env('ESPOCRM_API_BASE_URL')
# Optional with defaults
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
self.lock_ttl = int(os.getenv('LOCK_TTL_SECONDS', '1800')) # 30min
def _require_env(self, key: str) -> str:
"""Get required env var or raise."""
value = os.getenv(key)
if not value:
raise ValueError(f"Required environment variable not set: {key}")
return value
# Usage
config = ServiceConfig() # Fails fast on startup if misconfigured
```
**Notes:**
- Collections lifecycle (creation/deletion) managed by EspoCRM entity status
- Document deletion: Remove from xAI collections, keep file (may be referenced elsewhere)
- Lock TTL: 30 minutes (forced release on timeout)
---
### Type Safety & Explicit Contracts
**Principle: "Explicit over Implicit - Magic Strings Kill Refactoring"**
```
╔═══════════════════════════════════════════════════════╗
║ Prefer: ║
║ ✅ Enums over string literals ║
║ ✅ Type hints over duck typing ║
║ ✅ Named constants over magic numbers ║
║ ✅ Central definitions over scattered duplicates ║
╚═══════════════════════════════════════════════════════╝
```
#### Status Enums
**Define all valid status values centrally:**
```python
# services/models.py
from enum import Enum
class FileStatus(str, Enum):
"""Valid values for CDokumente.fileStatus field."""
NEW = "new"
CHANGED = "changed"
SYNCED = "synced"
def __str__(self) -> str:
return self.value
class XAISyncStatus(str, Enum):
"""Valid values for CDokumente.xaiSyncStatus field."""
NO_SYNC = "no_sync" # Entity has no xAI collections
PENDING_SYNC = "pending_sync" # Sync in progress (locked)
CLEAN = "clean" # Synced successfully
UNCLEAN = "unclean" # Needs re-sync (file changed)
FAILED = "failed" # Sync failed (see xaiSyncError)
def __str__(self) -> str:
return self.value
class EntityAction(str, Enum):
"""Valid webhook actions."""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
LINK = "link"
UNLINK = "unlink"
def __str__(self) -> str:
return self.value
class EntityType(str, Enum):
"""Supported EspoCRM entity types."""
DOKUMENTE = "CDokumente"
BETEILIGTE = "CBeteiligte"
RAEUMUNGSKLAGE = "Raeumungsklage"
ACCOUNT = "Account"
def __str__(self) -> str:
return self.value
```
#### Usage in Code
**Before (Magic Strings):**
```python
# ❌ BAD: Typos not caught at runtime
if doc['xaiSyncStatus'] == 'claen': # Typo!
pass
if doc['fileStatus'] == 'synced':
# No IDE autocomplete, no type safety
pass
# ❌ BAD: Scattered definitions
status = 'pending_sync' # What are ALL valid values?
```
**After (Type-Safe Enums):**
```python
# ✅ GOOD: Type-safe, autocomplete, refactorable
from services.models import FileStatus, XAISyncStatus
if doc['xaiSyncStatus'] == XAISyncStatus.CLEAN:
# IDE autocomplete works
# Typos caught by linter
# Easy to find all usages
pass
# Update status
await espocrm.update_entity('CDokumente', entity_id, {
'fileStatus': FileStatus.SYNCED,
'xaiSyncStatus': XAISyncStatus.CLEAN
})
# Enum validation
def validate_status(status: str) -> XAISyncStatus:
"""Validate and convert status string."""
try:
return XAISyncStatus(status)
except ValueError:
valid = [s.value for s in XAISyncStatus]
raise ValueError(f"Invalid status '{status}'. Valid: {valid}")
```
#### Benefits
**Why Enums Matter:**
1. **Prevent Typos**: `XAISyncStatus.CLAEN` → AttributeError at import time
2. **IDE Support**: Autocomplete shows all valid values
3. **Refactoring**: Rename in one place, affects entire codebase
4. **Documentation**: Single source of truth for valid values
5. **Type Safety**: `mypy` can catch invalid assignments
6. **Debugging**: Easy to find all usages with "Find References"
**Example - Easy Refactoring:**
```python
# Change enum value in one place:
class FileStatus(str, Enum):
NEW = "new"
CHANGED = "changed"
SYNCED = "synchronized" # Changed!
# All usages automatically updated (string value changes):
FileStatus.SYNCED # Returns "synchronized" everywhere
```
#### Type Hints
**Always use type hints for function signatures:**
```python
from typing import Dict, List, Optional, Tuple
from services.models import FileStatus, XAISyncStatus
# ✅ GOOD: Clear contracts
async def sync_document(
entity_id: str,
ctx: FlowContext
) -> Tuple[bool, Optional[str]]:
"""
Sync document to xAI.
Args:
entity_id: EspoCRM document ID
ctx: Motia flow context
Returns:
(success: bool, xai_file_id: Optional[str])
"""
pass
async def should_sync_to_xai(
doc: Dict[str, Any]
) -> Tuple[bool, str]:
"""
Check if document needs xAI sync.
Returns:
(should_sync: bool, reason: str)
"""
status = XAISyncStatus(doc.get('xaiSyncStatus', 'no_sync'))
if status == XAISyncStatus.CLEAN:
return (False, "Already synced")
return (True, "Sync required")
```
#### Constants
**Define magic values as named constants:**
```python
# ✅ GOOD: Central configuration
# services/config.py
# Lock TTLs
LOCK_TTL_DOCUMENT_SYNC = 1800 # 30 minutes
LOCK_TTL_COLLECTION_CREATE = 300 # 5 minutes
# Retry configuration
RETRY_MAX_ATTEMPTS = 3
RETRY_BASE_DELAY = 1.0
RETRY_MAX_DELAY = 30.0
# xAI rate limits
XAI_RATE_LIMIT_RETRY_AFTER = 60 # Default retry delay
XAI_MAX_FILE_SIZE_MB = 512
# EspoCRM pagination
ESPOCRM_DEFAULT_PAGE_SIZE = 50
ESPOCRM_MAX_PAGE_SIZE = 200
# Usage
from services.config import LOCK_TTL_DOCUMENT_SYNC
lock_acquired = await acquire_lock(
lock_key,
ttl=LOCK_TTL_DOCUMENT_SYNC # Clear, refactorable
)
```
---
## Step Development Best Practices ## Step Development Best Practices
### File Naming Convention ### File Naming Convention
@@ -631,7 +1180,8 @@ await xai.remove_from_collection("collection_id", file_id)
'xaiId': 'file-abc123', # xAI file_id 'xaiId': 'file-abc123', # xAI file_id
'xaiCollections': ['col1', 'col2'], # Array of collection IDs 'xaiCollections': ['col1', 'col2'], # Array of collection IDs
'xaiSyncedHash': 'abc123def456', # MD5 at last sync 'xaiSyncedHash': 'abc123def456', # MD5 at last sync
'fileStatus': 'synced', # Status: neu, geändert, synced 'fileStatus': 'synced', # Status: new, changed, synced
'xaiSyncStatus': 'clean', # Sync state: no_sync, pending_sync, clean, unclean, failed
'md5sum': 'abc123def456', # Current file hash 'md5sum': 'abc123def456', # Current file hash
'sha256': 'def456...', # SHA-256 (optional) 'sha256': 'def456...', # SHA-256 (optional)
} }