feat(cron): Add RAGflow Graph Build Cron for periodic status updates and new builds

This commit is contained in:
bsiggel
2026-03-27 11:27:09 +00:00
parent a2181a25fc
commit 88c9df5995
2 changed files with 236 additions and 0 deletions

View File

@@ -426,6 +426,66 @@ class RAGFlowService:
self._log(f"📄 Document found: {result.get('name')} (run={result.get('run')})")
return result
async def trace_graphrag(self, dataset_id: str) -> Optional[Dict]:
"""
Gibt den aktuellen Status des Knowledge-Graph-Builds zurueck.
GET /api/v1/datasets/{dataset_id}/trace_graphrag
Returns:
Dict mit 'progress' (0.0-1.0), 'task_id', 'progress_msg' etc.
None wenn noch kein Graph-Build gestartet wurde.
"""
import aiohttp
url = f"{self.base_url.rstrip('/')}/api/v1/datasets/{dataset_id}/trace_graphrag"
headers = {'Authorization': f'Bearer {self.api_key}'}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
if resp.status not in (200, 201):
text = await resp.text()
raise RuntimeError(
f"trace_graphrag HTTP {resp.status} fuer dataset {dataset_id}: {text}"
)
data = await resp.json()
task = data.get('data')
if not task:
return None
return {
'task_id': task.get('id', ''),
'progress': float(task.get('progress', 0.0)),
'progress_msg': task.get('progress_msg', ''),
'begin_at': task.get('begin_at'),
'update_date': task.get('update_date'),
}
async def run_graphrag(self, dataset_id: str) -> str:
"""
Startet bzw. aktualisiert den Knowledge Graph eines Datasets
via POST /api/v1/datasets/{id}/run_graphrag.
Returns:
graphrag_task_id (str) leer wenn der Server keinen zurueckgibt.
"""
import aiohttp
url = f"{self.base_url.rstrip('/')}/api/v1/datasets/{dataset_id}/run_graphrag"
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json={}) as resp:
if resp.status not in (200, 201):
text = await resp.text()
raise RuntimeError(
f"run_graphrag HTTP {resp.status} fuer dataset {dataset_id}: {text}"
)
data = await resp.json()
task_id = (data.get('data') or {}).get('graphrag_task_id', '')
self._log(
f"🔗 run_graphrag angestossen fuer {dataset_id[:16]}"
+ (f" task_id={task_id}" if task_id else "")
)
return task_id
async def wait_for_parsing(
self,
dataset_id: str,

View File

@@ -0,0 +1,176 @@
"""
RAGflow Graph Build Cron
Laeuft alle 5 Minuten und erledigt zwei Aufgaben:
Phase A Status-Update laufender Graphs:
Holt alle CAkten mit graphParsingStatus='parsing', fragt per trace_graphrag
den aktuellen Fortschritt ab und setzt den Status in EspoCRM auf 'complete'
sobald progress == 1.0.
Phase B Neue Graph-Builds anstossen:
Holt alle CAkten mit:
- aiParsingStatus in ['complete', 'complete_with_failures']
- graphParsingStatus in ['unclean', 'no_graph']
- aiCollectionId isNotNull
Stellt sicher, dass kein Graph-Build laeuft (trace_graphrag), und
stoesst per run_graphrag einen neuen Build an.
Setzt graphParsingStatus → 'parsing'.
graphParsingStatus-Werte (EspoCRM):
no_graph → noch kein Graph gebaut
parsing → Graph-Build laeuft
complete → Graph fertig (progress == 1.0)
unclean → Graph veraltet (neue Dokumente hochgeladen)
"""
from motia import FlowContext, cron
config = {
"name": "RAGflow Graph Build Cron",
"description": "Polls and triggers Knowledge Graph builds in RAGflow for CAkten",
"flows": ["akte-sync"],
"triggers": [cron("0 */5 * * * *")], # alle 5 Minuten
}
BATCH_SIZE = 50
async def handler(input_data: None, ctx: FlowContext) -> None:
from services.espocrm import EspoCRMAPI
from services.ragflow_service import RAGFlowService
ctx.logger.info("=" * 60)
ctx.logger.info("⏰ RAGFLOW GRAPH BUILD CRON")
espocrm = EspoCRMAPI(ctx)
ragflow = RAGFlowService(ctx)
# ══════════════════════════════════════════════════════════════
# Phase A: Laufende Builds aktualisieren
# ══════════════════════════════════════════════════════════════
ctx.logger.info("── Phase A: Laufende Builds pruefen ──")
try:
parsing_result = await espocrm.list_entities(
'CAkten',
where=[
{'type': 'isNotNull', 'attribute': 'aiCollectionId'},
{'type': 'equals', 'attribute': 'graphParsingStatus', 'value': 'parsing'},
],
select='id,aiCollectionId,graphParsingStatus',
max_size=BATCH_SIZE,
)
except Exception as e:
ctx.logger.error(f"❌ EspoCRM Phase-A-Abfrage fehlgeschlagen: {e}")
parsing_result = {'list': []}
polling_done = 0
polling_error = 0
for akte in parsing_result.get('list', []):
akte_id = akte['id']
dataset_id = akte['aiCollectionId']
try:
task = await ragflow.trace_graphrag(dataset_id)
if task is None:
# kein Task mehr vorhanden als unclean markieren
ctx.logger.warn(
f" ⚠️ Akte {akte_id}: kein Graph-Task gefunden → unclean"
)
await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'unclean'})
polling_done += 1
elif task['progress'] >= 1.0:
ctx.logger.info(
f" ✅ Akte {akte_id}: Graph fertig (progress=100%) → complete"
)
await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'complete'})
polling_done += 1
else:
ctx.logger.info(
f" ⏳ Akte {akte_id}: Graph laeuft noch "
f"(progress={task['progress']:.0%})"
)
except Exception as e:
ctx.logger.error(f" ❌ Fehler bei Akte {akte_id}: {e}")
polling_error += 1
ctx.logger.info(
f" Phase A: {len(parsing_result.get('list', []))} laufend"
f"{polling_done} aktualisiert {polling_error} Fehler"
)
# ══════════════════════════════════════════════════════════════
# Phase B: Neue Graph-Builds anstossen
# ══════════════════════════════════════════════════════════════
ctx.logger.info("── Phase B: Neue Builds anstossen ──")
try:
pending_result = await espocrm.list_entities(
'CAkten',
where=[
{'type': 'isNotNull', 'attribute': 'aiCollectionId'},
{'type': 'in', 'attribute': 'aiParsingStatus',
'value': ['complete', 'complete_with_failures']},
{'type': 'in', 'attribute': 'graphParsingStatus',
'value': ['unclean', 'no_graph']},
],
select='id,aiCollectionId,aiParsingStatus,graphParsingStatus',
max_size=BATCH_SIZE,
)
except Exception as e:
ctx.logger.error(f"❌ EspoCRM Phase-B-Abfrage fehlgeschlagen: {e}")
pending_result = {'list': []}
triggered = 0
skipped = 0
trig_error = 0
for akte in pending_result.get('list', []):
akte_id = akte['id']
dataset_id = akte['aiCollectionId']
ai_status = akte.get('aiParsingStatus', '')
graph_status = akte.get('graphParsingStatus', '')
# Sicherstellen dass kein Build bereits laeuft
try:
task = await ragflow.trace_graphrag(dataset_id)
except Exception as e:
ctx.logger.error(
f" ❌ trace_graphrag Akte {akte_id} fehlgeschlagen: {e}"
)
trig_error += 1
continue
if task is not None and task['progress'] < 1.0:
ctx.logger.info(
f" ⏭️ Akte {akte_id}: Build laeuft noch "
f"(progress={task['progress']:.0%}) → setze parsing"
)
try:
await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'parsing'})
except Exception as e:
ctx.logger.error(f" ❌ Status-Update fehlgeschlagen: {e}")
skipped += 1
continue
# Build anstossen
ctx.logger.info(
f" 🔧 Akte {akte_id} "
f"ai={ai_status} graph={graph_status} "
f"dataset={dataset_id[:16]}"
)
try:
task_id = await ragflow.run_graphrag(dataset_id)
ctx.logger.info(
f" ✅ Graph-Build angestossen"
+ (f" task_id={task_id}" if task_id else "")
)
await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'parsing'})
triggered += 1
except Exception as e:
ctx.logger.error(f" ❌ Fehler: {e}")
trig_error += 1
ctx.logger.info(
f" Phase B: {len(pending_result.get('list', []))} ausstehend"
f"{triggered} angestossen {skipped} uebersprungen {trig_error} Fehler"
)
ctx.logger.info("=" * 60)