Files
motia-iii/services/aiknowledge_sync_utils.py
bsiggel 9b2fb5ae4a feat: Implement AI Knowledge Sync Utilities and RAGFlow Service
- Added `aiknowledge_sync_utils.py` for provider-agnostic synchronization logic for CAIKnowledge entities, supporting both xAI and RAGFlow.
- Introduced lifecycle management for CAIKnowledge entities including states: new, active, paused, and deactivated.
- Implemented change detection using Blake3 hash for efficient document synchronization.
- Created `ragflow_service.py` to handle dataset and document management with RAGFlow API.
- Added daily cron job in `aiknowledge_daily_cron_step.py` to synchronize active CAIKnowledge entities with unclean or failed statuses.
- Developed `aiknowledge_sync_event_step.py` to process synchronization events from webhooks and cron jobs.
2026-03-26 21:38:42 +00:00

558 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AI Knowledge Sync Utilities
Provider-agnostische Sync-Logik fuer CAIKnowledge Entities.
Unterstuetzt:
- aiProvider = "xai" → XAIService (Collections API)
- aiProvider = "ragflow" → RAGFlowService (Dataset API)
Lifecycle:
new → Dataset/Collection erstellen → active
active → Dokumente syncen
paused → kein Sync, kein Delete
deactivated → Dataset/Collection loeschen, Junction zuruecksetzen
Change Detection:
- Blake3-Hash in EspoCRM (CDokumente.blake3hash) als primaere Quelle
- Bei xAI: blake3hash == xaiBlake3Hash → kein Re-Upload
- Bei RAGFlow: blake3hash == junction.syncedHash → kein Re-Upload
metadata-hash → nur meta_fields update noetig
"""
from __future__ import annotations
import hashlib
import mimetypes
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import unquote
import pytz
from services.models import (
AIKnowledgeActivationStatus,
AIKnowledgeSyncStatus,
JunctionSyncStatus,
)
from services.sync_utils_base import BaseSyncUtils
from services.config import SYNC_CONFIG
def _compute_metadata_hash(doc: Dict[str, Any]) -> str:
"""Berechnet einen deterministischen Hash der Dokument-Metadaten."""
parts = [
doc.get('name', ''),
doc.get('beschreibung', '') or '',
doc.get('advowareArt', '') or '',
doc.get('advowareBemerkung', '') or '',
]
return hashlib.md5('|'.join(parts).encode()).hexdigest()
class AIKnowledgeSyncUtils(BaseSyncUtils):
"""
Provider-agnostische Sync-Utilities fuer CAIKnowledge.
Verwendung im Step:
sync = AIKnowledgeSyncUtils(espocrm, redis_client, ctx)
await sync.run_sync(knowledge_id)
"""
def _get_lock_key(self, entity_id: str) -> str:
return f"sync_lock:aiknowledge:{entity_id}"
async def acquire_sync_lock(self, entity_id: str, **kwargs) -> bool:
"""Redis-Lock + syncStatus → pending_sync."""
lock_key = self._get_lock_key(entity_id)
if not self._acquire_redis_lock(lock_key):
self._log(f"⏸️ Lock aktiv fuer CAIKnowledge {entity_id}", level='warn')
return False
try:
await self.espocrm.update_entity('CAIKnowledge', entity_id, {
'syncStatus': AIKnowledgeSyncStatus.PENDING_SYNC.value
})
except Exception as e:
self._log(f"syncStatus update fehlgeschlagen: {e}", level='debug')
return True
async def release_sync_lock(
self,
entity_id: str,
success: bool = True,
error_message: Optional[str] = None,
extra_fields: Optional[Dict[str, Any]] = None,
**kwargs,
) -> None:
"""Redis-Lock freigeben + finaler syncStatus."""
now = self._get_espocrm_datetime()
update: Dict[str, Any] = {
'syncStatus': (
AIKnowledgeSyncStatus.SYNCED.value
if success
else AIKnowledgeSyncStatus.FAILED.value
),
'lastSync': now,
}
if error_message:
update['syncError'] = error_message[:2000]
else:
update['syncError'] = None
if extra_fields:
update.update(extra_fields)
try:
await self.espocrm.update_entity('CAIKnowledge', entity_id, update)
except Exception as e:
self._log(f"release_sync_lock update failed: {e}", level='error')
finally:
self._release_redis_lock(self._get_lock_key(entity_id))
# =========================================================
# Main Entry Point
# =========================================================
async def run_sync(self, knowledge_id: str) -> None:
"""
Vollstaendiger Sync-Durchlauf fuer eine CAIKnowledge-Entity.
1. Entity laden + Lock acquieren
2. Provider bestimmen (xai / ragflow)
3. Lifecycle-Action:
- new → Dataset/Collection erstellen
- active → Dokumente syncen
- paused → nichts tun
- deactivated → Dataset/Collection loeschen
"""
self._log("=" * 70)
self._log(f"🔄 AI KNOWLEDGE SYNC START: {knowledge_id}")
self._log("=" * 70)
# 1. Entity laden
try:
entity = await self.espocrm.get_entity('CAIKnowledge', knowledge_id)
except Exception as e:
self._log(f"❌ Entity laden fehlgeschlagen: {e}", level='error')
return
name = entity.get('name', knowledge_id)
activation = entity.get('activationStatus', AIKnowledgeActivationStatus.NEW.value)
sync_status = entity.get('syncStatus', AIKnowledgeSyncStatus.UNCLEAN.value)
provider = entity.get('aiProvider', 'xai')
dataset_id = entity.get('datenbankId')
self._log(f"📋 Entity: {name}")
self._log(f" activationStatus: {activation}")
self._log(f" syncStatus : {sync_status}")
self._log(f" aiProvider : {provider}")
self._log(f" datenbankId : {dataset_id or 'N/A'}")
# Pausiert → nichts tun
if activation == AIKnowledgeActivationStatus.PAUSED.value:
self._log("⏸️ PAUSED kein Sync")
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
'syncStatus': AIKnowledgeSyncStatus.SYNCED.value
})
return
# Lock acquieren
acquired = await self.acquire_sync_lock(knowledge_id)
if not acquired:
return
try:
# 2. Provider-Service instanziieren
ai = self._build_provider(provider)
# 3. Lifecycle
if activation == AIKnowledgeActivationStatus.NEW.value:
dataset_id = await self._handle_new(knowledge_id, entity, ai)
if not dataset_id:
await self.release_sync_lock(
knowledge_id, success=False,
error_message="Dataset/Collection erstellen fehlgeschlagen"
)
return
# Status wechselt zu active → nochmal syncen
activation = AIKnowledgeActivationStatus.ACTIVE.value
if activation == AIKnowledgeActivationStatus.ACTIVE.value:
error = await self._handle_active(knowledge_id, entity, dataset_id, ai, provider)
if error:
await self.release_sync_lock(knowledge_id, success=False, error_message=error)
return
elif activation == AIKnowledgeActivationStatus.DEACTIVATED.value:
await self._handle_deactivated(knowledge_id, entity, dataset_id, ai)
await self.release_sync_lock(knowledge_id, success=True)
self._log(f"✅ AI KNOWLEDGE SYNC DONE: {name}")
except Exception as e:
self._log(f"❌ Unerwarteter Fehler: {e}", level='error')
await self.release_sync_lock(knowledge_id, success=False, error_message=str(e))
# =========================================================
# Lifecycle: NEW
# =========================================================
async def _handle_new(
self,
knowledge_id: str,
entity: Dict[str, Any],
ai,
) -> Optional[str]:
"""Erstellt Dataset/Collection und aktualisiert datenbankId + activationStatus."""
name = entity.get('name', knowledge_id)
self._log(f"🆕 NEW → Dataset erstellen: {name}")
try:
result = await ai.ensure_dataset(name=name, description=f"CAIKnowledge: {name}")
dataset_id = result['id']
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': dataset_id,
'activationStatus': AIKnowledgeActivationStatus.ACTIVE.value,
'syncStatus': AIKnowledgeSyncStatus.UNCLEAN.value,
})
self._log(f"✅ Dataset erstellt: {dataset_id} → activationStatus=active")
return dataset_id
except Exception as e:
self._log(f"❌ Dataset erstellen fehlgeschlagen: {e}", level='error')
return None
# =========================================================
# Lifecycle: ACTIVE
# =========================================================
async def _handle_active(
self,
knowledge_id: str,
entity: Dict[str, Any],
dataset_id: str,
ai,
provider: str,
) -> Optional[str]:
"""
Synchronisiert alle verknuepften Dokumente.
Returns:
Fehlermeldung (str) oder None bei Erfolg.
"""
if not dataset_id:
return "datenbankId fehlt Dataset wurde noch nicht erstellt"
# Alle verknuepften Dokumente laden (Junction: CAIKnowledgeCDokumente)
self._log(f"📋 Lade verknuepfte Dokumente fuer {knowledge_id}")
try:
junction_entries = await self.espocrm.list_related_all(
'CAIKnowledge', knowledge_id, 'dokumentes'
)
except Exception as e:
return f"Junction laden fehlgeschlagen: {e}"
self._log(f" {len(junction_entries)} Dokument(e) verknuepft")
# Remotedokumente holen (fuer Orphan-Detection)
try:
remote_docs = await ai.list_documents(dataset_id)
remote_ids: set = {d['id'] for d in remote_docs if d.get('id')}
except Exception:
remote_ids = set()
synced_remote_ids: set = set()
has_error = False
for junction in junction_entries:
doc_id = junction.get('id')
ok = await self._sync_single_document(
doc_id, junction, knowledge_id, dataset_id, ai, provider, synced_remote_ids
)
if not ok:
has_error = True
# Orphan-Detection: remote Dokumente ohne Junction → loeschen
orphans = remote_ids - synced_remote_ids
if orphans:
self._log(f"🧹 Orphan-Cleanup: {len(orphans)} Dokument(e) ohne Junction")
for orphan_id in orphans:
try:
await ai.remove_document(dataset_id, orphan_id)
self._log(f" 🗑️ Orphan geloescht: {orphan_id}")
except Exception as e:
self._log(f" ⚠️ Orphan cleanup fehlgeschlagen ({orphan_id}): {e}", level='warn')
if has_error:
return "Einige Dokumente konnten nicht synchroisert werden (Partial Failure)"
return None
async def _sync_single_document(
self,
doc_id: str,
junction: Dict[str, Any],
knowledge_id: str,
dataset_id: str,
ai,
provider: str,
synced_remote_ids: set,
) -> bool:
"""
Synchronisiert ein einzelnes Dokument.
Entscheidungslogik:
- junction.syncstatus == 'synced' UND Hash unveraendert UND
Metadata-Hash unveraendert → Skip
- junction.syncstatus in ('new', 'unclean', 'failed') → Upload/Update
- Kein Attachment → 'unsupported'
Returns:
True bei Erfolg/Skip, False bei Fehler
"""
doc_name = junction.get('name', doc_id)
junction_status = junction.get('syncstatus', JunctionSyncStatus.NEW.value)
ai_doc_id = junction.get('aiDocumentId') # RAGFlow doc ID / xAI file ID
synced_hash = junction.get('syncedHash')
synced_meta_hash = junction.get('syncedMetadataHash')
blake3_hash = junction.get('blake3hash') # EspoCRM berechnet
self._log(f"\n 📄 {doc_name} (junction_status={junction_status})")
# Metadaten-Hash berechnen
current_meta_hash = _compute_metadata_hash(junction)
# Skip-Pruefung: synced + Hash unveraendert
if junction_status == JunctionSyncStatus.SYNCED.value and ai_doc_id:
file_unchanged = (synced_hash and blake3_hash and synced_hash == blake3_hash)
meta_unchanged = (synced_meta_hash == current_meta_hash)
if file_unchanged and meta_unchanged:
self._log(f" ✅ Unveraendert Skip")
synced_remote_ids.add(ai_doc_id)
return True
if file_unchanged and not meta_unchanged:
# Nur Metadaten aendern (kein Re-Upload)
return await self._update_metadata_only(
doc_id, junction, ai_doc_id, dataset_id,
current_meta_hash, ai, provider, synced_remote_ids
)
# Upload (neu oder geaendert)
return await self._upload_document(
doc_id, junction, knowledge_id, dataset_id,
current_meta_hash, blake3_hash, ai_doc_id, ai, provider, synced_remote_ids
)
async def _upload_document(
self,
doc_id: str,
junction: Dict[str, Any],
knowledge_id: str,
dataset_id: str,
current_meta_hash: str,
blake3_hash: Optional[str],
old_ai_doc_id: Optional[str],
ai,
provider: str,
synced_remote_ids: set,
) -> bool:
doc_name = junction.get('name', doc_id)
attachment_id = junction.get('dokumentId') or junction.get('attachmentId')
if not attachment_id:
self._log(f" ⚠️ Kein Attachment unsupported")
await self._update_junction(doc_id, knowledge_id, {
'syncstatus': JunctionSyncStatus.UNSUPPORTED.value,
'lastSync': self._get_espocrm_datetime(),
})
return True # kein Fehler, nur unsupported
# MIME-Type ermitteln
filename = unquote(junction.get('dokumentName') or junction.get('name') or 'document.bin')
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# MIME-Type Support pruefen
if not ai.is_mime_type_supported(mime_type):
self._log(f" ⚠️ MIME-Type nicht unterstuetzt: {mime_type} unsupported")
await self._update_junction(doc_id, knowledge_id, {
'syncstatus': JunctionSyncStatus.UNSUPPORTED.value,
'lastSync': self._get_espocrm_datetime(),
})
return True
# Datei von EspoCRM herunterladen
try:
self._log(f" 📥 Downloading {filename} ({attachment_id})…")
file_content = await self.espocrm.download_attachment(attachment_id)
self._log(f" Downloaded {len(file_content)} bytes")
except Exception as e:
self._log(f" ❌ Download fehlgeschlagen: {e}", level='error')
await self._update_junction(doc_id, knowledge_id, {
'syncstatus': JunctionSyncStatus.FAILED.value,
'lastSync': self._get_espocrm_datetime(),
})
return False
# Altes Dokument im Provider loeschen (bei Update)
if old_ai_doc_id:
try:
await ai.remove_document(dataset_id, old_ai_doc_id)
self._log(f" 🗑️ Altes Dokument geloescht: {old_ai_doc_id}")
except Exception:
pass # Non-fatal
# Upload + Metadaten
try:
self._log(f" 📤 Uploading zu {provider}")
result = await ai.upload_document(
dataset_id=dataset_id,
file_content=file_content,
filename=filename,
mime_type=mime_type,
blake3_hash=blake3_hash,
espocrm_id=doc_id,
description=junction.get('beschreibung') or junction.get('description') or '',
advoware_art=junction.get('advowareArt') or '',
advoware_bemerkung=junction.get('advowareBemerkung') or '',
)
new_ai_doc_id = result['id']
self._log(f" ✅ Upload OK: {new_ai_doc_id}")
except Exception as e:
self._log(f" ❌ Upload fehlgeschlagen: {e}", level='error')
await self._update_junction(doc_id, knowledge_id, {
'syncstatus': JunctionSyncStatus.FAILED.value,
'lastSync': self._get_espocrm_datetime(),
})
return False
synced_remote_ids.add(new_ai_doc_id)
# Junction aktualisieren
await self._update_junction(doc_id, knowledge_id, {
'aiDocumentId': new_ai_doc_id,
'syncstatus': JunctionSyncStatus.SYNCED.value,
'syncedHash': blake3_hash or '',
'syncedMetadataHash': current_meta_hash,
'lastSync': self._get_espocrm_datetime(),
})
return True
async def _update_metadata_only(
self,
doc_id: str,
junction: Dict[str, Any],
ai_doc_id: str,
dataset_id: str,
current_meta_hash: str,
ai,
provider: str,
synced_remote_ids: set,
) -> bool:
"""Nur Metadaten aktualisieren (kein Re-Upload der Datei)."""
self._log(f" ✏️ Nur Metadaten aendern (kein Re-Upload)")
synced_remote_ids.add(ai_doc_id)
try:
await ai.update_document_meta(
dataset_id=dataset_id,
doc_id=ai_doc_id,
description=junction.get('beschreibung') or junction.get('description') or '',
advoware_art=junction.get('advowareArt') or '',
advoware_bemerkung=junction.get('advowareBemerkung') or '',
)
await self._update_junction(doc_id, junction.get('__knowledge_id__', ''), {
'syncedMetadataHash': current_meta_hash,
'lastSync': self._get_espocrm_datetime(),
})
self._log(f" ✅ Metadaten aktualisiert")
return True
except Exception as e:
self._log(f" ⚠️ Metadaten-Update fehlgeschlagen: {e}", level='warn')
# Non-fatal: Datei ist noch synced, Metadaten kommen beim naechsten Sync
return True
# =========================================================
# Lifecycle: DEACTIVATED
# =========================================================
async def _handle_deactivated(
self,
knowledge_id: str,
entity: Dict[str, Any],
dataset_id: Optional[str],
ai,
) -> None:
"""Loescht Dataset/Collection und setzt alle Junctions zurueck."""
self._log(f"🔴 DEACTIVATED → Dataset loeschen")
if dataset_id:
try:
await ai.delete_dataset(dataset_id)
self._log(f"✅ Dataset geloescht: {dataset_id}")
except Exception as e:
self._log(f"⚠️ Dataset loeschen fehlgeschlagen: {e}", level='warn')
# datenbankId leeren
await self.espocrm.update_entity('CAIKnowledge', knowledge_id, {
'datenbankId': None,
})
# Alle Junction-Eintraege zuruecksetzen
try:
junctions = await self.espocrm.list_related_all(
'CAIKnowledge', knowledge_id, 'dokumentes'
)
for j in junctions:
await self._update_junction(j['id'], knowledge_id, {
'aiDocumentId': None,
'syncstatus': JunctionSyncStatus.NEW.value,
'syncedHash': None,
'syncedMetadataHash': None,
'lastSync': None,
})
self._log(f"{len(junctions)} Junction(s) zurueckgesetzt")
except Exception as e:
self._log(f"⚠️ Junction-Reset fehlgeschlagen: {e}", level='warn')
# =========================================================
# Provider Factory
# =========================================================
def _build_provider(self, provider: str):
"""
Gibt den passenden AI-Provider-Service zurueck.
xai → XAIProviderAdapter (wrapt XAIService auf Provider-Interface)
ragflow → RAGFlowService (implementiert Interface nativ)
"""
if provider == 'ragflow':
from services.ragflow_service import RAGFlowService
return RAGFlowService(ctx=self.context)
else:
# Default: xAI
from services.xai_upload_utils import XAIProviderAdapter
return XAIProviderAdapter(self.context)
# =========================================================
# EspoCRM Helpers
# =========================================================
async def _update_junction(
self,
doc_id: str,
knowledge_id: str,
fields: Dict[str, Any],
) -> None:
"""
Aktualisiert einen CAIKnowledgeCDokumente Junction-Eintrag.
EspoCRM junction additional columns werden ueber den Relationship-
Endpunkt aktualisiert: PUT /CAIKnowledge/{id}/dokumentes/{docId}
"""
try:
await self.espocrm.api_call(
f"/CAIKnowledge/{knowledge_id}/dokumentes/{doc_id}",
method='PUT',
json_data=fields,
)
except Exception as e:
self._log(f"⚠️ Junction-Update fehlgeschlagen ({doc_id}): {e}", level='warn')