feat: Add Akte Sync Event Handler for unified synchronization across backends
This commit is contained in:
435
src/steps/akte/akte_sync_event_step.py
Normal file
435
src/steps/akte/akte_sync_event_step.py
Normal file
@@ -0,0 +1,435 @@
|
||||
"""
|
||||
Akte Sync - Event Handler
|
||||
|
||||
Unified sync for one CAkten entity across all configured backends:
|
||||
- Advoware (3-way merge: Windows ↔ EspoCRM ↔ History)
|
||||
- xAI (Blake3 hash-based upload to Collection)
|
||||
|
||||
Both run in the same event to keep CDokumente perfectly in sync.
|
||||
|
||||
Trigger: akte.sync { akte_id, aktennummer }
|
||||
Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte)
|
||||
Parallel: Different Akten sync simultaneously.
|
||||
|
||||
Enqueues:
|
||||
- document.generate_preview (after CREATE / UPDATE_ESPO)
|
||||
"""
|
||||
|
||||
from typing import Dict, Any
|
||||
from datetime import datetime
|
||||
from motia import FlowContext, queue
|
||||
|
||||
|
||||
config = {
|
||||
"name": "Akte Sync - Event Handler",
|
||||
"description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload",
|
||||
"flows": ["akte-sync"],
|
||||
"triggers": [queue("akte.sync")],
|
||||
"enqueues": ["document.generate_preview"],
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Entry point
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
akte_id = event_data.get('akte_id')
|
||||
aktennummer = event_data.get('aktennummer')
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🔄 AKTE SYNC STARTED")
|
||||
ctx.logger.info(f" Aktennummer : {aktennummer}")
|
||||
ctx.logger.info(f" EspoCRM ID : {akte_id}")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
from services.redis_client import get_redis_client
|
||||
from services.espocrm import EspoCRMAPI
|
||||
|
||||
redis_client = get_redis_client(strict=False)
|
||||
if not redis_client:
|
||||
ctx.logger.error("❌ Redis unavailable")
|
||||
return
|
||||
|
||||
lock_key = f"akte_sync:{akte_id}"
|
||||
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800)
|
||||
if not lock_acquired:
|
||||
ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing")
|
||||
raise RuntimeError(f"Lock busy for akte_id={akte_id}")
|
||||
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
|
||||
try:
|
||||
# ── Load Akte ──────────────────────────────────────────────────────
|
||||
akte = await espocrm.get_entity('CAkten', akte_id)
|
||||
if not akte:
|
||||
ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM")
|
||||
return
|
||||
|
||||
# aktennummer can come from the event payload OR from the entity
|
||||
# (Akten without Advoware have no aktennummer)
|
||||
if not aktennummer:
|
||||
aktennummer = akte.get('aktennummer')
|
||||
|
||||
sync_schalter = akte.get('syncSchalter', False)
|
||||
aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower()
|
||||
ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower()
|
||||
|
||||
ctx.logger.info(f"📋 Akte '{akte.get('name')}'")
|
||||
ctx.logger.info(f" syncSchalter : {sync_schalter}")
|
||||
ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}")
|
||||
ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}")
|
||||
|
||||
# Advoware sync requires an aktennummer (Akten without Advoware won't have one)
|
||||
advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active')
|
||||
xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active')
|
||||
|
||||
ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}")
|
||||
ctx.logger.info(f" xAI sync : {'✅ ON' if xai_enabled else '⏭️ OFF'}")
|
||||
|
||||
if not advoware_enabled and not xai_enabled:
|
||||
ctx.logger.info("⏭️ Both syncs disabled – nothing to do")
|
||||
return
|
||||
|
||||
# ── ADVOWARE SYNC ──────────────────────────────────────────────────
|
||||
advoware_results = None
|
||||
if advoware_enabled:
|
||||
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx)
|
||||
|
||||
# ── xAI SYNC ──────────────────────────────────────────────────────
|
||||
if xai_enabled:
|
||||
await _run_xai_sync(akte, akte_id, espocrm, ctx)
|
||||
|
||||
# ── Final Status ───────────────────────────────────────────────────
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'}
|
||||
if advoware_enabled:
|
||||
final_update['syncStatus'] = 'synced'
|
||||
final_update['lastSync'] = now
|
||||
# 'import' = erster Sync → danach auf 'aktiv' setzen
|
||||
if aktivierungsstatus == 'import':
|
||||
final_update['aktivierungsstatus'] = 'aktiv'
|
||||
ctx.logger.info("🔄 aktivierungsstatus: import → aktiv")
|
||||
if xai_enabled:
|
||||
final_update['aiSyncStatus'] = 'synced'
|
||||
final_update['aiLastSync'] = now
|
||||
# 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen
|
||||
if ai_aktivierungsstatus == 'new':
|
||||
final_update['aiAktivierungsstatus'] = 'aktiv'
|
||||
ctx.logger.info("🔄 aiAktivierungsstatus: new → aktiv")
|
||||
|
||||
await espocrm.update_entity('CAkten', akte_id, final_update)
|
||||
# Clean up processing sets (both queues may have triggered this sync)
|
||||
if aktennummer:
|
||||
redis_client.srem("advoware:processing_aktennummern", aktennummer)
|
||||
redis_client.srem("akte:processing_entity_ids", akte_id)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ AKTE SYNC COMPLETE")
|
||||
if advoware_results:
|
||||
ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Sync failed: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
# Requeue for retry (into the appropriate queue(s))
|
||||
import time
|
||||
now_ts = time.time()
|
||||
if aktennummer:
|
||||
redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts})
|
||||
redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts})
|
||||
|
||||
try:
|
||||
await espocrm.update_entity('CAkten', akte_id, {
|
||||
'syncStatus': 'failed',
|
||||
'globalSyncStatus': 'failed',
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
raise
|
||||
|
||||
finally:
|
||||
if lock_acquired and redis_client:
|
||||
redis_client.delete(lock_key)
|
||||
ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Advoware 3-way merge
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _run_advoware_sync(
|
||||
akte: Dict[str, Any],
|
||||
aktennummer: str,
|
||||
akte_id: str,
|
||||
espocrm,
|
||||
ctx: FlowContext,
|
||||
) -> Dict[str, int]:
|
||||
from services.advoware_watcher_service import AdvowareWatcherService
|
||||
from services.advoware_history_service import AdvowareHistoryService
|
||||
from services.advoware_service import AdvowareService
|
||||
from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils
|
||||
from services.blake3_utils import compute_blake3
|
||||
import mimetypes
|
||||
|
||||
watcher = AdvowareWatcherService(ctx)
|
||||
history_service = AdvowareHistoryService(ctx)
|
||||
advoware_service = AdvowareService(ctx)
|
||||
sync_utils = AdvowareDocumentSyncUtils(ctx)
|
||||
|
||||
results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0}
|
||||
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("─" * 60)
|
||||
ctx.logger.info("📂 ADVOWARE SYNC")
|
||||
ctx.logger.info("─" * 60)
|
||||
|
||||
# ── Fetch from all 3 sources ───────────────────────────────────────
|
||||
espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
|
||||
espo_docs = espo_docs_result.get('list', [])
|
||||
|
||||
try:
|
||||
windows_files = await watcher.get_akte_files(aktennummer)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Windows watcher failed: {e}")
|
||||
windows_files = []
|
||||
|
||||
try:
|
||||
advo_history = await history_service.get_akte_history(aktennummer)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Advoware history failed: {e}")
|
||||
advo_history = []
|
||||
|
||||
ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}")
|
||||
ctx.logger.info(f" Windows files : {len(windows_files)}")
|
||||
ctx.logger.info(f" History entries: {len(advo_history)}")
|
||||
|
||||
# ── Cleanup Windows list (only files in History) ───────────────────
|
||||
windows_files = sync_utils.cleanup_file_list(windows_files, advo_history)
|
||||
|
||||
# ── Build indexes by HNR (stable identifier from Advoware) ────────
|
||||
espo_by_hnr = {}
|
||||
for doc in espo_docs:
|
||||
if doc.get('hnr'):
|
||||
espo_by_hnr[doc['hnr']] = doc
|
||||
|
||||
history_by_hnr = {}
|
||||
for entry in advo_history:
|
||||
if entry.get('hNr'):
|
||||
history_by_hnr[entry['hNr']] = entry
|
||||
|
||||
windows_by_path = {f.get('path', '').lower(): f for f in windows_files}
|
||||
|
||||
all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys())
|
||||
ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}")
|
||||
|
||||
# ── 3-way merge per HNR ───────────────────────────────────────────
|
||||
for hnr in all_hnrs:
|
||||
espo_doc = espo_by_hnr.get(hnr)
|
||||
history_entry = history_by_hnr.get(hnr)
|
||||
|
||||
windows_file = None
|
||||
if history_entry and history_entry.get('datei'):
|
||||
windows_file = windows_by_path.get(history_entry['datei'].lower())
|
||||
|
||||
if history_entry and history_entry.get('datei'):
|
||||
filename = history_entry['datei'].split('\\')[-1]
|
||||
elif espo_doc:
|
||||
filename = espo_doc.get('name', f'hnr_{hnr}')
|
||||
else:
|
||||
filename = f'hnr_{hnr}'
|
||||
|
||||
try:
|
||||
action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry)
|
||||
ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) – {action.reason}")
|
||||
|
||||
if action.action == 'SKIP':
|
||||
results['skipped'] += 1
|
||||
|
||||
elif action.action == 'CREATE':
|
||||
if not windows_file:
|
||||
ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}")
|
||||
results['errors'] += 1
|
||||
continue
|
||||
|
||||
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
|
||||
blake3_hash = compute_blake3(content)
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
mime_type = mime_type or 'application/octet-stream'
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
attachment = await espocrm.upload_attachment_for_file_field(
|
||||
file_content=content,
|
||||
filename=filename,
|
||||
related_type='CDokumente',
|
||||
field='dokument',
|
||||
mime_type=mime_type,
|
||||
)
|
||||
new_doc = await espocrm.create_entity('CDokumente', {
|
||||
'name': filename,
|
||||
'dokumentId': attachment.get('id'),
|
||||
'hnr': history_entry.get('hNr') if history_entry else None,
|
||||
'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben',
|
||||
'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '',
|
||||
'dateipfad': windows_file.get('path', ''),
|
||||
'blake3hash': blake3_hash,
|
||||
'syncedHash': blake3_hash,
|
||||
'usn': windows_file.get('usn', 0),
|
||||
'syncStatus': 'synced',
|
||||
'lastSyncTimestamp': now,
|
||||
'cAktenId': akte_id, # Direct FK to CAkten
|
||||
})
|
||||
doc_id = new_doc.get('id')
|
||||
|
||||
# Link to Akte
|
||||
await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id)
|
||||
results['created'] += 1
|
||||
|
||||
# Trigger preview
|
||||
try:
|
||||
await ctx.emit('document.generate_preview', {
|
||||
'entity_id': doc_id,
|
||||
'entity_type': 'CDokumente',
|
||||
})
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
|
||||
|
||||
elif action.action == 'UPDATE_ESPO':
|
||||
if not windows_file:
|
||||
ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}")
|
||||
results['errors'] += 1
|
||||
continue
|
||||
|
||||
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
|
||||
blake3_hash = compute_blake3(content)
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
mime_type = mime_type or 'application/octet-stream'
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
update_data: Dict[str, Any] = {
|
||||
'name': filename,
|
||||
'blake3hash': blake3_hash,
|
||||
'syncedHash': blake3_hash,
|
||||
'usn': windows_file.get('usn', 0),
|
||||
'dateipfad': windows_file.get('path', ''),
|
||||
'syncStatus': 'synced',
|
||||
'lastSyncTimestamp': now,
|
||||
}
|
||||
if history_entry:
|
||||
update_data['hnr'] = history_entry.get('hNr')
|
||||
update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100]
|
||||
update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255]
|
||||
|
||||
await espocrm.update_entity('CDokumente', espo_doc['id'], update_data)
|
||||
results['updated'] += 1
|
||||
|
||||
# Mark for re-sync to xAI (hash changed)
|
||||
if espo_doc.get('aiSyncStatus') == 'synced':
|
||||
await espocrm.update_entity('CDokumente', espo_doc['id'], {
|
||||
'aiSyncStatus': 'unclean',
|
||||
})
|
||||
|
||||
try:
|
||||
await ctx.emit('document.generate_preview', {
|
||||
'entity_id': espo_doc['id'],
|
||||
'entity_type': 'CDokumente',
|
||||
})
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}")
|
||||
|
||||
elif action.action == 'DELETE':
|
||||
if espo_doc:
|
||||
# Only delete if the HNR is genuinely absent from Advoware History
|
||||
# (not just absent from Windows – avoids deleting docs whose file
|
||||
# is temporarily unavailable on the Windows share)
|
||||
if hnr in history_by_hnr:
|
||||
ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows")
|
||||
results['skipped'] += 1
|
||||
else:
|
||||
await espocrm.delete_entity('CDokumente', espo_doc['id'])
|
||||
results['deleted'] += 1
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}")
|
||||
results['errors'] += 1
|
||||
|
||||
# ── Ablage check + Rubrum sync ─────────────────────────────────────
|
||||
try:
|
||||
akte_details = await advoware_service.get_akte(aktennummer)
|
||||
if akte_details:
|
||||
espo_update: Dict[str, Any] = {}
|
||||
|
||||
if akte_details.get('ablage') == 1:
|
||||
ctx.logger.info("📁 Akte marked as ablage → deactivating")
|
||||
espo_update['aktivierungsstatus'] = 'deaktiviert'
|
||||
|
||||
rubrum = akte_details.get('rubrum')
|
||||
if rubrum and rubrum != akte.get('rubrum'):
|
||||
espo_update['rubrum'] = rubrum
|
||||
ctx.logger.info(f"📝 Rubrum synced: {rubrum[:80]}")
|
||||
|
||||
if espo_update:
|
||||
await espocrm.update_entity('CAkten', akte_id, espo_update)
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f"⚠️ Ablage/Rubrum check failed: {e}")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# xAI sync
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _run_xai_sync(
|
||||
akte: Dict[str, Any],
|
||||
akte_id: str,
|
||||
espocrm,
|
||||
ctx: FlowContext,
|
||||
) -> None:
|
||||
from services.xai_service import XAIService
|
||||
from services.xai_upload_utils import XAIUploadUtils
|
||||
|
||||
xai = XAIService(ctx)
|
||||
upload_utils = XAIUploadUtils(ctx)
|
||||
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("─" * 60)
|
||||
ctx.logger.info("🤖 xAI SYNC")
|
||||
ctx.logger.info("─" * 60)
|
||||
|
||||
try:
|
||||
# ── Ensure collection exists ───────────────────────────────────
|
||||
collection_id = await upload_utils.ensure_collection(akte, xai, espocrm)
|
||||
if not collection_id:
|
||||
ctx.logger.error("❌ Could not obtain xAI collection – aborting xAI sync")
|
||||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||||
return
|
||||
|
||||
# ── Load all linked documents ──────────────────────────────────
|
||||
docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
|
||||
docs = docs_result.get('list', [])
|
||||
ctx.logger.info(f" Documents to check: {len(docs)}")
|
||||
|
||||
synced = 0
|
||||
skipped = 0
|
||||
failed = 0
|
||||
|
||||
for doc in docs:
|
||||
ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm)
|
||||
if ok:
|
||||
if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'):
|
||||
skipped += 1
|
||||
else:
|
||||
synced += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
ctx.logger.info(f" ✅ Synced : {synced}")
|
||||
ctx.logger.info(f" ⏭️ Skipped : {skipped}")
|
||||
ctx.logger.info(f" ❌ Failed : {failed}")
|
||||
|
||||
finally:
|
||||
await xai.close()
|
||||
Reference in New Issue
Block a user