704 lines
32 KiB
Python
704 lines
32 KiB
Python
"""
|
||
Akte Sync - Event Handler
|
||
|
||
Unified sync for one CAkten entity across all configured backends:
|
||
- Advoware (3-way merge: Windows ↔ EspoCRM ↔ History)
|
||
- xAI (Blake3 hash-based upload to Collection)
|
||
|
||
Both run in the same event to keep CDokumente perfectly in sync.
|
||
|
||
Trigger: akte.sync { akte_id, aktennummer }
|
||
Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte)
|
||
Parallel: Different Akten sync simultaneously.
|
||
|
||
Enqueues:
|
||
- document.generate_preview (after CREATE / UPDATE_ESPO)
|
||
"""
|
||
|
||
from typing import Dict, Any
|
||
from datetime import datetime
|
||
from motia import FlowContext, queue
|
||
|
||
|
||
config = {
|
||
"name": "Akte Sync - Event Handler",
|
||
"description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload",
|
||
"flows": ["akte-sync"],
|
||
"triggers": [queue("akte.sync")],
|
||
"enqueues": ["document.generate_preview"],
|
||
}
|
||
|
||
VALID_ADVOWARE_STATUSES = frozenset({'import', 'new', 'active'})
|
||
VALID_AI_STATUSES = frozenset({'new', 'active'})
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Entry point
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||
akte_id = event_data.get('akte_id')
|
||
aktennummer = event_data.get('aktennummer')
|
||
|
||
ctx.logger.info("=" * 80)
|
||
ctx.logger.info("🔄 AKTE SYNC STARTED")
|
||
ctx.logger.info(f" Aktennummer : {aktennummer}")
|
||
ctx.logger.info(f" EspoCRM ID : {akte_id}")
|
||
ctx.logger.info("=" * 80)
|
||
|
||
from services.redis_client import get_redis_client
|
||
from services.espocrm import EspoCRMAPI
|
||
|
||
redis_client = get_redis_client(strict=False)
|
||
if not redis_client:
|
||
ctx.logger.error("❌ Redis unavailable")
|
||
return
|
||
|
||
lock_key = f"akte_sync:{akte_id}"
|
||
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=600)
|
||
if not lock_acquired:
|
||
ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing")
|
||
raise RuntimeError(f"Lock busy for akte_id={akte_id}")
|
||
|
||
espocrm = EspoCRMAPI(ctx)
|
||
|
||
try:
|
||
# ── Load Akte ──────────────────────────────────────────────────────
|
||
akte = await espocrm.get_entity('CAkten', akte_id)
|
||
if not akte:
|
||
ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM")
|
||
return
|
||
|
||
# aktennummer can come from the event payload OR from the entity
|
||
# (Akten without Advoware have no aktennummer)
|
||
if not aktennummer:
|
||
aktennummer = akte.get('aktennummer')
|
||
|
||
sync_schalter = akte.get('syncSchalter', False)
|
||
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
|
||
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
|
||
ai_provider = str(akte.get('aiProvider') or 'xAI')
|
||
|
||
ctx.logger.info(f"📋 Akte '{akte.get('name')}'")
|
||
ctx.logger.info(f" syncSchalter : {sync_schalter}")
|
||
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}")
|
||
ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}")
|
||
ctx.logger.info(f" aiProvider : {ai_provider}")
|
||
|
||
# Advoware sync requires an aktennummer (Akten without Advoware won't have one)
|
||
advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
|
||
ai_enabled = ai_aktivierungsstatus in VALID_AI_STATUSES
|
||
|
||
ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}")
|
||
ctx.logger.info(f" AI sync ({ai_provider}) : {'✅ ON' if ai_enabled else '⏭️ OFF'}")
|
||
|
||
if not advoware_enabled and not ai_enabled:
|
||
ctx.logger.info("⏭️ Both syncs disabled – nothing to do")
|
||
return
|
||
|
||
# ── Load CDokumente once (shared by Advoware + xAI sync) ─────────────────
|
||
espo_docs: list = []
|
||
if advoware_enabled or ai_enabled:
|
||
espo_docs = await espocrm.list_related_all('CAkten', akte_id, 'dokumentes')
|
||
|
||
# ── ADVOWARE SYNC ────────────────────────────────────────────
|
||
advoware_results = None
|
||
if advoware_enabled:
|
||
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx, espo_docs)
|
||
|
||
# ── AI SYNC (xAI or RAGflow) ─────────────────────────────────
|
||
if ai_enabled:
|
||
if ai_provider.lower() == 'ragflow':
|
||
await _run_ragflow_sync(akte, akte_id, espocrm, ctx, espo_docs)
|
||
else:
|
||
await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs)
|
||
|
||
# ── Final Status ───────────────────────────────────────────────────
|
||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'}
|
||
if advoware_enabled:
|
||
final_update['syncStatus'] = 'synced'
|
||
final_update['lastSync'] = now
|
||
# 'import' = erster Sync → danach auf 'aktiv' setzen
|
||
if aktivierungsstatus == 'import':
|
||
final_update['aktivierungsstatus'] = 'active'
|
||
ctx.logger.info("🔄 aktivierungsstatus: import → active")
|
||
if ai_enabled:
|
||
final_update['aiSyncStatus'] = 'synced'
|
||
final_update['aiLastSync'] = now
|
||
# 'new' = Dataset/Collection erstmalig angelegt → auf 'aktiv' setzen
|
||
if ai_aktivierungsstatus == 'new':
|
||
final_update['aiAktivierungsstatus'] = 'active'
|
||
ctx.logger.info("🔄 aiAktivierungsstatus: new → active")
|
||
|
||
await espocrm.update_entity('CAkten', akte_id, final_update)
|
||
# Clean up processing set (Advoware Watcher queue)
|
||
if aktennummer:
|
||
redis_client.srem("advoware:processing_aktennummern", aktennummer)
|
||
|
||
ctx.logger.info("=" * 80)
|
||
ctx.logger.info("✅ AKTE SYNC COMPLETE")
|
||
if advoware_results:
|
||
ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}")
|
||
ctx.logger.info("=" * 80)
|
||
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ Sync failed: {e}")
|
||
import traceback
|
||
ctx.logger.error(traceback.format_exc())
|
||
|
||
# Requeue Advoware aktennummer for retry (Motia retries the akte.sync event itself)
|
||
import time
|
||
if aktennummer:
|
||
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: time.time()})
|
||
|
||
try:
|
||
await espocrm.update_entity('CAkten', akte_id, {
|
||
'syncStatus': 'failed',
|
||
'globalSyncStatus': 'failed',
|
||
})
|
||
except Exception:
|
||
pass
|
||
raise
|
||
|
||
finally:
|
||
if lock_acquired and redis_client:
|
||
redis_client.delete(lock_key)
|
||
ctx.logger.info(f"🔓 Lock released for Akte {akte_id}")
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Advoware 3-way merge
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _run_advoware_sync(
|
||
akte: Dict[str, Any],
|
||
aktennummer: str,
|
||
akte_id: str,
|
||
espocrm,
|
||
ctx: FlowContext,
|
||
espo_docs: list,
|
||
) -> Dict[str, int]:
|
||
from services.advoware_watcher_service import AdvowareWatcherService
|
||
from services.advoware_history_service import AdvowareHistoryService
|
||
from services.advoware_service import AdvowareService
|
||
from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils
|
||
from services.blake3_utils import compute_blake3
|
||
import mimetypes
|
||
|
||
watcher = AdvowareWatcherService(ctx)
|
||
history_service = AdvowareHistoryService(ctx)
|
||
advoware_service = AdvowareService(ctx)
|
||
sync_utils = AdvowareDocumentSyncUtils(ctx)
|
||
|
||
results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
|
||
|
||
ctx.logger.info("")
|
||
ctx.logger.info("─" * 60)
|
||
ctx.logger.info("📂 ADVOWARE SYNC")
|
||
ctx.logger.info("─" * 60)
|
||
|
||
# ── Fetch Windows files + Advoware History ───────────────────────────
|
||
try:
|
||
windows_files = await watcher.get_akte_files(aktennummer)
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ Windows watcher failed: {e}")
|
||
windows_files = []
|
||
|
||
try:
|
||
advo_history = await history_service.get_akte_history(aktennummer)
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ Advoware history failed: {e}")
|
||
advo_history = []
|
||
|
||
ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}")
|
||
ctx.logger.info(f" Windows files : {len(windows_files)}")
|
||
ctx.logger.info(f" History entries: {len(advo_history)}")
|
||
|
||
# ── Cleanup Windows list (only files in History) ───────────────────
|
||
windows_files = sync_utils.cleanup_file_list(windows_files, advo_history)
|
||
|
||
# ── Build indexes by HNR (stable identifier from Advoware) ────────
|
||
espo_by_hnr = {}
|
||
for doc in espo_docs:
|
||
if doc.get('hnr'):
|
||
espo_by_hnr[doc['hnr']] = doc
|
||
|
||
history_by_hnr = {}
|
||
for entry in advo_history:
|
||
if entry.get('hNr'):
|
||
history_by_hnr[entry['hNr']] = entry
|
||
|
||
windows_by_path = {f.get('path', '').lower(): f for f in windows_files}
|
||
|
||
all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys())
|
||
ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}")
|
||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
# ── 3-way merge per HNR ───────────────────────────────────────────
|
||
for hnr in all_hnrs:
|
||
espo_doc = espo_by_hnr.get(hnr)
|
||
history_entry = history_by_hnr.get(hnr)
|
||
|
||
windows_file = None
|
||
if history_entry and history_entry.get('datei'):
|
||
windows_file = windows_by_path.get(history_entry['datei'].lower())
|
||
|
||
if history_entry and history_entry.get('datei'):
|
||
filename = history_entry['datei'].split('\\')[-1]
|
||
elif espo_doc:
|
||
filename = espo_doc.get('name', f'hnr_{hnr}')
|
||
else:
|
||
filename = f'hnr_{hnr}'
|
||
|
||
try:
|
||
action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry)
|
||
ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) – {action.reason}")
|
||
|
||
if action.action == 'SKIP':
|
||
results['skipped'] += 1
|
||
|
||
elif action.action == 'CREATE':
|
||
if not windows_file:
|
||
ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}")
|
||
results['errors'] += 1
|
||
continue
|
||
|
||
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
|
||
blake3_hash = compute_blake3(content)
|
||
mime_type, _ = mimetypes.guess_type(filename)
|
||
mime_type = mime_type or 'application/octet-stream'
|
||
|
||
attachment = await espocrm.upload_attachment_for_file_field(
|
||
file_content=content,
|
||
filename=filename,
|
||
related_type='CDokumente',
|
||
field='dokument',
|
||
mime_type=mime_type,
|
||
)
|
||
new_doc = await espocrm.create_entity('CDokumente', {
|
||
'name': filename,
|
||
'dokumentId': attachment.get('id'),
|
||
'hnr': history_entry.get('hNr') if history_entry else None,
|
||
'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben',
|
||
'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '',
|
||
'dateipfad': windows_file.get('path', ''),
|
||
'blake3hash': blake3_hash,
|
||
'syncedHash': blake3_hash,
|
||
'usn': windows_file.get('usn', 0),
|
||
'syncStatus': 'synced',
|
||
'lastSyncTimestamp': now,
|
||
'cAktenId': akte_id, # Direct FK to CAkten
|
||
})
|
||
doc_id = new_doc.get('id')
|
||
|
||
# Link to Akte
|
||
await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id)
|
||
results['created'] += 1
|
||
|
||
# Trigger preview
|
||
try:
|
||
await ctx.enqueue({'topic': 'document.generate_preview', 'data': {
|
||
'entity_id': doc_id,
|
||
'entity_type': 'CDokumente',
|
||
}})
|
||
except Exception as e:
|
||
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
|
||
|
||
elif action.action == 'UPDATE_ESPO':
|
||
if not windows_file:
|
||
ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}")
|
||
results['errors'] += 1
|
||
continue
|
||
|
||
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
|
||
blake3_hash = compute_blake3(content)
|
||
mime_type, _ = mimetypes.guess_type(filename)
|
||
mime_type = mime_type or 'application/octet-stream'
|
||
|
||
update_data: Dict[str, Any] = {
|
||
'name': filename,
|
||
'blake3hash': blake3_hash,
|
||
'syncedHash': blake3_hash,
|
||
'usn': windows_file.get('usn', 0),
|
||
'dateipfad': windows_file.get('path', ''),
|
||
'syncStatus': 'synced',
|
||
'lastSyncTimestamp': now,
|
||
}
|
||
if history_entry:
|
||
update_data['hnr'] = history_entry.get('hNr')
|
||
update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100]
|
||
update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255]
|
||
|
||
# Mark for re-sync to xAI only if file content actually changed
|
||
# (USN can change without content change, e.g. metadata-only updates)
|
||
content_changed = blake3_hash != espo_doc.get('syncedHash', '')
|
||
if content_changed and espo_doc.get('aiSyncStatus') == 'synced':
|
||
update_data['aiSyncStatus'] = 'unclean'
|
||
await espocrm.update_entity('CDokumente', espo_doc['id'], update_data)
|
||
results['updated'] += 1
|
||
|
||
try:
|
||
await ctx.enqueue({'topic': 'document.generate_preview', 'data': {
|
||
'entity_id': espo_doc['id'],
|
||
'entity_type': 'CDokumente',
|
||
}})
|
||
except Exception as e:
|
||
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
|
||
|
||
elif action.action == 'DELETE':
|
||
if espo_doc:
|
||
# Only delete if the HNR is genuinely absent from Advoware History
|
||
# (not just absent from Windows – avoids deleting docs whose file
|
||
# is temporarily unavailable on the Windows share)
|
||
if hnr in history_by_hnr:
|
||
ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows")
|
||
results['skipped'] += 1
|
||
else:
|
||
await espocrm.delete_entity('CDokumente', espo_doc['id'])
|
||
results['deleted'] += 1
|
||
|
||
except Exception as e:
|
||
ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}")
|
||
results['errors'] += 1
|
||
|
||
# ── Ablage check + Rubrum sync ─────────────────────────────────────
|
||
try:
|
||
akte_details = await advoware_service.get_akte(aktennummer)
|
||
if akte_details:
|
||
espo_update: Dict[str, Any] = {}
|
||
|
||
if akte_details.get('ablage') == 1:
|
||
ctx.logger.info("📁 Akte marked as ablage → deactivating")
|
||
espo_update['aktivierungsstatus'] = 'inactive'
|
||
|
||
rubrum = akte_details.get('rubrum')
|
||
if rubrum and rubrum != akte.get('rubrum'):
|
||
espo_update['rubrum'] = rubrum
|
||
ctx.logger.info(f"📝 Rubrum synced: {rubrum[:80]}")
|
||
|
||
if espo_update:
|
||
await espocrm.update_entity('CAkten', akte_id, espo_update)
|
||
except Exception as e:
|
||
ctx.logger.warn(f"⚠️ Ablage/Rubrum check failed: {e}")
|
||
|
||
return results
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# xAI sync
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _run_xai_sync(
|
||
akte: Dict[str, Any],
|
||
akte_id: str,
|
||
espocrm,
|
||
ctx: FlowContext,
|
||
docs: list,
|
||
) -> None:
|
||
from services.xai_service import XAIService
|
||
from services.xai_upload_utils import XAIUploadUtils
|
||
|
||
xai = XAIService(ctx)
|
||
upload_utils = XAIUploadUtils(ctx)
|
||
|
||
ctx.logger.info("")
|
||
ctx.logger.info("─" * 60)
|
||
ctx.logger.info("🤖 xAI SYNC")
|
||
ctx.logger.info("─" * 60)
|
||
|
||
try:
|
||
# ── Collection-ID ermitteln ────────────────────────────────────
|
||
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
|
||
collection_id = akte.get('aiCollectionId')
|
||
|
||
if not collection_id:
|
||
if ai_aktivierungsstatus == 'new':
|
||
# Status 'new' → neue Collection anlegen
|
||
ctx.logger.info(" Status 'new' → Erstelle neue xAI Collection...")
|
||
collection_id = await upload_utils.ensure_collection(akte, xai, espocrm)
|
||
if not collection_id:
|
||
ctx.logger.error("❌ xAI Collection konnte nicht erstellt werden – Sync abgebrochen")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
ctx.logger.info(f" ✅ Collection erstellt: {collection_id}")
|
||
# aiAktivierungsstatus → 'aktiv' wird in handler final_update gesetzt
|
||
else:
|
||
# aktiv (oder anderer Status) aber keine Collection-ID → Konfigurationsfehler
|
||
ctx.logger.error(
|
||
f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId vorhanden – "
|
||
f"xAI Sync abgebrochen. Bitte Collection-ID in EspoCRM eintragen."
|
||
)
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
else:
|
||
# Collection-ID vorhanden → verifizieren ob sie noch in xAI existiert
|
||
try:
|
||
col = await xai.get_collection(collection_id)
|
||
if not col:
|
||
ctx.logger.error(f"❌ Collection {collection_id} existiert nicht mehr in xAI – Sync abgebrochen")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
ctx.logger.info(f" ✅ Collection verifiziert: {collection_id}")
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ Collection-Verifizierung fehlgeschlagen: {e} – Sync abgebrochen")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
|
||
ctx.logger.info(f" Documents to check: {len(docs)}")
|
||
|
||
# ── Orphan-Cleanup: xAI-Docs löschen die kein EspoCRM-Äquivalent haben ──
|
||
known_xai_file_ids = {doc.get('aiFileId') for doc in docs if doc.get('aiFileId')}
|
||
try:
|
||
xai_docs = await xai.list_collection_documents(collection_id)
|
||
orphans = [d for d in xai_docs if d.get('file_id') not in known_xai_file_ids]
|
||
if orphans:
|
||
ctx.logger.info(f" 🗑️ Orphan-Cleanup: {len(orphans)} Doc(s) in xAI ohne EspoCRM-Eintrag")
|
||
for orphan in orphans:
|
||
try:
|
||
await xai.remove_from_collection(collection_id, orphan['file_id'])
|
||
ctx.logger.info(f" Gelöscht: {orphan.get('filename', orphan['file_id'])}")
|
||
except Exception as e:
|
||
ctx.logger.warn(f" Orphan-Delete fehlgeschlagen: {e}")
|
||
except Exception as e:
|
||
ctx.logger.warn(f" ⚠️ Orphan-Cleanup fehlgeschlagen (non-fatal): {e}")
|
||
|
||
synced = 0
|
||
skipped = 0
|
||
failed = 0
|
||
|
||
for doc in docs:
|
||
# Determine skip condition based on pre-sync state (avoids stale-dict stats bug)
|
||
will_skip = (
|
||
doc.get('aiSyncStatus') == 'synced'
|
||
and doc.get('aiSyncHash')
|
||
and doc.get('blake3hash')
|
||
and doc.get('aiSyncHash') == doc.get('blake3hash')
|
||
)
|
||
ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm)
|
||
if ok:
|
||
if will_skip:
|
||
skipped += 1
|
||
else:
|
||
synced += 1
|
||
else:
|
||
failed += 1
|
||
|
||
ctx.logger.info(f" ✅ Synced : {synced}")
|
||
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
|
||
ctx.logger.info(f" ❌ Failed : {failed}")
|
||
|
||
finally:
|
||
await xai.close()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# RAGflow sync
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _run_ragflow_sync(
|
||
akte: Dict[str, Any],
|
||
akte_id: str,
|
||
espocrm,
|
||
ctx: FlowContext,
|
||
docs: list,
|
||
) -> None:
|
||
from services.ragflow_service import RAGFlowService
|
||
from urllib.parse import unquote
|
||
import mimetypes
|
||
|
||
ragflow = RAGFlowService(ctx)
|
||
|
||
ctx.logger.info("")
|
||
ctx.logger.info("─" * 60)
|
||
ctx.logger.info("🧠 RAGflow SYNC")
|
||
ctx.logger.info("─" * 60)
|
||
|
||
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
|
||
dataset_id = akte.get('aiCollectionId')
|
||
|
||
# ── Ensure dataset exists ─────────────────────────────────────────────
|
||
if not dataset_id:
|
||
if ai_aktivierungsstatus == 'new':
|
||
akte_name = akte.get('name') or f"Akte {akte.get('aktennummer', akte_id)}"
|
||
ctx.logger.info(f" Status 'new' → Erstelle neues RAGflow Dataset für '{akte_name}'...")
|
||
dataset_info = await ragflow.ensure_dataset(akte_name)
|
||
if not dataset_info or not dataset_info.get('id'):
|
||
ctx.logger.error("❌ RAGflow Dataset konnte nicht erstellt werden – Sync abgebrochen")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
dataset_id = dataset_info['id']
|
||
ctx.logger.info(f" ✅ Dataset erstellt: {dataset_id}")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiCollectionId': dataset_id})
|
||
else:
|
||
ctx.logger.error(
|
||
f"❌ aiAktivierungsstatus='{ai_aktivierungsstatus}' aber keine aiCollectionId – "
|
||
f"RAGflow Sync abgebrochen. Bitte Dataset-ID in EspoCRM eintragen."
|
||
)
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
|
||
ctx.logger.info(f" Dataset-ID : {dataset_id}")
|
||
ctx.logger.info(f" EspoCRM docs: {len(docs)}")
|
||
|
||
# ── RAGflow-Bestand abrufen (source of truth) ─────────────────────────
|
||
# Lookup: espocrm_id → ragflow_doc (nur Docs die mit espocrm_id getaggt sind)
|
||
ragflow_by_espocrm_id: Dict[str, Any] = {}
|
||
try:
|
||
ragflow_docs = await ragflow.list_documents(dataset_id)
|
||
ctx.logger.info(f" RAGflow docs: {len(ragflow_docs)}")
|
||
for rd in ragflow_docs:
|
||
eid = rd.get('espocrm_id')
|
||
if eid:
|
||
ragflow_by_espocrm_id[eid] = rd
|
||
except Exception as e:
|
||
ctx.logger.error(f"❌ RAGflow Dokumentenliste nicht abrufbar: {e}")
|
||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||
return
|
||
|
||
# ── Orphan-Cleanup: RAGflow-Docs die kein EspoCRM-Äquivalent mehr haben ──
|
||
espocrm_ids_set = {d['id'] for d in docs}
|
||
for rd in ragflow_docs:
|
||
eid = rd.get('espocrm_id')
|
||
if eid and eid not in espocrm_ids_set:
|
||
try:
|
||
await ragflow.remove_document(dataset_id, rd['id'])
|
||
ctx.logger.info(f" 🗑️ Orphan gelöscht: {rd.get('name', rd['id'])} (espocrm_id={eid})")
|
||
except Exception as e:
|
||
ctx.logger.warn(f" ⚠️ Orphan-Delete fehlgeschlagen: {e}")
|
||
|
||
synced = 0
|
||
skipped = 0
|
||
failed = 0
|
||
|
||
for doc in docs:
|
||
doc_id = doc['id']
|
||
doc_name = doc.get('name', doc_id)
|
||
blake3_hash = doc.get('blake3hash') or ''
|
||
|
||
# Was ist aktuell in RAGflow für dieses Dokument?
|
||
ragflow_doc = ragflow_by_espocrm_id.get(doc_id)
|
||
ragflow_doc_id = ragflow_doc['id'] if ragflow_doc else None
|
||
ragflow_blake3 = ragflow_doc.get('blake3_hash', '') if ragflow_doc else ''
|
||
ragflow_meta = ragflow_doc.get('meta_fields', {}) if ragflow_doc else {}
|
||
|
||
# Aktuelle Metadaten aus EspoCRM
|
||
current_description = str(doc.get('beschreibung') or '')
|
||
current_advo_art = str(doc.get('advowareArt') or '')
|
||
current_advo_bemerk = str(doc.get('advowareBemerkung') or '')
|
||
|
||
content_changed = blake3_hash != ragflow_blake3
|
||
meta_changed = (
|
||
ragflow_meta.get('description', '') != current_description or
|
||
ragflow_meta.get('advoware_art', '') != current_advo_art or
|
||
ragflow_meta.get('advoware_bemerkung', '') != current_advo_bemerk
|
||
)
|
||
|
||
ctx.logger.info(f" 📄 {doc_name}")
|
||
ctx.logger.info(
|
||
f" in_ragflow={bool(ragflow_doc_id)}, "
|
||
f"content_changed={content_changed}, meta_changed={meta_changed}"
|
||
)
|
||
if ragflow_doc_id:
|
||
ctx.logger.info(
|
||
f" ragflow_blake3={ragflow_blake3[:12] if ragflow_blake3 else 'N/A'}..., "
|
||
f"espo_blake3={blake3_hash[:12] if blake3_hash else 'N/A'}..."
|
||
)
|
||
|
||
if not ragflow_doc_id and not content_changed and not meta_changed and not blake3_hash:
|
||
# Kein Attachment-Hash vorhanden und noch nie in RAGflow → unsupported
|
||
ctx.logger.info(f" ⏭️ Kein Blake3-Hash – übersprungen")
|
||
skipped += 1
|
||
continue
|
||
|
||
attachment_id = doc.get('dokumentId')
|
||
if not attachment_id:
|
||
ctx.logger.warn(f" ⚠️ Kein Attachment (dokumentId fehlt) – unsupported")
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiSyncStatus': 'unsupported',
|
||
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
})
|
||
skipped += 1
|
||
continue
|
||
|
||
filename = unquote(doc.get('dokumentName') or doc.get('name') or 'document.bin')
|
||
mime_type, _ = mimetypes.guess_type(filename)
|
||
if not mime_type:
|
||
mime_type = 'application/octet-stream'
|
||
|
||
try:
|
||
if ragflow_doc_id and not content_changed and meta_changed:
|
||
# ── Nur Metadaten aktualisieren ───────────────────────────
|
||
ctx.logger.info(f" 🔄 Metadata-Update für {ragflow_doc_id}…")
|
||
await ragflow.update_document_meta(
|
||
dataset_id, ragflow_doc_id,
|
||
blake3_hash=blake3_hash,
|
||
description=current_description,
|
||
advoware_art=current_advo_art,
|
||
advoware_bemerkung=current_advo_bemerk,
|
||
)
|
||
new_ragflow_id = ragflow_doc_id
|
||
|
||
elif ragflow_doc_id and not content_changed and not meta_changed:
|
||
# ── Vollständig unverändert → Skip ────────────────────────
|
||
ctx.logger.info(f" ✅ Unverändert – kein Re-Upload")
|
||
# Tracking-Felder in EspoCRM aktuell halten
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiFileId': ragflow_doc_id,
|
||
'aiCollectionId': dataset_id,
|
||
'aiSyncHash': blake3_hash,
|
||
'aiSyncStatus': 'synced',
|
||
})
|
||
skipped += 1
|
||
continue
|
||
|
||
else:
|
||
# ── Upload (neu oder Inhalt geändert) ─────────────────────
|
||
if ragflow_doc_id and content_changed:
|
||
ctx.logger.info(f" 🗑️ Inhalt geändert – altes Dokument löschen: {ragflow_doc_id}")
|
||
try:
|
||
await ragflow.remove_document(dataset_id, ragflow_doc_id)
|
||
except Exception:
|
||
pass
|
||
|
||
ctx.logger.info(f" 📥 Downloading {filename} ({attachment_id})…")
|
||
file_content = await espocrm.download_attachment(attachment_id)
|
||
ctx.logger.info(f" Downloaded {len(file_content)} bytes")
|
||
|
||
ctx.logger.info(f" 📤 Uploading '{filename}' ({mime_type})…")
|
||
result = await ragflow.upload_document(
|
||
dataset_id=dataset_id,
|
||
file_content=file_content,
|
||
filename=filename,
|
||
mime_type=mime_type,
|
||
blake3_hash=blake3_hash,
|
||
espocrm_id=doc_id,
|
||
description=current_description,
|
||
advoware_art=current_advo_art,
|
||
advoware_bemerkung=current_advo_bemerk,
|
||
)
|
||
if not result or not result.get('id'):
|
||
raise RuntimeError("upload_document gab kein Ergebnis zurück")
|
||
new_ragflow_id = result['id']
|
||
|
||
ctx.logger.info(f" ✅ RAGflow-ID: {new_ragflow_id}")
|
||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiFileId': new_ragflow_id,
|
||
'aiCollectionId': dataset_id,
|
||
'aiSyncHash': blake3_hash,
|
||
'aiSyncStatus': 'synced',
|
||
'aiLastSync': now_str,
|
||
})
|
||
synced += 1
|
||
|
||
except Exception as e:
|
||
ctx.logger.error(f" ❌ Fehlgeschlagen: {e}")
|
||
await espocrm.update_entity('CDokumente', doc_id, {
|
||
'aiSyncStatus': 'failed',
|
||
'aiLastSync': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
})
|
||
failed += 1
|
||
|
||
ctx.logger.info(f" ✅ Synced : {synced}")
|
||
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
|
||
ctx.logger.info(f" ❌ Failed : {failed}")
|