feat(sync): Update RAGFlow dataset creation to use stable EspoCRM-ID and improve logging

This commit is contained in:
bsiggel
2026-03-27 00:52:48 +00:00
parent 9bd62fc5ab
commit 61113d8f3d
2 changed files with 42 additions and 26 deletions

View File

@@ -7,9 +7,31 @@ from services.logging_utils import get_service_logger
RAGFLOW_DEFAULT_BASE_URL = "http://192.168.1.64:9380"
# Defaults fuer Dokument-Analyse
RAGFLOW_AUTO_KEYWORDS = 14
RAGFLOW_AUTO_QUESTIONS = 7
# Knowledge-Graph Dataset Konfiguration
# Hinweis: llm_id kann nur über die RAGflow Web-UI gesetzt werden (API erlaubt es nicht)
RAGFLOW_KG_ENTITY_TYPES = [
'Partei',
'Anspruch',
'Anspruchsgrundlage',
'unstreitiger Sachverhalt',
'streitiger Sachverhalt',
'streitige Rechtsfrage',
'Beweismittel',
'Beweisangebot',
'Norm',
'Gerichtsentscheidung',
'Forderung',
'Beweisergebnis',
]
RAGFLOW_KG_PARSER_CONFIG = {
'raptor': {'use_raptor': False},
'graphrag': {
'use_graphrag': True,
'method': 'general',
'resolution': True,
'entity_types': RAGFLOW_KG_ENTITY_TYPES,
},
}
def _base_to_dict(obj: Any) -> Any:
@@ -101,22 +123,23 @@ class RAGFlowService:
chunk_method: str = 'laws',
embedding_model: Optional[str] = None,
description: Optional[str] = None,
auto_keywords: int = RAGFLOW_AUTO_KEYWORDS,
auto_questions: int = RAGFLOW_AUTO_QUESTIONS,
) -> Dict:
"""
Erstellt ein neues RAGFlow Dataset (entspricht xAI Collection).
Erstellt ein neues RAGFlow Dataset mit Knowledge-Graph Konfiguration.
Verwendet standardmaessig chunk_method='laws' (optimiert fuer Rechtsdokumente).
Setzt nach der Erstellung auto_keywords=14 und auto_questions=7.
Ablauf:
1. create_dataset(chunk_method='laws') via SDK
2. dataset.update(parser_config={graphrag, raptor}) via SDK
(graphrag: use_graphrag=True, method=general, resolution=True,
entity_types=deutsche Rechtsbegriffe, raptor=False)
Hinweis: llm_id fuer die KG-Extraktion muss in der RAGflow Web-UI
gesetzt werden die API erlaubt es nicht.
Returns:
dict mit 'id', 'name', 'chunk_method', 'parser_config', etc.
"""
self._log(
f"📚 Creating dataset: {name} "
f"(chunk_method={chunk_method}, keywords={auto_keywords}, questions={auto_questions})"
)
self._log(f"📚 Creating dataset: {name} (chunk_method={chunk_method}, graphrag=True)")
def _create():
rag = self._get_client()
@@ -126,14 +149,9 @@ class RAGFlowService:
if description:
kwargs['description'] = description
dataset = rag.create_dataset(**kwargs)
# parser_config kann erst nach create via update() gesetzt werden
dataset.update({
'parser_config': {
'auto_keywords': auto_keywords,
'auto_questions': auto_questions,
}
})
# graphrag + raptor werden via update() gesetzt
# llm_id kann nur über die RAGflow Web-UI konfiguriert werden
dataset.update({'parser_config': RAGFLOW_KG_PARSER_CONFIG})
return self._dataset_to_dict(dataset)
result = await self._run(_create)
@@ -164,8 +182,6 @@ class RAGFlowService:
chunk_method: str = 'laws',
embedding_model: Optional[str] = None,
description: Optional[str] = None,
auto_keywords: int = RAGFLOW_AUTO_KEYWORDS,
auto_questions: int = RAGFLOW_AUTO_QUESTIONS,
) -> Dict:
"""
Gibt bestehendes Dataset zurueck oder erstellt ein neues (get-or-create).
@@ -183,8 +199,6 @@ class RAGFlowService:
chunk_method=chunk_method,
embedding_model=embedding_model,
description=description,
auto_keywords=auto_keywords,
auto_questions=auto_questions,
)
async def delete_dataset(self, dataset_id: str) -> None:

View File

@@ -531,8 +531,10 @@ async def _run_ragflow_sync(
if not dataset_id:
if ai_aktivierungsstatus == 'new':
akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}"
ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...")
dataset_info = await ragflow.ensure_dataset(akte_name)
# Name = EspoCRM-ID (stabil, eindeutig, kein Sonderzeichen-Problem)
dataset_name = akte_id
ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset '{dataset_name}' für '{akte_name}'...")
dataset_info = await ragflow.ensure_dataset(dataset_name)
if not dataset_info or not dataset_info.get('id'):
ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})