diff --git a/src/steps/crm/akte/akte_sync_event_step.py b/src/steps/crm/akte/akte_sync_event_step.py index 99c78e7..0295b40 100644 --- a/src/steps/crm/akte/akte_sync_event_step.py +++ b/src/steps/crm/akte/akte_sync_event_step.py @@ -4,7 +4,9 @@ Akte Sync - Event Handler Unified sync for one CAkten entity across all configured backends: - Advoware (3-way merge: Windows ↔ EspoCRM ↔ History) - xAI (Blake3 hash-based upload to Collection) + - RAGflow (Dataset-based upload with laws chunk_method) +AI provider is selected via CAkten.aiProvider ('xai' or 'ragflow'). Both run in the same event to keep CDokumente perfectly in sync. Trigger: akte.sync { akte_id, aktennummer } @@ -15,6 +17,8 @@ Enqueues: - document.generate_preview (after CREATE / UPDATE_ESPO) """ +import traceback +import time from typing import Dict, Any from datetime import datetime from motia import FlowContext, queue @@ -22,7 +26,7 @@ from motia import FlowContext, queue config = { "name": "Akte Sync - Event Handler", - "description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload", + "description": "Unified sync for one Akte: Advoware 3-way merge + AI upload (xAI or RAGflow)", "flows": ["akte-sync"], "triggers": [queue("akte.sync")], "enqueues": ["document.generate_preview"], @@ -54,7 +58,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: return lock_key = f"akte_sync:{akte_id}" - lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=600) + lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) # 30 min if not lock_acquired: ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing") raise RuntimeError(f"Lock busy for akte_id={akte_id}") @@ -104,13 +108,21 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: advoware_results = None if advoware_enabled: advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx, espo_docs) + # Re-fetch docs after Advoware sync – newly created docs must be visible to AI sync + if ai_enabled and advoware_results and advoware_results.get('created', 0) > 0: + ctx.logger.info( + f" 🔄 Re-fetching docs after Advoware sync " + f"({advoware_results['created']} new doc(s) created)" + ) + espo_docs = await espocrm.list_related_all('CAkten', akte_id, 'dokumentes') # ── AI SYNC (xAI or RAGflow) ───────────────────────────────── + ai_had_failures = False if ai_enabled: if ai_provider.lower() == 'ragflow': - await _run_ragflow_sync(akte, akte_id, espocrm, ctx, espo_docs) + ai_had_failures = await _run_ragflow_sync(akte, akte_id, espocrm, ctx, espo_docs) else: - await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs) + ai_had_failures = await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs) # ── Final Status ─────────────────────────────────────────────────── now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -123,7 +135,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: final_update['aktivierungsstatus'] = 'active' ctx.logger.info("🔄 aktivierungsstatus: import → active") if ai_enabled: - final_update['aiSyncStatus'] = 'synced' + final_update['aiSyncStatus'] = 'failed' if ai_had_failures else 'synced' final_update['aiLastSync'] = now # 'new' = Dataset/Collection erstmalig angelegt → auf 'aktiv' setzen if ai_aktivierungsstatus == 'new': @@ -143,11 +155,9 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: except Exception as e: ctx.logger.error(f"❌ Sync failed: {e}") - import traceback ctx.logger.error(traceback.format_exc()) # Requeue Advoware aktennummer for retry (Motia retries the akte.sync event itself) - import time if aktennummer: redis_client.zadd("advoware:pending_aktennummern", {aktennummer: time.time()}) @@ -393,7 +403,7 @@ async def _run_xai_sync( espocrm, ctx: FlowContext, docs: list, -) -> None: +) -> bool: from services.xai_service import XAIService from services.xai_upload_utils import XAIUploadUtils @@ -418,7 +428,7 @@ async def _run_xai_sync( if not collection_id: ctx.logger.error("❌ xAI Collection konnte nicht erstellt werden – Sync abgebrochen") await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return + return True # had failures ctx.logger.info(f" ✅ Collection erstellt: {collection_id}") # aiAktivierungsstatus → 'aktiv' wird in handler final_update gesetzt else: @@ -428,7 +438,7 @@ async def _run_xai_sync( f"xAI Sync abgebrochen. Bitte Collection-ID in EspoCRM eintragen." ) await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return + return True # had failures else: # Collection-ID vorhanden → verifizieren ob sie noch in xAI existiert try: @@ -436,12 +446,12 @@ async def _run_xai_sync( if not col: ctx.logger.error(f"❌ Collection {collection_id} existiert nicht mehr in xAI – Sync abgebrochen") await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return + return True # had failures ctx.logger.info(f" ✅ Collection verifiziert: {collection_id}") except Exception as e: ctx.logger.error(f"❌ Collection-Verifizierung fehlgeschlagen: {e} – Sync abgebrochen") await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return + return True # had failures ctx.logger.info(f" Documents to check: {len(docs)}") @@ -485,6 +495,7 @@ async def _run_xai_sync( ctx.logger.info(f" ✅ Synced : {synced}") ctx.logger.info(f" ⏭️ Skipped : {skipped}") ctx.logger.info(f" ❌ Failed : {failed}") + return failed > 0 finally: await xai.close() @@ -500,7 +511,7 @@ async def _run_ragflow_sync( espocrm, ctx: FlowContext, docs: list, -) -> None: +) -> bool: from services.ragflow_service import RAGFlowService from urllib.parse import unquote import mimetypes @@ -512,192 +523,200 @@ async def _run_ragflow_sync( ctx.logger.info("🧠 RAGflow SYNC") ctx.logger.info("─" * 60) - ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() - dataset_id = akte.get('aiCollectionId') - - # ── Ensure dataset exists ───────────────────────────────────────────── - if not dataset_id: - if ai_aktivierungsstatus == 'new': - akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}" - ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...") - dataset_info = await ragflow.ensure_dataset(akte_name) - if not dataset_info or not dataset_info.get('id'): - ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden – Sync abgebrochen") - await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return - dataset_id = dataset_info['id'] - ctx.logger.info(f" ✅ Dataset erstellt: {dataset_id}") - await espocrm.update_entity('CAkten', akte_id, {'aiCollectionId': dataset_id}) - else: - ctx.logger.error( - f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId – " - f"RAGflow Sync abgebrochen. Bitte Dataset-ID in EspoCRM eintragen." - ) - await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return - - ctx.logger.info(f" Dataset-ID : {dataset_id}") - ctx.logger.info(f" EspoCRM docs: {len(docs)}") - - # ── RAGflow-Bestand abrufen (source of truth) ───────────────────────── - # Lookup: espocrm_id → ragflow_doc (nur Docs die mit espocrm_id getaggt sind) - ragflow_by_espocrm_id: Dict[str, Any] = {} try: - ragflow_docs = await ragflow.list_documents(dataset_id) - ctx.logger.info(f" RAGflow docs: {len(ragflow_docs)}") + ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() + dataset_id = akte.get('aiCollectionId') + + # ── Ensure dataset exists ───────────────────────────────────────────── + if not dataset_id: + if ai_aktivierungsstatus == 'new': + akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}" + ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...") + dataset_info = await ragflow.ensure_dataset(akte_name) + if not dataset_info or not dataset_info.get('id'): + ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden – Sync abgebrochen") + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + return True # had failures + dataset_id = dataset_info['id'] + ctx.logger.info(f" ✅ Dataset erstellt: {dataset_id}") + await espocrm.update_entity('CAkten', akte_id, {'aiCollectionId': dataset_id}) + else: + ctx.logger.error( + f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId – " + f"RAGflow Sync abgebrochen. Bitte Dataset-ID in EspoCRM eintragen." + ) + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + return True # had failures + + ctx.logger.info(f" Dataset-ID : {dataset_id}") + ctx.logger.info(f" EspoCRM docs: {len(docs)}") + + # ── RAGflow-Bestand abrufen (source of truth) ───────────────────────── + ragflow_by_espocrm_id: Dict[str, Any] = {} + try: + ragflow_docs = await ragflow.list_documents(dataset_id) + ctx.logger.info(f" RAGflow docs: {len(ragflow_docs)}") + for rd in ragflow_docs: + eid = rd.get('espocrm_id') + if eid: + ragflow_by_espocrm_id[eid] = rd + except Exception as e: + ctx.logger.error(f"❌ RAGflow Dokumentenliste nicht abrufbar: {e}") + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + return True # had failures + + # ── Orphan-Cleanup: RAGflow-Docs die kein EspoCRM-Äquivalent mehr haben ── + espocrm_ids_set = {d['id'] for d in docs} for rd in ragflow_docs: eid = rd.get('espocrm_id') - if eid: - ragflow_by_espocrm_id[eid] = rd - except Exception as e: - ctx.logger.error(f"❌ RAGflow Dokumentenliste nicht abrufbar: {e}") - await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) - return + if eid and eid not in espocrm_ids_set: + try: + await ragflow.remove_document(dataset_id, rd['id']) + ctx.logger.info(f" 🗑️ Orphan gelöscht: {rd.get('name', rd['id'])} (espocrm_id={eid})") + except Exception as e: + ctx.logger.warn(f" ⚠️ Orphan-Delete fehlgeschlagen: {e}") - # ── Orphan-Cleanup: RAGflow-Docs die kein EspoCRM-Äquivalent mehr haben ── - espocrm_ids_set = {d['id'] for d in docs} - for rd in ragflow_docs: - eid = rd.get('espocrm_id') - if eid and eid not in espocrm_ids_set: - try: - await ragflow.remove_document(dataset_id, rd['id']) - ctx.logger.info(f" 🗑️ Orphan gelöscht: {rd.get('name', rd['id'])} (espocrm_id={eid})") - except Exception as e: - ctx.logger.warn(f" ⚠️ Orphan-Delete fehlgeschlagen: {e}") + synced = 0 + skipped = 0 + failed = 0 - synced = 0 - skipped = 0 - failed = 0 + for doc in docs: + doc_id = doc['id'] + doc_name = doc.get('name', doc_id) + blake3_hash = doc.get('blake3hash') or '' - for doc in docs: - doc_id = doc['id'] - doc_name = doc.get('name', doc_id) - blake3_hash = doc.get('blake3hash') or '' + # Was ist aktuell in RAGflow für dieses Dokument? + ragflow_doc = ragflow_by_espocrm_id.get(doc_id) + ragflow_doc_id = ragflow_doc['id'] if ragflow_doc else None + ragflow_blake3 = ragflow_doc.get('blake3_hash', '') if ragflow_doc else '' + ragflow_meta = ragflow_doc.get('meta_fields', {}) if ragflow_doc else {} - # Was ist aktuell in RAGflow für dieses Dokument? - ragflow_doc = ragflow_by_espocrm_id.get(doc_id) - ragflow_doc_id = ragflow_doc['id'] if ragflow_doc else None - ragflow_blake3 = ragflow_doc.get('blake3_hash', '') if ragflow_doc else '' - ragflow_meta = ragflow_doc.get('meta_fields', {}) if ragflow_doc else {} + # Aktuelle Metadaten aus EspoCRM + current_description = str(doc.get('beschreibung') or '') + current_advo_art = str(doc.get('advowareArt') or '') + current_advo_bemerk = str(doc.get('advowareBemerkung') or '') - # Aktuelle Metadaten aus EspoCRM - current_description = str(doc.get('beschreibung') or '') - current_advo_art = str(doc.get('advowareArt') or '') - current_advo_bemerk = str(doc.get('advowareBemerkung') or '') - - content_changed = blake3_hash != ragflow_blake3 - meta_changed = ( - ragflow_meta.get('description', '') != current_description or - ragflow_meta.get('advoware_art', '') != current_advo_art or - ragflow_meta.get('advoware_bemerkung', '') != current_advo_bemerk - ) - - ctx.logger.info(f" 📄 {doc_name}") - ctx.logger.info( - f" in_ragflow={bool(ragflow_doc_id)}, " - f"content_changed={content_changed}, meta_changed={meta_changed}" - ) - if ragflow_doc_id: - ctx.logger.info( - f" ragflow_blake3={ragflow_blake3[:12] if ragflow_blake3 else 'N/A'}..., " - f"espo_blake3={blake3_hash[:12] if blake3_hash else 'N/A'}..." + content_changed = blake3_hash != ragflow_blake3 + meta_changed = ( + ragflow_meta.get('description', '') != current_description or + ragflow_meta.get('advoware_art', '') != current_advo_art or + ragflow_meta.get('advoware_bemerkung', '') != current_advo_bemerk ) - if not ragflow_doc_id and not content_changed and not meta_changed and not blake3_hash: - # Kein Attachment-Hash vorhanden und noch nie in RAGflow → unsupported - ctx.logger.info(f" ⏭️ Kein Blake3-Hash – übersprungen") - skipped += 1 - continue - - attachment_id = doc.get('dokumentId') - if not attachment_id: - ctx.logger.warn(f" ⚠️ Kein Attachment (dokumentId fehlt) – unsupported") - await espocrm.update_entity('CDokumente', doc_id, { - 'aiSyncStatus': 'unsupported', - 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - }) - skipped += 1 - continue - - filename = unquote(doc.get('dokumentName') or doc.get('name') or 'document.bin') - mime_type, _ = mimetypes.guess_type(filename) - if not mime_type: - mime_type = 'application/octet-stream' - - try: - if ragflow_doc_id and not content_changed and meta_changed: - # ── Nur Metadaten aktualisieren ─────────────────────────── - ctx.logger.info(f" 🔄 Metadata-Update für {ragflow_doc_id}…") - await ragflow.update_document_meta( - dataset_id, ragflow_doc_id, - blake3_hash=blake3_hash, - description=current_description, - advoware_art=current_advo_art, - advoware_bemerkung=current_advo_bemerk, + ctx.logger.info(f" 📄 {doc_name}") + ctx.logger.info( + f" in_ragflow={bool(ragflow_doc_id)}, " + f"content_changed={content_changed}, meta_changed={meta_changed}" + ) + if ragflow_doc_id: + ctx.logger.info( + f" ragflow_blake3={ragflow_blake3[:12] if ragflow_blake3 else 'N/A'}..., " + f"espo_blake3={blake3_hash[:12] if blake3_hash else 'N/A'}..." ) - new_ragflow_id = ragflow_doc_id - elif ragflow_doc_id and not content_changed and not meta_changed: - # ── Vollständig unverändert → Skip ──────────────────────── - ctx.logger.info(f" ✅ Unverändert – kein Re-Upload") - # Tracking-Felder in EspoCRM aktuell halten + if not ragflow_doc_id and not blake3_hash: + ctx.logger.info(f" ⏭️ Kein Blake3-Hash – übersprungen") + skipped += 1 + continue + + attachment_id = doc.get('dokumentId') + if not attachment_id: + ctx.logger.warn(f" ⚠️ Kein Attachment (dokumentId fehlt) – unsupported") await espocrm.update_entity('CDokumente', doc_id, { - 'aiFileId': ragflow_doc_id, - 'aiCollectionId': dataset_id, - 'aiSyncHash': blake3_hash, - 'aiSyncStatus': 'synced', + 'aiSyncStatus': 'unsupported', + 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), }) skipped += 1 continue - else: - # ── Upload (neu oder Inhalt geändert) ───────────────────── - if ragflow_doc_id and content_changed: - ctx.logger.info(f" 🗑️ Inhalt geändert – altes Dokument löschen: {ragflow_doc_id}") - try: - await ragflow.remove_document(dataset_id, ragflow_doc_id) - except Exception: - pass + filename = unquote(doc.get('dokumentName') or doc.get('name') or 'document.bin') + mime_type, _ = mimetypes.guess_type(filename) + if not mime_type: + mime_type = 'application/octet-stream' - ctx.logger.info(f" 📥 Downloading {filename} ({attachment_id})…") - file_content = await espocrm.download_attachment(attachment_id) - ctx.logger.info(f" Downloaded {len(file_content)} bytes") + try: + if ragflow_doc_id and not content_changed and meta_changed: + # ── Nur Metadaten aktualisieren ─────────────────────────── + ctx.logger.info(f" 🔄 Metadata-Update für {ragflow_doc_id}…") + await ragflow.update_document_meta( + dataset_id, ragflow_doc_id, + blake3_hash=blake3_hash, + description=current_description, + advoware_art=current_advo_art, + advoware_bemerkung=current_advo_bemerk, + ) + new_ragflow_id = ragflow_doc_id - ctx.logger.info(f" 📤 Uploading '{filename}' ({mime_type})…") - result = await ragflow.upload_document( - dataset_id=dataset_id, - file_content=file_content, - filename=filename, - mime_type=mime_type, - blake3_hash=blake3_hash, - espocrm_id=doc_id, - description=current_description, - advoware_art=current_advo_art, - advoware_bemerkung=current_advo_bemerk, - ) - if not result or not result.get('id'): - raise RuntimeError("upload_document gab kein Ergebnis zurück") - new_ragflow_id = result['id'] + elif ragflow_doc_id and not content_changed and not meta_changed: + # ── Vollständig unverändert → Skip ──────────────────────── + ctx.logger.info(f" ✅ Unverändert – kein Re-Upload") + await espocrm.update_entity('CDokumente', doc_id, { + 'aiFileId': ragflow_doc_id, + 'aiCollectionId': dataset_id, + 'aiSyncHash': blake3_hash, + 'aiSyncStatus': 'synced', + }) + skipped += 1 + continue - ctx.logger.info(f" ✅ RAGflow-ID: {new_ragflow_id}") - now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - await espocrm.update_entity('CDokumente', doc_id, { - 'aiFileId': new_ragflow_id, - 'aiCollectionId': dataset_id, - 'aiSyncHash': blake3_hash, - 'aiSyncStatus': 'synced', - 'aiLastSync': now_str, - }) - synced += 1 + else: + # ── Upload (neu oder Inhalt geändert) ───────────────────── + if ragflow_doc_id and content_changed: + ctx.logger.info(f" 🗑️ Inhalt geändert – altes Dokument löschen: {ragflow_doc_id}") + try: + await ragflow.remove_document(dataset_id, ragflow_doc_id) + except Exception: + pass - except Exception as e: - ctx.logger.error(f" ❌ Fehlgeschlagen: {e}") - await espocrm.update_entity('CDokumente', doc_id, { - 'aiSyncStatus': 'failed', - 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - }) - failed += 1 + ctx.logger.info(f" 📥 Downloading {filename} ({attachment_id})…") + file_content = await espocrm.download_attachment(attachment_id) + ctx.logger.info(f" Downloaded {len(file_content)} bytes") - ctx.logger.info(f" ✅ Synced : {synced}") - ctx.logger.info(f" ⏭️ Skipped : {skipped}") - ctx.logger.info(f" ❌ Failed : {failed}") + ctx.logger.info(f" 📤 Uploading '{filename}' ({mime_type})…") + result = await ragflow.upload_document( + dataset_id=dataset_id, + file_content=file_content, + filename=filename, + mime_type=mime_type, + blake3_hash=blake3_hash, + espocrm_id=doc_id, + description=current_description, + advoware_art=current_advo_art, + advoware_bemerkung=current_advo_bemerk, + ) + if not result or not result.get('id'): + raise RuntimeError("upload_document gab kein Ergebnis zurück") + new_ragflow_id = result['id'] + + ctx.logger.info(f" ✅ RAGflow-ID: {new_ragflow_id}") + now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + await espocrm.update_entity('CDokumente', doc_id, { + 'aiFileId': new_ragflow_id, + 'aiCollectionId': dataset_id, + 'aiSyncHash': blake3_hash, + 'aiSyncStatus': 'synced', + 'aiLastSync': now_str, + }) + synced += 1 + + except Exception as e: + ctx.logger.error(f" ❌ Fehlgeschlagen: {e}") + await espocrm.update_entity('CDokumente', doc_id, { + 'aiSyncStatus': 'failed', + 'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + }) + failed += 1 + + ctx.logger.info(f" ✅ Synced : {synced}") + ctx.logger.info(f" ⏭️ Skipped : {skipped}") + ctx.logger.info(f" ❌ Failed : {failed}") + return failed > 0 + + except Exception as e: + ctx.logger.error(f"❌ RAGflow Sync unerwarteter Fehler: {e}") + ctx.logger.error(traceback.format_exc()) + try: + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + except Exception: + pass + return True # had failures