feat: enhance document update webhook to trigger akte.sync and improve logging
This commit is contained in:
@@ -241,7 +241,10 @@ async def _run_advoware_sync(
|
||||
windows_by_path = {f.get('path', '').lower(): f for f in windows_files}
|
||||
|
||||
all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys())
|
||||
# Count unclean docs for logging
|
||||
unclean_docs = [d for d in espo_docs if d.get('syncStatus') not in ('synced', None, '')]
|
||||
ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}")
|
||||
ctx.logger.info(f" Unclean docs : {len(unclean_docs)}" + (f" {[d.get('name','?') for d in unclean_docs[:5]]}" if unclean_docs else ""))
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
# ── 3-way merge per HNR ───────────────────────────────────────────
|
||||
for hnr in all_hnrs:
|
||||
@@ -403,6 +406,12 @@ async def _run_advoware_sync(
|
||||
ctx.logger.error(f" ❌ Error for hnr {hnr} ({filename}): {e}")
|
||||
results['errors'] += 1
|
||||
|
||||
# Log post-sync unclean count to verify cleanup
|
||||
still_unclean = sum(1 for d in espo_docs if d.get('syncStatus') not in ('synced', None, '') and d.get('hnr') in all_hnrs)
|
||||
ctx.logger.info(f" → After sync: {results['updated']} updated, {results['skipped']} skipped, {results['errors']} errors")
|
||||
if unclean_docs:
|
||||
ctx.logger.info(f" → Unclean before: {len(unclean_docs)}, processed in this run: {min(len(unclean_docs), len(all_hnrs))}")
|
||||
|
||||
# ── Ablage check + Rubrum sync ─────────────────────────────────────
|
||||
try:
|
||||
akte_details = await advoware_service.get_akte(aktennummer)
|
||||
|
||||
@@ -49,12 +49,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
|
||||
ctx.logger.error("❌ Missing entity_id in event data")
|
||||
return
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"🖼️ GENERATE DOCUMENT PREVIEW")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"Entity Type: {entity_type}")
|
||||
ctx.logger.info(f"Document ID: {entity_id}")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.debug(f"🖼️ PREVIEW: {entity_type} {entity_id}")
|
||||
|
||||
# Initialize sync utils (EspoCRMAPI must be passed as first arg, not ctx)
|
||||
from services.espocrm import EspoCRMAPI
|
||||
@@ -62,71 +57,35 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
|
||||
sync_utils = DocumentSync(espocrm, context=ctx)
|
||||
|
||||
try:
|
||||
# Step 1: Get download info from EspoCRM
|
||||
ctx.logger.info("📥 Step 1: Getting download info from EspoCRM...")
|
||||
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
|
||||
|
||||
if not download_info:
|
||||
ctx.logger.warn("⚠️ No download info available - skipping preview generation")
|
||||
ctx.logger.debug(f" ⏭️ No attachment – skipping preview for {entity_id}")
|
||||
return
|
||||
|
||||
attachment_id = download_info['attachment_id']
|
||||
filename = download_info['filename']
|
||||
mime_type = download_info['mime_type']
|
||||
|
||||
ctx.logger.info(f" Filename: {filename}")
|
||||
ctx.logger.info(f" MIME Type: {mime_type}")
|
||||
ctx.logger.info(f" Attachment ID: {attachment_id}")
|
||||
|
||||
# Step 2: Download file from EspoCRM
|
||||
ctx.logger.info("📥 Step 2: Downloading file from EspoCRM...")
|
||||
file_content = await sync_utils.espocrm.download_attachment(attachment_id)
|
||||
ctx.logger.info(f" Downloaded: {len(file_content)} bytes")
|
||||
|
||||
# Step 3: Save to temporary file for preview generation
|
||||
ctx.logger.info("💾 Step 3: Saving to temporary file...")
|
||||
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix=os.path.splitext(filename)[1]) as tmp_file:
|
||||
tmp_file.write(file_content)
|
||||
tmp_path = tmp_file.name
|
||||
|
||||
try:
|
||||
# Step 4: Generate preview (600x800 WebP)
|
||||
ctx.logger.info(f"🖼️ Step 4: Generating preview (600x800 WebP)...")
|
||||
preview_data = await sync_utils.generate_thumbnail(
|
||||
tmp_path,
|
||||
mime_type,
|
||||
max_width=600,
|
||||
max_height=800
|
||||
)
|
||||
preview_data = await sync_utils.generate_thumbnail(tmp_path, mime_type, max_width=600, max_height=800)
|
||||
|
||||
if preview_data:
|
||||
ctx.logger.info(f"✅ Preview generated: {len(preview_data)} bytes WebP")
|
||||
|
||||
# Step 5: Upload preview to EspoCRM
|
||||
ctx.logger.info(f"📤 Step 5: Uploading preview to EspoCRM...")
|
||||
await sync_utils._upload_preview_to_espocrm(entity_id, preview_data, entity_type)
|
||||
ctx.logger.info(f"✅ Preview uploaded successfully")
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ PREVIEW GENERATION COMPLETE")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"🖼️ Preview OK: {filename} ({len(preview_data)} bytes)")
|
||||
else:
|
||||
ctx.logger.warn("⚠️ Preview generation returned no data")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("⚠️ PREVIEW GENERATION FAILED")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.warn(f"🖼️ Preview failed (no data): {filename}")
|
||||
|
||||
finally:
|
||||
# Cleanup temporary file
|
||||
if os.path.exists(tmp_path):
|
||||
os.remove(tmp_path)
|
||||
ctx.logger.debug(f"🗑️ Removed temporary file: {tmp_path}")
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Preview generation failed: {e}")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("❌ PREVIEW GENERATION ERROR")
|
||||
ctx.logger.info("=" * 80)
|
||||
import traceback
|
||||
ctx.logger.debug(traceback.format_exc())
|
||||
ctx.logger.warn(f"🖼️ Preview error for {entity_id}: {e}")
|
||||
# Don't raise - preview generation is optional
|
||||
|
||||
@@ -7,12 +7,12 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Update",
|
||||
"description": "Empfängt Update-Webhooks von EspoCRM für Documents und triggert Preview-Generierung",
|
||||
"description": "Empfängt Update-Webhooks von EspoCRM für Documents, triggert Preview-Generierung und Akte-Sync",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/crm/document/webhook/update")
|
||||
],
|
||||
"enqueues": ["document.generate_preview"],
|
||||
"enqueues": ["document.generate_preview", "akte.sync"],
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,32 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
|
||||
ctx.logger.info(f"{len(entity_ids)} document IDs found for update sync")
|
||||
|
||||
# Trigger preview generation for each updated document
|
||||
# Collect cAktenId per document – from payload first, fallback to EspoCRM API
|
||||
from services.espocrm import EspoCRMAPI
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
|
||||
# Build map: entity_id → cAktenId (from payload if available)
|
||||
payload_list = payload if isinstance(payload, list) else ([payload] if isinstance(payload, dict) else [])
|
||||
akte_id_by_doc: dict = {}
|
||||
for entity in payload_list:
|
||||
if isinstance(entity, dict) and 'id' in entity:
|
||||
if entity.get('cAktenId'):
|
||||
akte_id_by_doc[entity['id']] = entity['cAktenId']
|
||||
|
||||
# For documents where cAktenId was not in payload, fetch from EspoCRM
|
||||
missing = [eid for eid in entity_ids if eid not in akte_id_by_doc]
|
||||
for eid in missing:
|
||||
try:
|
||||
doc = await espocrm.get_entity('CDokumente', eid)
|
||||
if doc and doc.get('cAktenId'):
|
||||
akte_id_by_doc[eid] = doc['cAktenId']
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Could not fetch cAktenId for {eid}: {e}")
|
||||
|
||||
# Collect unique Akte IDs to trigger
|
||||
akte_ids = set(akte_id_by_doc.values())
|
||||
|
||||
# Trigger preview generation and akte.sync for each updated document
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'document.generate_preview',
|
||||
@@ -58,8 +83,15 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
}
|
||||
})
|
||||
|
||||
for akte_id in akte_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'akte.sync',
|
||||
'data': {'akte_id': akte_id, 'aktennummer': None},
|
||||
})
|
||||
ctx.logger.info(f" 🔄 akte.sync enqueued for CAkten {akte_id}")
|
||||
|
||||
ctx.logger.info("✅ Document Update Webhook processed: "
|
||||
f"{len(entity_ids)} preview(s) enqueued")
|
||||
f"{len(entity_ids)} preview(s), {len(akte_ids)} akte.sync(s) enqueued")
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
|
||||
Reference in New Issue
Block a user