feat: Implement Advoware Document Sync Handler

- Added advoware_document_sync_step.py to handle 3-way merge sync for documents.
- Introduced locking mechanism for per-Akte synchronization to allow parallel processing.
- Integrated data fetching from EspoCRM, Windows files, and Advoware history.
- Implemented 3-way merge logic for document synchronization and metadata updates.
- Triggered document preview generation for new/changed documents.

feat: Create Shared Steps Module

- Added shared/__init__.py for shared steps across multiple modules.
- Introduced generate_document_preview_step.py for generating document previews.
- Implemented logic to download documents, generate previews, and upload to EspoCRM.

feat: Add VMH Document xAI Sync Handler

- Created document_xai_sync_step.py to manage document synchronization with xAI collections.
- Handled create, update, and delete actions for documents in EspoCRM.
- Integrated logic for triggering preview generation and managing xAI collections.
- Implemented error handling and logging for synchronization processes.
This commit is contained in:
bsiggel
2026-03-26 01:00:49 +00:00
parent d78a4ee67e
commit 86ec4db9db
6 changed files with 279 additions and 106 deletions

View File

@@ -27,7 +27,7 @@ class SyncAction:
"""
action: Literal['CREATE', 'UPDATE_ESPO', 'UPLOAD_WINDOWS', 'DELETE', 'SKIP']
reason: str
source: Literal['Windows', 'EspoCRM', 'None']
source: Literal['Windows', 'EspoCRM', 'Both', 'None']
needs_upload: bool
needs_download: bool
@@ -149,15 +149,26 @@ class AdvowareDocumentSyncUtils:
needs_download=True
)
# Case 2: File only in EspoCRM → UPLOAD to Windows
# Case 2: File only in EspoCRM → DELETE (file was deleted from Windows/Advoware)
if espo_doc and not windows_file:
return SyncAction(
action='UPLOAD_WINDOWS',
reason='File exists in EspoCRM but not in Windows',
source='EspoCRM',
needs_upload=True,
needs_download=False
)
# Check if also not in History (means it was deleted in Advoware)
if not advo_history:
return SyncAction(
action='DELETE',
reason='File deleted from Windows and Advoware History',
source='Both',
needs_upload=False,
needs_download=False
)
else:
# Still in History but not in Windows - Upload not implemented
return SyncAction(
action='UPLOAD_WINDOWS',
reason='File exists in EspoCRM/History but not in Windows',
source='EspoCRM',
needs_upload=True,
needs_download=False
)
# Case 3: File in both → Compare hashes and USNs
if espo_doc and windows_file:

View File

@@ -3,6 +3,7 @@ Advoware Document Sync - Event Handler
Executes 3-way merge sync for one Akte.
PER-AKTE LOCK: Allows parallel syncs of different Akten.
Triggers preview generation for new/changed documents.
Flow:
1. Acquire per-Akte lock (key: advoware_document_sync:akte:{aktennr})
@@ -17,6 +18,9 @@ Flow:
PARALLEL EXECUTION: Multiple Akten can sync simultaneously.
LOCK SCOPE: Only prevents the same Akte from syncing twice at once.
Enqueues:
- document.generate_preview: Bei CREATE/UPDATE_ESPO
"""
from typing import Dict, Any
@@ -29,7 +33,7 @@ config = {
"description": "Execute 3-way merge sync for Akte",
"flows": ["advoware-document-sync"],
"triggers": [queue("advoware.document.sync")],
"enqueues": [],
"enqueues": ["document.generate_preview"],
}
@@ -154,45 +158,78 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
windows_files = sync_utils.cleanup_file_list(windows_files, advo_history)
ctx.logger.info(f"🧹 After cleanup: {len(windows_files)} Windows files with History")
# 5. Build file mapping for 3-way merge
# Create lookup dicts by full path (History uses full path, Windows also has full path)
espo_docs_by_name = {doc.get('name', '').lower(): doc for doc in espo_docs}
windows_files_by_path = {f.get('path', '').lower(): f for f in windows_files}
history_by_path = {}
# 5. Build file mapping for 3-way merge based on HNR (stable identifier)
# hnr (History Number) is the stable identifier in Advoware - files can change name/path but hnr stays same
# Index EspoCRM docs by hnr (stable identifier)
espo_docs_by_hnr = {}
espo_docs_by_path = {} # Fallback for docs without hnr
for doc in espo_docs:
hnr = doc.get('hnr')
if hnr:
espo_docs_by_hnr[hnr] = doc
dateipfad = doc.get('dateipfad', '')
if dateipfad:
espo_docs_by_path[dateipfad.lower()] = doc
# Index History by hnr
history_by_hnr = {}
history_by_path = {} # For path-based lookup
for entry in advo_history:
hnr = entry.get('hNr')
datei = entry.get('datei', '')
if hnr:
history_by_hnr[hnr] = entry
if datei:
history_by_path[datei.lower()] = entry
# Get all unique file paths (Windows files already filtered by cleanup)
all_paths = set(windows_files_by_path.keys())
# Index Windows files by path (they don't have hnr directly)
windows_files_by_path = {f.get('path', '').lower(): f for f in windows_files}
ctx.logger.info(f"📋 Total unique files: {len(all_paths)}")
# Get all unique hnrs to process
all_hnrs = set(espo_docs_by_hnr.keys()) | set(history_by_hnr.keys())
# 6. 3-Way merge per file
ctx.logger.info(f"📋 Total unique documents (by hnr): {len(all_hnrs)}")
ctx.logger.info(f" EspoCRM docs with hnr: {len(espo_docs_by_hnr)}")
ctx.logger.info(f" History entries: {len(history_by_hnr)}")
ctx.logger.info(f" Windows files: {len(windows_files_by_path)}")
# 6. 3-Way merge per hnr (stable identifier)
sync_results = {
'created': 0,
'uploaded': 0,
'updated': 0,
'deleted': 0,
'skipped': 0,
'errors': 0
}
for file_path in all_paths:
# Extract filename for display and EspoCRM lookup
filename = file_path.split('\\')[-1]
for hnr in all_hnrs:
# Get data for this hnr from all sources
espo_doc = espo_docs_by_hnr.get(hnr)
history_entry = history_by_hnr.get(hnr)
# Get Windows file through history path
windows_file = None
file_path = None
if history_entry:
file_path = history_entry.get('datei', '').lower()
windows_file = windows_files_by_path.get(file_path)
# Extract filename for display
if history_entry and history_entry.get('datei'):
filename = history_entry.get('datei').split('\\')[-1]
elif espo_doc:
filename = espo_doc.get('name', f'hnr_{hnr}')
else:
filename = f'hnr_{hnr}'
ctx.logger.info(f"\n{'='*80}")
ctx.logger.info(f"Processing: {filename}")
ctx.logger.info(f"Processing: {filename} (hnr: {hnr})")
ctx.logger.info(f"{'='*80}")
espo_doc = espo_docs_by_name.get(filename.lower())
windows_file = windows_files_by_path.get(file_path)
history_entry = history_by_path.get(file_path)
try:
# Perform 3-way merge
# Perform 3-way merge based on hnr
action = sync_utils.merge_three_way(espo_doc, windows_file, history_entry)
ctx.logger.info(f"📊 Merge decision:")
@@ -207,6 +244,11 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
elif action.action == 'CREATE':
# Download from Windows and create in EspoCRM
if not windows_file:
ctx.logger.error(f"❌ Cannot CREATE - no Windows file for hnr {hnr}")
sync_results['errors'] += 1
continue
ctx.logger.info(f"📥 Downloading {filename} from Windows...")
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
@@ -271,15 +313,36 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
)
sync_results['created'] += 1
# Trigger preview generation
try:
await ctx.emit('document.generate_preview', {
'entity_id': doc_id,
'entity_type': 'CDokumente'
})
ctx.logger.info(f"✅ Preview generation triggered for {doc_id}")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}")
elif action.action == 'UPDATE_ESPO':
# Download from Windows and update EspoCRM
if not windows_file:
ctx.logger.error(f"❌ Cannot UPDATE_ESPO - no Windows file for hnr {hnr}")
sync_results['errors'] += 1
continue
ctx.logger.info(f"📥 Downloading {filename} from Windows...")
content = await watcher.download_file(aktennummer, windows_file.get('relative_path', filename))
# Compute Blake3 hash
blake3_hash = compute_blake3(content)
# Determine MIME type
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = 'application/octet-stream'
# Extract full Windows path
full_path = windows_file.get('path', '')
@@ -288,10 +351,11 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
now_iso = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
update_data = {
'name': filename, # Update name if changed
'blake3hash': blake3_hash,
'syncedHash': blake3_hash,
'usn': windows_file.get('usn', 0),
'dateipfad': full_path,
'dateipfad': full_path, # Update path if changed
'syncStatus': 'synced',
'lastSyncTimestamp': now_iso
}
@@ -306,6 +370,16 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
ctx.logger.info(f"✅ Updated document: {espo_doc.get('id')}")
sync_results['updated'] += 1
# Trigger preview generation
try:
await ctx.emit('document.generate_preview', {
'entity_id': espo_doc.get('id'),
'entity_type': 'CDokumente'
})
ctx.logger.info(f"✅ Preview generation triggered for {espo_doc.get('id')}")
except Exception as e:
ctx.logger.warn(f"⚠️ Failed to trigger preview generation: {e}")
elif action.action == 'UPLOAD_WINDOWS':
# Upload to Windows from EspoCRM
@@ -316,6 +390,19 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
ctx.logger.warn(f"⚠️ Upload to Windows not yet implemented for {filename}")
sync_results['skipped'] += 1
elif action.action == 'DELETE':
# Delete from EspoCRM (file deleted in Windows/Advoware)
ctx.logger.info(f"🗑️ Deleting {filename} from EspoCRM...")
if espo_doc:
doc_id = espo_doc.get('id')
await espocrm.delete_entity('CDokumente', doc_id)
ctx.logger.info(f"✅ Deleted document: {doc_id}")
sync_results['deleted'] += 1
else:
ctx.logger.warn(f"⚠️ No EspoCRM document found for deletion")
sync_results['skipped'] += 1
except Exception as e:
ctx.logger.error(f"❌ Error processing {filename}: {e}")
sync_results['errors'] += 1
@@ -376,6 +463,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
ctx.logger.info(f"📊 Results:")
ctx.logger.info(f" - Created: {sync_results['created']}")
ctx.logger.info(f" - Updated: {sync_results['updated']}")
ctx.logger.info(f" - Deleted: {sync_results['deleted']}")
ctx.logger.info(f" - Uploaded: {sync_results['uploaded']}")
ctx.logger.info(f" - Skipped: {sync_results['skipped']}")
ctx.logger.info(f" - Errors: {sync_results['errors']}")

