Compare commits

...

19 Commits

Author SHA1 Message Date
bsiggel
71f583481a fix: Remove deprecated AI Chat Completions and Models List API implementations 2026-03-19 23:10:00 +00:00
bsiggel
48d440a860 fix: Remove deprecated VMH xAI Chat Completions API implementation 2026-03-19 21:42:43 +00:00
bsiggel
c02a5d8823 fix: Update ExecModule exec path to use correct binary location 2026-03-19 21:23:42 +00:00
bsiggel
edae5f6081 fix: Update ExecModule configuration to use correct source directory for step scripts 2026-03-19 21:20:31 +00:00
bsiggel
8ce843415e feat: Enhance developer guide with updated platform evolution and workflow details 2026-03-19 20:56:32 +00:00
bsiggel
46085bd8dd update to iii 0.90 and change directory structure 2026-03-19 20:33:49 +00:00
bsiggel
2ac83df1e0 fix: Update default chat model to grok-4-1-fast-reasoning and enhance logging for LLM responses 2026-03-19 09:50:31 +00:00
bsiggel
7fffdb2660 fix: Simplify error logging in models list API handler 2026-03-19 09:48:57 +00:00
bsiggel
69f0c6a44d feat: Implement AI Chat Completions API with streaming support and models list endpoint
- Enhanced the AI Chat Completions API to support true streaming using async generators and proper SSE headers.
- Updated endpoint paths to align with OpenAI's API versioning.
- Improved logging for request details and error handling.
- Added a new AI Models List API to return available models compatible with chat completions.
- Refactored code for better readability and maintainability, including the extraction of common functionalities.
- Introduced a VMH-specific Chat Completions API with similar features and structure.
2026-03-18 21:30:59 +00:00
bsiggel
949a5fd69c feat: Implement AI Chat Completions API with support for file search, web search, and Aktenzeichen-based collection lookup 2026-03-18 18:22:04 +00:00
bsiggel
8e53fd6345 fix: Enhance tool binding in LangChainXAIService to support web search and update API handler for new parameters 2026-03-15 16:37:57 +00:00
bsiggel
59fdd7d9ec fix: Normalize MIME type for PDF uploads and update collection management endpoint to use vector store API 2026-03-15 16:34:13 +00:00
bsiggel
eaab14ae57 fix: Adjust multipart form to use raw UTF-8 encoding for filenames in file uploads 2026-03-14 23:00:49 +00:00
bsiggel
331d43390a fix: Import unquote for URL decoding in AI Knowledge synchronization utilities 2026-03-14 22:50:59 +00:00
bsiggel
18f2ff775e fix: URL-decode filenames in document synchronization to handle special characters 2026-03-14 22:49:07 +00:00
bsiggel
c032e24d7a fix: Update default model name to 'grok-4-1-fast-reasoning' in xAI Chat Completions API 2026-03-14 08:39:50 +00:00
bsiggel
4a5065aea4 feat: Add Aktenzeichen utility functions and LangChain xAI service integration
- Implemented utility functions for extracting, validating, and normalizing Aktenzeichen in 'aktenzeichen_utils.py'.
- Created LangChainXAIService for integrating LangChain ChatXAI with file search capabilities in 'langchain_xai_service.py'.
- Developed VMH xAI Chat Completions API to handle OpenAI-compatible requests with support for Aktenzeichen detection and file search in 'xai_chat_completion_api_step.py'.
2026-03-13 10:10:33 +00:00
bsiggel
bb13d59ddb fix: Improve orphan detection and Blake3 hash verification in document synchronization 2026-03-13 08:40:20 +00:00
bsiggel
b0fceef4e2 fix: Update sync mode logging to clarify Blake3 hash verification status 2026-03-12 23:09:21 +00:00
41 changed files with 1902 additions and 50 deletions

View File

