diff --git a/services/ragflow_service.py b/services/ragflow_service.py index d1e3d25..8d4b2a7 100644 --- a/services/ragflow_service.py +++ b/services/ragflow_service.py @@ -426,6 +426,66 @@ class RAGFlowService: self._log(f"📄 Document found: {result.get('name')} (run={result.get('run')})") return result + async def trace_graphrag(self, dataset_id: str) -> Optional[Dict]: + """ + Gibt den aktuellen Status des Knowledge-Graph-Builds zurueck. + GET /api/v1/datasets/{dataset_id}/trace_graphrag + + Returns: + Dict mit 'progress' (0.0-1.0), 'task_id', 'progress_msg' etc. + None wenn noch kein Graph-Build gestartet wurde. + """ + import aiohttp + url = f"{self.base_url.rstrip('/')}/api/v1/datasets/{dataset_id}/trace_graphrag" + headers = {'Authorization': f'Bearer {self.api_key}'} + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as resp: + if resp.status not in (200, 201): + text = await resp.text() + raise RuntimeError( + f"trace_graphrag HTTP {resp.status} fuer dataset {dataset_id}: {text}" + ) + data = await resp.json() + task = data.get('data') + if not task: + return None + return { + 'task_id': task.get('id', ''), + 'progress': float(task.get('progress', 0.0)), + 'progress_msg': task.get('progress_msg', ''), + 'begin_at': task.get('begin_at'), + 'update_date': task.get('update_date'), + } + + async def run_graphrag(self, dataset_id: str) -> str: + """ + Startet bzw. aktualisiert den Knowledge Graph eines Datasets + via POST /api/v1/datasets/{id}/run_graphrag. + + Returns: + graphrag_task_id (str) – leer wenn der Server keinen zurueckgibt. + """ + import aiohttp + url = f"{self.base_url.rstrip('/')}/api/v1/datasets/{dataset_id}/run_graphrag" + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json', + } + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, json={}) as resp: + if resp.status not in (200, 201): + text = await resp.text() + raise RuntimeError( + f"run_graphrag HTTP {resp.status} fuer dataset {dataset_id}: {text}" + ) + data = await resp.json() + task_id = (data.get('data') or {}).get('graphrag_task_id', '') + self._log( + f"🔗 run_graphrag angestossen fuer {dataset_id[:16]}…" + + (f" task_id={task_id}" if task_id else "") + ) + return task_id + async def wait_for_parsing( self, dataset_id: str, diff --git a/src/steps/crm/akte/ragflow_graph_build_cron_step.py b/src/steps/crm/akte/ragflow_graph_build_cron_step.py new file mode 100644 index 0000000..dd68f00 --- /dev/null +++ b/src/steps/crm/akte/ragflow_graph_build_cron_step.py @@ -0,0 +1,176 @@ +""" +RAGflow Graph Build Cron + +Laeuft alle 5 Minuten und erledigt zwei Aufgaben: + +Phase A – Status-Update laufender Graphs: + Holt alle CAkten mit graphParsingStatus='parsing', fragt per trace_graphrag + den aktuellen Fortschritt ab und setzt den Status in EspoCRM auf 'complete' + sobald progress == 1.0. + +Phase B – Neue Graph-Builds anstossen: + Holt alle CAkten mit: + - aiParsingStatus in ['complete', 'complete_with_failures'] + - graphParsingStatus in ['unclean', 'no_graph'] + - aiCollectionId isNotNull + Stellt sicher, dass kein Graph-Build laeuft (trace_graphrag), und + stoesst per run_graphrag einen neuen Build an. + Setzt graphParsingStatus → 'parsing'. + +graphParsingStatus-Werte (EspoCRM): + no_graph → noch kein Graph gebaut + parsing → Graph-Build laeuft + complete → Graph fertig (progress == 1.0) + unclean → Graph veraltet (neue Dokumente hochgeladen) +""" + +from motia import FlowContext, cron + +config = { + "name": "RAGflow Graph Build Cron", + "description": "Polls and triggers Knowledge Graph builds in RAGflow for CAkten", + "flows": ["akte-sync"], + "triggers": [cron("0 */5 * * * *")], # alle 5 Minuten +} + +BATCH_SIZE = 50 + + +async def handler(input_data: None, ctx: FlowContext) -> None: + from services.espocrm import EspoCRMAPI + from services.ragflow_service import RAGFlowService + + ctx.logger.info("=" * 60) + ctx.logger.info("⏰ RAGFLOW GRAPH BUILD CRON") + + espocrm = EspoCRMAPI(ctx) + ragflow = RAGFlowService(ctx) + + # ══════════════════════════════════════════════════════════════ + # Phase A: Laufende Builds aktualisieren + # ══════════════════════════════════════════════════════════════ + ctx.logger.info("── Phase A: Laufende Builds pruefen ──") + try: + parsing_result = await espocrm.list_entities( + 'CAkten', + where=[ + {'type': 'isNotNull', 'attribute': 'aiCollectionId'}, + {'type': 'equals', 'attribute': 'graphParsingStatus', 'value': 'parsing'}, + ], + select='id,aiCollectionId,graphParsingStatus', + max_size=BATCH_SIZE, + ) + except Exception as e: + ctx.logger.error(f"❌ EspoCRM Phase-A-Abfrage fehlgeschlagen: {e}") + parsing_result = {'list': []} + + polling_done = 0 + polling_error = 0 + for akte in parsing_result.get('list', []): + akte_id = akte['id'] + dataset_id = akte['aiCollectionId'] + try: + task = await ragflow.trace_graphrag(dataset_id) + if task is None: + # kein Task mehr vorhanden – als unclean markieren + ctx.logger.warn( + f" ⚠️ Akte {akte_id}: kein Graph-Task gefunden → unclean" + ) + await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'unclean'}) + polling_done += 1 + elif task['progress'] >= 1.0: + ctx.logger.info( + f" ✅ Akte {akte_id}: Graph fertig (progress=100%) → complete" + ) + await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'complete'}) + polling_done += 1 + else: + ctx.logger.info( + f" ⏳ Akte {akte_id}: Graph laeuft noch " + f"(progress={task['progress']:.0%})" + ) + except Exception as e: + ctx.logger.error(f" ❌ Fehler bei Akte {akte_id}: {e}") + polling_error += 1 + + ctx.logger.info( + f" Phase A: {len(parsing_result.get('list', []))} laufend" + f" → {polling_done} aktualisiert {polling_error} Fehler" + ) + + # ══════════════════════════════════════════════════════════════ + # Phase B: Neue Graph-Builds anstossen + # ══════════════════════════════════════════════════════════════ + ctx.logger.info("── Phase B: Neue Builds anstossen ──") + try: + pending_result = await espocrm.list_entities( + 'CAkten', + where=[ + {'type': 'isNotNull', 'attribute': 'aiCollectionId'}, + {'type': 'in', 'attribute': 'aiParsingStatus', + 'value': ['complete', 'complete_with_failures']}, + {'type': 'in', 'attribute': 'graphParsingStatus', + 'value': ['unclean', 'no_graph']}, + ], + select='id,aiCollectionId,aiParsingStatus,graphParsingStatus', + max_size=BATCH_SIZE, + ) + except Exception as e: + ctx.logger.error(f"❌ EspoCRM Phase-B-Abfrage fehlgeschlagen: {e}") + pending_result = {'list': []} + + triggered = 0 + skipped = 0 + trig_error = 0 + + for akte in pending_result.get('list', []): + akte_id = akte['id'] + dataset_id = akte['aiCollectionId'] + ai_status = akte.get('aiParsingStatus', '—') + graph_status = akte.get('graphParsingStatus', '—') + + # Sicherstellen dass kein Build bereits laeuft + try: + task = await ragflow.trace_graphrag(dataset_id) + except Exception as e: + ctx.logger.error( + f" ❌ trace_graphrag Akte {akte_id} fehlgeschlagen: {e}" + ) + trig_error += 1 + continue + + if task is not None and task['progress'] < 1.0: + ctx.logger.info( + f" ⏭️ Akte {akte_id}: Build laeuft noch " + f"(progress={task['progress']:.0%}) → setze parsing" + ) + try: + await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'parsing'}) + except Exception as e: + ctx.logger.error(f" ❌ Status-Update fehlgeschlagen: {e}") + skipped += 1 + continue + + # Build anstossen + ctx.logger.info( + f" 🔧 Akte {akte_id} " + f"ai={ai_status} graph={graph_status} " + f"dataset={dataset_id[:16]}…" + ) + try: + task_id = await ragflow.run_graphrag(dataset_id) + ctx.logger.info( + f" ✅ Graph-Build angestossen" + + (f" task_id={task_id}" if task_id else "") + ) + await espocrm.update_entity('CAkten', akte_id, {'graphParsingStatus': 'parsing'}) + triggered += 1 + except Exception as e: + ctx.logger.error(f" ❌ Fehler: {e}") + trig_error += 1 + + ctx.logger.info( + f" Phase B: {len(pending_result.get('list', []))} ausstehend" + f" → {triggered} angestossen {skipped} uebersprungen {trig_error} Fehler" + ) + ctx.logger.info("=" * 60)