feat(sync): Implement RAGflow Parsing Status Poller for syncing document statuses with EspoCRM

This commit is contained in:
bsiggel
2026-03-27 10:12:52 +00:00
parent c20baeb21a
commit a2181a25fc

View File

@@ -0,0 +1,125 @@
"""
RAGflow Parsing Status Poller
Fragt alle 60 Sekunden EspoCRM nach CDokumente-Eintraegen ab,
deren RAGflow-Parsing noch nicht abgeschlossen ist (aiParsingStatus not in {complete, failed}).
Fuer jedes gefundene Dokument wird der aktuelle Parsing-Status von RAGflow
abgefragt und bei Aenderung zurueck nach EspoCRM geschrieben.
aiParsingStatus-Werte (EspoCRM):
unknown → RAGflow run=UNSTART (noch nicht gestartet)
parsing → RAGflow run=RUNNING
complete → RAGflow run=DONE
failed → RAGflow run=FAIL oder CANCEL
"""
from motia import FlowContext, cron
config = {
"name": "RAGflow Parsing Status Poller",
"description": "Polls RAGflow parsing status for uploaded documents and syncs back to EspoCRM",
"flows": ["akte-sync"],
"triggers": [cron("0 */1 * * * *")], # jede Minute
}
# RAGflow run → EspoCRM aiParsingStatus
RUN_STATUS_MAP = {
'UNSTART': 'unknown',
'RUNNING': 'parsing',
'DONE': 'complete',
'FAIL': 'failed',
'CANCEL': 'failed',
}
BATCH_SIZE = 200 # max CDokumente pro Poll-Tick
async def handler(input_data: None, ctx: FlowContext) -> None:
from services.espocrm import EspoCRMAPI
from services.ragflow_service import RAGFlowService
from collections import defaultdict
ctx.logger.info("=" * 60)
ctx.logger.info("⏰ RAGFLOW PARSING STATUS POLLER")
espocrm = EspoCRMAPI(ctx)
ragflow = RAGFlowService(ctx)
# ── 1. CDokumente laden die noch nicht erfolgreicher geparst wurden ───────
try:
result = await espocrm.list_entities(
'CDokumente',
where=[
{'type': 'isNotNull', 'attribute': 'aiFileId'},
{'type': 'isNotNull', 'attribute': 'aiCollectionId'},
{'type': 'notEquals', 'attribute': 'aiParsingStatus', 'value': 'complete'},
{'type': 'notEquals', 'attribute': 'aiParsingStatus', 'value': 'failed'},
],
select='id,aiFileId,aiCollectionId,aiParsingStatus',
max_size=BATCH_SIZE,
)
except Exception as e:
ctx.logger.error(f"❌ EspoCRM Abfrage fehlgeschlagen: {e}")
ctx.logger.info("=" * 60)
return
docs = result.get('list', [])
ctx.logger.info(f" Pending-Dokumente: {len(docs)}")
if not docs:
ctx.logger.info("✓ Keine ausstehenden Dokumente")
ctx.logger.info("=" * 60)
return
# ── 2. Nach Dataset-ID gruppieren (1 RAGflow-Aufruf pro Dataset) ─────────
by_dataset: dict[str, list] = defaultdict(list)
for doc in docs:
if doc.get('aiCollectionId'):
by_dataset[doc['aiCollectionId']].append(doc)
updated = 0
failed = 0
for dataset_id, dataset_docs in by_dataset.items():
# RAGflow-Dokumente des Datasets laden
try:
ragflow_docs = await ragflow.list_documents(dataset_id)
ragflow_by_id = {rd['id']: rd for rd in ragflow_docs}
except Exception as e:
ctx.logger.error(f" ❌ RAGflow list_documents({dataset_id[:12]}…) fehlgeschlagen: {e}")
failed += len(dataset_docs)
continue
for doc in dataset_docs:
doc_id = doc['id']
ai_file_id = doc.get('aiFileId', '')
current_status = doc.get('aiParsingStatus') or 'unknown'
ragflow_doc = ragflow_by_id.get(ai_file_id)
if not ragflow_doc:
ctx.logger.warn(
f" ⚠️ CDokumente {doc_id}: aiFileId {ai_file_id[:12]}… nicht in RAGflow gefunden"
)
continue
run = (ragflow_doc.get('run') or 'UNSTART').upper()
new_status = RUN_STATUS_MAP.get(run, 'unknown')
if new_status == current_status:
continue # keine Änderung
ctx.logger.info(
f" 📄 {doc_id}: {current_status}{new_status} "
f"(run={run}, progress={ragflow_doc.get('progress', 0):.0%})"
)
try:
await espocrm.update_entity('CDokumente', doc_id, {
'aiParsingStatus': new_status,
})
updated += 1
except Exception as e:
ctx.logger.error(f" ❌ Update CDokumente {doc_id} fehlgeschlagen: {e}")
failed += 1
ctx.logger.info(f" ✅ Aktualisiert: {updated} ❌ Fehler: {failed}")
ctx.logger.info("=" * 60)