diff --git a/src/steps/crm/akte/ragflow_parsing_status_cron_step.py b/src/steps/crm/akte/ragflow_parsing_status_cron_step.py new file mode 100644 index 0000000..8d18bc2 --- /dev/null +++ b/src/steps/crm/akte/ragflow_parsing_status_cron_step.py @@ -0,0 +1,125 @@ +""" +RAGflow Parsing Status Poller + +Fragt alle 60 Sekunden EspoCRM nach CDokumente-Eintraegen ab, +deren RAGflow-Parsing noch nicht abgeschlossen ist (aiParsingStatus not in {complete, failed}). +Fuer jedes gefundene Dokument wird der aktuelle Parsing-Status von RAGflow +abgefragt und – bei Aenderung – zurueck nach EspoCRM geschrieben. + +aiParsingStatus-Werte (EspoCRM): + unknown → RAGflow run=UNSTART (noch nicht gestartet) + parsing → RAGflow run=RUNNING + complete → RAGflow run=DONE + failed → RAGflow run=FAIL oder CANCEL +""" + +from motia import FlowContext, cron + +config = { + "name": "RAGflow Parsing Status Poller", + "description": "Polls RAGflow parsing status for uploaded documents and syncs back to EspoCRM", + "flows": ["akte-sync"], + "triggers": [cron("0 */1 * * * *")], # jede Minute +} + +# RAGflow run → EspoCRM aiParsingStatus +RUN_STATUS_MAP = { + 'UNSTART': 'unknown', + 'RUNNING': 'parsing', + 'DONE': 'complete', + 'FAIL': 'failed', + 'CANCEL': 'failed', +} + +BATCH_SIZE = 200 # max CDokumente pro Poll-Tick + + +async def handler(input_data: None, ctx: FlowContext) -> None: + from services.espocrm import EspoCRMAPI + from services.ragflow_service import RAGFlowService + from collections import defaultdict + + ctx.logger.info("=" * 60) + ctx.logger.info("⏰ RAGFLOW PARSING STATUS POLLER") + + espocrm = EspoCRMAPI(ctx) + ragflow = RAGFlowService(ctx) + + # ── 1. CDokumente laden die noch nicht erfolgreicher geparst wurden ─────── + try: + result = await espocrm.list_entities( + 'CDokumente', + where=[ + {'type': 'isNotNull', 'attribute': 'aiFileId'}, + {'type': 'isNotNull', 'attribute': 'aiCollectionId'}, + {'type': 'notEquals', 'attribute': 'aiParsingStatus', 'value': 'complete'}, + {'type': 'notEquals', 'attribute': 'aiParsingStatus', 'value': 'failed'}, + ], + select='id,aiFileId,aiCollectionId,aiParsingStatus', + max_size=BATCH_SIZE, + ) + except Exception as e: + ctx.logger.error(f"❌ EspoCRM Abfrage fehlgeschlagen: {e}") + ctx.logger.info("=" * 60) + return + + docs = result.get('list', []) + ctx.logger.info(f" Pending-Dokumente: {len(docs)}") + + if not docs: + ctx.logger.info("✓ Keine ausstehenden Dokumente") + ctx.logger.info("=" * 60) + return + + # ── 2. Nach Dataset-ID gruppieren (1 RAGflow-Aufruf pro Dataset) ───────── + by_dataset: dict[str, list] = defaultdict(list) + for doc in docs: + if doc.get('aiCollectionId'): + by_dataset[doc['aiCollectionId']].append(doc) + + updated = 0 + failed = 0 + + for dataset_id, dataset_docs in by_dataset.items(): + # RAGflow-Dokumente des Datasets laden + try: + ragflow_docs = await ragflow.list_documents(dataset_id) + ragflow_by_id = {rd['id']: rd for rd in ragflow_docs} + except Exception as e: + ctx.logger.error(f" ❌ RAGflow list_documents({dataset_id[:12]}…) fehlgeschlagen: {e}") + failed += len(dataset_docs) + continue + + for doc in dataset_docs: + doc_id = doc['id'] + ai_file_id = doc.get('aiFileId', '') + current_status = doc.get('aiParsingStatus') or 'unknown' + + ragflow_doc = ragflow_by_id.get(ai_file_id) + if not ragflow_doc: + ctx.logger.warn( + f" ⚠️ CDokumente {doc_id}: aiFileId {ai_file_id[:12]}… nicht in RAGflow gefunden" + ) + continue + + run = (ragflow_doc.get('run') or 'UNSTART').upper() + new_status = RUN_STATUS_MAP.get(run, 'unknown') + + if new_status == current_status: + continue # keine Änderung + + ctx.logger.info( + f" 📄 {doc_id}: {current_status} → {new_status} " + f"(run={run}, progress={ragflow_doc.get('progress', 0):.0%})" + ) + try: + await espocrm.update_entity('CDokumente', doc_id, { + 'aiParsingStatus': new_status, + }) + updated += 1 + except Exception as e: + ctx.logger.error(f" ❌ Update CDokumente {doc_id} fehlgeschlagen: {e}") + failed += 1 + + ctx.logger.info(f" ✅ Aktualisiert: {updated} ❌ Fehler: {failed}") + ctx.logger.info("=" * 60)