View File

@@ -149,9 +149,10 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
ctx.logger.info(f" ├─ Aktivierungsstatus RAW: '{aktivierungsstatus}' (type: {type(aktivierungsstatus).__name__})")
ctx.logger.info(f" └─ All akte fields: {list(akte.keys())[:10]}...") # Debug: Zeige Feldnamen
# Valid statuses: import, neu, aktiv (case-insensitive)
# EspoCRM liefert kleingeschriebene Werte!
valid_statuses = ['import', 'neu', 'aktiv']
# Valid statuses: Both German and English variants accepted
# German: import, neu, aktiv
# English: import, new, active
valid_statuses = ['import', 'neu', 'aktiv', 'new', 'active']
aktivierungsstatus_lower = str(aktivierungsstatus).lower().strip()
ctx.logger.info(f"🔍 Status validation:")

View File

@@ -0,0 +1 @@
# Shared steps used across multiple modules

View File

@@ -0,0 +1,130 @@
"""
Generate Document Preview Step
Universal step for generating document previews.
Can be triggered by any document sync flow.
Flow:
1. Load document from EspoCRM
2. Download file attachment
3. Generate preview (PDF, DOCX, Images → WebP)
4. Upload preview to EspoCRM
5. Update document metadata
Event: document.generate_preview
Input: entity_id, entity_type (default: 'CDokumente')
"""
from typing import Dict, Any
from motia import FlowContext, queue
import tempfile
import os
config = {
"name": "Generate Document Preview",
"description": "Generates preview image for documents",
"flows": ["document-preview"],
"triggers": [queue("document.generate_preview")],
"enqueues": [],
}
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
"""
Generate preview for a document.
Args:
event_data: {
'entity_id': str, # Required: Document ID
'entity_type': str, # Optional: 'CDokumente' (default) or 'Document'
}
"""
from services.document_sync_utils import DocumentSync
entity_id = event_data.get('entity_id')
entity_type = event_data.get('entity_type', 'CDokumente')
if not entity_id:
ctx.logger.error("❌ Missing entity_id in event data")
return
ctx.logger.info("=" * 80)
ctx.logger.info(f"🖼️ GENERATE DOCUMENT PREVIEW")
ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info("=" * 80)
# Initialize sync utils
sync_utils = DocumentSync(ctx)
try:
# Step 1: Get download info from EspoCRM
ctx.logger.info("📥 Step 1: Getting download info from EspoCRM...")
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
ctx.logger.warn("⚠️ No download info available - skipping preview generation")
return
attachment_id = download_info['attachment_id']
filename = download_info['filename']
mime_type = download_info['mime_type']
ctx.logger.info(f" Filename: {filename}")
ctx.logger.info(f" MIME Type: {mime_type}")
ctx.logger.info(f" Attachment ID: {attachment_id}")
# Step 2: Download file from EspoCRM
ctx.logger.info("📥 Step 2: Downloading file from EspoCRM...")
file_content = await sync_utils.espocrm.download_attachment(attachment_id)
ctx.logger.info(f" Downloaded: {len(file_content)} bytes")
# Step 3: Save to temporary file for preview generation
ctx.logger.info("💾 Step 3: Saving to temporary file...")
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix=os.path.splitext(filename)[1]) as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
try:
# Step 4: Generate preview (600x800 WebP)
ctx.logger.info(f"🖼️ Step 4: Generating preview (600x800 WebP)...")
preview_data = await sync_utils.generate_thumbnail(
tmp_path,
mime_type,
max_width=600,
max_height=800
)
if preview_data:
ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP")
# Step 5: Upload preview to EspoCRM
ctx.logger.info(f"📤 Step 5: Uploading preview to EspoCRM...")
await sync_utils._upload_preview_to_espocrm(entity_id, preview_data, entity_type)
ctx.logger.info(f"✅ Preview uploaded successfully")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ PREVIEW GENERATION COMPLETE")
ctx.logger.info("=" * 80)
else:
ctx.logger.warn("⚠️ Preview generation returned no data")
ctx.logger.info("=" * 80)
ctx.logger.info("⚠️ PREVIEW GENERATION FAILED")
ctx.logger.info("=" * 80)
finally:
# Cleanup temporary file
if os.path.exists(tmp_path):
os.remove(tmp_path)
ctx.logger.debug(f"🗑️ Removed temporary file: {tmp_path}")
except Exception as e:
ctx.logger.error(f"❌ Preview generation failed: {e}")
ctx.logger.info("=" * 80)
ctx.logger.info("❌ PREVIEW GENERATION ERROR")
ctx.logger.info("=" * 80)
import traceback
ctx.logger.debug(traceback.format_exc())
# Don't raise - preview generation is optional

