feat(sync): Enhance Akte Sync with RAGflow support and improve error handling

This commit is contained in:
bsiggel
2026-03-26 23:09:42 +00:00
parent 1cd8de8574
commit 9bd62fc5ab

View File

@@ -4,7 +4,9 @@ Akte Sync - Event Handler
Unified sync for one CAkten entity across all configured backends:
- Advoware (3-way merge: Windows ↔ EspoCRM ↔ History)
- xAI (Blake3 hash-based upload to Collection)
- RAGflow (Dataset-based upload with laws chunk_method)
AI provider is selected via CAkten.aiProvider ('xai' or 'ragflow').
Both run in the same event to keep CDokumente perfectly in sync.
Trigger: akte.sync { akte_id, aktennummer }
@@ -15,6 +17,8 @@ Enqueues:
- document.generate_preview (after CREATE / UPDATE_ESPO)
"""
import traceback
import time
from typing import Dict, Any
from datetime import datetime
from motia import FlowContext, queue
@@ -22,7 +26,7 @@ from motia import FlowContext, queue
config = {
"name": "Akte Sync - Event Handler",
"description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload",
"description": "Unified sync for one Akte: Advoware 3-way merge + AI upload (xAI or RAGflow)",
"flows": ["akte-sync"],
"triggers": [queue("akte.sync")],
"enqueues": ["document.generate_preview"],
@@ -54,7 +58,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
return
lock_key = f"akte_sync:{akte_id}"
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=600)
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) # 30 min
if not lock_acquired:
ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} requeueing")
raise RuntimeError(f"Lock busy for akte_id={akte_id}")
@@ -104,13 +108,21 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
advoware_results = None
if advoware_enabled:
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx, espo_docs)
# Re-fetch docs after Advoware sync newly created docs must be visible to AI sync
if ai_enabled and advoware_results and advoware_results.get('created', 0) > 0:
ctx.logger.info(
f" 🔄 Re-fetching docs after Advoware sync "
f"({advoware_results['created']} new doc(s) created)"
)
espo_docs = await espocrm.list_related_all('CAkten', akte_id, 'dokumentes')
# ── AI SYNC (xAI or RAGflow) ─────────────────────────────────
ai_had_failures = False
if ai_enabled:
if ai_provider.lower() == 'ragflow':
await _run_ragflow_sync(akte, akte_id, espocrm, ctx, espo_docs)
ai_had_failures = await _run_ragflow_sync(akte, akte_id, espocrm, ctx, espo_docs)
else:
await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs)
ai_had_failures = await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs)
# ── Final Status ───────────────────────────────────────────────────
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@@ -123,7 +135,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
final_update['aktivierungsstatus'] = 'active'
ctx.logger.info("🔄 aktivierungsstatus: import → active")
if ai_enabled:
final_update['aiSyncStatus'] = 'synced'
final_update['aiSyncStatus'] = 'failed' if ai_had_failures else 'synced'
final_update['aiLastSync'] = now
# 'new' = Dataset/Collection erstmalig angelegt → auf 'aktiv' setzen
if ai_aktivierungsstatus == 'new':
@@ -143,11 +155,9 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
except Exception as e:
ctx.logger.error(f"❌ Sync failed: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
# Requeue Advoware aktennummer for retry (Motia retries the akte.sync event itself)
import time
if aktennummer:
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: time.time()})
@@ -393,7 +403,7 @@ async def _run_xai_sync(
espocrm,
ctx: FlowContext,
docs: list,
) -> None:
) -> bool:
from services.xai_service import XAIService
from services.xai_upload_utils import XAIUploadUtils
@@ -418,7 +428,7 @@ async def _run_xai_sync(
if not collection_id:
ctx.logger.error("❌ xAI Collection konnte nicht erstellt werden Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
return True # had failures
ctx.logger.info(f" ✅ Collection erstellt: {collection_id}")
# aiAktivierungsstatus → 'aktiv' wird in handler final_update gesetzt
else:
@@ -428,7 +438,7 @@ async def _run_xai_sync(
f"xAI Sync abgebrochen. Bitte Collection-ID in EspoCRM eintragen."
)
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
return True # had failures
else:
# Collection-ID vorhanden → verifizieren ob sie noch in xAI existiert
try:
@@ -436,12 +446,12 @@ async def _run_xai_sync(
if not col:
ctx.logger.error(f"❌ Collection {collection_id} existiert nicht mehr in xAI Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
return True # had failures
ctx.logger.info(f" ✅ Collection verifiziert: {collection_id}")
except Exception as e:
ctx.logger.error(f"❌ Collection-Verifizierung fehlgeschlagen: {e} Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
return True # had failures
ctx.logger.info(f" Documents to check: {len(docs)}")
@@ -485,6 +495,7 @@ async def _run_xai_sync(
ctx.logger.info(f" ✅ Synced : {synced}")
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
ctx.logger.info(f" ❌ Failed : {failed}")
return failed > 0
finally:
await xai.close()
@@ -500,7 +511,7 @@ async def _run_ragflow_sync(
espocrm,
ctx: FlowContext,
docs: list,
) -> None:
) -> bool:
from services.ragflow_service import RAGFlowService
from urllib.parse import unquote
import mimetypes
@@ -512,192 +523,200 @@ async def _run_ragflow_sync(
ctx.logger.info("🧠 RAGflow SYNC")
ctx.logger.info("" * 60)
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
dataset_id = akte.get('aiCollectionId')
# ── Ensure dataset exists ─────────────────────────────────────────────
if not dataset_id:
if ai_aktivierungsstatus == 'new':
akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}"
ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...")
dataset_info = await ragflow.ensure_dataset(akte_name)
if not dataset_info or not dataset_info.get('id'):
ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
dataset_id = dataset_info['id']
ctx.logger.info(f" ✅ Dataset erstellt: {dataset_id}")
await espocrm.update_entity('CAkten', akte_id, {'aiCollectionId': dataset_id})
else:
ctx.logger.error(
f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId "
f"RAGflow Sync abgebrochen. Bitte Dataset-ID in EspoCRM eintragen."
)
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
ctx.logger.info(f" Dataset-ID : {dataset_id}")
ctx.logger.info(f" EspoCRM docs: {len(docs)}")
# ── RAGflow-Bestand abrufen (source of truth) ─────────────────────────
# Lookup: espocrm_id → ragflow_doc (nur Docs die mit espocrm_id getaggt sind)
ragflow_by_espocrm_id: Dict[str, Any] = {}
try:
ragflow_docs = await ragflow.list_documents(dataset_id)
ctx.logger.info(f" RAGflow docs: {len(ragflow_docs)}")
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
dataset_id = akte.get('aiCollectionId')
# ── Ensure dataset exists ─────────────────────────────────────────────
if not dataset_id:
if ai_aktivierungsstatus == 'new':
akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}"
ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...")
dataset_info = await ragflow.ensure_dataset(akte_name)
if not dataset_info or not dataset_info.get('id'):
ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden Sync abgebrochen")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return True # had failures
dataset_id = dataset_info['id']
ctx.logger.info(f" ✅ Dataset erstellt: {dataset_id}")
await espocrm.update_entity('CAkten', akte_id, {'aiCollectionId': dataset_id})
else:
ctx.logger.error(
f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId "
f"RAGflow Sync abgebrochen. Bitte Dataset-ID in EspoCRM eintragen."
)
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return True # had failures
ctx.logger.info(f" Dataset-ID : {dataset_id}")
ctx.logger.info(f" EspoCRM docs: {len(docs)}")
# ── RAGflow-Bestand abrufen (source of truth) ─────────────────────────
ragflow_by_espocrm_id: Dict[str, Any] = {}
try:
ragflow_docs = await ragflow.list_documents(dataset_id)
ctx.logger.info(f" RAGflow docs: {len(ragflow_docs)}")
for rd in ragflow_docs:
eid = rd.get('espocrm_id')
if eid:
ragflow_by_espocrm_id[eid] = rd
except Exception as e:
ctx.logger.error(f"❌ RAGflow Dokumentenliste nicht abrufbar: {e}")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return True # had failures
# ── Orphan-Cleanup: RAGflow-Docs die kein EspoCRM-Äquivalent mehr haben ──
espocrm_ids_set = {d['id'] for d in docs}
for rd in ragflow_docs:
eid = rd.get('espocrm_id')
if eid:
ragflow_by_espocrm_id[eid] = rd
except Exception as e:
ctx.logger.error(f"❌ RAGflow Dokumentenliste nicht abrufbar: {e}")
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
return
if eid and eid not in espocrm_ids_set:
try:
await ragflow.remove_document(dataset_id, rd['id'])
ctx.logger.info(f" 🗑️ Orphan gelöscht: {rd.get('name', rd['id'])} (espocrm_id={eid})")
except Exception as e:
ctx.logger.warn(f" ⚠️ Orphan-Delete fehlgeschlagen: {e}")
# ── Orphan-Cleanup: RAGflow-Docs die kein EspoCRM-Äquivalent mehr haben ──
espocrm_ids_set = {d['id'] for d in docs}
for rd in ragflow_docs:
eid = rd.get('espocrm_id')
if eid and eid not in espocrm_ids_set:
try:
await ragflow.remove_document(dataset_id, rd['id'])
ctx.logger.info(f" 🗑️ Orphan gelöscht: {rd.get('name', rd['id'])} (espocrm_id={eid})")
except Exception as e:
ctx.logger.warn(f" ⚠️ Orphan-Delete fehlgeschlagen: {e}")
synced = 0
skipped = 0
failed = 0
synced = 0
skipped = 0
failed = 0
for doc in docs:
doc_id = doc['id']
doc_name = doc.get('name', doc_id)
blake3_hash = doc.get('blake3hash') or ''
for doc in docs:
doc_id = doc['id']
doc_name = doc.get('name', doc_id)
blake3_hash = doc.get('blake3hash') or ''
# Was ist aktuell in RAGflow für dieses Dokument?
ragflow_doc = ragflow_by_espocrm_id.get(doc_id)
ragflow_doc_id = ragflow_doc['id'] if ragflow_doc else None
ragflow_blake3 = ragflow_doc.get('blake3_hash', '') if ragflow_doc else ''
ragflow_meta = ragflow_doc.get('meta_fields', {}) if ragflow_doc else {}
# Was ist aktuell in RAGflow für dieses Dokument?
ragflow_doc = ragflow_by_espocrm_id.get(doc_id)
ragflow_doc_id = ragflow_doc['id'] if ragflow_doc else None
ragflow_blake3 = ragflow_doc.get('blake3_hash', '') if ragflow_doc else ''
ragflow_meta = ragflow_doc.get('meta_fields', {}) if ragflow_doc else {}
# Aktuelle Metadaten aus EspoCRM
current_description = str(doc.get('beschreibung') or '')
current_advo_art = str(doc.get('advowareArt') or '')
current_advo_bemerk = str(doc.get('advowareBemerkung') or '')
# Aktuelle Metadaten aus EspoCRM
current_description = str(doc.get('beschreibung') or '')
current_advo_art = str(doc.get('advowareArt') or '')
current_advo_bemerk = str(doc.get('advowareBemerkung') or '')
content_changed = blake3_hash != ragflow_blake3
meta_changed = (
ragflow_meta.get('description', '') != current_description or
ragflow_meta.get('advoware_art', '') != current_advo_art or
ragflow_meta.get('advoware_bemerkung', '') != current_advo_bemerk
)
ctx.logger.info(f" 📄 {doc_name}")
ctx.logger.info(
f" in_ragflow={bool(ragflow_doc_id)}, "
f"content_changed={content_changed}, meta_changed={meta_changed}"
)
if ragflow_doc_id:
ctx.logger.info(
f" ragflow_blake3={ragflow_blake3[:12] if ragflow_blake3 else 'N/A'}..., "
f"espo_blake3={blake3_hash[:12] if blake3_hash else 'N/A'}..."
content_changed = blake3_hash != ragflow_blake3
meta_changed = (
ragflow_meta.get('description', '') != current_description or
ragflow_meta.get('advoware_art', '') != current_advo_art or
ragflow_meta.get('advoware_bemerkung', '') != current_advo_bemerk
)
if not ragflow_doc_id and not content_changed and not meta_changed and not blake3_hash:
# Kein Attachment-Hash vorhanden und noch nie in RAGflow → unsupported
ctx.logger.info(f" ⏭️ Kein Blake3-Hash übersprungen")
skipped += 1
continue
attachment_id = doc.get('dokumentId')
if not attachment_id:
ctx.logger.warn(f" ⚠️ Kein Attachment (dokumentId fehlt) unsupported")
await espocrm.update_entity('CDokumente', doc_id, {
'aiSyncStatus': 'unsupported',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
skipped += 1
continue
filename = unquote(doc.get('dokumentName') or doc.get('name') or 'document.bin')
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
try:
if ragflow_doc_id and not content_changed and meta_changed:
# ── Nur Metadaten aktualisieren ───────────────────────────
ctx.logger.info(f" 🔄 Metadata-Update für {ragflow_doc_id}")
await ragflow.update_document_meta(
dataset_id, ragflow_doc_id,
blake3_hash=blake3_hash,
description=current_description,
advoware_art=current_advo_art,
advoware_bemerkung=current_advo_bemerk,
ctx.logger.info(f" 📄 {doc_name}")
ctx.logger.info(
f" in_ragflow={bool(ragflow_doc_id)}, "
f"content_changed={content_changed}, meta_changed={meta_changed}"
)
if ragflow_doc_id:
ctx.logger.info(
f" ragflow_blake3={ragflow_blake3[:12] if ragflow_blake3 else 'N/A'}..., "
f"espo_blake3={blake3_hash[:12] if blake3_hash else 'N/A'}..."
)
new_ragflow_id = ragflow_doc_id
elif ragflow_doc_id and not content_changed and not meta_changed:
# ── Vollständig unverändert → Skip ────────────────────────
ctx.logger.info(f" ✅ Unverändert kein Re-Upload")
# Tracking-Felder in EspoCRM aktuell halten
if not ragflow_doc_id and not blake3_hash:
ctx.logger.info(f" ⏭️ Kein Blake3-Hash übersprungen")
skipped += 1
continue
attachment_id = doc.get('dokumentId')
if not attachment_id:
ctx.logger.warn(f" ⚠️ Kein Attachment (dokumentId fehlt) unsupported")
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': ragflow_doc_id,
'aiCollectionId': dataset_id,
'aiSyncHash': blake3_hash,
'aiSyncStatus': 'synced',
'aiSyncStatus': 'unsupported',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
skipped += 1
continue
else:
# ── Upload (neu oder Inhalt geändert) ─────────────────────
if ragflow_doc_id and content_changed:
ctx.logger.info(f" 🗑️ Inhalt geändert altes Dokument löschen: {ragflow_doc_id}")
try:
await ragflow.remove_document(dataset_id, ragflow_doc_id)
except Exception:
pass
filename = unquote(doc.get('dokumentName') or doc.get('name') or 'document.bin')
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
ctx.logger.info(f" 📥 Downloading {filename} ({attachment_id})…")
file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" Downloaded {len(file_content)} bytes")
try:
if ragflow_doc_id and not content_changed and meta_changed:
# ── Nur Metadaten aktualisieren ───────────────────────────
ctx.logger.info(f" 🔄 Metadata-Update für {ragflow_doc_id}")
await ragflow.update_document_meta(
dataset_id, ragflow_doc_id,
blake3_hash=blake3_hash,
description=current_description,
advoware_art=current_advo_art,
advoware_bemerkung=current_advo_bemerk,
)
new_ragflow_id = ragflow_doc_id
ctx.logger.info(f" 📤 Uploading '{filename}' ({mime_type})…")
result = await ragflow.upload_document(
dataset_id=dataset_id,
file_content=file_content,
filename=filename,
mime_type=mime_type,
blake3_hash=blake3_hash,
espocrm_id=doc_id,
description=current_description,
advoware_art=current_advo_art,
advoware_bemerkung=current_advo_bemerk,
)
if not result or not result.get('id'):
raise RuntimeError("upload_document gab kein Ergebnis zurück")
new_ragflow_id = result['id']
elif ragflow_doc_id and not content_changed and not meta_changed:
# ── Vollständig unverändert → Skip ────────────────────────
ctx.logger.info(f" ✅ Unverändert kein Re-Upload")
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': ragflow_doc_id,
'aiCollectionId': dataset_id,
'aiSyncHash': blake3_hash,
'aiSyncStatus': 'synced',
})
skipped += 1
continue
ctx.logger.info(f" ✅ RAGflow-ID: {new_ragflow_id}")
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': new_ragflow_id,
'aiCollectionId': dataset_id,
'aiSyncHash': blake3_hash,
'aiSyncStatus': 'synced',
'aiLastSync': now_str,
})
synced += 1
else:
# ── Upload (neu oder Inhalt geändert) ─────────────────────
if ragflow_doc_id and content_changed:
ctx.logger.info(f" 🗑️ Inhalt geändert altes Dokument löschen: {ragflow_doc_id}")
try:
await ragflow.remove_document(dataset_id, ragflow_doc_id)
except Exception:
pass
except Exception as e:
ctx.logger.error(f" ❌ Fehlgeschlagen: {e}")
await espocrm.update_entity('CDokumente', doc_id, {
'aiSyncStatus': 'failed',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
failed += 1
ctx.logger.info(f" 📥 Downloading {filename} ({attachment_id})…")
file_content = await espocrm.download_attachment(attachment_id)
ctx.logger.info(f" Downloaded {len(file_content)} bytes")
ctx.logger.info(f" ✅ Synced : {synced}")
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
ctx.logger.info(f" ❌ Failed : {failed}")
ctx.logger.info(f" 📤 Uploading '{filename}' ({mime_type})…")
result = await ragflow.upload_document(
dataset_id=dataset_id,
file_content=file_content,
filename=filename,
mime_type=mime_type,
blake3_hash=blake3_hash,
espocrm_id=doc_id,
description=current_description,
advoware_art=current_advo_art,
advoware_bemerkung=current_advo_bemerk,
)
if not result or not result.get('id'):
raise RuntimeError("upload_document gab kein Ergebnis zurück")
new_ragflow_id = result['id']
ctx.logger.info(f" ✅ RAGflow-ID: {new_ragflow_id}")
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
await espocrm.update_entity('CDokumente', doc_id, {
'aiFileId': new_ragflow_id,
'aiCollectionId': dataset_id,
'aiSyncHash': blake3_hash,
'aiSyncStatus': 'synced',
'aiLastSync': now_str,
})
synced += 1
except Exception as e:
ctx.logger.error(f" ❌ Fehlgeschlagen: {e}")
await espocrm.update_entity('CDokumente', doc_id, {
'aiSyncStatus': 'failed',
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
})
failed += 1
ctx.logger.info(f" ✅ Synced : {synced}")
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
ctx.logger.info(f" ❌ Failed : {failed}")
return failed > 0
except Exception as e:
ctx.logger.error(f"❌ RAGflow Sync unerwarteter Fehler: {e}")
ctx.logger.error(traceback.format_exc())
try:
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
except Exception:
pass
return True # had failures