@@ -3,6 +3,7 @@
> **For AI Assistants**: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency. > **For AI Assistants**: This document contains all critical patterns, conventions, and best practices. Read this first to understand the codebase structure and ensure consistency.
**Quick Navigation:** **Quick Navigation:**
- [iii Platform & Development Workflow](#iii-platform--development-workflow) - Platform evolution and CLI tools
- [Core Concepts](#core-concepts) - System architecture and patterns - [Core Concepts](#core-concepts) - System architecture and patterns
- [Design Principles](#design-principles) - Event Storm & Bidirectional References - [Design Principles](#design-principles) - Event Storm & Bidirectional References
- [Step Development](#step-development-best-practices) - How to create new steps - [Step Development](#step-development-best-practices) - How to create new steps
@@ -23,6 +24,244 @@
--- ---
## iii Platform & Development Workflow
### Platform Evolution (v0.8 → v0.9+)
**Status:** March 2026 - iii v0.9+ production-ready
iii has evolved from an all-in-one development tool to a **modular, production-grade event engine** with clear separation between development and deployment workflows.
#### Structural Changes Overview
| Component | Before (v0.2-v0.7) | Now (v0.9+) | Impact |
|-----------|-------------------|-------------|--------|
| **Console/Dashboard** | Integrated in engine process (port 3111) | Separate process (`iii-cli console` or `dev`) | More flexibility, less resource overhead, better scaling |
| **CLI Tool** | Minimal or non-existent | `iii-cli` is the central dev tool | Terminal-based dev workflow, scriptable, faster iteration |
| **Project Structure** | Steps anywhere in project | **Recommended:** `src/` + `src/steps/` | Cleaner structure, reliable hot-reload |
| **Hot-Reload/Watcher** | Integrated in engine | Separate `shell::ExecModule` with `watch` paths | Only Python/TS files watched (configurable) |
| **Start & Services** | Single `iii` process | Engine (`iii` or `iii-cli start`) + Console separate | Better for production (engine) vs dev (console) |
| **Config Handling** | YAML + ENV | YAML + ENV + CLI flags prioritized | More control via CLI flags |
| **Observability** | Basic | Enhanced (OTel, Rollups, Alerts, Traces) | Production-ready telemetry |
| **Streams & State** | KV-Store (file/memory) | More adapters + file_based default | Better persistence handling |
**Key Takeaway:** iii is now a **modular, production-ready engine** where development (CLI + separate console) is clearly separated from production deployment.
---
### Development Workflow with iii-cli
**`iii-cli` is your primary tool for local development, debugging, and testing.**
#### Essential Commands
| Command | Purpose | When to Use | Example |
|---------|---------|------------|---------|
| `iii-cli dev` | Start dev server with hot-reload + integrated console | Local development, immediate feedback on code changes | `iii-cli dev` |
| `iii-cli console` | Start dashboard only (separate port) | When you only need the console (no dev reload) | `iii-cli console --host 0.0.0.0 --port 3113` |
| `iii-cli start` | Start engine standalone (like `motia.service`) | Testing engine in isolation | `iii-cli start -c iii-config.yaml` |
| `iii-cli logs` | Live logs of all flows/workers/triggers | Debugging, error investigation | `iii-cli logs --level debug` |
| `iii-cli trace <id>` | Show detailed trace information (OTel) | Debug specific request/flow | `iii-cli trace abc123` |
| `iii-cli state ls` | List states (KV storage) | Verify state persistence | `iii-cli state ls` |
| `iii-cli state get` | Get specific state value | Inspect state content | `iii-cli state get key` |
| `iii-cli stream ls` | List all streams + groups | Inspect stream/websocket connections | `iii-cli stream ls` |
| `iii-cli flow list` | Show all registered flows/triggers | Overview of active steps & endpoints | `iii-cli flow list` |
| `iii-cli worker logs` | Worker logs (Python/TS execution) | Debug issues in step handlers | `iii-cli worker logs` |
#### Typical Development Workflow
```bash
# 1. Navigate to project
cd /opt/motia-iii/bitbylaw
# 2. Start dev mode (hot-reload + console on port 3113)
iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
# Alternative: Separate engine + console
# Terminal 1:
iii-cli start -c iii-config.yaml
# Terminal 2:
iii-cli console --host 0.0.0.0 --port 3113 \
--engine-host 192.168.1.62 --engine-port 3111
# 3. Watch logs live (separate terminal)
iii-cli logs -f
# 4. Debug specific trace
iii-cli trace <trace-id-from-logs>
# 5. Inspect state
iii-cli state ls
iii-cli state get document:sync:status
# 6. Verify flows registered
iii-cli flow list
```
#### Development vs. Production
**Development:**
- Use `iii-cli dev` for hot-reload
- Console accessible on localhost:3113
- Logs visible in terminal
- Immediate feedback on code changes
**Production:**
- `systemd` service runs `iii-cli start`
- Console runs separately (if needed)
- Logs via `journalctl -u motia.service -f`
- No hot-reload (restart service for changes)
**Example Production Service:**
```ini
[Unit]
Description=Motia III Engine
After=network.target redis.service
[Service]
Type=simple
User=motia
WorkingDirectory=/opt/motia-iii/bitbylaw
ExecStart=/usr/local/bin/iii-cli start -c /opt/motia-iii/bitbylaw/iii-config.yaml
Restart=always
RestartSec=10
Environment="PATH=/usr/local/bin:/usr/bin"
[Install]
WantedBy=multi-user.target
```
#### Project Structure Best Practices
**Recommended Structure (v0.9+):**
```
bitbylaw/
├── iii-config.yaml # Main configuration
├── src/ # Source code root
│ └── steps/ # All steps here (hot-reload reliable)
│ ├── __init__.py
│ ├── vmh/
│ │ ├── __init__.py
│ │ ├── document_sync_event_step.py
│ │ └── webhook/
│ │ ├── __init__.py
│ │ └── document_create_api_step.py
│ └── advoware_proxy/
│ └── ...
├── services/ # Shared business logic
│ ├── __init__.py
│ ├── xai_service.py
│ ├── espocrm.py
│ └── ...
└── tests/ # Test files
```
**Why `src/steps/` is recommended:**
- **Hot-reload works reliably** - Watcher detects changes correctly
- **Cleaner project** - Source code isolated from config/docs
- **IDE support** - Better navigation and refactoring
- **Deployment** - Easier to package
**Note:** Old structure (steps in root) still works, but hot-reload may be less reliable.
#### Hot-Reload Configuration
**Hot-reload is configured via `shell::ExecModule` in `iii-config.yaml`:**
```yaml
modules:
- type: shell::ExecModule
config:
watch:
- "src/**/*.py" # Watch Python files in src/
- "services/**/*.py" # Watch service files
# Add more patterns as needed
ignore:
- "**/__pycache__/**"
- "**/*.pyc"
- "**/tests/**"
```
**Behavior:**
- Only files matching `watch` patterns trigger reload
- Changes in `ignore` patterns are ignored
- Reload is automatic in `iii-cli dev` mode
- Production mode (`iii-cli start`) does NOT watch files
---
### Observability & Debugging
#### OpenTelemetry Integration
**iii v0.9+ has built-in OpenTelemetry support:**
```python
# Traces are automatically created for:
# - HTTP requests
# - Queue processing
# - Cron execution
# - Service calls (if instrumented)
# Access trace ID in handler:
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
trace_id = ctx.trace_id # Use for debugging
ctx.logger.info(f"Trace ID: {trace_id}")
```
**View traces:**
```bash
# Get trace details
iii-cli trace <trace-id>
# Filter logs by trace
iii-cli logs --trace <trace-id>
```
#### Debugging Workflow
**1. Live Logs:**
```bash
# All logs
iii-cli logs -f
# Specific level
iii-cli logs --level error
# With grep
iii-cli logs -f | grep "document_sync"
```
**2. State Inspection:**
```bash
# List all state keys
iii-cli state ls
# Get specific state
iii-cli state get sync:document:last_run
```
**3. Flow Verification:**
```bash
# List all registered flows
iii-cli flow list
# Verify endpoint exists
iii-cli flow list | grep "/vmh/webhook"
```
**4. Worker Issues:**
```bash
# Worker-specific logs
iii-cli worker logs
# Check worker health
iii-cli worker status
```
---
## Core Concepts ## Core Concepts
### System Overview ### System Overview
@@ -1271,24 +1510,41 @@ sudo systemctl enable motia.service
sudo systemctl enable iii-console.service sudo systemctl enable iii-console.service
``` ```
**Manual (Development):** **Development (iii-cli):**
```bash ```bash
# Start iii Engine # Option 1: Dev mode with integrated console and hot-reload
cd /opt/motia-iii/bitbylaw cd /opt/motia-iii/bitbylaw
/opt/bin/iii -c iii-config.yaml iii-cli dev --host 0.0.0.0 --port 3113 --engine-port 3111
# Start iii Console (Web UI) # Option 2: Separate engine and console
/opt/bin/iii-console --enable-flow --host 0.0.0.0 --port 3113 \ # Terminal 1: Start engine
--engine-host 192.168.67.233 --engine-port 3111 --ws-port 3114 iii-cli start -c iii-config.yaml
# Terminal 2: Start console
iii-cli console --host 0.0.0.0 --port 3113 \
--engine-host 192.168.1.62 --engine-port 3111
# Option 3: Manual (legacy)
/opt/bin/iii -c iii-config.yaml
``` ```
### Check Registered Steps ### Check Registered Steps
**Using iii-cli (recommended):**
```bash
# List all flows and triggers
iii-cli flow list
# Filter for specific step
iii-cli flow list | grep document_sync
```
**Using curl (legacy):**
```bash ```bash
curl http://localhost:3111/_console/functions | python3 -m json.tool curl http://localhost:3111/_console/functions | python3 -m json.tool
``` ```
### Test HTTP Endpoint ### Test HTTP Endpoints
```bash ```bash
# Test document webhook # Test document webhook
@@ -1298,6 +1554,11 @@ curl -X POST "http://localhost:3111/vmh/webhook/document/create" \
# Test advoware proxy # Test advoware proxy
curl "http://localhost:3111/advoware/proxy?endpoint=employees" curl "http://localhost:3111/advoware/proxy?endpoint=employees"
# Test beteiligte sync
curl -X POST "http://localhost:3111/vmh/webhook/beteiligte/create" \
-H "Content-Type: application/json" \
-d '{"entity_type": "CBeteiligte", "entity_id": "abc123", "action": "create"}'
``` ```
### Manually Trigger Cron ### Manually Trigger Cron
@@ -1308,36 +1569,208 @@ curl -X POST "http://localhost:3111/_console/cron/trigger" \
-d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}' -d '{"function_id": "steps::VMH Beteiligte Sync Cron::trigger::0"}'
``` ```
### View Logs ### View and Debug Logs
**Using iii-cli (recommended):**
```bash ```bash
# Live logs via journalctl # Live logs (all)
journalctl -u motia-iii -f iii-cli logs -f
# Live logs with specific level
iii-cli logs -f --level error
iii-cli logs -f --level debug
# Filter by component
iii-cli logs -f | grep "document_sync"
# Worker-specific logs
iii-cli worker logs
# Get specific trace
iii-cli trace <trace-id>
# Filter logs by trace ID
iii-cli logs --trace <trace-id>
```
**Using journalctl (production):**
```bash
# Live logs
journalctl -u motia.service -f
# Search for specific step # Search for specific step
journalctl --since "today" | grep -i "document sync" journalctl -u motia.service --since "today" | grep -i "document sync"
# Show errors only
journalctl -u motia.service -p err -f
# Last 100 lines
journalctl -u motia.service -n 100
# Specific time range
journalctl -u motia.service --since "2026-03-19 10:00" --until "2026-03-19 11:00"
```
**Using log files (legacy):**
```bash
# Check for errors # Check for errors
tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error tail -100 /opt/motia-iii/bitbylaw/iii_new.log | grep -i error
# Follow log file
tail -f /opt/motia-iii/bitbylaw/iii_new.log
```
### Inspect State and Streams
**State Management:**
```bash
# List all state keys
iii-cli state ls
# Get specific state value
iii-cli state get document:sync:last_run
# Set state (if needed for testing)
iii-cli state set test:key "test value"
# Delete state
iii-cli state delete test:key
```
**Stream Management:**
```bash
# List all active streams
iii-cli stream ls
# Inspect specific stream
iii-cli stream info <stream-id>
# List consumer groups
iii-cli stream groups <stream-name>
```
### Debugging Workflow
**1. Identify the Issue:**
```bash
# Check if step is registered
iii-cli flow list | grep my_step
# View recent errors
iii-cli logs --level error -n 50
# Check service status
sudo systemctl status motia.service
```
**2. Get Detailed Information:**
```bash
# Live tail logs for specific step
iii-cli logs -f | grep "document_sync"
# Check worker processes
iii-cli worker logs
# Inspect state
iii-cli state ls
```
**3. Test Specific Functionality:**
```bash
# Trigger webhook manually
curl -X POST http://localhost:3111/vmh/webhook/...
# Check response and logs
iii-cli logs -f | grep "webhook"
# Verify state changed
iii-cli state get entity:sync:status
```
**4. Trace Specific Request:**
```bash
# Make request, note trace ID from logs
curl -X POST http://localhost:3111/vmh/webhook/document/create ...
# Get full trace
iii-cli trace <trace-id>
# View all logs for this trace
iii-cli logs --trace <trace-id>
```
### Performance Monitoring
**Check System Resources:**
```bash
# CPU and memory usage
htop
# Process-specific
ps aux | grep iii
# Redis memory
redis-cli info memory
# File descriptors
lsof -p $(pgrep -f "iii-cli start")
```
**Check Processing Metrics:**
```bash
# Queue lengths (if using Redis streams)
redis-cli XINFO STREAM vmh:document:sync
# Pending messages
redis-cli XPENDING vmh:document:sync group1
# Lock status
redis-cli KEYS "lock:*"
``` ```
### Common Issues ### Common Issues
**Step not showing up:** **Step not showing up:**
1. Check file naming: Must end with `_step.py` 1. Check file naming: Must end with `_step.py`
2. Check for import errors: `grep -i "importerror\|traceback" iii.log` 2. Check for syntax errors: `iii-cli logs --level error`
3. Verify `config` dict is present 3. Check for import errors: `iii-cli logs | grep -i "importerror\|traceback"`
4. Restart iii engine 4. Verify `config` dict is present
5. Restart: `sudo systemctl restart motia.service` or restart `iii-cli dev`
6. Verify hot-reload working: Check terminal output in `iii-cli dev`
**Redis connection failed:** **Redis connection failed:**
- Check `REDIS_HOST` and `REDIS_PORT` environment variables - Check `REDIS_HOST` and `REDIS_PORT` environment variables
- Verify Redis is running: `redis-cli ping` - Verify Redis is running: `redis-cli ping`
- Check Redis logs: `journalctl -u redis -f`
- Service will work without Redis but with warnings - Service will work without Redis but with warnings
**Hot-reload not working:**
- Verify using `iii-cli dev` (not `iii-cli start`)
- Check `watch` patterns in `iii-config.yaml`
- Ensure files are in watched directories (`src/**/*.py`)
- Look for watcher errors: `iii-cli logs | grep -i "watch"`
**Handler not triggered:**
- Verify endpoint registered: `iii-cli flow list`
- Check HTTP method matches (GET, POST, etc.)
- Test with curl to isolate issue
- Check trigger configuration in step's `config` dict
**AttributeError '_log' not found:** **AttributeError '_log' not found:**
- Ensure service inherits from `BaseSyncUtils` OR - Ensure service inherits from `BaseSyncUtils` OR
- Implement `_log()` method manually - Implement `_log()` method manually
**Trace not found:**
- Ensure OpenTelemetry enabled in config
- Check if trace ID is valid format
- Use `iii-cli logs` with filters instead
**Console not accessible:**
- Check if console service running: `systemctl status iii-console.service`
- Verify port not blocked by firewall: `sudo ufw status`
- Check console logs: `journalctl -u iii-console.service -f`
- Try accessing via `localhost:3113` instead of public IP
--- ---
## Key Patterns Summary ## Key Patterns Summary

View File

@@ -78,6 +78,6 @@ modules:
- class: modules::shell::ExecModule - class: modules::shell::ExecModule
config: config:
watch: watch:
- steps/**/*.py - src/steps/**/*.py
exec: exec:
- /opt/bin/uv run python -m motia.cli run --dir steps - /usr/local/bin/uv run python -m motia.cli run --dir src/steps

View File

@@ -18,5 +18,8 @@ dependencies = [
"google-api-python-client>=2.100.0", # Google Calendar API "google-api-python-client>=2.100.0", # Google Calendar API
"google-auth>=2.23.0", # Google OAuth2 "google-auth>=2.23.0", # Google OAuth2
"backoff>=2.2.1", # Retry/backoff decorator "backoff>=2.2.1", # Retry/backoff decorator
"langchain>=0.3.0", # LangChain framework
"langchain-xai>=0.2.0", # xAI integration for LangChain
"langchain-core>=0.3.0", # LangChain core
] ]

View File

@@ -12,6 +12,7 @@ import hashlib
import json import json
from typing import Dict, Any, Optional, List, Tuple from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime from datetime import datetime
from urllib.parse import unquote
from services.sync_utils_base import BaseSyncUtils from services.sync_utils_base import BaseSyncUtils
from services.models import ( from services.models import (
@@ -282,7 +283,8 @@ class AIKnowledgeSync(BaseSyncUtils):
successful = 0 successful = 0
failed = 0 failed = 0
skipped = 0 skipped = 0
# Track aiDocumentIds for orphan detection (collected during sync)
synced_file_ids: set = set()
for doc in documents: for doc in documents:
doc_id = doc['documentId'] doc_id = doc['documentId']
doc_name = doc.get('documentName', 'Unknown') doc_name = doc.get('documentName', 'Unknown')
@@ -303,7 +305,17 @@ class AIKnowledgeSync(BaseSyncUtils):
if junction_status in ['new', 'unclean', 'failed']: if junction_status in ['new', 'unclean', 'failed']:
needs_sync = True needs_sync = True
reason = f"status={junction_status}" reason = f"status={junction_status}"
elif junction_status == 'synced' and blake3_hash and ai_document_id: elif junction_status == 'synced':
# Synced status should have both blake3_hash and ai_document_id
if not blake3_hash:
needs_sync = True
reason = "inconsistency: synced but no blake3 hash"
ctx.logger.warn(f" ⚠️ Synced document missing blake3 hash!")
elif not ai_document_id:
needs_sync = True
reason = "inconsistency: synced but no aiDocumentId"
ctx.logger.warn(f" ⚠️ Synced document missing aiDocumentId!")
else:
# Verify Blake3 hash with XAI (always, since hash from JunctionData API is free) # Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
try: try:
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id) xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
@@ -319,11 +331,17 @@ class AIKnowledgeSync(BaseSyncUtils):
else: else:
needs_sync = True needs_sync = True
reason = "file not found in XAI collection" reason = "file not found in XAI collection"
ctx.logger.warn(f" ⚠️ Document marked synced but not in XAI!")
except Exception as e: except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to verify Blake3: {e}") needs_sync = True
reason = f"verification failed: {e}"
ctx.logger.warn(f" ⚠️ Failed to verify Blake3, will re-sync: {e}")
if not needs_sync: if not needs_sync:
ctx.logger.info(f" ⏭️ Skipped (no sync needed)") ctx.logger.info(f" ⏭️ Skipped (no sync needed)")
# Document is already synced, track its aiDocumentId
if ai_document_id:
synced_file_ids.add(ai_document_id)
skipped += 1 skipped += 1
continue continue
@@ -338,24 +356,29 @@ class AIKnowledgeSync(BaseSyncUtils):
failed += 1 failed += 1
continue continue
# Get attachment details for MIME type # Get attachment details for MIME type and original filename
try: try:
attachment = await espocrm.get_entity('Attachment', attachment_id) attachment = await espocrm.get_entity('Attachment', attachment_id)
mime_type = attachment.get('type', 'application/octet-stream') mime_type = attachment.get('type', 'application/octet-stream')
file_size = attachment.get('size', 0) file_size = attachment.get('size', 0)
original_filename = attachment.get('name', doc_name) # Original filename with extension
# URL-decode filename (fixes special chars like §, ä, ö, ü, etc.)
original_filename = unquote(original_filename)
except Exception as e: except Exception as e:
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults") ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
mime_type = 'application/octet-stream' mime_type = 'application/octet-stream'
file_size = 0 file_size = 0
original_filename = unquote(doc_name) # Also decode fallback name
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)") ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
ctx.logger.info(f" 📄 Original filename: {original_filename}")
# Download document # Download document
file_content = await espocrm.download_attachment(attachment_id) file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes") ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
# Upload to XAI # Upload to XAI with original filename (includes extension)
filename = doc_name filename = original_filename
xai_file_id = await xai.upload_file(file_content, filename, mime_type) xai_file_id = await xai.upload_file(file_content, filename, mime_type)
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}") ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
@@ -376,6 +399,9 @@ class AIKnowledgeSync(BaseSyncUtils):
) )
ctx.logger.info(f" ✅ Junction updated") ctx.logger.info(f" ✅ Junction updated")
# Track the new aiDocumentId for orphan detection
synced_file_ids.add(xai_file_id)
successful += 1 successful += 1
except Exception as e: except Exception as e:
@@ -403,11 +429,12 @@ class AIKnowledgeSync(BaseSyncUtils):
xai_documents = await xai.list_collection_documents(collection_id) xai_documents = await xai.list_collection_documents(collection_id)
xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')} xai_file_ids = {doc.get('file_id') for doc in xai_documents if doc.get('file_id')}
# Get all ai_document_ids from junction # Use synced_file_ids (collected during this sync) for orphan detection
junction_file_ids = {doc.get('aiDocumentId') for doc in documents if doc.get('aiDocumentId')} # This includes both pre-existing synced docs and newly uploaded ones
ctx.logger.info(f" XAI has {len(xai_file_ids)} files, we have {len(synced_file_ids)} synced")
# Find orphans (in XAI but not in junction) # Find orphans (in XAI but not in our current sync)
orphans = xai_file_ids - junction_file_ids orphans = xai_file_ids - synced_file_ids
if orphans: if orphans:
ctx.logger.info(f" Found {len(orphans)} orphaned file(s)") ctx.logger.info(f" Found {len(orphans)} orphaned file(s)")
@@ -432,7 +459,7 @@ class AIKnowledgeSync(BaseSyncUtils):
ctx.logger.info(f" ✅ Synced: {successful}") ctx.logger.info(f" ✅ Synced: {successful}")
ctx.logger.info(f" ⏭️ Skipped: {skipped}") ctx.logger.info(f" ⏭️ Skipped: {skipped}")
ctx.logger.info(f" ❌ Failed: {failed}") ctx.logger.info(f" ❌ Failed: {failed}")
ctx.logger.info(f" Mode: {'FULL SYNC (Blake3 verification)' if full_sync else 'INCREMENTAL'}") ctx.logger.info(f" Mode: Blake3 hash verification enabled")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
def _calculate_metadata_hash(self, document: Dict) -> str: def _calculate_metadata_hash(self, document: Dict) -> str:

View File

@@ -0,0 +1,110 @@
"""Aktenzeichen-Erkennung und Validation
Utility functions für das Erkennen, Validieren und Normalisieren von
Aktenzeichen im Format '1234/56' oder 'ABC/23'.
"""
import re
from typing import Optional
# Regex für Aktenzeichen: 1-4 Zeichen (alphanumerisch) + "/" + 2 Ziffern
AKTENZEICHEN_REGEX = re.compile(r'^([A-Za-z0-9]{1,4}/\d{2})\s*', re.IGNORECASE)
def extract_aktenzeichen(text: str) -> Optional[str]:
"""
Extrahiert Aktenzeichen vom Anfang des Textes.
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}
Examples:
>>> extract_aktenzeichen("1234/56 Was ist der Stand?")
"1234/56"
>>> extract_aktenzeichen("ABC/23 Frage zum Vertrag")
"ABC/23"
>>> extract_aktenzeichen("Kein Aktenzeichen hier")
None
Args:
text: Eingabetext (z.B. erste Message)
Returns:
Aktenzeichen als String, oder None wenn nicht gefunden
"""
if not text or not isinstance(text, str):
return None
match = AKTENZEICHEN_REGEX.match(text.strip())
return match.group(1) if match else None
def remove_aktenzeichen(text: str) -> str:
"""
Entfernt Aktenzeichen vom Anfang des Textes.
Examples:
>>> remove_aktenzeichen("1234/56 Was ist der Stand?")
"Was ist der Stand?"
>>> remove_aktenzeichen("Kein Aktenzeichen")
"Kein Aktenzeichen"
Args:
text: Eingabetext mit Aktenzeichen
Returns:
Text ohne Aktenzeichen (whitespace getrimmt)
"""
if not text or not isinstance(text, str):
return text
return AKTENZEICHEN_REGEX.sub('', text, count=1).strip()
def validate_aktenzeichen(az: str) -> bool:
"""
Validiert Aktenzeichen-Format.
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}$
Examples:
>>> validate_aktenzeichen("1234/56")
True
>>> validate_aktenzeichen("ABC/23")
True
>>> validate_aktenzeichen("12345/567") # Zu lang
False
>>> validate_aktenzeichen("1234-56") # Falsches Trennzeichen
False
Args:
az: Aktenzeichen zum Validieren
Returns:
True wenn valide, False sonst
"""
if not az or not isinstance(az, str):
return False
return bool(re.match(r'^[A-Za-z0-9]{1,4}/\d{2}$', az, re.IGNORECASE))
def normalize_aktenzeichen(az: str) -> str:
"""
Normalisiert Aktenzeichen (uppercase, trim whitespace).
Examples:
>>> normalize_aktenzeichen("abc/23")
"ABC/23"
>>> normalize_aktenzeichen(" 1234/56 ")
"1234/56"
Args:
az: Aktenzeichen zum Normalisieren
Returns:
Normalisiertes Aktenzeichen (uppercase, getrimmt)
"""
if not az or not isinstance(az, str):
return az
return az.strip().upper()