View File

@@ -1,12 +1,16 @@
"""
VMH Document Sync Handler
VMH Document xAI Sync Handler
Zentraler Sync-Handler für Documents mit xAI Collections
Zentraler Sync-Handler für Documents mit xAI Collections.
Triggers preview generation for new/changed files.
Verarbeitet:
- vmh.document.create: Neu in EspoCRM Prüfe ob xAI-Sync nötig
- vmh.document.update: Geändert in EspoCRM Prüfe ob xAI-Sync/Update nötig
- vmh.document.delete: Gelöscht in EspoCRM Remove from xAI Collections
Enqueues:
- document.generate_preview: Bei new/changed Status
"""
from typing import Dict, Any
@@ -19,7 +23,7 @@ import hashlib
import json
config = {
"name": "VMH Document Sync Handler",
"name": "VMH Document xAI Sync Handler",
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
"flows": ["vmh-documents"],
"triggers": [
@@ -27,7 +31,7 @@ config = {
queue("vmh.document.update"),
queue("vmh.document.delete")
],
"enqueues": []
"enqueues": ["document.generate_preview"]
}
@@ -197,83 +201,21 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN")
ctx.logger.info("🖼️ TRIGGER PREVIEW-GENERIERUNG")
ctx.logger.info(f" Datei-Status: {datei_status}")
ctx.logger.info("=" * 80)
try:
# 1. Hole Download-Informationen
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info:
ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview")
else:
ctx.logger.info(f"📥 Datei-Info:")
ctx.logger.info(f" Filename: {download_info['filename']}")
ctx.logger.info(f" MIME-Type: {download_info['mime_type']}")
ctx.logger.info(f" Size: {download_info['size']} bytes")
# 2. Download File von EspoCRM
ctx.logger.info(f"📥 Downloading file...")
espocrm = sync_utils.espocrm
file_content = await espocrm.download_attachment(download_info['attachment_id'])
ctx.logger.info(f"✅ Downloaded {len(file_content)} bytes")
# 3. Speichere temporär für Preview-Generierung
import tempfile
import os
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{download_info['filename']}") as tmp_file:
tmp_file.write(file_content)
tmp_path = tmp_file.name
try:
# 4. Generiere Preview
ctx.logger.info(f"🖼️ Generating preview (600x800 WebP)...")
preview_data = await sync_utils.generate_thumbnail(
tmp_path,
download_info['mime_type'],
max_width=600,
max_height=800
)
if preview_data:
ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP")
# 5. Upload Preview zu EspoCRM und reset file status
ctx.logger.info(f"📤 Uploading preview to EspoCRM...")
await sync_utils.update_sync_metadata(
entity_id,
preview_data=preview_data,
reset_file_status=True, # Reset status nach Preview-Generierung
entity_type=entity_type
)
ctx.logger.info(f"✅ Preview uploaded successfully")
else:
ctx.logger.warn("⚠️ Preview-Generierung lieferte keine Daten")
# Auch bei fehlgeschlagener Preview-Generierung Status zurücksetzen
await sync_utils.update_sync_metadata(
entity_id,
reset_file_status=True,
entity_type=entity_type
)
finally:
# Cleanup temp file
try:
os.remove(tmp_path)
except:
pass
# Enqueue preview generation event
await ctx.emit('document.generate_preview', {
'entity_id': entity_id,
'entity_type': entity_type
})
ctx.logger.info(f"✅ Preview generation event emitted for {entity_id}")
except Exception as e:
ctx.logger.error(f"❌ Fehler bei Preview-Generierung: {e}")
import traceback
ctx.logger.error(traceback.format_exc())
ctx.logger.error(f"❌ Fehler beim Triggern der Preview-Generierung: {e}")
# Continue - Preview ist optional
ctx.logger.info("")
ctx.logger.info("=" * 80)
ctx.logger.info("✅ PREVIEW-VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
# ═══════════════════════════════════════════════════════════════