Files
motia-iii/services/ragflow_service.py

526 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""RAGFlow Dataset & Document Service"""
import os
import asyncio
from functools import partial
from typing import Optional, List, Dict, Any
from services.logging_utils import get_service_logger
RAGFLOW_DEFAULT_BASE_URL = "http://192.168.1.64:9380"
# Knowledge-Graph Dataset Konfiguration
# Hinweis: llm_id kann nur über die RAGflow Web-UI gesetzt werden (API erlaubt es nicht)
RAGFLOW_KG_ENTITY_TYPES = [
'Partei',
'Anspruch',
'Anspruchsgrundlage',
'unstreitiger Sachverhalt',
'streitiger Sachverhalt',
'streitige Rechtsfrage',
'Beweismittel',
'Beweisangebot',
'Norm',
'Gerichtsentscheidung',
'Forderung',
'Beweisergebnis',
]
RAGFLOW_KG_PARSER_CONFIG = {
'raptor': {'use_raptor': False},
'graphrag': {
'use_graphrag': True,
'method': 'general',
'resolution': True,
'entity_types': RAGFLOW_KG_ENTITY_TYPES,
},
}
def _base_to_dict(obj: Any) -> Any:
"""
Konvertiert ragflow_sdk.modules.base.Base rekursiv zu einem plain dict.
Filtert den internen 'rag'-Client-Key heraus.
"""
try:
from ragflow_sdk.modules.base import Base
if isinstance(obj, Base):
return {k: _base_to_dict(v) for k, v in vars(obj).items() if k != 'rag'}
except ImportError:
pass
if isinstance(obj, dict):
return {k: _base_to_dict(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_base_to_dict(i) for i in obj]
return obj
class RAGFlowService:
"""
Client fuer RAGFlow API via ragflow-sdk (Python SDK).
Wrapt das synchrone SDK in asyncio.run_in_executor, sodass
es nahtlos in Motia-Steps (async) verwendet werden kann.
Dataflow beim Upload:
upload_document() →
1. upload_documents([{blob}]) # Datei hochladen
2. doc.update({meta_fields}) # blake3 + advoware-Felder setzen
3. async_parse_documents([id]) # Parsing starten (chunk_method=laws)
Benoetigte Umgebungsvariablen:
- RAGFLOW_API_KEY API Key
- RAGFLOW_BASE_URL Optional, URL Override (Default: http://192.168.1.64:9380)
"""
SUPPORTED_MIME_TYPES = {
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.ms-excel',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.oasis.opendocument.text',
'application/epub+zip',
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'text/plain',
'text/html',
'text/markdown',
'text/csv',
'text/xml',
'application/json',
'application/xml',
}
def __init__(self, ctx=None):
self.api_key = os.getenv('RAGFLOW_API_KEY', '')
base_url_env = os.getenv('RAGFLOW_BASE_URL', '')
self.base_url = base_url_env or RAGFLOW_DEFAULT_BASE_URL
self.ctx = ctx
self.logger = get_service_logger('ragflow', ctx)
self._rag = None
if not self.api_key:
raise ValueError("RAGFLOW_API_KEY not configured in environment")
def _log(self, msg: str, level: str = 'info') -> None:
log_func = getattr(self.logger, level, self.logger.info)
log_func(msg)
def _get_client(self):
"""Gibt RAGFlow SDK Client zurueck (lazy init, sync)."""
if self._rag is None:
from ragflow_sdk import RAGFlow
self._rag = RAGFlow(api_key=self.api_key, base_url=self.base_url)
return self._rag
async def _run(self, func, *args, **kwargs):
"""Fuehrt synchrone SDK-Funktion in ThreadPoolExecutor aus."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, partial(func, *args, **kwargs))
# ========== Dataset Management ==========
async def create_dataset(
self,
name: str,
chunk_method: str = 'laws',
embedding_model: Optional[str] = None,
description: Optional[str] = None,
) -> Dict:
"""
Erstellt ein neues RAGFlow Dataset mit Knowledge-Graph Konfiguration.
Ablauf:
1. create_dataset(chunk_method='laws') via SDK
2. dataset.update(parser_config={graphrag, raptor}) via SDK
(graphrag: use_graphrag=True, method=general, resolution=True,
entity_types=deutsche Rechtsbegriffe, raptor=False)
Hinweis: llm_id fuer die KG-Extraktion muss in der RAGflow Web-UI
gesetzt werden die API erlaubt es nicht.
Returns:
dict mit 'id', 'name', 'chunk_method', 'parser_config', etc.
"""
self._log(f"📚 Creating dataset: {name} (chunk_method={chunk_method}, graphrag=True)")
def _create():
rag = self._get_client()
kwargs = dict(name=name, chunk_method=chunk_method)
if embedding_model:
kwargs['embedding_model'] = embedding_model
if description:
kwargs['description'] = description
dataset = rag.create_dataset(**kwargs)
# graphrag + raptor werden via update() gesetzt
# llm_id kann nur über die RAGflow Web-UI konfiguriert werden
dataset.update({'parser_config': RAGFLOW_KG_PARSER_CONFIG})
return self._dataset_to_dict(dataset)
result = await self._run(_create)
self._log(f"✅ Dataset created: {result.get('id')} ({name})")
return result
async def get_dataset_by_name(self, name: str) -> Optional[Dict]:
"""
Sucht Dataset nach Name. Gibt None zurueck wenn nicht gefunden.
"""
def _find():
rag = self._get_client()
# list_datasets(name=...) hat Permission-Bugs lokal filtern
all_datasets = rag.list_datasets(page_size=100)
for ds in all_datasets:
if getattr(ds, 'name', None) == name:
return self._dataset_to_dict(ds)
return None
result = await self._run(_find)
if result:
self._log(f"🔍 Dataset found: {result.get('id')} ({name})")
return result
async def ensure_dataset(
self,
name: str,
chunk_method: str = 'laws',
embedding_model: Optional[str] = None,
description: Optional[str] = None,
) -> Dict:
"""
Gibt bestehendes Dataset zurueck oder erstellt ein neues (get-or-create).
Entspricht xAI create_collection mit idempotency.
Returns:
dict mit 'id', 'name', etc.
"""
existing = await self.get_dataset_by_name(name)
if existing:
self._log(f"✅ Dataset exists: {existing.get('id')} ({name})")
return existing
return await self.create_dataset(
name=name,
chunk_method=chunk_method,
embedding_model=embedding_model,
description=description,
)
async def delete_dataset(self, dataset_id: str) -> None:
"""
Loescht ein Dataset inklusive aller Dokumente.
Entspricht xAI delete_collection.
"""
self._log(f"🗑️ Deleting dataset: {dataset_id}")
def _delete():
rag = self._get_client()
rag.delete_datasets(ids=[dataset_id])
await self._run(_delete)
self._log(f"✅ Dataset deleted: {dataset_id}")
async def list_datasets(self) -> List[Dict]:
"""Listet alle Datasets auf."""
def _list():
rag = self._get_client()
return [self._dataset_to_dict(d) for d in rag.list_datasets()]
result = await self._run(_list)
self._log(f"📋 Listed {len(result)} datasets")
return result
# ========== Document Management ==========
async def upload_document(
self,
dataset_id: str,
file_content: bytes,
filename: str,
mime_type: str = 'application/octet-stream',
blake3_hash: Optional[str] = None,
espocrm_id: Optional[str] = None,
description: Optional[str] = None,
advoware_art: Optional[str] = None,
advoware_bemerkung: Optional[str] = None,
) -> Dict:
"""
Laedt ein Dokument in ein Dataset hoch.
Ablauf (3 Schritte):
1. upload_documents() Datei hochladen
2. doc.update(meta_fields) Metadaten setzen inkl. blake3_hash
3. async_parse_documents() Parsing mit chunk_method=laws starten
Meta-Felder die gesetzt werden:
- blake3_hash (fuer Change Detection, entspricht xAI BLAKE3)
- espocrm_id (Rueckreferenz zu EspoCRM CDokument)
- description (Dokumentbeschreibung)
- advoware_art (Advoware Dokumenten-Art)
- advoware_bemerkung (Advoware Bemerkung/Notiz)
Returns:
dict mit 'id', 'name', 'run', 'meta_fields', etc.
"""
if mime_type == 'application/octet-stream' and filename.lower().endswith('.pdf'):
mime_type = 'application/pdf'
self._log(
f"📤 Uploading {len(file_content)} bytes to dataset {dataset_id}: "
f"{filename} ({mime_type})"
)
def _upload_and_tag():
rag = self._get_client()
datasets = rag.list_datasets(id=dataset_id)
if not datasets:
raise RuntimeError(f"Dataset not found: {dataset_id}")
dataset = datasets[0]
# Schritt 1: Upload
dataset.upload_documents([{
'display_name': filename,
'blob': file_content,
}])
# Dokument-ID ermitteln (neuestes mit passendem Namen)
base_name = filename.split('/')[-1]
docs = dataset.list_documents(keywords=base_name, page_size=10)
doc = None
for d in docs:
if d.name == filename or d.name == base_name:
doc = d
break
if doc is None and docs:
doc = docs[0] # Fallback
if doc is None:
raise RuntimeError(f"Document not found after upload: {filename}")
# Schritt 2: Meta-Fields setzen
meta: Dict[str, str] = {}
if blake3_hash:
meta['blake3_hash'] = blake3_hash
if espocrm_id:
meta['espocrm_id'] = espocrm_id
if description:
meta['description'] = description
if advoware_art:
meta['advoware_art'] = advoware_art
if advoware_bemerkung:
meta['advoware_bemerkung'] = advoware_bemerkung
if meta:
doc.update({'meta_fields': meta})
# Schritt 3: Parsing starten
dataset.async_parse_documents([doc.id])
return self._document_to_dict(doc)
result = await self._run(_upload_and_tag)
self._log(
f"✅ Document uploaded & parsing started: {result.get('id')} ({filename})"
)
return result
async def update_document_meta(
self,
dataset_id: str,
doc_id: str,
blake3_hash: Optional[str] = None,
description: Optional[str] = None,
advoware_art: Optional[str] = None,
advoware_bemerkung: Optional[str] = None,
) -> None:
"""
Aktualisiert nur die Metadaten eines Dokuments (ohne Re-Upload).
Entspricht xAI PATCH-Metadata-Only.
Startet Parsing neu, da Chunk-Injection von meta_fields abhaengt.
"""
self._log(f"✏️ Updating metadata for document {doc_id}")
def _update():
rag = self._get_client()
datasets = rag.list_datasets(id=dataset_id)
if not datasets:
raise RuntimeError(f"Dataset not found: {dataset_id}")
dataset = datasets[0]
docs = dataset.list_documents(id=doc_id)
if not docs:
raise RuntimeError(f"Document not found: {doc_id}")
doc = docs[0]
# Bestehende meta_fields lesen und mergen
existing_meta = _base_to_dict(doc.meta_fields) or {}
if blake3_hash is not None:
existing_meta['blake3_hash'] = blake3_hash
if description is not None:
existing_meta['description'] = description
if advoware_art is not None:
existing_meta['advoware_art'] = advoware_art
if advoware_bemerkung is not None:
existing_meta['advoware_bemerkung'] = advoware_bemerkung
doc.update({'meta_fields': existing_meta})
# Re-parsing noetig damit Chunks aktualisierte Metadata enthalten
dataset.async_parse_documents([doc.id])
await self._run(_update)
self._log(f"✅ Metadata updated and re-parsing started: {doc_id}")
async def remove_document(self, dataset_id: str, doc_id: str) -> None:
"""
Loescht ein Dokument aus einem Dataset.
Entspricht xAI remove_from_collection.
"""
self._log(f"🗑️ Removing document {doc_id} from dataset {dataset_id}")
def _delete():
rag = self._get_client()
datasets = rag.list_datasets(id=dataset_id)
if not datasets:
raise RuntimeError(f"Dataset not found: {dataset_id}")
datasets[0].delete_documents(ids=[doc_id])
await self._run(_delete)
self._log(f"✅ Document removed: {doc_id}")
async def list_documents(self, dataset_id: str) -> List[Dict]:
"""
Listet alle Dokumente in einem Dataset auf (paginiert).
Entspricht xAI list_collection_documents.
"""
self._log(f"📋 Listing documents in dataset {dataset_id}")
def _list():
rag = self._get_client()
datasets = rag.list_datasets(id=dataset_id)
if not datasets:
raise RuntimeError(f"Dataset not found: {dataset_id}")
dataset = datasets[0]
docs = []
page = 1
while True:
batch = dataset.list_documents(page=page, page_size=100)
if not batch:
break
docs.extend(batch)
if len(batch) < 100:
break
page += 1
return [self._document_to_dict(d) for d in docs]
result = await self._run(_list)
self._log(f"✅ Listed {len(result)} documents")
return result
async def get_document(self, dataset_id: str, doc_id: str) -> Optional[Dict]:
"""Holt ein einzelnes Dokument by ID. None wenn nicht gefunden."""
def _get():
rag = self._get_client()
datasets = rag.list_datasets(id=dataset_id)
if not datasets:
return None
docs = datasets[0].list_documents(id=doc_id)
if not docs:
return None
return self._document_to_dict(docs[0])
result = await self._run(_get)
if result:
self._log(f"📄 Document found: {result.get('name')} (run={result.get('run')})")
return result
async def wait_for_parsing(
self,
dataset_id: str,
doc_id: str,
timeout_seconds: int = 120,
poll_interval: float = 3.0,
) -> Dict:
"""
Wartet bis das Parsing eines Dokuments abgeschlossen ist.
Returns:
Aktueller Dokument-State als dict.
Raises:
TimeoutError: Wenn Parsing nicht innerhalb timeout_seconds fertig wird.
RuntimeError: Wenn Parsing fehlschlaegt.
"""
self._log(f"⏳ Waiting for parsing: {doc_id} (timeout={timeout_seconds}s)")
elapsed = 0.0
while elapsed < timeout_seconds:
doc = await self.get_document(dataset_id, doc_id)
if doc is None:
raise RuntimeError(f"Document disappeared during parsing: {doc_id}")
run_status = doc.get('run', 'UNSTART')
if run_status == 'DONE':
self._log(
f"✅ Parsing done: {doc_id} "
f"(chunks={doc.get('chunk_count')}, tokens={doc.get('token_count')})"
)
return doc
elif run_status in ('FAIL', 'CANCEL'):
raise RuntimeError(
f"Parsing failed for {doc_id}: status={run_status}, "
f"msg={doc.get('progress_msg', '')}"
)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(
f"Parsing timeout after {timeout_seconds}s for document {doc_id}"
)
# ========== MIME Type Support ==========
def is_mime_type_supported(self, mime_type: str) -> bool:
"""Prueft ob RAGFlow diesen MIME-Type verarbeiten kann."""
return mime_type.lower().strip() in self.SUPPORTED_MIME_TYPES
# ========== Internal Helpers ==========
def _dataset_to_dict(self, dataset) -> Dict:
"""Konvertiert RAGFlow DataSet Objekt zu dict (inkl. parser_config unwrap)."""
return {
'id': getattr(dataset, 'id', None),
'name': getattr(dataset, 'name', None),
'chunk_method': getattr(dataset, 'chunk_method', None),
'embedding_model': getattr(dataset, 'embedding_model', None),
'description': getattr(dataset, 'description', None),
'chunk_count': getattr(dataset, 'chunk_count', 0),
'document_count': getattr(dataset, 'document_count', 0),
'parser_config': _base_to_dict(getattr(dataset, 'parser_config', {})),
}
def _document_to_dict(self, doc) -> Dict:
"""
Konvertiert RAGFlow Document Objekt zu dict.
meta_fields wird via _base_to_dict() zu einem plain dict unwrapped.
Enthaelt blake3_hash, espocrm_id, description, advoware_art,
advoware_bemerkung sofern gesetzt.
"""
raw_meta = getattr(doc, 'meta_fields', None)
meta_dict = _base_to_dict(raw_meta) if raw_meta is not None else {}
return {
'id': getattr(doc, 'id', None),
'name': getattr(doc, 'name', None),
'dataset_id': getattr(doc, 'dataset_id', None),
'chunk_method': getattr(doc, 'chunk_method', None),
'size': getattr(doc, 'size', 0),
'token_count': getattr(doc, 'token_count', 0),
'chunk_count': getattr(doc, 'chunk_count', 0),
'run': getattr(doc, 'run', 'UNSTART'),
'progress': getattr(doc, 'progress', 0.0),
'progress_msg': getattr(doc, 'progress_msg', ''),
'source_type': getattr(doc, 'source_type', 'local'),
'created_by': getattr(doc, 'created_by', ''),
'process_duration': getattr(doc, 'process_duration', 0.0),
# Metadaten (blake3_hash hier drin wenn gesetzt)
'meta_fields': meta_dict,
'blake3_hash': meta_dict.get('blake3_hash'),
'espocrm_id': meta_dict.get('espocrm_id'),
'parser_config': _base_to_dict(getattr(doc, 'parser_config', None)),
}