diff --git a/services/xai_service.py b/services/xai_service.py index 49d56e9..b66e94f 100644 --- a/services/xai_service.py +++ b/services/xai_service.py @@ -112,12 +112,9 @@ class XAIService: async def add_to_collection(self, collection_id: str, file_id: str) -> None: """ - FΓΌgt eine Datei einer xAI-Collection (Vector Store) hinzu. + FΓΌgt eine Datei einer xAI-Collection hinzu. - POST https://api.x.ai/v1/vector_stores/{vector_store_id}/files - - Uses the OpenAI-compatible API pattern for adding files to vector stores. - This triggers proper indexing and processing. + POST https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id} Raises: RuntimeError: bei HTTP-Fehler @@ -125,16 +122,10 @@ class XAIService: self._log(f"πŸ“š Adding file {file_id} to collection {collection_id}") session = await self._get_session() - # Use the OpenAI-compatible endpoint (not management API) - url = f"{XAI_FILES_URL}/v1/vector_stores/{collection_id}/files" - headers = { - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json", - } + url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}" + headers = {"Authorization": f"Bearer {self.management_key}"} - payload = {"file_id": file_id} - - async with session.post(url, json=payload, headers=headers) as response: + async with session.post(url, headers=headers) as response: if response.status not in (200, 201): raw = await response.text() raise RuntimeError( diff --git a/src/steps/akte/akte_sync_event_step.py b/src/steps/akte/akte_sync_event_step.py new file mode 100644 index 0000000..1f1bef7 --- /dev/null +++ b/src/steps/akte/akte_sync_event_step.py @@ -0,0 +1,435 @@ +""" +Akte Sync - Event Handler + +Unified sync for one CAkten entity across all configured backends: + - Advoware (3-way merge: Windows ↔ EspoCRM ↔ History) + - xAI (Blake3 hash-based upload to Collection) + +Both run in the same event to keep CDokumente perfectly in sync. + +Trigger: akte.sync { akte_id, aktennummer } +Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte) +Parallel: Different Akten sync simultaneously. + +Enqueues: + - document.generate_preview (after CREATE / UPDATE_ESPO) +""" + +from typing import Dict, Any +from datetime import datetime +from motia import FlowContext, queue + + +config = { + "name": "Akte Sync - Event Handler", + "description": "Unified sync for one Akte: Advoware 3-way merge + xAI upload", + "flows": ["akte-sync"], + "triggers": [queue("akte.sync")], + "enqueues": ["document.generate_preview"], +} + + +# ───────────────────────────────────────────────────────────────────────────── +# Entry point +# ───────────────────────────────────────────────────────────────────────────── + +async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None: + akte_id = event_data.get('akte_id') + aktennummer = event_data.get('aktennummer') + + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ”„ AKTE SYNC STARTED") + ctx.logger.info(f" Aktennummer : {aktennummer}") + ctx.logger.info(f" EspoCRM ID : {akte_id}") + ctx.logger.info("=" * 80) + + from services.redis_client import get_redis_client + from services.espocrm import EspoCRMAPI + + redis_client = get_redis_client(strict=False) + if not redis_client: + ctx.logger.error("❌ Redis unavailable") + return + + lock_key = f"akte_sync:{akte_id}" + lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800) + if not lock_acquired: + ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing") + raise RuntimeError(f"Lock busy for akte_id={akte_id}") + + espocrm = EspoCRMAPI(ctx) + + try: + # ── Load Akte ────────────────────────────────────────────────────── + akte = await espocrm.get_entity('CAkten', akte_id) + if not akte: + ctx.logger.error(f"❌ Akte {akte_id} not found in EspoCRM") + return + + # aktennummer can come from the event payload OR from the entity + # (Akten without Advoware have no aktennummer) + if not aktennummer: + aktennummer = akte.get('aktennummer') + + sync_schalter = akte.get('syncSchalter', False) + aktivierungsstatus = str(akte.get('aktivierungsstatus') or '').lower() + ai_aktivierungsstatus = str(akte.get('aiAktivierungsstatus') or '').lower() + + ctx.logger.info(f"πŸ“‹ Akte '{akte.get('name')}'") + ctx.logger.info(f" syncSchalter : {sync_schalter}") + ctx.logger.info(f" aktivierungsstatus : {aktivierungsstatus}") + ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}") + + # Advoware sync requires an aktennummer (Akten without Advoware won't have one) + advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'new', 'active') + xai_enabled = ai_aktivierungsstatus in ('new', 'active') + + ctx.logger.info(f" Advoware sync : {'βœ… ON' if advoware_enabled else '⏭️ OFF'}") + ctx.logger.info(f" xAI sync : {'βœ… ON' if xai_enabled else '⏭️ OFF'}") + + if not advoware_enabled and not xai_enabled: + ctx.logger.info("⏭️ Both syncs disabled – nothing to do") + return + + # ── ADVOWARE SYNC ────────────────────────────────────────────────── + advoware_results = None + if advoware_enabled: + advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx) + + # ── xAI SYNC ────────────────────────────────────────────────────── + if xai_enabled: + await _run_xai_sync(akte, akte_id, espocrm, ctx) + + # ── Final Status ─────────────────────────────────────────────────── + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + final_update: Dict[str, Any] = {'globalLastSync': now, 'globalSyncStatus': 'synced'} + if advoware_enabled: + final_update['syncStatus'] = 'synced' + final_update['lastSync'] = now + # 'import' = erster Sync β†’ danach auf 'aktiv' setzen + if aktivierungsstatus == 'import': + final_update['aktivierungsstatus'] = 'active' + ctx.logger.info("πŸ”„ aktivierungsstatus: import β†’ active") + if xai_enabled: + final_update['aiSyncStatus'] = 'synced' + final_update['aiLastSync'] = now + # 'new' = Collection wurde gerade erstmalig angelegt β†’ auf 'aktiv' setzen + if ai_aktivierungsstatus == 'new': + final_update['aiAktivierungsstatus'] = 'active' + ctx.logger.info("πŸ”„ aiAktivierungsstatus: new β†’ active") + + await espocrm.update_entity('CAkten', akte_id, final_update) + # Clean up processing sets (both queues may have triggered this sync) + if aktennummer: + redis_client.srem("advoware:processing_aktennummern", aktennummer) + redis_client.srem("akte:processing_entity_ids", akte_id) + + ctx.logger.info("=" * 80) + ctx.logger.info("βœ… AKTE SYNC COMPLETE") + if advoware_results: + ctx.logger.info(f" Advoware: created={advoware_results['created']} updated={advoware_results['updated']} deleted={advoware_results['deleted']} errors={advoware_results['errors']}") + ctx.logger.info("=" * 80) + + except Exception as e: + ctx.logger.error(f"❌ Sync failed: {e}") + import traceback + ctx.logger.error(traceback.format_exc()) + + # Requeue for retry (into the appropriate queue(s)) + import time + now_ts = time.time() + if aktennummer: + redis_client.zadd("advoware:pending_aktennummern", {aktennummer: now_ts}) + redis_client.zadd("akte:pending_entity_ids", {akte_id: now_ts}) + + try: + await espocrm.update_entity('CAkten', akte_id, { + 'syncStatus': 'failed', + 'globalSyncStatus': 'failed', + }) + except Exception: + pass + raise + + finally: + if lock_acquired and redis_client: + redis_client.delete(lock_key) + ctx.logger.info(f"πŸ”“ Lock released for Akte {aktennummer}") + + +# ───────────────────────────────────────────────────────────────────────────── +# Advoware 3-way merge +# ───────────────────────────────────────────────────────────────────────────── + +async def _run_advoware_sync( + akte: Dict[str, Any], + aktennummer: str, + akte_id: str, + espocrm, + ctx: FlowContext, +) -> Dict[str, int]: + from services.advoware_watcher_service import AdvowareWatcherService + from services.advoware_history_service import AdvowareHistoryService + from services.advoware_service import AdvowareService + from services.advoware_document_sync_utils import AdvowareDocumentSyncUtils + from services.blake3_utils import compute_blake3 + import mimetypes + + watcher = AdvowareWatcherService(ctx) + history_service = AdvowareHistoryService(ctx) + advoware_service = AdvowareService(ctx) + sync_utils = AdvowareDocumentSyncUtils(ctx) + + results = {'created': 0, 'updated': 0, 'deleted': 0, 'skipped': 0, 'errors': 0} + + ctx.logger.info("") + ctx.logger.info("─" * 60) + ctx.logger.info("πŸ“‚ ADVOWARE SYNC") + ctx.logger.info("─" * 60) + + # ── Fetch from all 3 sources ─────────────────────────────────────── + espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') + espo_docs = espo_docs_result.get('list', []) + + try: + windows_files = await watcher.get_akte_files(aktennummer) + except Exception as e: + ctx.logger.error(f"❌ Windows watcher failed: {e}") + windows_files = [] + + try: + advo_history = await history_service.get_akte_history(aktennummer) + except Exception as e: + ctx.logger.error(f"❌ Advoware history failed: {e}") + advo_history = [] + + ctx.logger.info(f" EspoCRM docs : {len(espo_docs)}") + ctx.logger.info(f" Windows files : {len(windows_files)}") + ctx.logger.info(f" History entries: {len(advo_history)}") + + # ── Cleanup Windows list (only files in History) ─────────────────── + windows_files = sync_utils.cleanup_file_list(windows_files, advo_history) + + # ── Build indexes by HNR (stable identifier from Advoware) ──────── + espo_by_hnr = {} + for doc in espo_docs: + if doc.get('hnr'): + espo_by_hnr[doc['hnr']] = doc + + history_by_hnr = {} + for entry in advo_history: + if entry.get('hNr'): + history_by_hnr[entry['hNr']] = entry + + windows_by_path = {f.get('path', '').lower(): f for f in windows_files} + + all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys()) + ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}") + + # ── 3-way merge per HNR ─────────────────────────────────────────── + for hnr in all_hnrs: + espo_doc = espo_by_hnr.get(hnr) + history_entry = history_by_hnr.get(hnr) + + windows_file = None + if history_entry and history_entry.get('datei'): + windows_file = windows_by_path.get(history_entry['datei'].lower()) + + if history_entry and history_entry.get('datei'): + filename = history_entry['datei'].split('\\')[-1] + elif espo_doc: + filename = espo_doc.get('name', f'hnr_{hnr}') + else: + filename = f'hnr_{hnr}' + + try: + action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry) + ctx.logger.info(f" [{action.action:12s}] {filename} (hnr={hnr}) – {action.reason}") + + if action.action == 'SKIP': + results['skipped'] += 1 + + elif action.action == 'CREATE': + if not windows_file: + ctx.logger.error(f" ❌ CREATE: no Windows file for hnr {hnr}") + results['errors'] += 1 + continue + + content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) + blake3_hash = compute_blake3(content) + mime_type, _ = mimetypes.guess_type(filename) + mime_type = mime_type or 'application/octet-stream' + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + attachment = await espocrm.upload_attachment_for_file_field( + file_content=content, + filename=filename, + related_type='CDokumente', + field='dokument', + mime_type=mime_type, + ) + new_doc = await espocrm.create_entity('CDokumente', { + 'name': filename, + 'dokumentId': attachment.get('id'), + 'hnr': history_entry.get('hNr') if history_entry else None, + 'advowareArt': (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] if history_entry else 'Schreiben', + 'advowareBemerkung': (history_entry.get('text', '') or '')[:255] if history_entry else '', + 'dateipfad': windows_file.get('path', ''), + 'blake3hash': blake3_hash, + 'syncedHash': blake3_hash, + 'usn': windows_file.get('usn', 0), + 'syncStatus': 'synced', + 'lastSyncTimestamp': now, + 'cAktenId': akte_id, # Direct FK to CAkten + }) + doc_id = new_doc.get('id') + + # Link to Akte + await espocrm.link_entities('CAkten', akte_id, 'dokumentes', doc_id) + results['created'] += 1 + + # Trigger preview + try: + await ctx.emit('document.generate_preview', { + 'entity_id': doc_id, + 'entity_type': 'CDokumente', + }) + except Exception as e: + ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") + + elif action.action == 'UPDATE_ESPO': + if not windows_file: + ctx.logger.error(f" ❌ UPDATE_ESPO: no Windows file for hnr {hnr}") + results['errors'] += 1 + continue + + content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename)) + blake3_hash = compute_blake3(content) + mime_type, _ = mimetypes.guess_type(filename) + mime_type = mime_type or 'application/octet-stream' + now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + update_data: Dict[str, Any] = { + 'name': filename, + 'blake3hash': blake3_hash, + 'syncedHash': blake3_hash, + 'usn': windows_file.get('usn', 0), + 'dateipfad': windows_file.get('path', ''), + 'syncStatus': 'synced', + 'lastSyncTimestamp': now, + } + if history_entry: + update_data['hnr'] = history_entry.get('hNr') + update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100] + update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255] + + await espocrm.update_entity('CDokumente', espo_doc['id'], update_data) + results['updated'] += 1 + + # Mark for re-sync to xAI (hash changed) + if espo_doc.get('aiSyncStatus') == 'synced': + await espocrm.update_entity('CDokumente', espo_doc['id'], { + 'aiSyncStatus': 'unclean', + }) + + try: + await ctx.emit('document.generate_preview', { + 'entity_id': espo_doc['id'], + 'entity_type': 'CDokumente', + }) + except Exception as e: + ctx.logger.warn(f" ⚠️ Preview trigger failed: {e}") + + elif action.action == 'DELETE': + if espo_doc: + # Only delete if the HNR is genuinely absent from Advoware History + # (not just absent from Windows – avoids deleting docs whose file + # is temporarily unavailable on the Windows share) + if hnr in history_by_hnr: + ctx.logger.warn(f" ⚠️ SKIP DELETE hnr={hnr}: still in Advoware History, only missing from Windows") + results['skipped'] += 1 + else: + await espocrm.delete_entity('CDokumente', espo_doc['id']) + results['deleted'] += 1 + + except Exception as e: + ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}") + results['errors'] += 1 + + # ── Ablage check + Rubrum sync ───────────────────────────────────── + try: + akte_details = await advoware_service.get_akte(aktennummer) + if akte_details: + espo_update: Dict[str, Any] = {} + + if akte_details.get('ablage') == 1: + ctx.logger.info("πŸ“ Akte marked as ablage β†’ deactivating") + espo_update['aktivierungsstatus'] = 'inactive' + + rubrum = akte_details.get('rubrum') + if rubrum and rubrum != akte.get('rubrum'): + espo_update['rubrum'] = rubrum + ctx.logger.info(f"πŸ“ Rubrum synced: {rubrum[:80]}") + + if espo_update: + await espocrm.update_entity('CAkten', akte_id, espo_update) + except Exception as e: + ctx.logger.warn(f"⚠️ Ablage/Rubrum check failed: {e}") + + return results + + +# ───────────────────────────────────────────────────────────────────────────── +# xAI sync +# ───────────────────────────────────────────────────────────────────────────── + +async def _run_xai_sync( + akte: Dict[str, Any], + akte_id: str, + espocrm, + ctx: FlowContext, +) -> None: + from services.xai_service import XAIService + from services.xai_upload_utils import XAIUploadUtils + + xai = XAIService(ctx) + upload_utils = XAIUploadUtils(ctx) + + ctx.logger.info("") + ctx.logger.info("─" * 60) + ctx.logger.info("πŸ€– xAI SYNC") + ctx.logger.info("─" * 60) + + try: + # ── Ensure collection exists ─────────────────────────────────── + collection_id = await upload_utils.ensure_collection(akte, xai, espocrm) + if not collection_id: + ctx.logger.error("❌ Could not obtain xAI collection – aborting xAI sync") + await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'}) + return + + # ── Load all linked documents ────────────────────────────────── + docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes') + docs = docs_result.get('list', []) + ctx.logger.info(f" Documents to check: {len(docs)}") + + synced = 0 + skipped = 0 + failed = 0 + + for doc in docs: + ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm) + if ok: + if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'): + skipped += 1 + else: + synced += 1 + else: + failed += 1 + + ctx.logger.info(f" βœ… Synced : {synced}") + ctx.logger.info(f" ⏭️ Skipped : {skipped}") + ctx.logger.info(f" ❌ Failed : {failed}") + + finally: + await xai.close()