View File

@@ -10,6 +10,7 @@ Utility functions for document synchronization with xAI:
from typing import Dict, Any, Optional, List, Tuple from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
from urllib.parse import unquote
from services.sync_utils_base import BaseSyncUtils from services.sync_utils_base import BaseSyncUtils
from services.models import FileStatus, XAISyncStatus from services.models import FileStatus, XAISyncStatus
@@ -365,6 +366,10 @@ class DocumentSync(BaseSyncUtils):
# Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment # Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment
final_filename = filename or attachment.get('name', 'unknown') final_filename = filename or attachment.get('name', 'unknown')
# URL-decode filename (fixes special chars like §, ä, ö, ü, etc.)
# EspoCRM stores filenames URL-encoded: %C2%A7 → §
final_filename = unquote(final_filename)
return { return {
'attachment_id': attachment_id, 'attachment_id': attachment_id,
'download_url': f"/api/v1/Attachment/file/{attachment_id}", 'download_url': f"/api/v1/Attachment/file/{attachment_id}",

View File

@@ -0,0 +1,218 @@
"""LangChain xAI Integration Service
Service für LangChain ChatXAI Integration mit File Search Binding.
Analog zu xai_service.py für xAI Files API.
"""
import os
from typing import Dict, List, Any, Optional, AsyncIterator
from services.logging_utils import get_service_logger
class LangChainXAIService:
"""
Wrapper für LangChain ChatXAI mit Motia-Integration.
Benötigte Umgebungsvariablen:
- XAI_API_KEY: API Key für xAI (für ChatXAI model)
Usage:
service = LangChainXAIService(ctx)
model = service.get_chat_model(model="grok-4-1-fast-reasoning")
model_with_tools = service.bind_file_search(model, collection_id)
result = await service.invoke_chat(model_with_tools, messages)
"""
def __init__(self, ctx=None):
"""
Initialize LangChain xAI Service.
Args:
ctx: Optional Motia context for logging
Raises:
ValueError: If XAI_API_KEY not configured
"""
self.api_key = os.getenv('XAI_API_KEY', '')
self.ctx = ctx
self.logger = get_service_logger('langchain_xai', ctx)
if not self.api_key:
raise ValueError("XAI_API_KEY not configured in environment")
def _log(self, msg: str, level: str = 'info') -> None:
"""Delegate logging to service logger"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(msg)
def get_chat_model(
self,
model: str = "grok-4-1-fast-reasoning",
temperature: float = 0.7,
max_tokens: Optional[int] = None
):
"""
Initialisiert ChatXAI Model.
Args:
model: Model name (default: grok-4-1-fast-reasoning)
temperature: Sampling temperature 0.0-1.0
max_tokens: Optional max tokens for response
Returns:
ChatXAI model instance
Raises:
ImportError: If langchain_xai not installed
"""
try:
from langchain_xai import ChatXAI
except ImportError:
raise ImportError(
"langchain_xai not installed. "
"Run: pip install langchain-xai>=0.2.0"
)
self._log(f"🤖 Initializing ChatXAI: model={model}, temp={temperature}")
kwargs = {
"model": model,
"api_key": self.api_key,
"temperature": temperature
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
return ChatXAI(**kwargs)
def bind_tools(
self,
model,
collection_id: Optional[str] = None,
enable_web_search: bool = False,
web_search_config: Optional[Dict[str, Any]] = None,
max_num_results: int = 10
):
"""
Bindet xAI Tools (file_search und/oder web_search) an Model.
Args:
model: ChatXAI model instance
collection_id: Optional xAI Collection ID für file_search
enable_web_search: Enable web search tool (default: False)
web_search_config: Optional web search configuration:
{
'allowed_domains': ['example.com'], # Max 5 domains
'excluded_domains': ['spam.com'], # Max 5 domains
'enable_image_understanding': True
}
max_num_results: Max results from file search (default: 10)
Returns:
Model with requested tools bound (file_search and/or web_search)
"""
tools = []
# Add file_search tool if collection_id provided
if collection_id:
self._log(f"🔍 Binding file_search: collection={collection_id}")
tools.append({
"type": "file_search",
"vector_store_ids": [collection_id],
"max_num_results": max_num_results
})
# Add web_search tool if enabled
if enable_web_search:
self._log("🌐 Binding web_search")
web_search_tool = {"type": "web_search"}
# Add optional web search filters
if web_search_config:
if 'allowed_domains' in web_search_config:
domains = web_search_config['allowed_domains'][:5] # Max 5
web_search_tool['filters'] = {'allowed_domains': domains}
self._log(f" Allowed domains: {domains}")
elif 'excluded_domains' in web_search_config:
domains = web_search_config['excluded_domains'][:5] # Max 5
web_search_tool['filters'] = {'excluded_domains': domains}
self._log(f" Excluded domains: {domains}")
if web_search_config.get('enable_image_understanding'):
web_search_tool['enable_image_understanding'] = True
self._log(" Image understanding: enabled")
tools.append(web_search_tool)
if not tools:
self._log("⚠️ No tools to bind (no collection_id and web_search disabled)", level='warn')
return model
self._log(f"🔧 Binding {len(tools)} tool(s) to model")
return model.bind_tools(tools)
def bind_file_search(
self,
model,
collection_id: str,
max_num_results: int = 10
):
"""
Legacy method: Bindet nur file_search Tool an Model.
Use bind_tools() for more flexibility.
"""
return self.bind_tools(
model=model,
collection_id=collection_id,
max_num_results=max_num_results
)
async def invoke_chat(
self,
model,
messages: List[Dict[str, Any]]
) -> Any:
"""
Non-streaming Chat Completion.
Args:
model: ChatXAI model (with or without tools)
messages: List of message dicts [{"role": "user", "content": "..."}]
Returns:
LangChain AIMessage with response
Raises:
Exception: If API call fails
"""
self._log(f"💬 Invoking chat: {len(messages)} messages", level='debug')
result = await model.ainvoke(messages)
self._log(f"✅ Response received: {len(result.content)} chars", level='debug')
return result
async def astream_chat(
self,
model,
messages: List[Dict[str, Any]]
) -> AsyncIterator:
"""
Streaming Chat Completion.
Args:
model: ChatXAI model (with or without tools)
messages: List of message dicts
Yields:
Chunks from streaming response
Example:
async for chunk in service.astream_chat(model, messages):
delta = chunk.content if hasattr(chunk, "content") else ""
# Process delta...
"""
self._log(f"💬 Streaming chat: {len(messages)} messages", level='debug')
async for chunk in model.astream(messages):
yield chunk

View File

@@ -63,14 +63,31 @@ class XAIService:
Raises: Raises:
RuntimeError: bei HTTP-Fehler oder fehlendem file_id in der Antwort RuntimeError: bei HTTP-Fehler oder fehlendem file_id in der Antwort
""" """
self._log(f"📤 Uploading {len(file_content)} bytes to xAI: {filename}") # Normalize MIME type: xAI needs correct Content-Type for proper processing
# If generic octet-stream but file is clearly a PDF, fix it
if mime_type == 'application/octet-stream' and filename.lower().endswith('.pdf'):
mime_type = 'application/pdf'
self._log(f"⚠️ Corrected MIME type to application/pdf for {filename}")
self._log(f"📤 Uploading {len(file_content)} bytes to xAI: {filename} ({mime_type})")
session = await self._get_session() session = await self._get_session()
url = f"{XAI_FILES_URL}/v1/files" url = f"{XAI_FILES_URL}/v1/files"
headers = {"Authorization": f"Bearer {self.api_key}"} headers = {"Authorization": f"Bearer {self.api_key}"}
form = aiohttp.FormData() # Create multipart form with explicit UTF-8 filename encoding
form.add_field('file', file_content, filename=filename, content_type=mime_type) # aiohttp automatically URL-encodes filenames with special chars,
# but xAI expects raw UTF-8 in the filename parameter
form = aiohttp.FormData(quote_fields=False)
form.add_field(
'file',
file_content,
filename=filename,
content_type=mime_type
)
# CRITICAL: purpose="file_search" enables proper PDF processing
# Without this, xAI throws "internal error" on complex PDFs
form.add_field('purpose', 'file_search')
async with session.post(url, data=form, headers=headers) as response: async with session.post(url, data=form, headers=headers) as response:
try: try:
@@ -95,9 +112,12 @@ class XAIService:
async def add_to_collection(self, collection_id: str, file_id: str) -> None: async def add_to_collection(self, collection_id: str, file_id: str) -> None:
""" """
Fügt eine Datei einer xAI-Collection hinzu. Fügt eine Datei einer xAI-Collection (Vector Store) hinzu.
POST https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} POST https://api.x.ai/v1/vector_stores/{vector_store_id}/files
Uses the OpenAI-compatible API pattern for adding files to vector stores.
This triggers proper indexing and processing.
Raises: Raises:
RuntimeError: bei HTTP-Fehler RuntimeError: bei HTTP-Fehler
@@ -105,13 +125,16 @@ class XAIService:
self._log(f"📚 Adding file {file_id} to collection {collection_id}") self._log(f"📚 Adding file {file_id} to collection {collection_id}")
session = await self._get_session() session = await self._get_session()
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" # Use the OpenAI-compatible endpoint (not management API)
url = f"{XAI_FILES_URL}/v1/vector_stores/{collection_id}/files"
headers = { headers = {
"Authorization": f"Bearer {self.management_key}", "Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
async with session.post(url, headers=headers) as response: payload = {"file_id": file_id}
async with session.post(url, json=payload, headers=headers) as response:
if response.status not in (200, 201): if response.status not in (200, 201):
raw = await response.text() raw = await response.text()
raise RuntimeError( raise RuntimeError(

1033
uv.lock generated

File diff suppressed because it is too large Load Diff