Compare commits
27 Commits
6bf2343a12
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71f583481a | ||
|
|
48d440a860 | ||
|
|
c02a5d8823 | ||
|
|
edae5f6081 | ||
|
|
8ce843415e | ||
|
|
46085bd8dd | ||
|
|
2ac83df1e0 | ||
|
|
7fffdb2660 | ||
|
|
69f0c6a44d | ||
|
|
949a5fd69c | ||
|
|
8e53fd6345 | ||
|
|
59fdd7d9ec | ||
|
|
eaab14ae57 | ||
|
|
331d43390a | ||
|
|
18f2ff775e | ||
|
|
c032e24d7a | ||
|
|
4a5065aea4 | ||
|
|
bb13d59ddb | ||
|
|
b0fceef4e2 | ||
|
|
e727582584 | ||
|
|
2292fd4762 | ||
|
|
9ada48d8c8 | ||
|
|
9a3e01d447 | ||
|
|
e945333c1a | ||
|
|
6f7f847939 | ||
|
|
46c0bbf381 | ||
|
|
8f1533337c |
461
docs/INDEX.md
461
docs/INDEX.md
@@ -3,6 +3,7 @@
|
||||
> **For AI Assistants**: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
|
||||
|
||||
**Quick Navigation:**
|
||||
- [iii Platform & Development Workflow](#iii-platform--development-workflow) - Platform evolution and CLI tools
|
||||
- [Core Concepts](#core-concepts) - System architecture and patterns
|
||||
- [Design Principles](#design-principles) - Event Storm & Bidirectional References
|
||||
- [Step Development](#step-development-best-practices) - How to create new steps
|
||||
@@ -23,6 +24,244 @@
|
||||
|
||||
---
|
||||
|
||||
## iii Platform & Development Workflow
|
||||
|
||||
### Platform Evolution (v0.8 → v0.9+)
|
||||
|
||||
**Status:** March 2026 - iii v0.9+ production-ready
|
||||
|
||||
iii has evolved from an all-in-one development tool to a **modular, production-grade event engine** with clear separation between development and deployment workflows.
|
||||
|
||||
#### Structural Changes Overview
|
||||
|
||||
| Component | Before (v0.2-v0.7) | Now (v0.9+) | Impact |
|
||||
|-----------|-------------------|-------------|--------|
|
||||
| **Console/Dashboard** | Integrated in engine process (port 3111) | Separate process (`iii-cli console` or `dev`) | More flexibility, less resource overhead, better scaling |
|
||||
| **CLI Tool** | Minimal or non-existent | `iii-cli` is the central dev tool | Terminal-based dev workflow, scriptable, faster iteration |
|
||||
| **Project Structure** | Steps anywhere in project | **Recommended:** `src/` + `src/steps/` | Cleaner structure, reliable hot-reload |
|
||||
| **Hot-Reload/Watcher** | Integrated in engine | Separate `shell::ExecModule` with `watch` paths | Only Python/TS files watched (configurable) |
|
||||
| **Start & Services** | Single `iii` process | Engine (`iii` or `iii-cli start`) + Console separate | Better for production (engine) vs dev (console) |
|
||||
| **Config Handling** | YAML + ENV | YAML + ENV + CLI flags prioritized | More control via CLI flags |
|
||||
| **Observability** | Basic | Enhanced (OTel, Rollups, Alerts, Traces) | Production-ready telemetry |
|
||||
| **Streams & State** | KV-Store (file/memory) | More adapters + file_based default | Better persistence handling |
|
||||
|
||||
**Key Takeaway:** iii is now a **modular, production-ready engine** where development (CLI + separate console) is clearly separated from production deployment.
|
||||
|
||||
---
|
||||
|
||||
### Development Workflow with iii-cli
|
||||
|
||||
**`iii-cli` is your primary tool for local development, debugging, and testing.**
|
||||
|
||||
#### Essential Commands
|
||||
|
||||
| Command | Purpose | When to Use | Example |
|
||||
|---------|---------|------------|---------|
|
||||
| `iii-cli dev` | Start dev server with hot-reload + integrated console | Local development, immediate feedback on code changes | `iii-cli dev` |
|
||||
| `iii-cli console` | Start dashboard only (separate port) | When you only need the console (no dev reload) | `iii-cli console --host 0.0.0.0 --port 3113` |
|
||||
| `iii-cli start` | Start engine standalone (like `motia.service`) | Testing engine in isolation | `iii-cli start -c iii-config.yaml` |
|
||||
| `iii-cli logs` | Live logs of all flows/workers/triggers | Debugging, error investigation | `iii-cli logs --level debug` |
|
||||
| `iii-cli trace <id>` | Show detailed trace information (OTel) | Debug specific request/flow | `iii-cli trace abc123` |
|
||||
| `iii-cli state ls` | List states (KV storage) | Verify state persistence | `iii-cli state ls` |
|
||||
| `iii-cli state get` | Get specific state value | Inspect state content | `iii-cli state get key` |
|
||||
| `iii-cli stream ls` | List all streams + groups | Inspect stream/websocket connections | `iii-cli stream ls` |
|
||||
| `iii-cli flow list` | Show all registered flows/triggers | Overview of active steps & endpoints | `iii-cli flow list` |
|
||||
| `iii-cli worker logs` | Worker logs (Python/TS execution) | Debug issues in step handlers | `iii-cli worker logs` |
|
||||
|
||||
#### Typical Development Workflow
|
||||
|
||||
```bash
|
||||
# 1. Navigate to project
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
|
||||
# 2. Start dev mode (hot-reload + console on port 3113)
|
||||
iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
|
||||
|
||||
# Alternative: Separate engine + console
|
||||
# Terminal 1:
|
||||
iii-cli start -c iii-config.yaml
|
||||
|
||||
# Terminal 2:
|
||||
iii-cli console --host 0.0.0.0 --port 3113 \
|
||||
--engine-host 192.168.1.62 --engine-port 3111
|
||||
|
||||
# 3. Watch logs live (separate terminal)
|
||||
iii-cli logs -f
|
||||
|
||||
# 4. Debug specific trace
|
||||
iii-cli trace <trace-id-from-logs>
|
||||
|
||||
# 5. Inspect state
|
||||
iii-cli state ls
|
||||
iii-cli state get document:sync:status
|
||||
|
||||
# 6. Verify flows registered
|
||||
iii-cli flow list
|
||||
```
|
||||
|
||||
#### Development vs. Production
|
||||
|
||||
**Development:**
|
||||
- Use `iii-cli dev` for hot-reload
|
||||
- Console accessible on localhost:3113
|
||||
- Logs visible in terminal
|
||||
- Immediate feedback on code changes
|
||||
|
||||
**Production:**
|
||||
- `systemd` service runs `iii-cli start`
|
||||
- Console runs separately (if needed)
|
||||
- Logs via `journalctl -u motia.service -f`
|
||||
- No hot-reload (restart service for changes)
|
||||
|
||||
**Example Production Service:**
|
||||
```ini
|
||||
[Unit]
|
||||
Description=Motia III Engine
|
||||
After=network.target redis.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=motia
|
||||
WorkingDirectory=/opt/motia-iii/bitbylaw
|
||||
ExecStart=/usr/local/bin/iii-cli start -c /opt/motia-iii/bitbylaw/iii-config.yaml
|
||||
Restart=always
|
||||
RestartSec=10
|
||||
Environment="PATH=/usr/local/bin:/usr/bin"
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
```
|
||||
|
||||
#### Project Structure Best Practices
|
||||
|
||||
**Recommended Structure (v0.9+):**
|
||||
```
|
||||
bitbylaw/
|
||||
├── iii-config.yaml # Main configuration
|
||||
├── src/ # Source code root
|
||||
│ └── steps/ # All steps here (hot-reload reliable)
|
||||
│ ├── __init__.py
|
||||
│ ├── vmh/
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── document_sync_event_step.py
|
||||
│ │ └── webhook/
|
||||
│ │ ├── __init__.py
|
||||
│ │ └── document_create_api_step.py
|
||||
│ └── advoware_proxy/
|
||||
│ └── ...
|
||||
├── services/ # Shared business logic
|
||||
│ ├── __init__.py
|
||||
│ ├── xai_service.py
|
||||
│ ├── espocrm.py
|
||||
│ └── ...
|
||||
└── tests/ # Test files
|
||||
```
|
||||
|
||||
**Why `src/steps/` is recommended:**
|
||||
- **Hot-reload works reliably** - Watcher detects changes correctly
|
||||
- **Cleaner project** - Source code isolated from config/docs
|
||||
- **IDE support** - Better navigation and refactoring
|
||||
- **Deployment** - Easier to package
|
||||
|
||||
**Note:** Old structure (steps in root) still works, but hot-reload may be less reliable.
|
||||
|
||||
#### Hot-Reload Configuration
|
||||
|
||||
**Hot-reload is configured via `shell::ExecModule` in `iii-config.yaml`:**
|
||||
|
||||
```yaml
|
||||
modules:
|
||||
- type: shell::ExecModule
|
||||
config:
|
||||
watch:
|
||||
- "src/**/*.py" # Watch Python files in src/
|
||||
- "services/**/*.py" # Watch service files
|
||||
# Add more patterns as needed
|
||||
ignore:
|
||||
- "**/__pycache__/**"
|
||||
- "**/*.pyc"
|
||||
- "**/tests/**"
|
||||
```
|
||||
|
||||
**Behavior:**
|
||||
- Only files matching `watch` patterns trigger reload
|
||||
- Changes in `ignore` patterns are ignored
|
||||
- Reload is automatic in `iii-cli dev` mode
|
||||
- Production mode (`iii-cli start`) does NOT watch files
|
||||
|
||||
---
|
||||
|
||||
### Observability & Debugging
|
||||
|
||||
#### OpenTelemetry Integration
|
||||
|
||||
**iii v0.9+ has built-in OpenTelemetry support:**
|
||||
|
||||
```python
|
||||
# Traces are automatically created for:
|
||||
# - HTTP requests
|
||||
# - Queue processing
|
||||
# - Cron execution
|
||||
# - Service calls (if instrumented)
|
||||
|
||||
# Access trace ID in handler:
|
||||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||||
trace_id = ctx.trace_id # Use for debugging
|
||||
ctx.logger.info(f"Trace ID: {trace_id}")
|
||||
```
|
||||
|
||||
**View traces:**
|
||||
```bash
|
||||
# Get trace details
|
||||
iii-cli trace <trace-id>
|
||||
|
||||
# Filter logs by trace
|
||||
iii-cli logs --trace <trace-id>
|
||||
```
|
||||
|
||||
#### Debugging Workflow
|
||||
|
||||
**1. Live Logs:**
|
||||
```bash
|
||||
# All logs
|
||||
iii-cli logs -f
|
||||
|
||||
# Specific level
|
||||
iii-cli logs --level error
|
||||
|
||||
# With grep
|
||||
iii-cli logs -f | grep "document_sync"
|
||||
```
|
||||
|
||||
**2. State Inspection:**
|
||||
```bash
|
||||
# List all state keys
|
||||
iii-cli state ls
|
||||
|
||||
# Get specific state
|
||||
iii-cli state get sync:document:last_run
|
||||
```
|
||||
|
||||
**3. Flow Verification:**
|
||||
```bash
|
||||
# List all registered flows
|
||||
iii-cli flow list
|
||||
|
||||
# Verify endpoint exists
|
||||
iii-cli flow list | grep "/vmh/webhook"
|
||||
```
|
||||
|
||||
**4. Worker Issues:**
|
||||
```bash
|
||||
# Worker-specific logs
|
||||
iii-cli worker logs
|
||||
|
||||
# Check worker health
|
||||
iii-cli worker status
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### System Overview
|
||||
@@ -1271,24 +1510,41 @@ sudo systemctl enable motia.service
|
||||
sudo systemctl enable iii-console.service
|
||||
```
|
||||
|
||||
**Manual (Development):**
|
||||
**Development (iii-cli):**
|
||||
```bash
|
||||
# Start iii Engine
|
||||
# Option 1: Dev mode with integrated console and hot-reload
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
/opt/bin/iii -c iii-config.yaml
|
||||
iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
|
||||
|
||||
# Start iii Console (Web UI)
|
||||
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \
|
||||
--engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114
|
||||
# Option 2: Separate engine and console
|
||||
# Terminal 1: Start engine
|
||||
iii-cli start -c iii-config.yaml
|
||||
|
||||
# Terminal 2: Start console
|
||||
iii-cli console --host 0.0.0.0 --port 3113 \
|
||||
--engine-host 192.168.1.62 --engine-port 3111
|
||||
|
||||
# Option 3: Manual (legacy)
|
||||
/opt/bin/iii -c iii-config.yaml
|
||||
```
|
||||
|
||||
### Check Registered Steps
|
||||
|
||||
**Using iii-cli (recommended):**
|
||||
```bash
|
||||
# List all flows and triggers
|
||||
iii-cli flow list
|
||||
|
||||
# Filter for specific step
|
||||
iii-cli flow list | grep document_sync
|
||||
```
|
||||
|
||||
**Using curl (legacy):**
|
||||
```bash
|
||||
curl http://localhost:3111/_console/functions | python3 -m json.tool
|
||||
```
|
||||
|
||||
### Test HTTP Endpoint
|
||||
### Test HTTP Endpoints
|
||||
|
||||
```bash
|
||||
# Test document webhook
|
||||
@@ -1298,6 +1554,11 @@ curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
|
||||
|
||||
# Test advoware proxy
|
||||
curl "http://localhost:3111/advoware/proxy?endpoint=employees"
|
||||
|
||||
# Test beteiligte sync
|
||||
curl -X POST "http://localhost:3111/vmh/webhook/beteiligte/create" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"entity_type": "CBeteiligte", "entity_id": "abc123", "action": "create"}'
|
||||
```
|
||||
|
||||
### Manually Trigger Cron
|
||||
@@ -1308,36 +1569,208 @@ curl -X POST "http://localhost:3111/_console/cron/trigger" \
|
||||
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
|
||||
```
|
||||
|
||||
### View Logs
|
||||
### View and Debug Logs
|
||||
|
||||
**Using iii-cli (recommended):**
|
||||
```bash
|
||||
# Live logs via journalctl
|
||||
journalctl -u motia-iii -f
|
||||
# Live logs (all)
|
||||
iii-cli logs -f
|
||||
|
||||
# Live logs with specific level
|
||||
iii-cli logs -f --level error
|
||||
iii-cli logs -f --level debug
|
||||
|
||||
# Filter by component
|
||||
iii-cli logs -f | grep "document_sync"
|
||||
|
||||
# Worker-specific logs
|
||||
iii-cli worker logs
|
||||
|
||||
# Get specific trace
|
||||
iii-cli trace <trace-id>
|
||||
|
||||
# Filter logs by trace ID
|
||||
iii-cli logs --trace <trace-id>
|
||||
```
|
||||
|
||||
**Using journalctl (production):**
|
||||
```bash
|
||||
# Live logs
|
||||
journalctl -u motia.service -f
|
||||
|
||||
# Search for specific step
|
||||
journalctl --since "today" | grep -i "document sync"
|
||||
journalctl -u motia.service --since "today" | grep -i "document sync"
|
||||
|
||||
# Show errors only
|
||||
journalctl -u motia.service -p err -f
|
||||
|
||||
# Last 100 lines
|
||||
journalctl -u motia.service -n 100
|
||||
|
||||
# Specific time range
|
||||
journalctl -u motia.service --since "2026-03-19 10:00" --until "2026-03-19 11:00"
|
||||
```
|
||||
|
||||
**Using log files (legacy):**
|
||||
```bash
|
||||
# Check for errors
|
||||
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
|
||||
|
||||
# Follow log file
|
||||
tail -f /opt/motia-iii/bitbylaw/iii_new.log
|
||||
```
|
||||
|
||||
### Inspect State and Streams
|
||||
|
||||
**State Management:**
|
||||
```bash
|
||||
# List all state keys
|
||||
iii-cli state ls
|
||||
|
||||
# Get specific state value
|
||||
iii-cli state get document:sync:last_run
|
||||
|
||||
# Set state (if needed for testing)
|
||||
iii-cli state set test:key "test value"
|
||||
|
||||
# Delete state
|
||||
iii-cli state delete test:key
|
||||
```
|
||||
|
||||
**Stream Management:**
|
||||
```bash
|
||||
# List all active streams
|
||||
iii-cli stream ls
|
||||
|
||||
# Inspect specific stream
|
||||
iii-cli stream info <stream-id>
|
||||
|
||||
# List consumer groups
|
||||
iii-cli stream groups <stream-name>
|
||||
```
|
||||
|
||||
### Debugging Workflow
|
||||
|
||||
**1. Identify the Issue:**
|
||||
```bash
|
||||
# Check if step is registered
|
||||
iii-cli flow list | grep my_step
|
||||
|
||||
# View recent errors
|
||||
iii-cli logs --level error -n 50
|
||||
|
||||
# Check service status
|
||||
sudo systemctl status motia.service
|
||||
```
|
||||
|
||||
**2. Get Detailed Information:**
|
||||
```bash
|
||||
# Live tail logs for specific step
|
||||
iii-cli logs -f | grep "document_sync"
|
||||
|
||||
# Check worker processes
|
||||
iii-cli worker logs
|
||||
|
||||
# Inspect state
|
||||
iii-cli state ls
|
||||
```
|
||||
|
||||
**3. Test Specific Functionality:**
|
||||
```bash
|
||||
# Trigger webhook manually
|
||||
curl -X POST http://localhost:3111/vmh/webhook/...
|
||||
|
||||
# Check response and logs
|
||||
iii-cli logs -f | grep "webhook"
|
||||
|
||||
# Verify state changed
|
||||
iii-cli state get entity:sync:status
|
||||
```
|
||||
|
||||
**4. Trace Specific Request:**
|
||||
```bash
|
||||
# Make request, note trace ID from logs
|
||||
curl -X POST http://localhost:3111/vmh/webhook/document/create ...
|
||||
|
||||
# Get full trace
|
||||
iii-cli trace <trace-id>
|
||||
|
||||
# View all logs for this trace
|
||||
iii-cli logs --trace <trace-id>
|
||||
```
|
||||
|
||||
### Performance Monitoring
|
||||
|
||||
**Check System Resources:**
|
||||
```bash
|
||||
# CPU and memory usage
|
||||
htop
|
||||
|
||||
# Process-specific
|
||||
ps aux | grep iii
|
||||
|
||||
# Redis memory
|
||||
redis-cli info memory
|
||||
|
||||
# File descriptors
|
||||
lsof -p $(pgrep -f "iii-cli start")
|
||||
```
|
||||
|
||||
**Check Processing Metrics:**
|
||||
```bash
|
||||
# Queue lengths (if using Redis streams)
|
||||
redis-cli XINFO STREAM vmh:document:sync
|
||||
|
||||
# Pending messages
|
||||
redis-cli XPENDING vmh:document:sync group1
|
||||
|
||||
# Lock status
|
||||
redis-cli KEYS "lock:*"
|
||||
```
|
||||
|
||||
### Common Issues
|
||||
|
||||
**Step not showing up:**
|
||||
1. Check file naming: Must end with `_step.py`
|
||||
2. Check for import errors: `grep -i "importerror\|traceback" iii.log`
|
||||
3. Verify `config` dict is present
|
||||
4. Restart iii engine
|
||||
2. Check for syntax errors: `iii-cli logs --level error`
|
||||
3. Check for import errors: `iii-cli logs | grep -i "importerror\|traceback"`
|
||||
4. Verify `config` dict is present
|
||||
5. Restart: `sudo systemctl restart motia.service` or restart `iii-cli dev`
|
||||
6. Verify hot-reload working: Check terminal output in `iii-cli dev`
|
||||
|
||||
**Redis connection failed:**
|
||||
- Check `REDIS_HOST` and `REDIS_PORT` environment variables
|
||||
- Verify Redis is running: `redis-cli ping`
|
||||
- Check Redis logs: `journalctl -u redis -f`
|
||||
- Service will work without Redis but with warnings
|
||||
|
||||
**Hot-reload not working:**
|
||||
- Verify using `iii-cli dev` (not `iii-cli start`)
|
||||
- Check `watch` patterns in `iii-config.yaml`
|
||||
- Ensure files are in watched directories (`src/**/*.py`)
|
||||
- Look for watcher errors: `iii-cli logs | grep -i "watch"`
|
||||
|
||||
**Handler not triggered:**
|
||||
- Verify endpoint registered: `iii-cli flow list`
|
||||
- Check HTTP method matches (GET, POST, etc.)
|
||||
- Test with curl to isolate issue
|
||||
- Check trigger configuration in step's `config` dict
|
||||
|
||||
**AttributeError '_log' not found:**
|
||||
- Ensure service inherits from `BaseSyncUtils` OR
|
||||
- Implement `_log()` method manually
|
||||
|
||||
**Trace not found:**
|
||||
- Ensure OpenTelemetry enabled in config
|
||||
- Check if trace ID is valid format
|
||||
- Use `iii-cli logs` with filters instead
|
||||
|
||||
**Console not accessible:**
|
||||
- Check if console service running: `systemctl status iii-console.service`
|
||||
- Verify port not blocked by firewall: `sudo ufw status`
|
||||
- Check console logs: `journalctl -u iii-console.service -f`
|
||||
- Try accessing via `localhost:3113` instead of public IP
|
||||
|
||||
---
|
||||
|
||||
## Key Patterns Summary
|
||||
|
||||
@@ -78,6 +78,6 @@ modules:
|
||||
- class: modules::shell::ExecModule
|
||||
config:
|
||||
watch:
|
||||
- steps/**/*.py
|
||||
- src/steps/**/*.py
|
||||
exec:
|
||||
- /opt/bin/uv run python -m motia.cli run --dir steps
|
||||
- /usr/local/bin/uv run python -m motia.cli run --dir src/steps
|
||||
|
||||
@@ -18,5 +18,8 @@ dependencies = [
|
||||
"google-api-python-client>=2.100.0", # Google Calendar API
|
||||
"google-auth>=2.23.0", # Google OAuth2
|
||||
"backoff>=2.2.1", # Retry/backoff decorator
|
||||
"langchain>=0.3.0", # LangChain framework
|
||||
"langchain-xai>=0.2.0", # xAI integration for LangChain
|
||||
"langchain-core>=0.3.0", # LangChain core
|
||||
]
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import hashlib
|
||||
import json
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
from datetime import datetime
|
||||
from urllib.parse import unquote
|
||||
|
||||
from services.sync_utils_base import BaseSyncUtils
|
||||
from services.models import (
|
||||
@@ -120,12 +121,12 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
# 1. Load knowledge entity
|
||||
knowledge = await espocrm.get_entity('CAIKnowledge', knowledge_id)
|
||||
|
||||
activation_status = knowledge.get('activationStatus')
|
||||
activation_status = knowledge.get('aktivierungsstatus')
|
||||
collection_id = knowledge.get('datenbankId')
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"📋 Processing: {knowledge['name']}")
|
||||
ctx.logger.info(f" activationStatus: {activation_status}")
|
||||
ctx.logger.info(f" aktivierungsstatus: {activation_status}")
|
||||
ctx.logger.info(f" datenbankId: {collection_id or 'NONE'}")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
@@ -144,18 +145,21 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
}
|
||||
)
|
||||
|
||||
collection_id = collection['id']
|
||||
# XAI API returns 'collection_id' not 'id'
|
||||
collection_id = collection.get('collection_id') or collection.get('id')
|
||||
|
||||
# Update EspoCRM: Set datenbankId + change status to 'active'
|
||||
await espocrm.update_entity('CAIKnowledge', knowledge_id, {
|
||||
'datenbankId': collection_id,
|
||||
'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value,
|
||||
'aktivierungsstatus': AIKnowledgeActivationStatus.ACTIVE.value,
|
||||
'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value
|
||||
})
|
||||
|
||||
ctx.logger.info(f"✅ Collection created: {collection_id}")
|
||||
ctx.logger.info(" Status changed to 'active', next webhook will sync documents")
|
||||
return
|
||||
ctx.logger.info(" Status changed to 'active', now syncing documents...")
|
||||
|
||||
# Continue to document sync immediately (don't return)
|
||||
# Fall through to sync logic below
|
||||
|
||||
# ═══════════════════════════════════════════════════════════
|
||||
# CASE 2: DEACTIVATED → Delete Collection from XAI
|
||||
@@ -201,13 +205,14 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
return
|
||||
|
||||
# ═══════════════════════════════════════════════════════════
|
||||
# CASE 4: ACTIVE → Normal Sync
|
||||
# CASE 4: ACTIVE → Normal Sync (or just created from NEW)
|
||||
# ═══════════════════════════════════════════════════════════
|
||||
elif activation_status == AIKnowledgeActivationStatus.ACTIVE.value:
|
||||
if activation_status in (AIKnowledgeActivationStatus.ACTIVE.value, AIKnowledgeActivationStatus.NEW.value):
|
||||
if not collection_id:
|
||||
ctx.logger.error("❌ Status 'active' but no datenbankId!")
|
||||
raise RuntimeError("Active knowledge without collection ID")
|
||||
|
||||
if activation_status == AIKnowledgeActivationStatus.ACTIVE.value:
|
||||
ctx.logger.info(f"🔄 Status 'active' → Syncing documents to {collection_id}")
|
||||
|
||||
# Verify collection exists
|
||||
@@ -226,12 +231,12 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
'datenbankId': collection_id
|
||||
})
|
||||
|
||||
# Sync documents
|
||||
# Sync documents (both for ACTIVE status and after NEW → ACTIVE transition)
|
||||
await self._sync_knowledge_documents(knowledge_id, collection_id, ctx)
|
||||
|
||||
else:
|
||||
ctx.logger.error(f"❌ Unknown activationStatus: {activation_status}")
|
||||
raise ValueError(f"Invalid activationStatus: {activation_status}")
|
||||
elif activation_status not in (AIKnowledgeActivationStatus.DEACTIVATED.value, AIKnowledgeActivationStatus.PAUSED.value):
|
||||
ctx.logger.error(f"❌ Unknown aktivierungsstatus: {activation_status}")
|
||||
raise ValueError(f"Invalid aktivierungsstatus: {activation_status}")
|
||||
|
||||
finally:
|
||||
await xai.close()
|
||||
@@ -240,20 +245,18 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
self,
|
||||
knowledge_id: str,
|
||||
collection_id: str,
|
||||
ctx,
|
||||
full_sync: bool = False
|
||||
ctx
|
||||
) -> None:
|
||||
"""
|
||||
Sync all documents of a knowledge base to XAI collection.
|
||||
|
||||
Uses efficient JunctionData endpoint to get all documents with junction data
|
||||
and blake3 hashes in a single API call.
|
||||
and blake3 hashes in a single API call. Hash comparison is always performed.
|
||||
|
||||
Args:
|
||||
knowledge_id: CAIKnowledge entity ID
|
||||
collection_id: XAI Collection ID
|
||||
ctx: Motia context
|
||||
full_sync: If True, force Blake3 hash comparison for all documents (nightly cron)
|
||||
"""
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.xai_service import XAIService
|
||||
@@ -280,7 +283,8 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
successful = 0
|
||||
failed = 0
|
||||
skipped = 0
|
||||
|
||||
# Track aiDocumentIds for orphan detection (collected during sync)
|
||||
synced_file_ids: set = set()
|
||||
for doc in documents:
|
||||
doc_id = doc['documentId']
|
||||
doc_name = doc.get('documentName', 'Unknown')
|
||||
@@ -301,8 +305,18 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
if junction_status in ['new', 'unclean', 'failed']:
|
||||
needs_sync = True
|
||||
reason = f"status={junction_status}"
|
||||
elif full_sync and blake3_hash and ai_document_id:
|
||||
# Full sync mode: verify Blake3 hash with XAI
|
||||
elif junction_status == 'synced':
|
||||
# Synced status should have both blake3_hash and ai_document_id
|
||||
if not blake3_hash:
|
||||
needs_sync = True
|
||||
reason = "inconsistency: synced but no blake3 hash"
|
||||
ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!")
|
||||
elif not ai_document_id:
|
||||
needs_sync = True
|
||||
reason = "inconsistency: synced but no aiDocumentId"
|
||||
ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!")
|
||||
else:
|
||||
# Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
|
||||
try:
|
||||
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
|
||||
if xai_doc_info:
|
||||
@@ -310,31 +324,61 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
|
||||
if xai_blake3 != blake3_hash:
|
||||
needs_sync = True
|
||||
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)"
|
||||
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)"
|
||||
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
|
||||
else:
|
||||
ctx.logger.info(f" ✅ Blake3 hash matches")
|
||||
else:
|
||||
needs_sync = True
|
||||
reason = "file not found in XAI collection"
|
||||
ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!")
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}")
|
||||
needs_sync = True
|
||||
reason = f"verification failed: {e}"
|
||||
ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}")
|
||||
|
||||
if not needs_sync:
|
||||
ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
|
||||
# Document is already synced, track its aiDocumentId
|
||||
if ai_document_id:
|
||||
synced_file_ids.add(ai_document_id)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
ctx.logger.info(f" 🔄 Syncing: {reason}")
|
||||
|
||||
# Get complete document entity with attachment info
|
||||
doc_entity = await espocrm.get_entity('CDokumente', doc_id)
|
||||
attachment_id = doc_entity.get('dokumentId')
|
||||
|
||||
if not attachment_id:
|
||||
ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}")
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# Get attachment details for MIME type and original filename
|
||||
try:
|
||||
attachment = await espocrm.get_entity('Attachment', attachment_id)
|
||||
mime_type = attachment.get('type', 'application/octet-stream')
|
||||
file_size = attachment.get('size', 0)
|
||||
original_filename = attachment.get('name', doc_name) # Original filename with extension
|
||||
# URL-decode filename (fixes special chars like §, ä, ö, ü, etc.)
|
||||
original_filename = unquote(original_filename)
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
|
||||
mime_type = 'application/octet-stream'
|
||||
file_size = 0
|
||||
original_filename = unquote(doc_name) # Also decode fallback name
|
||||
|
||||
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
|
||||
ctx.logger.info(f" 📄 Original filename: {original_filename}")
|
||||
|
||||
# Download document
|
||||
attachment_id = doc.get('documentId') # TODO: Get correct attachment ID from CDokumente
|
||||
file_content = await espocrm.download_attachment(attachment_id)
|
||||
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
|
||||
|
||||
# Upload to XAI
|
||||
filename = doc_name
|
||||
mime_type = 'application/octet-stream' # TODO: Get from attachment
|
||||
# Upload to XAI with original filename (includes extension)
|
||||
filename = original_filename
|
||||
|
||||
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
|
||||
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
|
||||
@@ -355,6 +399,9 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
)
|
||||
ctx.logger.info(f" ✅ Junction updated")
|
||||
|
||||
# Track the new aiDocumentId for orphan detection
|
||||
synced_file_ids.add(xai_file_id)
|
||||
|
||||
successful += 1
|
||||
|
||||
except Exception as e:
|
||||
@@ -382,11 +429,12 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
xai_documents = await xai.list_collection_documents(collection_id)
|
||||
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
|
||||
|
||||
# Get all ai_document_ids from junction
|
||||
junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')}
|
||||
# Use synced_file_ids (collected during this sync) for orphan detection
|
||||
# This includes both pre-existing synced docs and newly uploaded ones
|
||||
ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced")
|
||||
|
||||
# Find orphans (in XAI but not in junction)
|
||||
orphans = xai_file_ids - junction_file_ids
|
||||
# Find orphans (in XAI but not in our current sync)
|
||||
orphans = xai_file_ids - synced_file_ids
|
||||
|
||||
if orphans:
|
||||
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
|
||||
@@ -411,7 +459,7 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
ctx.logger.info(f" ✅ Synced: {successful}")
|
||||
ctx.logger.info(f" ⏭️ Skipped: {skipped}")
|
||||
ctx.logger.info(f" ❌ Failed: {failed}")
|
||||
ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}")
|
||||
ctx.logger.info(f" Mode: Blake3 hash verification enabled")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
def _calculate_metadata_hash(self, document: Dict) -> str:
|
||||
|
||||
110
services/aktenzeichen_utils.py
Normal file
110
services/aktenzeichen_utils.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Aktenzeichen-Erkennung und Validation
|
||||
|
||||
Utility functions für das Erkennen, Validieren und Normalisieren von
|
||||
Aktenzeichen im Format '1234/56' oder 'ABC/23'.
|
||||
"""
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# Regex für Aktenzeichen: 1-4 Zeichen (alphanumerisch) + "/" + 2 Ziffern
|
||||
AKTENZEICHEN_REGEX = re.compile(r'^([A-Za-z0-9]{1,4}/\d{2})\s*', re.IGNORECASE)
|
||||
|
||||
|
||||
def extract_aktenzeichen(text: str) -> Optional[str]:
|
||||
"""
|
||||
Extrahiert Aktenzeichen vom Anfang des Textes.
|
||||
|
||||
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}
|
||||
|
||||
Examples:
|
||||
>>> extract_aktenzeichen("1234/56 Was ist der Stand?")
|
||||
"1234/56"
|
||||
>>> extract_aktenzeichen("ABC/23 Frage zum Vertrag")
|
||||
"ABC/23"
|
||||
>>> extract_aktenzeichen("Kein Aktenzeichen hier")
|
||||
None
|
||||
|
||||
Args:
|
||||
text: Eingabetext (z.B. erste Message)
|
||||
|
||||
Returns:
|
||||
Aktenzeichen als String, oder None wenn nicht gefunden
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return None
|
||||
|
||||
match = AKTENZEICHEN_REGEX.match(text.strip())
|
||||
return match.group(1) if match else None
|
||||
|
||||
|
||||
def remove_aktenzeichen(text: str) -> str:
|
||||
"""
|
||||
Entfernt Aktenzeichen vom Anfang des Textes.
|
||||
|
||||
Examples:
|
||||
>>> remove_aktenzeichen("1234/56 Was ist der Stand?")
|
||||
"Was ist der Stand?"
|
||||
>>> remove_aktenzeichen("Kein Aktenzeichen")
|
||||
"Kein Aktenzeichen"
|
||||
|
||||
Args:
|
||||
text: Eingabetext mit Aktenzeichen
|
||||
|
||||
Returns:
|
||||
Text ohne Aktenzeichen (whitespace getrimmt)
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return text
|
||||
|
||||
return AKTENZEICHEN_REGEX.sub('', text, count=1).strip()
|
||||
|
||||
|
||||
def validate_aktenzeichen(az: str) -> bool:
|
||||
"""
|
||||
Validiert Aktenzeichen-Format.
|
||||
|
||||
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}$
|
||||
|
||||
Examples:
|
||||
>>> validate_aktenzeichen("1234/56")
|
||||
True
|
||||
>>> validate_aktenzeichen("ABC/23")
|
||||
True
|
||||
>>> validate_aktenzeichen("12345/567") # Zu lang
|
||||
False
|
||||
>>> validate_aktenzeichen("1234-56") # Falsches Trennzeichen
|
||||
False
|
||||
|
||||
Args:
|
||||
az: Aktenzeichen zum Validieren
|
||||
|
||||
Returns:
|
||||
True wenn valide, False sonst
|
||||
"""
|
||||
if not az or not isinstance(az, str):
|
||||
return False
|
||||
|
||||
return bool(re.match(r'^[A-Za-z0-9]{1,4}/\d{2}$', az, re.IGNORECASE))
|
||||
|
||||
|
||||
def normalize_aktenzeichen(az: str) -> str:
|
||||
"""
|
||||
Normalisiert Aktenzeichen (uppercase, trim whitespace).
|
||||
|
||||
Examples:
|
||||
>>> normalize_aktenzeichen("abc/23")
|
||||
"ABC/23"
|
||||
>>> normalize_aktenzeichen(" 1234/56 ")
|
||||
"1234/56"
|
||||
|
||||
Args:
|
||||
az: Aktenzeichen zum Normalisieren
|
||||
|
||||
Returns:
|
||||
Normalisiertes Aktenzeichen (uppercase, getrimmt)
|
||||
"""
|
||||
if not az or not isinstance(az, str):
|
||||
return az
|
||||
|
||||
return az.strip().upper()
|
||||
@@ -10,6 +10,7 @@ Utility functions for document synchronization with xAI:
|
||||
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
from datetime import datetime, timedelta
|
||||
from urllib.parse import unquote
|
||||
|
||||
from services.sync_utils_base import BaseSyncUtils
|
||||
from services.models import FileStatus, XAISyncStatus
|
||||
@@ -282,7 +283,7 @@ class DocumentSync(BaseSyncUtils):
|
||||
|
||||
try:
|
||||
knowledge = await self.espocrm.get_entity('CAIKnowledge', knowledge_id)
|
||||
activation_status = knowledge.get('activationStatus')
|
||||
activation_status = knowledge.get('aktivierungsstatus')
|
||||
collection_id = knowledge.get('datenbankId')
|
||||
|
||||
if activation_status == 'active' and collection_id:
|
||||
@@ -365,6 +366,10 @@ class DocumentSync(BaseSyncUtils):
|
||||
# Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment
|
||||
final_filename = filename or attachment.get('name', 'unknown')
|
||||
|
||||
# URL-decode filename (fixes special chars like §, ä, ö, ü, etc.)
|
||||
# EspoCRM stores filenames URL-encoded: %C2%A7 → §
|
||||
final_filename = unquote(final_filename)
|
||||
|
||||
return {
|
||||
'attachment_id': attachment_id,
|
||||
'download_url': f"/api/v1/Attachment/file/{attachment_id}",
|
||||
|
||||
@@ -633,9 +633,10 @@ class EspoCRMAPI:
|
||||
for doc in docs:
|
||||
print(f"{doc['documentName']}: {doc['syncstatus']}")
|
||||
"""
|
||||
# JunctionData endpoint is at root level, not under /api/v1
|
||||
base_url = self.api_base_url.rstrip('/').replace('/api/v1', '')
|
||||
url = f"{base_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes"
|
||||
# JunctionData uses API Gateway URL, not direct EspoCRM
|
||||
# Use gateway URL from env or construct from ESPOCRM_API_BASE_URL
|
||||
gateway_url = os.getenv('ESPOCRM_GATEWAY_URL', 'https://api.bitbylaw.com/vmh/crm')
|
||||
url = f"{gateway_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes"
|
||||
|
||||
self._log(f"GET {url}")
|
||||
|
||||
@@ -698,9 +699,9 @@ class EspoCRMAPI:
|
||||
update_last_sync=True
|
||||
)
|
||||
"""
|
||||
# JunctionData endpoint is at root level, not under /api/v1
|
||||
base_url = self.api_base_url.rstrip('/').replace('/api/v1', '')
|
||||
url = f"{base_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id}"
|
||||
# JunctionData uses API Gateway URL, not direct EspoCRM
|
||||
gateway_url = os.getenv('ESPOCRM_GATEWAY_URL', 'https://api.bitbylaw.com/vmh/crm')
|
||||
url = f"{gateway_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id}"
|
||||
|
||||
payload = {**fields}
|
||||
if update_last_sync:
|
||||
|
||||
218
services/langchain_xai_service.py
Normal file
218
services/langchain_xai_service.py
Normal file
@@ -0,0 +1,218 @@
|
||||
"""LangChain xAI Integration Service
|
||||
|
||||
Service für LangChain ChatXAI Integration mit File Search Binding.
|
||||
Analog zu xai_service.py für xAI Files API.
|
||||
"""
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, AsyncIterator
|
||||
from services.logging_utils import get_service_logger
|
||||
|
||||
|
||||
class LangChainXAIService:
|
||||
"""
|
||||
Wrapper für LangChain ChatXAI mit Motia-Integration.
|
||||
|
||||
Benötigte Umgebungsvariablen:
|
||||
- XAI_API_KEY: API Key für xAI (für ChatXAI model)
|
||||
|
||||
Usage:
|
||||
service = LangChainXAIService(ctx)
|
||||
model = service.get_chat_model(model="grok-4-1-fast-reasoning")
|
||||
model_with_tools = service.bind_file_search(model, collection_id)
|
||||
result = await service.invoke_chat(model_with_tools, messages)
|
||||
"""
|
||||
|
||||
def __init__(self, ctx=None):
|
||||
"""
|
||||
Initialize LangChain xAI Service.
|
||||
|
||||
Args:
|
||||
ctx: Optional Motia context for logging
|
||||
|
||||
Raises:
|
||||
ValueError: If XAI_API_KEY not configured
|
||||
"""
|
||||
self.api_key = os.getenv('XAI_API_KEY', '')
|
||||
self.ctx = ctx
|
||||
self.logger = get_service_logger('langchain_xai', ctx)
|
||||
|
||||
if not self.api_key:
|
||||
raise ValueError("XAI_API_KEY not configured in environment")
|
||||
|
||||
def _log(self, msg: str, level: str = 'info') -> None:
|
||||
"""Delegate logging to service logger"""
|
||||
log_func = getattr(self.logger, level, self.logger.info)
|
||||
log_func(msg)
|
||||
|
||||
def get_chat_model(
|
||||
self,
|
||||
model: str = "grok-4-1-fast-reasoning",
|
||||
temperature: float = 0.7,
|
||||
max_tokens: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
Initialisiert ChatXAI Model.
|
||||
|
||||
Args:
|
||||
model: Model name (default: grok-4-1-fast-reasoning)
|
||||
temperature: Sampling temperature 0.0-1.0
|
||||
max_tokens: Optional max tokens for response
|
||||
|
||||
Returns:
|
||||
ChatXAI model instance
|
||||
|
||||
Raises:
|
||||
ImportError: If langchain_xai not installed
|
||||
"""
|
||||
try:
|
||||
from langchain_xai import ChatXAI
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"langchain_xai not installed. "
|
||||
"Run: pip install langchain-xai>=0.2.0"
|
||||
)
|
||||
|
||||
self._log(f"🤖 Initializing ChatXAI: model={model}, temp={temperature}")
|
||||
|
||||
kwargs = {
|
||||
"model": model,
|
||||
"api_key": self.api_key,
|
||||
"temperature": temperature
|
||||
}
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
return ChatXAI(**kwargs)
|
||||
|
||||
def bind_tools(
|
||||
self,
|
||||
model,
|
||||
collection_id: Optional[str] = None,
|
||||
enable_web_search: bool = False,
|
||||
web_search_config: Optional[Dict[str, Any]] = None,
|
||||
max_num_results: int = 10
|
||||
):
|
||||
"""
|
||||
Bindet xAI Tools (file_search und/oder web_search) an Model.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model instance
|
||||
collection_id: Optional xAI Collection ID für file_search
|
||||
enable_web_search: Enable web search tool (default: False)
|
||||
web_search_config: Optional web search configuration:
|
||||
{
|
||||
'allowed_domains': ['example.com'], # Max 5 domains
|
||||
'excluded_domains': ['spam.com'], # Max 5 domains
|
||||
'enable_image_understanding': True
|
||||
}
|
||||
max_num_results: Max results from file search (default: 10)
|
||||
|
||||
Returns:
|
||||
Model with requested tools bound (file_search and/or web_search)
|
||||
"""
|
||||
tools = []
|
||||
|
||||
# Add file_search tool if collection_id provided
|
||||
if collection_id:
|
||||
self._log(f"🔍 Binding file_search: collection={collection_id}")
|
||||
tools.append({
|
||||
"type": "file_search",
|
||||
"vector_store_ids": [collection_id],
|
||||
"max_num_results": max_num_results
|
||||
})
|
||||
|
||||
# Add web_search tool if enabled
|
||||
if enable_web_search:
|
||||
self._log("🌐 Binding web_search")
|
||||
web_search_tool = {"type": "web_search"}
|
||||
|
||||
# Add optional web search filters
|
||||
if web_search_config:
|
||||
if 'allowed_domains' in web_search_config:
|
||||
domains = web_search_config['allowed_domains'][:5] # Max 5
|
||||
web_search_tool['filters'] = {'allowed_domains': domains}
|
||||
self._log(f" Allowed domains: {domains}")
|
||||
elif 'excluded_domains' in web_search_config:
|
||||
domains = web_search_config['excluded_domains'][:5] # Max 5
|
||||
web_search_tool['filters'] = {'excluded_domains': domains}
|
||||
self._log(f" Excluded domains: {domains}")
|
||||
|
||||
if web_search_config.get('enable_image_understanding'):
|
||||
web_search_tool['enable_image_understanding'] = True
|
||||
self._log(" Image understanding: enabled")
|
||||
|
||||
tools.append(web_search_tool)
|
||||
|
||||
if not tools:
|
||||
self._log("⚠️ No tools to bind (no collection_id and web_search disabled)", level='warn')
|
||||
return model
|
||||
|
||||
self._log(f"🔧 Binding {len(tools)} tool(s) to model")
|
||||
return model.bind_tools(tools)
|
||||
|
||||
def bind_file_search(
|
||||
self,
|
||||
model,
|
||||
collection_id: str,
|
||||
max_num_results: int = 10
|
||||
):
|
||||
"""
|
||||
Legacy method: Bindet nur file_search Tool an Model.
|
||||
|
||||
Use bind_tools() for more flexibility.
|
||||
"""
|
||||
return self.bind_tools(
|
||||
model=model,
|
||||
collection_id=collection_id,
|
||||
max_num_results=max_num_results
|
||||
)
|
||||
|
||||
async def invoke_chat(
|
||||
self,
|
||||
model,
|
||||
messages: List[Dict[str, Any]]
|
||||
) -> Any:
|
||||
"""
|
||||
Non-streaming Chat Completion.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model (with or without tools)
|
||||
messages: List of message dicts [{"role": "user", "content": "..."}]
|
||||
|
||||
Returns:
|
||||
LangChain AIMessage with response
|
||||
|
||||
Raises:
|
||||
Exception: If API call fails
|
||||
"""
|
||||
self._log(f"💬 Invoking chat: {len(messages)} messages", level='debug')
|
||||
|
||||
result = await model.ainvoke(messages)
|
||||
|
||||
self._log(f"✅ Response received: {len(result.content)} chars", level='debug')
|
||||
return result
|
||||
|
||||
async def astream_chat(
|
||||
self,
|
||||
model,
|
||||
messages: List[Dict[str, Any]]
|
||||
) -> AsyncIterator:
|
||||
"""
|
||||
Streaming Chat Completion.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model (with or without tools)
|
||||
messages: List of message dicts
|
||||
|
||||
Yields:
|
||||
Chunks from streaming response
|
||||
|
||||
Example:
|
||||
async for chunk in service.astream_chat(model, messages):
|
||||
delta = chunk.content if hasattr(chunk, "content") else ""
|
||||
# Process delta...
|
||||
"""
|
||||
self._log(f"💬 Streaming chat: {len(messages)} messages", level='debug')
|
||||
|
||||
async for chunk in model.astream(messages):
|
||||
yield chunk
|
||||
@@ -63,14 +63,31 @@ class XAIService:
|
||||
Raises:
|
||||
RuntimeError: bei HTTP-Fehler oder fehlendem file_id in der Antwort
|
||||
"""
|
||||
self._log(f"📤 Uploading {len(file_content)} bytes to xAI: {filename}")
|
||||
# Normalize MIME type: xAI needs correct Content-Type for proper processing
|
||||
# If generic octet-stream but file is clearly a PDF, fix it
|
||||
if mime_type == 'application/octet-stream' and filename.lower().endswith('.pdf'):
|
||||
mime_type = 'application/pdf'
|
||||
self._log(f"⚠️ Corrected MIME type to application/pdf for {filename}")
|
||||
|
||||
self._log(f"📤 Uploading {len(file_content)} bytes to xAI: {filename} ({mime_type})")
|
||||
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_FILES_URL}/v1/files"
|
||||
headers = {"Authorization": f"Bearer {self.api_key}"}
|
||||
|
||||
form = aiohttp.FormData()
|
||||
form.add_field('file', file_content, filename=filename, content_type=mime_type)
|
||||
# Create multipart form with explicit UTF-8 filename encoding
|
||||
# aiohttp automatically URL-encodes filenames with special chars,
|
||||
# but xAI expects raw UTF-8 in the filename parameter
|
||||
form = aiohttp.FormData(quote_fields=False)
|
||||
form.add_field(
|
||||
'file',
|
||||
file_content,
|
||||
filename=filename,
|
||||
content_type=mime_type
|
||||
)
|
||||
# CRITICAL: purpose="file_search" enables proper PDF processing
|
||||
# Without this, xAI throws "internal error" on complex PDFs
|
||||
form.add_field('purpose', 'file_search')
|
||||
|
||||
async with session.post(url, data=form, headers=headers) as response:
|
||||
try:
|
||||
@@ -95,9 +112,12 @@ class XAIService:
|
||||
|
||||
async def add_to_collection(self, collection_id: str, file_id: str) -> None:
|
||||
"""
|
||||
Fügt eine Datei einer xAI-Collection hinzu.
|
||||
Fügt eine Datei einer xAI-Collection (Vector Store) hinzu.
|
||||
|
||||
POST https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id}
|
||||
POST https://api.x.ai/v1/vector_stores/{vector_store_id}/files
|
||||
|
||||
Uses the OpenAI-compatible API pattern for adding files to vector stores.
|
||||
This triggers proper indexing and processing.
|
||||
|
||||
Raises:
|
||||
RuntimeError: bei HTTP-Fehler
|
||||
@@ -105,13 +125,16 @@ class XAIService:
|
||||
self._log(f"📚 Adding file {file_id} to collection {collection_id}")
|
||||
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}"
|
||||
# Use the OpenAI-compatible endpoint (not management API)
|
||||
url = f"{XAI_FILES_URL}/v1/vector_stores/{collection_id}/files"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.management_key}",
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async with session.post(url, headers=headers) as response:
|
||||
payload = {"file_id": file_id}
|
||||
|
||||
async with session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status not in (200, 201):
|
||||
raw = await response.text()
|
||||
raise RuntimeError(
|
||||
@@ -458,107 +481,6 @@ class XAIService:
|
||||
|
||||
self._log(f"✅ Metadata updated for {file_id}")
|
||||
|
||||
# ========== High-Level Operations ==========
|
||||
|
||||
async def upload_document_with_metadata(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_content: bytes,
|
||||
filename: str,
|
||||
mime_type: str,
|
||||
metadata: Dict[str, str]
|
||||
) -> str:
|
||||
"""
|
||||
Upload file + add to collection with metadata in one operation.
|
||||
|
||||
Args:
|
||||
collection_id: XAI Collection ID
|
||||
file_content: File bytes
|
||||
filename: Filename
|
||||
mime_type: MIME type
|
||||
metadata: Metadata fields
|
||||
|
||||
Returns:
|
||||
XAI file_id
|
||||
|
||||
Raises:
|
||||
RuntimeError: bei Upload/Add-Fehler
|
||||
"""
|
||||
# Step 1: Upload file
|
||||
file_id = await self.upload_file(file_content, filename, mime_type)
|
||||
|
||||
try:
|
||||
# Step 2: Add to collection (XAI API automatically handles metadata)
|
||||
# Note: Metadata muss beim POST mit angegeben werden
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.management_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
body = {"fields": metadata}
|
||||
|
||||
async with session.post(url, json=body, headers=headers) as response:
|
||||
if response.status not in (200, 201):
|
||||
raw = await response.text()
|
||||
raise RuntimeError(
|
||||
f"Failed to add file to collection with metadata ({response.status}): {raw}"
|
||||
)
|
||||
|
||||
self._log(f"✅ File {file_id} added to collection {collection_id} with metadata")
|
||||
return file_id
|
||||
|
||||
except Exception as e:
|
||||
# Cleanup: File wurde hochgeladen aber nicht zur Collection hinzugefügt
|
||||
self._log(f"⚠️ Failed to add to collection, file {file_id} may be orphaned", level='warn')
|
||||
raise
|
||||
|
||||
async def verify_upload_integrity(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_id: str,
|
||||
retry_attempts: int = 3
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Verifiziert Upload-Integrität via BLAKE3 Hash von XAI.
|
||||
|
||||
Args:
|
||||
collection_id: XAI Collection ID
|
||||
file_id: XAI file_id
|
||||
retry_attempts: Retry bei temporären Fehlern
|
||||
|
||||
Returns:
|
||||
(success: bool, blake3_hash: Optional[str])
|
||||
"""
|
||||
for attempt in range(1, retry_attempts + 1):
|
||||
try:
|
||||
doc_info = await self.get_collection_document(collection_id, file_id)
|
||||
|
||||
if not doc_info:
|
||||
self._log(f"⚠️ Document {file_id} not found in collection", level='warn')
|
||||
return (False, None)
|
||||
|
||||
blake3_hash = doc_info.get('hash')
|
||||
|
||||
if not blake3_hash:
|
||||
self._log(f"⚠️ No hash returned by XAI API", level='warn')
|
||||
return (False, None)
|
||||
|
||||
self._log(f"✅ Upload verified, BLAKE3: {blake3_hash[:32]}...")
|
||||
return (True, blake3_hash)
|
||||
|
||||
except Exception as e:
|
||||
if attempt < retry_attempts:
|
||||
delay = 2 ** attempt # Exponential backoff
|
||||
self._log(f"⚠️ Verification failed (attempt {attempt}), retry in {delay}s", level='warn')
|
||||
await asyncio.sleep(delay)
|
||||
else:
|
||||
self._log(f"❌ Verification failed after {retry_attempts} attempts: {e}", level='error')
|
||||
return (False, None)
|
||||
|
||||
return (False, None)
|
||||
|
||||
def is_mime_type_supported(self, mime_type: str) -> bool:
|
||||
"""
|
||||
Prüft, ob XAI diesen MIME-Type unterstützt.
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
"""AI Knowledge Full Sync - Daily Cron Job"""
|
||||
"""AI Knowledge Daily Sync - Cron Job"""
|
||||
from typing import Any
|
||||
from motia import FlowContext, cron
|
||||
|
||||
|
||||
config = {
|
||||
"name": "AI Knowledge Full Sync",
|
||||
"description": "Daily full sync of all CAIKnowledge entities (catches missed webhooks)",
|
||||
"name": "AI Knowledge Daily Sync",
|
||||
"description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
|
||||
"flows": ["aiknowledge-full-sync"],
|
||||
"triggers": [
|
||||
cron("0 0 2 * * *"), # Daily at 2:00 AM
|
||||
@@ -16,16 +16,17 @@ config = {
|
||||
|
||||
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
"""
|
||||
Daily full sync handler.
|
||||
Daily sync handler - ensures all active knowledge bases are synchronized.
|
||||
|
||||
Loads all CAIKnowledge entities that need sync and emits events.
|
||||
Blake3 hash verification is always performed (hash available from JunctionData API).
|
||||
Runs every day at 02:00:00.
|
||||
"""
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🌙 DAILY FULL SYNC STARTED")
|
||||
ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
@@ -37,7 +38,7 @@ async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
where=[
|
||||
{
|
||||
'type': 'equals',
|
||||
'attribute': 'activationStatus',
|
||||
'attribute': 'aktivierungsstatus',
|
||||
'value': AIKnowledgeActivationStatus.ACTIVE.value
|
||||
},
|
||||
{
|
||||
@@ -63,23 +64,22 @@ async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
ctx.logger.info("=" * 80)
|
||||
return
|
||||
|
||||
# Enqueue sync events for all
|
||||
# Enqueue sync events for all (Blake3 verification always enabled)
|
||||
for i, entity in enumerate(entities, 1):
|
||||
await ctx.enqueue({
|
||||
'topic': 'aiknowledge.sync',
|
||||
'data': {
|
||||
'knowledge_id': entity['id'],
|
||||
'source': 'daily_full_sync'
|
||||
'source': 'daily_cron'
|
||||
}
|
||||
})
|
||||
|
||||
ctx.logger.info(
|
||||
f"📤 [{i}/{total}] Enqueued: {entity['name']} "
|
||||
f"(syncStatus={entity.get('syncStatus')})"
|
||||
)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"✅ Full sync complete: {total} events enqueued")
|
||||
ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
@@ -55,12 +55,12 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
|
||||
lock_acquired = await sync_utils.acquire_sync_lock(knowledge_id)
|
||||
|
||||
if not lock_acquired:
|
||||
ctx.logger.warning(f"⏸️ Lock already held for {knowledge_id}, skipping")
|
||||
ctx.logger.warn(f"⏸️ Lock already held for {knowledge_id}, skipping")
|
||||
ctx.logger.info(" (Will be retried by Motia queue)")
|
||||
raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry
|
||||
|
||||
try:
|
||||
# Perform sync
|
||||
# Perform sync (Blake3 hash verification always enabled)
|
||||
await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
@@ -74,7 +74,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error("❌ AI KNOWLEDGE SYNC FAILED")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error: {e}", exc_info=True)
|
||||
ctx.logger.error(f"Error: {e}")
|
||||
ctx.logger.error(f"Knowledge ID: {knowledge_id}")
|
||||
ctx.logger.error("=" * 80)
|
||||
|
||||
@@ -31,6 +31,24 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
# Extract payload
|
||||
payload = request.body
|
||||
|
||||
# Handle case where payload is a list (e.g., from array-based webhook)
|
||||
if isinstance(payload, list):
|
||||
if not payload:
|
||||
ctx.logger.error("❌ Empty payload list")
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={'success': False, 'error': 'Empty payload'}
|
||||
)
|
||||
payload = payload[0] # Take first item
|
||||
|
||||
# Ensure payload is a dict
|
||||
if not isinstance(payload, dict):
|
||||
ctx.logger.error(f"❌ Invalid payload type: {type(payload)}")
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={'success': False, 'error': f'Invalid payload type: {type(payload).__name__}'}
|
||||
)
|
||||
|
||||
# Validate required fields
|
||||
knowledge_id = payload.get('entity_id') or payload.get('id')
|
||||
entity_type = payload.get('entity_type', 'CAIKnowledge')
|
||||
Reference in New Issue
Block a user