diff --git a/docs/AI_KNOWLEDGE_SYNC.md b/docs/AI_KNOWLEDGE_SYNC.md new file mode 100644 index 0000000..30a55a0 --- /dev/null +++ b/docs/AI_KNOWLEDGE_SYNC.md @@ -0,0 +1,599 @@ +# AI Knowledge Collection Sync - Dokumentation + +**Version**: 1.0 +**Datum**: 11. März 2026 +**Status**: ✅ Implementiert + +--- + +## Überblick + +Synchronisiert EspoCRM `CAIKnowledge` Entities mit XAI Collections für semantische Dokumentensuche. Unterstützt vollständigen Collection-Lifecycle, BLAKE3-basierte Integritätsprüfung und robustes Hash-basiertes Change Detection. + +## Features + +✅ **Collection Lifecycle Management** +- NEW → Collection erstellen in XAI +- ACTIVE → Automatischer Sync der Dokumente +- PAUSED → Sync pausiert, Collection bleibt +- DEACTIVATED → Collection aus XAI löschen + +✅ **Dual-Hash Change Detection** +- EspoCRM Hash (MD5/SHA256) für lokale Änderungserkennung +- XAI BLAKE3 Hash für Remote-Integritätsverifikation +- Metadata-Hash für Beschreibungs-Änderungen + +✅ **Robustheit** +- BLAKE3 Verification nach jedem Upload +- Metadata-Only Updates via PATCH +- Orphan Detection & Cleanup +- Distributed Locking (Redis) +- Daily Full Sync (02:00 Uhr nachts) + +✅ **Fehlerbehandlung** +- Unsupported MIME Types → Status "unsupported" +- Transient Errors → Retry mit Exponential Backoff +- Partial Failures toleriert + +--- + +## Architektur + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ EspoCRM CAIKnowledge │ +│ ├─ activationStatus: new/active/paused/deactivated │ +│ ├─ syncStatus: unclean/pending_sync/synced/failed │ +│ └─ datenbankId: XAI Collection ID │ +└─────────────────────────────────────────────────────────────────┘ + ↓ Webhook +┌─────────────────────────────────────────────────────────────────┐ +│ Motia Webhook Handler │ +│ → POST /vmh/webhook/aiknowledge/update │ +└─────────────────────────────────────────────────────────────────┘ + ↓ Emit Event +┌─────────────────────────────────────────────────────────────────┐ +│ Queue: aiknowledge.sync │ +└─────────────────────────────────────────────────────────────────┘ + ↓ Lock: aiknowledge:{id} +┌─────────────────────────────────────────────────────────────────┐ +│ Sync Handler │ +│ ├─ Check activationStatus │ +│ ├─ Manage Collection Lifecycle │ +│ ├─ Sync Documents (with BLAKE3 verification) │ +│ └─ Update Statuses │ +└─────────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────────┐ +│ XAI Collections API │ +│ └─ Collections with embedded documents │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## EspoCRM Konfiguration + +### 1. Entity: CAIKnowledge + +**Felder:** + +| Feld | Typ | Beschreibung | Werte | +|------|-----|--------------|-------| +| `name` | varchar(255) | Name der Knowledge Base | - | +| `datenbankId` | varchar(255) | XAI Collection ID | Automatisch gefüllt | +| `activationStatus` | enum | Lifecycle-Status | new, active, paused, deactivated | +| `syncStatus` | enum | Sync-Status | unclean, pending_sync, synced, failed | +| `lastSync` | datetime | Letzter erfolgreicher Sync | ISO 8601 | +| `syncError` | text | Fehlermeldung bei Failure | Max 2000 Zeichen | + +**Enum-Definitionen:** + +```json +{ + "activationStatus": { + "type": "enum", + "options": ["new", "active", "paused", "deactivated"], + "default": "new" + }, + "syncStatus": { + "type": "enum", + "options": ["unclean", "pending_sync", "synced", "failed"], + "default": "unclean" + } +} +``` + +### 2. Junction: CAIKnowledgeCDokumente + +**additionalColumns:** + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `aiDocumentId` | varchar(255) | XAI file_id | +| `syncstatus` | enum | Per-Document Sync-Status | +| `syncedHash` | varchar(64) | MD5/SHA256 von EspoCRM | +| `xaiBlake3Hash` | varchar(128) | BLAKE3 Hash von XAI | +| `syncedMetadataHash` | varchar(64) | Hash der Metadaten | +| `lastSync` | datetime | Letzter Sync dieses Dokuments | + +**Enum-Definition:** + +```json +{ + "syncstatus": { + "type": "enum", + "options": ["new", "unclean", "synced", "failed", "unsupported"] + } +} +``` + +### 3. Webhooks + +**Webhook 1: CREATE** +```json +{ + "event": "CAIKnowledge.afterSave", + "url": "https://your-motia-domain.com/vmh/webhook/aiknowledge/update", + "method": "POST", + "payload": "{\"entity_id\": \"{$id}\", \"entity_type\": \"CAIKnowledge\", \"action\": \"create\"}", + "condition": "entity.isNew()" +} +``` + +**Webhook 2: UPDATE** +```json +{ + "event": "CAIKnowledge.afterSave", + "url": "https://your-motia-domain.com/vmh/webhook/aiknowledge/update", + "method": "POST", + "payload": "{\"entity_id\": \"{$id}\", \"entity_type\": \"CAIKnowledge\", \"action\": \"update\"}", + "condition": "!entity.isNew()" +} +``` + +**Webhook 3: DELETE (Optional)** +```json +{ + "event": "CAIKnowledge.afterRemove", + "url": "https://your-motia-domain.com/vmh/webhook/aiknowledge/delete", + "method": "POST", + "payload": "{\"entity_id\": \"{$id}\", \"entity_type\": \"CAIKnowledge\", \"action\": \"delete\"}" +} +``` + +**Empfehlung**: Nur CREATE + UPDATE verwenden. DELETE über `activationStatus="deactivated"` steuern. + +### 4. Hooks (EspoCRM Backend) + +**Hook 1: Document Link → syncStatus auf "unclean"** + +```php +// Hooks/Custom/CAIKnowledge/AfterRelateLinkMultiple.php +namespace Espo\Custom\Hooks\CAIKnowledge; + +class AfterRelateLinkMultiple extends \Espo\Core\Hooks\Base +{ + public function afterRelateLinkMultiple($entity, $options, $data) + { + if ($data['link'] === 'dokumentes') { + // Mark as unclean when documents linked + $entity->set('syncStatus', 'unclean'); + $this->getEntityManager()->saveEntity($entity); + } + } +} +``` + +**Hook 2: Document Change → Junction auf "unclean"** + +```php +// Hooks/Custom/CDokumente/AfterSave.php +namespace Espo\Custom\Hooks\CDokumente; + +class AfterSave extends \Espo\Core\Hooks\Base +{ + public function afterSave($entity, $options) + { + if ($entity->isAttributeChanged('description') || + $entity->isAttributeChanged('md5') || + $entity->isAttributeChanged('sha256')) { + + // Mark all junction entries as unclean + $this->updateJunctionStatuses($entity->id, 'unclean'); + + // Mark all related CAIKnowledge as unclean + $this->markRelatedKnowledgeUnclean($entity->id); + } + } +} +``` + +--- + +## Environment Variables + +```bash +# XAI API Keys (erforderlich) +XAI_API_KEY=your_xai_api_key_here +XAI_MANAGEMENT_KEY=your_xai_management_key_here + +# Redis (für Locking) +REDIS_HOST=localhost +REDIS_PORT=6379 + +# EspoCRM +ESPOCRM_API_BASE_URL=https://crm.bitbylaw.com/api/v1 +ESPOCRM_API_KEY=your_espocrm_api_key +``` + +--- + +## Workflows + +### Workflow 1: Neue Knowledge Base erstellen + +``` +1. User erstellt CAIKnowledge in EspoCRM + └─ activationStatus: "new" (default) + +2. Webhook CREATE gefeuert + └─ Event: aiknowledge.sync + +3. Sync Handler: + └─ activationStatus="new" → Collection erstellen in XAI + └─ Update EspoCRM: + ├─ datenbankId = collection_id + ├─ activationStatus = "active" + └─ syncStatus = "unclean" + +4. Nächster Webhook (UPDATE): + └─ activationStatus="active" → Dokumente syncen +``` + +### Workflow 2: Dokumente hinzufügen + +``` +1. User verknüpft Dokumente mit CAIKnowledge + └─ EspoCRM Hook setzt syncStatus = "unclean" + +2. Webhook UPDATE gefeuert + └─ Event: aiknowledge.sync + +3. Sync Handler: + └─ Für jedes Junction-Entry: + ├─ Check: MIME Type supported? + ├─ Check: Hash changed? + ├─ Download von EspoCRM + ├─ Upload zu XAI mit Metadata + ├─ Verify Upload (BLAKE3) + └─ Update Junction: syncstatus="synced" + +4. Update CAIKnowledge: + └─ syncStatus = "synced" + └─ lastSync = now() +``` + +### Workflow 3: Metadata-Änderung + +``` +1. User ändert Document.description in EspoCRM + └─ EspoCRM Hook setzt Junction syncstatus = "unclean" + └─ EspoCRM Hook setzt CAIKnowledge syncStatus = "unclean" + +2. Webhook UPDATE gefeuert + +3. Sync Handler: + └─ Berechne Metadata-Hash + └─ Hash unterschiedlich? → PATCH zu XAI + └─ Falls PATCH fehlschlägt → Fallback: Re-upload + └─ Update Junction: syncedMetadataHash +``` + +### Workflow 4: Knowledge Base deaktivieren + +``` +1. User setzt activationStatus = "deactivated" + +2. Webhook UPDATE gefeuert + +3. Sync Handler: + └─ Collection aus XAI löschen + └─ Alle Junction Entries zurücksetzen: + ├─ syncstatus = "new" + └─ aiDocumentId = NULL + └─ CAIKnowledge bleibt in EspoCRM (mit datenbankId) +``` + +### Workflow 5: Daily Full Sync + +``` +Cron: Täglich um 02:00 Uhr + +1. Lade alle CAIKnowledge mit: + └─ activationStatus = "active" + └─ syncStatus IN ("unclean", "failed") + +2. Für jedes: + └─ Emit: aiknowledge.sync Event + +3. Queue verarbeitet alle sequenziell + └─ Fängt verpasste Webhooks ab +``` + +--- + +## Monitoring & Troubleshooting + +### Logs prüfen + +```bash +# Motia Service Logs +sudo journalctl -u motia-iii -f | grep -i "ai knowledge" + +# Letzte 100 Sync-Events +sudo journalctl -u motia-iii -n 100 | grep "AI KNOWLEDGE SYNC" + +# Fehler der letzten 24 Stunden +sudo journalctl -u motia-iii --since "24 hours ago" | grep "❌" +``` + +### EspoCRM Status prüfen + +```sql +-- Alle Knowledge Bases mit Status +SELECT + id, + name, + activation_status, + sync_status, + last_sync, + sync_error +FROM c_ai_knowledge +WHERE activation_status = 'active'; + +-- Junction Entries mit Sync-Problemen +SELECT + j.id, + k.name AS knowledge_name, + d.name AS document_name, + j.syncstatus, + j.last_sync +FROM c_ai_knowledge_c_dokumente j +JOIN c_ai_knowledge k ON j.c_ai_knowledge_id = k.id +JOIN c_dokumente d ON j.c_dokumente_id = d.id +WHERE j.syncstatus IN ('failed', 'unsupported'); +``` + +### Häufige Probleme + +#### Problem: "Lock busy for aiknowledge:xyz" + +**Ursache**: Vorheriger Sync noch aktiv oder abgestürzt + +**Lösung**: +```bash +# Redis lock manuell freigeben +redis-cli +> DEL sync_lock:aiknowledge:xyz +``` + +#### Problem: "Unsupported MIME type" + +**Ursache**: Document hat MIME Type, den XAI nicht unterstützt + +**Lösung**: +- Dokument konvertieren (z.B. RTF → PDF) +- Oder: Akzeptieren (bleibt mit Status "unsupported") + +#### Problem: "Upload verification failed" + +**Ursache**: XAI liefert kein BLAKE3 Hash oder Hash-Mismatch + +**Lösung**: +1. Prüfe XAI API Dokumentation (Hash-Format geändert?) +2. Falls temporär: Retry läuft automatisch +3. Falls persistent: XAI Support kontaktieren + +#### Problem: "Collection not found" + +**Ursache**: Collection wurde manuell in XAI gelöscht + +**Lösung**: Automatisch gelöst - Sync erstellt neue Collection + +--- + +## API Endpoints + +### Webhook Endpoint + +```http +POST /vmh/webhook/aiknowledge/update +Content-Type: application/json + +{ + "entity_id": "kb-123", + "entity_type": "CAIKnowledge", + "action": "update" +} +``` + +**Response:** +```json +{ + "success": true, + "knowledge_id": "kb-123" +} +``` + +--- + +## Performance + +### Typische Sync-Zeiten + +| Szenario | Zeit | Notizen | +|----------|------|---------| +| Collection erstellen | < 1s | Nur API Call | +| 1 Dokument (1 MB) | 2-4s | Upload + Verify | +| 10 Dokumente (10 MB) | 20-40s | Sequenziell | +| 100 Dokumente (100 MB) | 3-6 min | Lock TTL: 30 min | +| Metadata-only Update | < 1s | Nur PATCH | +| Orphan Cleanup | 1-3s | Pro 10 Dokumente | + +### Lock TTLs + +- **AIKnowledge Sync**: 30 Minuten (1800 Sekunden) +- **Redis Lock**: Same as above +- **Auto-Release**: Bei Timeout (TTL expired) + +### Rate Limits + +**XAI API:** +- Files Upload: ~100 requests/minute +- Management API: ~1000 requests/minute + +**Strategie bei Rate Limit (429)**: +- Exponential Backoff: 2s, 4s, 8s, 16s, 32s +- Respect `Retry-After` Header +- Max 5 Retries + +--- + +## XAI Collections Metadata + +### Document Metadata Fields + +Werden für jedes Dokument in XAI gespeichert: + +```json +{ + "fields": { + "document_name": "Vertrag.pdf", + "description": "Mietvertrag Mustermann", + "created_at": "2024-01-01T00:00:00Z", + "modified_at": "2026-03-10T15:30:00Z", + "espocrm_id": "dok-123" + } +} +``` + +**inject_into_chunk**: `true` für `document_name` und `description` +→ Verbessert semantische Suche + +### Collection Metadata + +```json +{ + "metadata": { + "espocrm_entity_type": "CAIKnowledge", + "espocrm_entity_id": "kb-123", + "created_at": "2026-03-11T10:00:00Z" + } +} +``` + +--- + +## Testing + +### Manueller Test + +```bash +# 1. Erstelle CAIKnowledge in EspoCRM +# 2. Prüfe Logs +sudo journalctl -u motia-iii -f + +# 3. Prüfe Redis Lock +redis-cli +> KEYS sync_lock:aiknowledge:* + +# 4. Prüfe XAI Collection +curl -H "Authorization: Bearer $XAI_MANAGEMENT_KEY" \ + https://management-api.x.ai/v1/collections +``` + +### Integration Test + +```python +# tests/test_aiknowledge_sync.py + +async def test_full_sync_workflow(): + """Test complete sync workflow""" + + # 1. Create CAIKnowledge with status "new" + knowledge = await espocrm.create_entity('CAIKnowledge', { + 'name': 'Test KB', + 'activationStatus': 'new' + }) + + # 2. Trigger webhook + await trigger_webhook(knowledge['id']) + + # 3. Wait for sync + await asyncio.sleep(5) + + # 4. Check collection created + knowledge = await espocrm.get_entity('CAIKnowledge', knowledge['id']) + assert knowledge['datenbankId'] is not None + assert knowledge['activationStatus'] == 'active' + + # 5. Link document + await espocrm.link_entities('CAIKnowledge', knowledge['id'], 'CDokumente', doc_id) + + # 6. Trigger webhook again + await trigger_webhook(knowledge['id']) + await asyncio.sleep(10) + + # 7. Check junction synced + junction = await espocrm.get_junction_entries( + 'CAIKnowledgeCDokumente', + 'cAIKnowledgeId', + knowledge['id'] + ) + assert junction[0]['syncstatus'] == 'synced' + assert junction[0]['xaiBlake3Hash'] is not None +``` + +--- + +## Maintenance + +### Wöchentliche Checks + +- [ ] Prüfe failed Syncs in EspoCRM +- [ ] Prüfe Redis Memory Usage +- [ ] Prüfe XAI Storage Usage +- [ ] Review Logs für Patterns + +### Monatliche Tasks + +- [ ] Cleanup alte syncError Messages +- [ ] Verify XAI Collection Integrity +- [ ] Review Performance Metrics +- [ ] Update MIME Type Support List + +--- + +## Support + +**Bei Problemen:** + +1. **Logs prüfen**: `journalctl -u motia-iii -f` +2. **EspoCRM Status prüfen**: SQL Queries (siehe oben) +3. **Redis Locks prüfen**: `redis-cli KEYS sync_lock:*` +4. **XAI API Status**: https://status.x.ai + +**Kontakt:** +- Team: BitByLaw Development +- Motia Docs: `/opt/motia-iii/bitbylaw/docs/INDEX.md` + +--- + +**Version History:** + +- **1.0** (11.03.2026) - Initial Release + - Collection Lifecycle Management + - BLAKE3 Hash Verification + - Daily Full Sync + - Metadata Change Detection diff --git a/steps/advoware_cal_sync/calendar_sync_event_step.py b/steps/advoware_cal_sync/calendar_sync_event_step.py index 6c2e682..c2ff2ef 100644 --- a/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -1056,6 +1056,11 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}} + except Exception as e: + log_operation('error', f"Sync failed for {kuerzel}: {e}", context=ctx) + log_operation('info', f"Handler duration (failed): {time.time() - start_time}", context=ctx) + return {'status': 500, 'body': {'error': str(e)}} + finally: # Always close resources to prevent memory leaks if service is not None: @@ -1068,11 +1073,6 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: redis_client.close() except Exception as e: log_operation('debug', f"Error closing Redis client: {e}", context=ctx) - - except Exception as e: - log_operation('error', f"Sync failed for {kuerzel}: {e}", context=ctx) - log_operation('info', f"Handler duration (failed): {time.time() - start_time}", context=ctx) - return {'status': 500, 'body': {'error': str(e)}} - finally: + # Ensure lock is always released clear_employee_lock(redis_client, kuerzel, ctx) diff --git a/steps/advoware_cal_sync/calendar_sync_utils.py b/steps/advoware_cal_sync/calendar_sync_utils.py index 3c379c4..59f03e7 100644 --- a/steps/advoware_cal_sync/calendar_sync_utils.py +++ b/steps/advoware_cal_sync/calendar_sync_utils.py @@ -18,6 +18,26 @@ def get_logger(context=None): return get_service_logger('calendar_sync', context) +def log_operation(level: str, message: str, context=None, **extra): + """ + Log calendar sync operations with structured context. + + Args: + level: Log level ('debug', 'info', 'warning', 'error') + message: Log message + context: FlowContext if available + **extra: Additional key-value pairs to log + """ + logger = get_logger(context) + log_func = getattr(logger, level.lower(), logger.info) + + if extra: + extra_str = " | " + " | ".join(f"{k}={v}" for k, v in extra.items()) + log_func(message + extra_str) + else: + log_func(message) + + async def connect_db(context=None): """Connect to Postgres DB from environment variables.""" logger = get_logger(context) diff --git a/steps/advoware_proxy/advoware_api_proxy_put_step.py b/steps/advoware_proxy/advoware_api_proxy_put_step.py index 90acfda..b57c733 100644 --- a/steps/advoware_proxy/advoware_api_proxy_put_step.py +++ b/steps/advoware_proxy/advoware_api_proxy_put_step.py @@ -69,4 +69,3 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: status=500, body={'error': 'Internal server error', 'details': str(e)} ) - ) diff --git a/steps/vmh/aiknowledge_sync_event_step.py b/steps/vmh/aiknowledge_sync_event_step.py index 7663b68..f3ca95b 100644 --- a/steps/vmh/aiknowledge_sync_event_step.py +++ b/steps/vmh/aiknowledge_sync_event_step.py @@ -26,7 +26,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: event_data: Event payload with knowledge_id ctx: Motia context """ - from services.config import get_redis_client + from services.redis_client import RedisClientFactory from services.aiknowledge_sync_utils import AIKnowledgeSync ctx.logger.info("=" * 80) @@ -46,7 +46,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: ctx.logger.info("=" * 80) # Get Redis for locking - redis_client: Redis = get_redis_client(strict=False) + redis_client = RedisClientFactory.get_client(strict=False) # Initialize sync utils sync_utils = AIKnowledgeSync(ctx, redis_client) diff --git a/steps/vmh/webhook/aiknowledge_update_api_step.py b/steps/vmh/webhook/aiknowledge_update_api_step.py index 76e258d..fd59e50 100644 --- a/steps/vmh/webhook/aiknowledge_update_api_step.py +++ b/steps/vmh/webhook/aiknowledge_update_api_step.py @@ -39,7 +39,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: if not knowledge_id: ctx.logger.error("❌ Missing entity_id in payload") return ApiResponse( - status_code=400, + status=400, body={'success': False, 'error': 'Missing entity_id'} ) @@ -61,13 +61,13 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.info("=" * 80) return ApiResponse( - status_code=200, + status=200, body={'success': True, 'knowledge_id': knowledge_id} ) except Exception as e: ctx.logger.error(f"❌ Webhook error: {e}") return ApiResponse( - status_code=500, + status=500, body={'success': False, 'error': str(e)} )