Compare commits
2 Commits
0e521f22f8
...
bcb6454b2a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bcb6454b2a | ||
|
|
c45bfb7233 |
@@ -1,320 +0,0 @@
|
||||
# Vollständige Migrations-Analyse
|
||||
## Motia v0.17 → Motia III v1.0-RC
|
||||
|
||||
**Datum:** 1. März 2026
|
||||
**Status:** 🎉 **100% KOMPLETT - ALLE PHASEN ABGESCHLOSSEN!** 🎉
|
||||
|
||||
---
|
||||
|
||||
## ✅ MIGRIERT - Production-Ready
|
||||
|
||||
### 1. Steps (21 von 21 Steps - 100% Complete!)
|
||||
|
||||
#### Phase 1: Advoware Proxy (4 Steps)
|
||||
- ✅ [`advoware_api_proxy_get_step.py`](steps/advoware_proxy/advoware_api_proxy_get_step.py) - GET Proxy
|
||||
- ✅ [`advoware_api_proxy_post_step.py`](steps/advoware_proxy/advoware_api_proxy_post_step.py) - POST Proxy
|
||||
- ✅ [`advoware_api_proxy_put_step.py`](steps/advoware_proxy/advoware_api_proxy_put_step.py) - PUT Proxy
|
||||
- ✅ [`advoware_api_proxy_delete_step.py`](steps/advoware_proxy/advoware_api_proxy_delete_step.py) - DELETE Proxy
|
||||
|
||||
#### Phase 2: VMH Webhooks (6 Steps)
|
||||
- ✅ [`beteiligte_create_api_step.py`](steps/vmh/webhook/beteiligte_create_api_step.py) - POST /vmh/webhook/beteiligte/create
|
||||
- ✅ [`beteiligte_update_api_step.py`](steps/vmh/webhook/beteiligte_update_api_step.py) - POST /vmh/webhook/beteiligte/update
|
||||
- ✅ [`beteiligte_delete_api_step.py`](steps/vmh/webhook/beteiligte_delete_api_step.py) - POST /vmh/webhook/beteiligte/delete
|
||||
- ✅ [`bankverbindungen_create_api_step.py`](steps/vmh/webhook/bankverbindungen_create_api_step.py) - POST /vmh/webhook/bankverbindungen/create
|
||||
- ✅ [`bankverbindungen_update_api_step.py`](steps/vmh/webhook/bankverbindungen_update_api_step.py) - POST /vmh/webhook/bankverbindungen/update
|
||||
- ✅ [`bankverbindungen_delete_api_step.py`](steps/vmh/webhook/bankverbindungen_delete_api_step.py) - POST /vmh/webhook/bankverbindungen/delete
|
||||
|
||||
#### Phase 3: VMH Sync Handlers (3 Steps)
|
||||
- ✅ [`beteiligte_sync_event_step.py`](steps/vmh/beteiligte_sync_event_step.py) - Subscriber für Queue-Events (mit Kommunikation-Integration!)
|
||||
- ✅ [`bankverbindungen_sync_event_step.py`](steps/vmh/bankverbindungen_sync_event_step.py) - Subscriber für Queue-Events
|
||||
- ✅ [`beteiligte_sync_cron_step.py`](steps/vmh/beteiligte_sync_cron_step.py) - Cron-Job alle 15 Min.
|
||||
|
||||
---
|
||||
|
||||
### 2. Services (11 Module, 100% komplett)
|
||||
|
||||
#### Core APIs
|
||||
- ✅ [`advoware.py`](services/advoware.py) (310 Zeilen) - Advoware API Client mit Token-Auth
|
||||
- ✅ [`advoware_service.py`](services/advoware_service.py) (179 Zeilen) - High-Level Advoware Service
|
||||
- ✅ [`espocrm.py`](services/espocrm.py) (293 Zeilen) - EspoCRM API Client
|
||||
|
||||
#### Mapper & Sync Utils
|
||||
- ✅ [`espocrm_mapper.py`](services/espocrm_mapper.py) (663 Zeilen) - Beteiligte Mapping
|
||||
- ✅ [`bankverbindungen_mapper.py`](services/bankverbindungen_mapper.py) (141 Zeilen) - Bankverbindungen Mapping
|
||||
- ✅ [`beteiligte_sync_utils.py`](services/beteiligte_sync_utils.py) (663 Zeilen) - Distributed Locking, Retry Logic
|
||||
- ✅ [`notification_utils.py`](services/notification_utils.py) (200 Zeilen) - In-App Notifications
|
||||
|
||||
#### Phase 4: Kommunikation Sync
|
||||
- ✅ [`kommunikation_mapper.py`](services/kommunikation_mapper.py) (334 Zeilen) - Email/Phone Mapping mit Base64 Marker
|
||||
- ✅ [`kommunikation_sync_utils.py`](services/kommunikation_sync_utils.py) (999 Zeilen) - Bidirektionaler Sync mit 3-Way Diffing
|
||||
|
||||
#### Phase 5: Adressen Sync (2 Module - Phase 5)
|
||||
- ✅ [`adressen_mapper.py`](services/adressen_mapper.py) (267 Zeilen) - Adressen Mapping
|
||||
- ✅ [`adressen_sync.py`](services/adressen_sync.py) (697 Zeilen) - Adressen Sync mit READ-ONLY Detection
|
||||
|
||||
#### Phase 6: Google Calendar Sync (4 Steps + Utils)
|
||||
- ✅ [`calendar_sync_cron_step.py`](steps/advoware_cal_sync/calendar_sync_cron_step.py) - Cron-Trigger alle 15 Min.
|
||||
- ✅ [`calendar_sync_all_step.py`](steps/advoware_cal_sync/calendar_sync_all_step.py) - Bulk-Sync mit Redis-Priorisierung
|
||||
- ✅ [`calendar_sync_event_step.py`](steps/advoware_cal_sync/calendar_sync_event_step.py) - **1053 Zeilen!** Main Sync Handler
|
||||
- ✅ [`calendar_sync_a9 Topics - 100% Complete!)
|
||||
|
||||
#### VMH Beteiligte
|
||||
- ✅ `vmh.beteiligte.create` - Webhook → Sync Handler
|
||||
- ✅ `vmh.beteiligte.update` - Webhook → Sync Handler
|
||||
- ✅ `vmh.beteiligte.delete` - Webhook → Sync Handler
|
||||
- ✅ `vmh.beteiligte.sync_check` - Cron → Sync Handler
|
||||
|
||||
#### VMH Bankverbindungen
|
||||
- ✅ `vmh.bankverbindungen.create` - Webhook → Sync Handler
|
||||
- ✅ `vmh.bankverbindungen.update` - Webhook → Sync Handler
|
||||
- ✅ `vmh.bankverbindungen.delete` - Webhook → Sync Handler
|
||||
|
||||
#### Calendar Sync
|
||||
- ✅ `calendar_sync_all` - Cron/API → All Step → Employee Events
|
||||
- ✅ `calendar_sync_employee` - All/API → Event Step (Main Sync Logic)
|
||||
|
||||
---
|
||||
|
||||
### 4. HTTP Endpoints (14 Endpoints - 100% Complete!
|
||||
- ✅ `vmh.bankverbindungen.create` - Webhook → Sync Handler
|
||||
- ✅ `vmh.bankverbindungen.update` - Webhook → Sync Handler
|
||||
- ✅ `vmh.bankverbindungen.delete` - Webhook → Sync Handler
|
||||
|
||||
---
|
||||
|
||||
### 4. HTTP Endpoints (13 Endpoints, 100% komplett)
|
||||
|
||||
#### Advoware Proxy (4 Endpoints)
|
||||
- ✅ `GET /advoware/proxy?path=...` - Advoware API Proxy
|
||||
- ✅ `POST /advoware/proxy?path=...` - Advoware API Proxy
|
||||
- ✅ `PUT /advoware/proxy?path=...` - Advoware API Proxy
|
||||
- ✅ `DELETE /advoware/proxy?path=...` - Advoware API Proxy
|
||||
|
||||
#### VMH Webhooks - Beteiligte (3 Endpoints)
|
||||
- ✅ `POST /vmh/webhook/beteiligte/create` - EspoCRM Webhook Handler
|
||||
- ✅ `POST /vmh/webhook/beteiligte/update` - EspoCRM Webhook Handler
|
||||
- ✅ `POST /vmh/webhook/beteiligte/delete` - EspoCRM Webhook Handler
|
||||
|
||||
#### VMH Webhooks - Bankverbindungen (3 Endpoints)
|
||||
- ✅ `Calendar Sync (1 Endpoint)
|
||||
- ✅ `POST /advoware/calendar/sync` - Manual Calendar Sync Trigger (kuerzel or "ALL")
|
||||
|
||||
#### POST /vmh/webhook/bankverbindungen/create` - EspoCRM Webhook Handler
|
||||
- ✅ `POST /vmh/webhook/bankverbindungen/update` - EspoCRM Webhook Handler
|
||||
- ✅ `POST /vmh/webhook/bankverbindungen/delete` - EspoCRM Webhook Handler
|
||||
|
||||
#### Example Ticketing (6 Endpoints - Demo)
|
||||
- ✅ `POST /tickets` - Create Ticket
|
||||
- ✅ `GET /tickets` - List Tickets
|
||||
- ✅ `POST /tickets/{id}/triage` - Triage
|
||||
- ✅ `POST /tickets/{id}/escalate` - Escalate
|
||||
- ✅ `POST /tickets/{id}/notify` - Notify Customer
|
||||
- ✅ Cron: SLA Monitor
|
||||
2 Jobs - 100% Complete!)
|
||||
|
||||
- ✅ **VMH Beteiligte Sync Cron** (alle 15 Min.)
|
||||
- Findet Entities mit Status: `pending_sync`, `dirty`, `failed`
|
||||
- Auto-Reset für `permanently_failed` nach 24h
|
||||
- Findet `clean` Entities > 24h nicht gesynct
|
||||
- Emittiert `vmh.beteiligte.sync_check` Events
|
||||
|
||||
- ✅ **Calendar Sync Cron** (alle 15 Min.)
|
||||
- Emittiert `calendar_sync_all` Events
|
||||
- Triggered Bulk-Sync für alle oder priorisierte Mitarbeiter
|
||||
- Redis-basierte Priorisierung (älteste zuerst)
|
||||
|
||||
---
|
||||
|
||||
### 6. Dependencies (pyproject.toml - 100% Complete!
|
||||
---
|
||||
|
||||
### 6. Dependencies (pyproject.toml aktualisiert)
|
||||
|
||||
```toml
|
||||
dependencies = [
|
||||
"asyncpg>=0.29.0", # ✅ NEU für Calendar Sync (PostgreSQL)
|
||||
"google-api-python-client>=2.100.0", # ✅ NEU für Calendar Sync
|
||||
"google-auth>=2.23.0", # ✅ NEU für Calendar Sync
|
||||
"backoff>=2.2.1", # ✅ NEU für Calendar Sync (Retry Logic)
|
||||
]
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ❌ NICHT MIGRIERT → ALLE MIGRIERT! 🎉
|
||||
|
||||
~~### Phase 6: Google Calendar Sync (4 Steps)~~
|
||||
|
||||
**Status:** ✅ **VOLLSTÄNDIG MIGRIERT!** (1. März 2026)
|
||||
|
||||
- ✅ `calendar_sync_cron_step.py` - Cron-Trigger (alle 15 Min.)
|
||||
- ✅ `calendar_sync_all_step.py` - Bulk-Sync Handler
|
||||
- ✅ `calendar_sync_event_step.py` - Queue-Event Handler (**1053 Zeilen!**)
|
||||
- ✅ `calendar_sync_api_step.py` - HTTP API für manuellen Trigger
|
||||
- ✅ `calendar_sync_utils.py` - Hilfs-Funktionen
|
||||
|
||||
**Dependencies (ALLE installiert):**
|
||||
- ✅ `google-api-python-client` - Google Calendar API
|
||||
- ✅ `google-auth` - Google OAuth2
|
||||
- ✅ `asyncpg` - PostgreSQL Connection
|
||||
- ✅ `backoff` - Retry/Backoff Logic
|
||||
|
||||
**Migration abgeschlossen in:** ~4 Stunden (statt geschätzt 3-5 Tage
|
||||
|
||||
**Dependencies (nicht benötigt):**
|
||||
- ❌ `google-api-python-client` - Google Calendar API
|
||||
- ❌ `google-auth` - Google OAuth2
|
||||
- ❌ PostgreSQL Connection - Für Termine-Datenbank
|
||||
|
||||
**Geschätzte Migration:** 3-5 Tage (komplex wegen Google API + PostgreSQL)
|
||||
**Priorität:** MEDIUM (funktioniert aktuell im old-motia)
|
||||
|
||||
---
|
||||
|
||||
### Root-Level Steps (Test/Specialized Logic)
|
||||
|
||||
**Status:** Bewusst NICHT migriert (nicht Teil der Core-Funktionalität)
|
||||
|
||||
- ❌ `/opt/motia-iii/old-motia/steps/crm-bbl-vmh-reset-nextcall_step.py` (96 Zeilen)
|
||||
- **Zweck:** CVmhErstgespraech Status-Check Cron-Job
|
||||
- **Grund:** Spezialisierte Business-Logik, nicht Teil der Core-Sync-Infrastruktur
|
||||
- **Status:** Kann bei Bedarf später migriert werden
|
||||
|
||||
- ❌ `/opt/motia-iii/old-motia/steps/event_step.py` (Test/Demo)
|
||||
- ❌ `/opt/motia-iii/old-motia/steps/hello_step.py` (Test/Demo)
|
||||
|
||||
---
|
||||
|
||||
## 📊 Migrations-Statistik
|
||||
|
||||
| Kategorie | Migriert | 21 | 0 | 21 | **100%** ✅ |
|
||||
| **Service Module** | 11 | 0 | 11 | **100%** ✅ |
|
||||
| **Queue Events** | 9 | 0 | 9 | **100%** ✅ |
|
||||
| **HTTP Endpoints** | 14 | 0 | 14 | **100%** ✅ |
|
||||
| **Cron Jobs** | 2 | 0 | 2 | **100%** ✅ |
|
||||
| **Code (Zeilen)** | ~9.000 | 0 | ~9.000 | **100%** ✅ |
|
||||
|
||||
---
|
||||
|
||||
## 🎯 Funktionalitäts-Matrix
|
||||
|
||||
| Feature | Old-Motia | Motia III | Status |
|
||||
|---------|-----------|-----------|--------|
|
||||
| **Advoware Proxy API** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **VMH Beteiligte Sync** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **VMH Bankverbindungen Sync** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Kommunikation Sync (Email/Phone)** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Adressen Sync** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **EspoCRM Webhooks** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Distributed Locking** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Retry Logic & Backoff** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Notifications** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Sync Validation** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Cron-basierter Auto-Retry** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Google Calendar Sync** | ✅ | ✅ | ✅ **KOMPLETT** |
|
||||
|
||||
---
|
||||
|
||||
## 🏆 Migration erfolgreich abgeschlossen!
|
||||
|
||||
**Alle 21 Production Steps, 11 Service Module, 9 Queue Events, 14 HTTP Endpoints und 2 Cron Jobs wurden erfolgreich migriert!**
|
||||
| **Cron-basierter Auto-Retry** | ✅ | ✅ | ✅ KOMPLETT |
|
||||
| **Google Calendar Sync** | ✅ | ❌ | ⏳ PHASE 6 |
|
||||
| **CVmhErstgespraech Logic** | ✅ | ❌ | ⏳ Optional |
|
||||
|
||||
---
|
||||
|
||||
## 🔄 Sync-Architektur Übersicht
|
||||
|
||||
```
|
||||
┌─────────────────┐
|
||||
│ EspoCRM API │
|
||||
└────────┬────────┘
|
||||
│ Webhooks
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ VMH Webhook Steps (6 Endpoints) │
|
||||
│ • Batch & Single Entity Support │
|
||||
│ • Deduplication │
|
||||
└────────┬────────────────────────────┘
|
||||
│ Emits Queue Events
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ Queue System (Redis/Builtin) │
|
||||
│ • vmh.beteiligte.* │
|
||||
│ • vmh.bankverbindungen.* │
|
||||
└────────┬────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ Sync Event Handlers (3 Steps) │
|
||||
│ • Distributed Locking (Redis) │
|
||||
│ • Retry Logic & Backoff │
|
||||
│ • Conflict Resolution │
|
||||
└────────┬────────────────────────────┘
|
||||
│
|
||||
├──► Stammdaten Sync
|
||||
│ (espocrm_mapper.py)
|
||||
│
|
||||
├──► Kommunikation Sync ✅ NEW!
|
||||
│ (kommunikation_sync_utils.py)
|
||||
│ • 3-Way Diffing
|
||||
│ • Bidirectional
|
||||
│ • Slot-Management
|
||||
│
|
||||
└──► Adressen Sync ✅ NEW!
|
||||
(adressen_sync.py)
|
||||
• CREATE/UPDATE/DELETE
|
||||
• READ-ONLY Detection
|
||||
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ Advoware API (advoware.py) │
|
||||
│ • Token-based Auth │
|
||||
│ • HMAC Signing │
|
||||
└─────────────────────────────────────┘
|
||||
|
||||
┌──────────────────┐
|
||||
│ Cron Job (15min)│
|
||||
└────────┬─────────┘
|
||||
│
|
||||
▼ Emits sync_check Events
|
||||
┌─────────────────────────┐
|
||||
│ Auto-Retry & Cleanup │
|
||||
│ • pending_sync │
|
||||
│ • dirty │
|
||||
│ • failed → retry │
|
||||
│ • permanently_failed │
|
||||
│ → auto-reset (24h) │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ FAZIT
|
||||
|
||||
**Die gesamte Core-Funktionalität (außer Google Calendar) wurde erfolgreich migriert!**
|
||||
|
||||
### Production-Ready Features:
|
||||
1. ✅ Vollständige Advoware ↔ EspoCRM Synchronisation
|
||||
2. ✅ Bidirektionale Kommunikationsdaten (Email/Phone)
|
||||
3. ✅ Bidirektionale Adressen
|
||||
4. ✅ Webhook-basierte Event-Verarbeitung
|
||||
5. ✅ Automatisches Retry-System
|
||||
6. ✅ Distributed Locking
|
||||
7. ✅ Konflikt-Erkennung & Resolution
|
||||
|
||||
### Code-Qualität:
|
||||
- ✅ Keine Compile-Errors
|
||||
- ✅ Motia III API korrekt verwendet
|
||||
- ✅ Alle Dependencies vorhanden
|
||||
- ✅ Type-Hints (Pydantic Models)
|
||||
- ✅ Error-Handling & Logging
|
||||
|
||||
### Deployment:
|
||||
- ✅ Alle Steps registriert
|
||||
- ✅ Queue-System konfiguriert
|
||||
- ✅ Cron-Jobs aktiv
|
||||
- ✅ Redis-Integration
|
||||
|
||||
**Das System ist bereit für Production! 🚀**
|
||||
@@ -1,276 +0,0 @@
|
||||
# Motia Migration Status
|
||||
|
||||
**🎉 MIGRATION 100% KOMPLETT**
|
||||
|
||||
> 📋 Detaillierte Analyse: [MIGRATION_COMPLETE_ANALYSIS.md](MIGRATION_COMPLETE_ANALYSIS.md)
|
||||
|
||||
## Quick Stats
|
||||
|
||||
- ✅ **21 von 21 Steps** migriert (100%)
|
||||
- ✅ **11 von 11 Service-Module** migriert (100%)
|
||||
- ✅ **~9.000 Zeilen Code** migriert (100%)
|
||||
- ✅ **14 HTTP Endpoints** aktiv
|
||||
- ✅ **9 Queue Events** konfiguriert
|
||||
- ✅ **2 Cron Jobs** (VMH: alle 15 Min., Calendar: alle 15 Min.)
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
Migrating from **old-motia v0.17** (Node.js + Python hybrid) to **Motia III v1.0-RC** (pure Python).
|
||||
|
||||
## Old System Analysis
|
||||
|
||||
### Location
|
||||
- Old system: `/opt/motia-iii/old-motia/`
|
||||
- Old project dir: `/opt/motia-iii/old-motia/bitbylaw/`
|
||||
|
||||
### Steps Found in Old System
|
||||
|
||||
#### Root Steps (`/opt/motia-iii/old-motia/steps/`)
|
||||
1. `crm-bbl-vmh-reset-nextcall_step.py`
|
||||
2. `event_step.py`
|
||||
3. `hello_step.py`
|
||||
|
||||
#### BitByLaw Steps (`/opt/motia-iii/old-motia/bitbylaw/steps/`)
|
||||
|
||||
**Advoware Calendar Sync** (`advoware_cal_sync/`):
|
||||
- `calendar_sync_all_step.py`
|
||||
- `calendar_sync_api_step.py`
|
||||
- `calendar_sync_cron_step.py`
|
||||
- `calendar_sync_event_step.py`
|
||||
- `audit_calendar_sync.py`
|
||||
- `calendar_sync_utils.py` (utility module)
|
||||
|
||||
**Advoware Proxy** (`advoware_proxy/`):
|
||||
- `advoware_api_proxy_get_step.py`
|
||||
- `advoware_api_proxy_post_step.py`
|
||||
- `advoware_api_proxy_put_step.py`
|
||||
- `advoware_api_proxy_delete_step.py`
|
||||
|
||||
**VMH Integration** (`vmh/`):
|
||||
- `beteiligte_sync_cron_step.py`
|
||||
- `beteiligte_sync_event_step.py`
|
||||
- `bankverbindungen_sync_event_step.py`
|
||||
- `webhook/bankverbindungen_create_api_step.py`
|
||||
- `webhook/bankverbindungen_update_api_step.py`
|
||||
- `webhook/bankverbindungen_delete_api_step.py`
|
||||
- `webhook/beteiligte_create_api_step.py`
|
||||
- `webhook/beteiligte_update_api_step.py`
|
||||
- `webhook/beteiligte_delete_api_step.py`
|
||||
|
||||
### Supporting Services/Modules
|
||||
|
||||
From `/opt/motia-iii/old-motia/bitbylaw/`:
|
||||
- `services/advoware.py` - Advoware API wrapper
|
||||
- `config.py` - Configuration module
|
||||
- Dependencies: PostgreSQL, Redis, Google Calendar API
|
||||
|
||||
## Migration Changes Required
|
||||
|
||||
### Key Structural Changes
|
||||
|
||||
#### 1. Config Format
|
||||
```python
|
||||
# OLD
|
||||
config = {
|
||||
"type": "api", # or "event", "cron"
|
||||
"name": "StepName",
|
||||
"path": "/endpoint",
|
||||
"method": "GET",
|
||||
"cron": "0 5 * * *",
|
||||
"subscribes": ["topic"],
|
||||
"emits": ["other-topic"]
|
||||
}
|
||||
|
||||
# NEW
|
||||
from motia import http, queue, cron
|
||||
|
||||
config = {
|
||||
"name": "StepName",
|
||||
"flows": ["flow-name"],
|
||||
"triggers": [
|
||||
http("GET", "/endpoint")
|
||||
# or queue("topic", input=schema)
|
||||
# or cron("0 0 5 * * *") # 6-field!
|
||||
],
|
||||
"enqueues": ["other-topic"]
|
||||
}
|
||||
```
|
||||
|
||||
#### 2. Handler Signature
|
||||
```python
|
||||
# OLD - API
|
||||
async def handler(req, context):
|
||||
body = req.get('body', {})
|
||||
await context.emit({"topic": "x", "data": {...}})
|
||||
return {"status": 200, "body": {...}}
|
||||
|
||||
# NEW - API
|
||||
from motia import ApiRequest, ApiResponse, FlowContext
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
|
||||
body = request.body
|
||||
await ctx.enqueue({"topic": "x", "data": {...}})
|
||||
return ApiResponse(status=200, body={...})
|
||||
|
||||
# OLD - Event/Queue
|
||||
async def handler(data, context):
|
||||
context.logger.info(data['field'])
|
||||
|
||||
# NEW - Queue
|
||||
async def handler(input_data: dict, ctx: FlowContext):
|
||||
ctx.logger.info(input_data['field'])
|
||||
|
||||
# OLD - Cron
|
||||
async def handler(context):
|
||||
context.logger.info("Running")
|
||||
|
||||
# NEW - Cron
|
||||
async def handler(input_data: dict, ctx: FlowContext):
|
||||
ctx.logger.info("Running")
|
||||
```
|
||||
|
||||
#### 3. Method Changes
|
||||
- `context.emit()` → `ctx.enqueue()`
|
||||
- `req.get('body')` → `request.body`
|
||||
- `req.get('queryParams')` → `request.query_params`
|
||||
- `req.get('pathParams')` → `request.path_params`
|
||||
- `req.get('headers')` → `request.headers`
|
||||
- Return dict → `ApiResponse` object
|
||||
|
||||
#### 4. Cron Format
|
||||
- OLD: 5-field `"0 5 * * *"` (minute hour day month weekday)
|
||||
- NEW: 6-field `"0 0 5 * * *"` (second minute hour day month weekday)
|
||||
|
||||
## Migration Strategy
|
||||
|
||||
### Phase 1: Simple Steps (Priority)
|
||||
Start with simple API proxy steps as they're straightforward:
|
||||
1. ✅ Example ticketing steps (already in new system)
|
||||
2. ⏳ Advoware proxy steps (GET, POST, PUT, DELETE)
|
||||
3. ⏳ Simple webhook handlers
|
||||
|
||||
### Phase 2: Complex Integration Steps
|
||||
Steps with external dependencies:
|
||||
4. ⏳ VMH sync steps (beteiligte, bankverbindungen)
|
||||
5. ⏳ Calendar sync steps (most complex - Google Calendar + Redis + PostgreSQL)
|
||||
|
||||
### Phase 3: Supporting Infrastructure
|
||||
- Migrate `services/` modules (advoware.py wrapper)
|
||||
- Migrate `config.py` to use environment variables properly
|
||||
- Update dependencies in `pyproject.toml`
|
||||
|
||||
### Dependencies to Review
|
||||
From old `requirements.txt` and code analysis:
|
||||
- `asyncpg` - PostgreSQL async driver
|
||||
- `redis` - Redis client
|
||||
- `google-api-python-client` - Google Calendar API
|
||||
- `google-auth` - Google OAuth2
|
||||
- `backoff` - Retry/backoff decorator
|
||||
- `pytz` - Timezone handling
|
||||
- `pydantic` - Already in new system
|
||||
- `requests` / `aiohttp` - HTTP clients for Advoware API
|
||||
|
||||
## Migration Roadmap
|
||||
|
||||
### ✅ COMPLETED
|
||||
|
||||
| Phase | Module | Lines | Status |
|
||||
|-------|--------|-------|--------|
|
||||
| **1** | Advoware Proxy (GET, POST, PUT, DELETE) | ~400 | ✅ Complete |
|
||||
| **1** | `advoware.py`, `advoware_service.py` | ~800 | ✅ Complete |
|
||||
| **2** | VMH Webhook Steps (6 endpoints) | ~900 | ✅ Complete |
|
||||
| **2** | `espocrm.py`, `espocrm_mapper.py` | ~900 | ✅ Complete |
|
||||
| **2** | `bankverbindungen_mapper.py`, `beteiligte_sync_utils.py`, `notification_utils.py` | ~1200 | ✅ Complete |
|
||||
| **3** | VMH Sync Event Steps (2 handlers + 1 cron) | ~1000 | ✅ Complete |
|
||||
| **4** | Kommunikation Sync (`kommunikation_mapper.py`, `kommunikation_sync_utils.py`) | ~1333 | ✅ Complete |
|
||||
| **5** | Adressen Sync (`adressen_mapper.py`, `adressen_sync.py`) | ~964 | ✅ Complete |
|
||||
| **6** | **Google Calendar Sync** (`calendar_sync_*.py`, `calendar_sync_utils.py`) | ~1500 | ✅ **Complete** |
|
||||
|
||||
**Total migrated: ~9.000 lines of production code**
|
||||
|
||||
### ✅ Phase 6 COMPLETED: Google Calendar Sync
|
||||
|
||||
**Advoware Calendar Sync** - Google Calendar ↔ Advoware Sync:
|
||||
- ✅ `calendar_sync_cron_step.py` - Cron-Trigger (alle 15 Min.)
|
||||
- ✅ `calendar_sync_all_step.py` - Bulk-Sync Handler mit Redis-basierter Priorisierung
|
||||
- ✅ `calendar_sync_event_step.py` - Queue-Event Handler (**1053 Zeilen komplexe Sync-Logik!**)
|
||||
- ✅ `calendar_sync_api_step.py` - HTTP API für manuellen Trigger
|
||||
- ✅ `calendar_sync_utils.py` - Hilfs-Funktionen (DB, Google Service, Redis, Logging)
|
||||
|
||||
**Dependencies:**
|
||||
- ✅ `google-api-python-client` - Google Calendar API
|
||||
- ✅ `google-auth` - Google OAuth2
|
||||
- ✅ `asyncpg` - PostgreSQL async driver
|
||||
- ✅ `backoff` - Retry/backoff decorator
|
||||
|
||||
**Features:**
|
||||
- ✅ Bidirektionale Synchronisation (Google ↔ Advoware)
|
||||
- ✅ 4-Phase Sync-Algorithmus (New Adv→Google, New Google→Adv, Deletes, Updates)
|
||||
- ✅ PostgreSQL als Sync-State Hub (calendar_sync Tabelle)
|
||||
- ✅ Redis-basiertes Rate Limiting (Token Bucket für Google API)
|
||||
- ✅ Distributed Locking per Employee
|
||||
- ✅ Automatische Calendar-Creation mit ACL
|
||||
- ✅ Recurring Events Support (RRULE)
|
||||
- ✅ Timezone-Handling (Europe/Berlin)
|
||||
- ✅ Backoff-Retry für API-Fehler
|
||||
- ✅ Write-Protection für Advoware
|
||||
- ✅ Source-System-Wins & Last-Change-Wins Strategien
|
||||
|
||||
### ⏳ REMAINING
|
||||
|
||||
**Keine! Die Migration ist zu 100% abgeschlossen.**
|
||||
|
||||
### Completed
|
||||
- ✅ Analysis of old system structure
|
||||
- ✅ MIGRATION_GUIDE.md reviewed
|
||||
- ✅ Migration patterns documented
|
||||
- ✅ New system has example ticketing steps
|
||||
- ✅ **Phase 1: Advoware Proxy Steps migrated** (GET, POST, PUT, DELETE)
|
||||
- ✅ **Advoware API service module migrated** (services/advoware.py)
|
||||
- ✅ **Phase 2: VMH Integration - Webhook Steps migrated** (6 endpoints)
|
||||
- ✅ **EspoCRM API service module migrated** (services/espocrm.py)
|
||||
- ✅ All endpoints registered and running:
|
||||
- **Advoware Proxy:**
|
||||
- `GET /advoware/proxy6 Complete ✅
|
||||
|
||||
**🎉 ALLE PHASEN ABGESCHLOSSEN! 100% MIGRATION ERFOLGREICH!**
|
||||
|
||||
**Phase 6** - Google Calendar Sync:
|
||||
- ✅ `calendar_sync_cron_step.py` (Cron-Trigger alle 15 Min.)
|
||||
- ✅ `calendar_sync_all_step.py` (Bulk-Handler mit Redis-Priorisierung)
|
||||
- ✅ `calendar_sync_event_step.py` (1053 Zeilen - 4-Phase Sync-Algorithmus)
|
||||
- ✅ `calendar_sync_api_step.py` (HTTP API für manuelle Triggers)
|
||||
- ✅ `calendar_sync_utils.py` (DB, Google Service, Redis Client)
|
||||
|
||||
**Sync-Architektur komplett:**
|
||||
|
||||
1. **Advoware Proxy** (Phase 1) → HTTP API für Advoware-Zugriff
|
||||
2. **Webhooks** (Phase 2) → Emittieren Queue-Events
|
||||
3. **Event Handler** (Phase 3) → Verarbeiten Events mit Stammdaten-Sync
|
||||
4. **Kommunikation Sync** (Phase 4) → Bidirektionale Email/Phone-Synchronisation
|
||||
5. **Adressen Sync** (Phase 5) → Bidirektionale Adressen-Synchronisation
|
||||
6. **Calendar Sync** (Phase 6) → Google Calendar ↔ Advoware Bidirektional
|
||||
7. **Cron Jobs** (Phase 3 & 6) → Regelmäßige Sync-Checks & Auto-Retries
|
||||
|
||||
Die vollständige Synchronisations- und Integrations-Pipeline ist nun zu 100%
|
||||
**Phase 5** - Adressen Sync:
|
||||
- ✅ `adressen_mapper.py` (267 Zeilen - CAdressen ↔ Advoware Adressen)
|
||||
- ✅ `adressen_sync.py` (697 Zeilen - CREATE/UPDATE mit READ-ONLY Detection)
|
||||
|
||||
### Sync-Architektur komplett:
|
||||
|
||||
1. **Webhooks** (Phase 2) → Emittieren Queue-Events
|
||||
2. **Event Handler** (Phase 3) → Verarbeiten Events mit Stammdaten-Sync
|
||||
3. **Kommunikation Sync** (Phase 4) → Bidirektionale Email/Phone-Synchronisation
|
||||
4. **Adressen Sync** (Phase 5) → Bidirektionale Adressen-Synchronisation
|
||||
5. **Cron Job** (Phase 3) → Regelmäßige Sync-Checks & Auto-Retries
|
||||
|
||||
Die vollständige Synchronisations-Pipeline ist nun einsatzbereit!
|
||||
|
||||
## Notes
|
||||
- Old system was Node.js + Python hybrid (Python steps as child processes)
|
||||
- New system is pure Python (standalone SDK)
|
||||
- No need for Node.js/npm anymore
|
||||
- iii engine handles all infrastructure (queues, state, HTTP, cron)
|
||||
- Console replaced Workbench
|
||||
@@ -11,14 +11,11 @@ Hilfsfunktionen für Document-Synchronisation mit xAI:
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
|
||||
from services.sync_utils_base import BaseSyncUtils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
|
||||
# Max retry before permanent failure
|
||||
MAX_SYNC_RETRIES = 5
|
||||
|
||||
@@ -26,82 +23,49 @@ MAX_SYNC_RETRIES = 5
|
||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
||||
|
||||
|
||||
class DocumentSync:
|
||||
class DocumentSync(BaseSyncUtils):
|
||||
"""Utility-Klasse für Document-Synchronisation mit xAI"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
def _get_lock_key(self, entity_id: str) -> str:
|
||||
"""Redis Lock-Key für Documents"""
|
||||
return f"sync_lock:document:{entity_id}"
|
||||
|
||||
def _init_redis(self) -> redis.Redis:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Logging mit Context-Support"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
||||
async def acquire_sync_lock(self, entity_id: str, entity_type: str = 'CDokumente') -> bool:
|
||||
"""
|
||||
Atomic distributed lock via Redis + syncStatus update
|
||||
|
||||
Args:
|
||||
entity_id: EspoCRM Document ID
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits im Sync
|
||||
"""
|
||||
try:
|
||||
# STEP 1: Atomic Redis lock (prevents race conditions)
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
lock_key = self._get_lock_key(entity_id)
|
||||
if not self._acquire_redis_lock(lock_key):
|
||||
self._log(f"Redis lock bereits aktiv für {entity_type} {entity_id}", level='warn')
|
||||
return False
|
||||
|
||||
if not acquired:
|
||||
self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn')
|
||||
return False
|
||||
# STEP 2: Update syncStatus (für UI visibility) - nur bei Document Entity
|
||||
# CDokumente hat dieses Feld nicht - überspringen
|
||||
if entity_type == 'Document':
|
||||
try:
|
||||
await self.espocrm.update_entity(entity_type, entity_id, {
|
||||
'xaiSyncStatus': 'syncing'
|
||||
})
|
||||
except Exception as e:
|
||||
self._log(f"Konnte xaiSyncStatus nicht setzen: {e}", level='debug')
|
||||
|
||||
# STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert
|
||||
# NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden
|
||||
try:
|
||||
await self.espocrm.update_entity('Document', entity_id, {
|
||||
'xaiSyncStatus': 'syncing'
|
||||
})
|
||||
except Exception as e:
|
||||
self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug')
|
||||
|
||||
self._log(f"Sync-Lock für Document {entity_id} erworben")
|
||||
self._log(f"Sync-Lock für {entity_type} {entity_id} erworben")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Acquire Lock: {e}", level='error')
|
||||
# Clean up Redis lock on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
lock_key = self._get_lock_key(entity_id)
|
||||
self._release_redis_lock(lock_key)
|
||||
return False
|
||||
|
||||
async def release_sync_lock(
|
||||
@@ -109,7 +73,8 @@ class DocumentSync:
|
||||
entity_id: str,
|
||||
success: bool = True,
|
||||
error_message: Optional[str] = None,
|
||||
extra_fields: Optional[Dict[str, Any]] = None
|
||||
extra_fields: Optional[Dict[str, Any]] = None,
|
||||
entity_type: str = 'CDokumente'
|
||||
) -> None:
|
||||
"""
|
||||
Gibt Sync-Lock frei und setzt finalen Status
|
||||
@@ -119,44 +84,41 @@ class DocumentSync:
|
||||
success: Ob Sync erfolgreich war
|
||||
error_message: Optional: Fehlermeldung
|
||||
extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections)
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
"""
|
||||
try:
|
||||
update_data = {}
|
||||
|
||||
# Status-Feld (falls vorhanden)
|
||||
try:
|
||||
update_data['xaiSyncStatus'] = 'synced' if success else 'failed'
|
||||
# Status-Felder nur bei Document Entity (CDokumente hat diese Felder nicht)
|
||||
if entity_type == 'Document':
|
||||
try:
|
||||
update_data['xaiSyncStatus'] = 'synced' if success else 'failed'
|
||||
|
||||
if error_message:
|
||||
update_data['xaiSyncError'] = error_message[:2000]
|
||||
else:
|
||||
update_data['xaiSyncError'] = None
|
||||
except:
|
||||
pass # Felder existieren evtl. nicht
|
||||
if error_message:
|
||||
update_data['xaiSyncError'] = error_message[:2000]
|
||||
else:
|
||||
update_data['xaiSyncError'] = None
|
||||
except:
|
||||
pass # Felder existieren evtl. nicht
|
||||
|
||||
# Merge extra fields (z.B. xaiFileId, xaiCollections)
|
||||
if extra_fields:
|
||||
update_data.update(extra_fields)
|
||||
|
||||
if update_data:
|
||||
await self.espocrm.update_entity('Document', entity_id, update_data)
|
||||
await self.espocrm.update_entity(entity_type, entity_id, update_data)
|
||||
|
||||
self._log(f"Sync-Lock released: Document {entity_id} → {'success' if success else 'failed'}")
|
||||
self._log(f"Sync-Lock released: {entity_type} {entity_id} → {'success' if success else 'failed'}")
|
||||
|
||||
# Release Redis lock
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
lock_key = self._get_lock_key(entity_id)
|
||||
self._release_redis_lock(lock_key)
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Release Lock: {e}", level='error')
|
||||
# Ensure Redis lock is released even on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
lock_key = self._get_lock_key(entity_id)
|
||||
self._release_redis_lock(lock_key)
|
||||
|
||||
async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]:
|
||||
"""
|
||||
@@ -322,12 +284,17 @@ class DocumentSync:
|
||||
|
||||
return result
|
||||
|
||||
async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]:
|
||||
async def get_document_download_info(self, document_id: str, entity_type: str = 'CDokumente') -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Holt Download-Informationen für ein Document
|
||||
|
||||
Args:
|
||||
document_id: ID des Documents
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
|
||||
Returns:
|
||||
Dict mit:
|
||||
- attachment_id: ID des Attachments
|
||||
- download_url: URL zum Download
|
||||
- filename: Dateiname
|
||||
- mime_type: MIME-Type
|
||||
@@ -335,25 +302,49 @@ class DocumentSync:
|
||||
"""
|
||||
try:
|
||||
# Hole vollständiges Document
|
||||
doc = await self.espocrm.get_entity('Document', document_id)
|
||||
doc = await self.espocrm.get_entity(entity_type, document_id)
|
||||
|
||||
# EspoCRM Document hat Attachments (Attachment ID in attachmentsIds)
|
||||
attachment_ids = doc.get('attachmentsIds') or []
|
||||
# EspoCRM Documents können Files auf verschiedene Arten speichern:
|
||||
# CDokumente: dokumentId/dokumentName (Custom Entity)
|
||||
# Document: fileId/fileName ODER attachmentsIds
|
||||
|
||||
if not attachment_ids:
|
||||
self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn')
|
||||
attachment_id = None
|
||||
filename = None
|
||||
|
||||
# Prüfe zuerst dokumentId (CDokumente Custom Entity)
|
||||
if doc.get('dokumentId'):
|
||||
attachment_id = doc.get('dokumentId')
|
||||
filename = doc.get('dokumentName')
|
||||
self._log(f"📎 CDokumente verwendet dokumentId: {attachment_id}")
|
||||
|
||||
# Fallback: fileId (Standard Document Entity)
|
||||
elif doc.get('fileId'):
|
||||
attachment_id = doc.get('fileId')
|
||||
filename = doc.get('fileName')
|
||||
self._log(f"📎 Document verwendet fileId: {attachment_id}")
|
||||
|
||||
# Fallback 2: attachmentsIds (z.B. bei zusätzlichen Attachments)
|
||||
elif doc.get('attachmentsIds'):
|
||||
attachment_ids = doc.get('attachmentsIds')
|
||||
if attachment_ids:
|
||||
attachment_id = attachment_ids[0]
|
||||
self._log(f"📎 Document verwendet attachmentsIds: {attachment_id}")
|
||||
|
||||
if not attachment_id:
|
||||
self._log(f"⚠️ {entity_type} {document_id} hat weder dokumentId, fileId noch attachmentsIds", level='warn')
|
||||
self._log(f" Verfügbare Felder: {list(doc.keys())}")
|
||||
return None
|
||||
|
||||
# Nehme erstes Attachment (Documents haben normalerweise nur 1 File)
|
||||
attachment_id = attachment_ids[0]
|
||||
|
||||
# Hole Attachment-Details
|
||||
attachment = await self.espocrm.get_entity('Attachment', attachment_id)
|
||||
|
||||
# Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment
|
||||
final_filename = filename or attachment.get('name', 'unknown')
|
||||
|
||||
return {
|
||||
'attachment_id': attachment_id,
|
||||
'download_url': f"/api/v1/Attachment/file/{attachment_id}",
|
||||
'filename': attachment.get('name', 'unknown'),
|
||||
'filename': final_filename,
|
||||
'mime_type': attachment.get('type', 'application/octet-stream'),
|
||||
'size': attachment.get('size', 0)
|
||||
}
|
||||
@@ -476,7 +467,8 @@ class DocumentSync:
|
||||
xai_file_id: Optional[str] = None,
|
||||
collection_ids: Optional[List[str]] = None,
|
||||
file_hash: Optional[str] = None,
|
||||
preview_data: Optional[bytes] = None
|
||||
preview_data: Optional[bytes] = None,
|
||||
entity_type: str = 'CDokumente'
|
||||
) -> None:
|
||||
"""
|
||||
Updated Document-Metadaten nach erfolgreichem xAI-Sync
|
||||
@@ -487,20 +479,29 @@ class DocumentSync:
|
||||
collection_ids: Liste der xAI Collection IDs (optional)
|
||||
file_hash: MD5/SHA Hash des gesyncten Files
|
||||
preview_data: Vorschaubild (WebP) als bytes
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
"""
|
||||
try:
|
||||
update_data = {}
|
||||
|
||||
# Nur xAI-Felder updaten wenn vorhanden
|
||||
if xai_file_id:
|
||||
update_data['xaiFileId'] = xai_file_id
|
||||
# CDokumente verwendet xaiId, Document verwendet xaiFileId
|
||||
if entity_type == 'CDokumente':
|
||||
update_data['xaiId'] = xai_file_id
|
||||
else:
|
||||
update_data['xaiFileId'] = xai_file_id
|
||||
|
||||
if collection_ids is not None:
|
||||
update_data['xaiCollections'] = collection_ids
|
||||
|
||||
# Nur Status auf "Gesynct" setzen wenn xAI-File-ID vorhanden
|
||||
if xai_file_id:
|
||||
update_data['dateiStatus'] = 'Gesynct'
|
||||
# CDokumente verwendet fileStatus, Document verwendet dateiStatus
|
||||
if entity_type == 'CDokumente':
|
||||
update_data['fileStatus'] = 'synced'
|
||||
else:
|
||||
update_data['dateiStatus'] = 'Gesynct'
|
||||
|
||||
# Hash speichern für zukünftige Change Detection
|
||||
if file_hash:
|
||||
@@ -508,40 +509,78 @@ class DocumentSync:
|
||||
|
||||
# Preview als Attachment hochladen (falls vorhanden)
|
||||
if preview_data:
|
||||
await self._upload_preview_to_espocrm(document_id, preview_data)
|
||||
await self._upload_preview_to_espocrm(document_id, preview_data, entity_type)
|
||||
|
||||
# Nur updaten wenn es etwas zu updaten gibt
|
||||
if update_data:
|
||||
await self.espocrm.update_entity('Document', document_id, update_data)
|
||||
self._log(f"✅ Sync-Metadaten aktualisiert für Document {document_id}: {list(update_data.keys())}")
|
||||
await self.espocrm.update_entity(entity_type, document_id, update_data)
|
||||
self._log(f"✅ Sync-Metadaten aktualisiert für {entity_type} {document_id}: {list(update_data.keys())}")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error')
|
||||
raise
|
||||
|
||||
async def _upload_preview_to_espocrm(self, document_id: str, preview_data: bytes) -> None:
|
||||
async def _upload_preview_to_espocrm(self, document_id: str, preview_data: bytes, entity_type: str = 'CDokumente') -> None:
|
||||
"""
|
||||
Lädt Preview-Image als Attachment zu EspoCRM hoch
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
preview_data: WebP Preview als bytes
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
"""
|
||||
try:
|
||||
self._log(f"📤 Uploading preview image ({len(preview_data)} bytes)...")
|
||||
self._log(f"📤 Uploading preview image to {entity_type} ({len(preview_data)} bytes)...")
|
||||
|
||||
# Upload via EspoCRM Attachment API
|
||||
await self.espocrm.upload_attachment(
|
||||
file_content=preview_data,
|
||||
filename='preview.webp',
|
||||
parent_type='Document',
|
||||
parent_id=document_id,
|
||||
field='preview',
|
||||
mime_type='image/webp',
|
||||
role='Attachment'
|
||||
)
|
||||
# EspoCRM erwartet base64-encoded file im Format: data:mime/type;base64,xxxxx
|
||||
import base64
|
||||
import aiohttp
|
||||
|
||||
self._log(f"✅ Preview erfolgreich hochgeladen")
|
||||
# Base64-encode preview data
|
||||
base64_data = base64.b64encode(preview_data).decode('ascii')
|
||||
file_data_uri = f"data:image/webp;base64,{base64_data}"
|
||||
|
||||
# Upload via JSON POST mit base64-encoded file field
|
||||
url = self.espocrm.api_base_url.rstrip('/') + '/Attachment'
|
||||
headers = {
|
||||
'X-Api-Key': self.espocrm.api_key,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
payload = {
|
||||
'name': 'preview.webp',
|
||||
'type': 'image/webp',
|
||||
'role': 'Attachment',
|
||||
'field': 'preview',
|
||||
'relatedType': entity_type,
|
||||
'relatedId': document_id,
|
||||
'file': file_data_uri
|
||||
}
|
||||
|
||||
self._log(f"📤 Posting to {url} with base64-encoded file ({len(base64_data)} chars)")
|
||||
self._log(f" relatedType={entity_type}, relatedId={document_id}, field=preview")
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=30)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.post(url, headers=headers, json=payload) as response:
|
||||
self._log(f"Upload response status: {response.status}")
|
||||
|
||||
if response.status >= 400:
|
||||
error_text = await response.text()
|
||||
self._log(f"❌ Upload failed: {error_text}", level='error')
|
||||
raise Exception(f"Upload error {response.status}: {error_text}")
|
||||
|
||||
result = await response.json()
|
||||
attachment_id = result.get('id')
|
||||
self._log(f"✅ Preview Attachment created: {attachment_id}")
|
||||
|
||||
# Update Entity mit previewId
|
||||
self._log(f"📝 Updating {entity_type} with previewId...")
|
||||
await self.espocrm.update_entity(entity_type, document_id, {
|
||||
'previewId': attachment_id,
|
||||
'previewName': 'preview.webp'
|
||||
})
|
||||
self._log(f"✅ {entity_type} previewId/previewName aktualisiert")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ Fehler beim Preview-Upload: {e}", level='error')
|
||||
|
||||
@@ -341,12 +341,14 @@ class EspoCRMAPI:
|
||||
form_data.add_field('role', role)
|
||||
form_data.add_field('name', filename)
|
||||
|
||||
self._log(f"Upload params: parentType={parent_type}, parentId={parent_id}, field={field}, role={role}")
|
||||
|
||||
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||
|
||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
||||
try:
|
||||
async with session.post(url, headers=headers, data=form_data) as response:
|
||||
self._log(f"Upload response status: {response.status}", level='debug')
|
||||
self._log(f"Upload response status: {response.status}")
|
||||
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
@@ -356,6 +358,7 @@ class EspoCRMAPI:
|
||||
raise EspoCRMError(f"Attachment endpoint not found")
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error')
|
||||
raise EspoCRMError(f"Upload error {response.status}: {error_text}")
|
||||
|
||||
# Parse response
|
||||
|
||||
150
services/sync_utils_base.py
Normal file
150
services/sync_utils_base.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""
|
||||
Base Sync Utilities
|
||||
|
||||
Gemeinsame Funktionalität für alle Sync-Operationen:
|
||||
- Redis Distributed Locking
|
||||
- Context-aware Logging
|
||||
- EspoCRM API Helpers
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
import pytz
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
|
||||
|
||||
class BaseSyncUtils:
|
||||
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
"""
|
||||
Args:
|
||||
espocrm_api: EspoCRM API client instance
|
||||
redis_client: Optional Redis client (wird sonst initialisiert)
|
||||
context: Optional Motia FlowContext für Logging
|
||||
"""
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
|
||||
def _init_redis(self) -> Optional[redis.Redis]:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""
|
||||
Context-aware logging
|
||||
|
||||
Falls ein FlowContext vorhanden ist, wird dessen Logger verwendet.
|
||||
Sonst fallback auf Standard-Logger.
|
||||
"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
def _get_lock_key(self, entity_id: str) -> str:
|
||||
"""
|
||||
Erzeugt Redis Lock-Key für eine Entity
|
||||
|
||||
Muss in Subklassen überschrieben werden, um entity-spezifische Prefixes zu nutzen.
|
||||
z.B. 'sync_lock:cbeteiligte:{entity_id}' oder 'sync_lock:document:{entity_id}'
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _get_lock_key()")
|
||||
|
||||
def _acquire_redis_lock(self, lock_key: str) -> bool:
|
||||
"""
|
||||
Atomic Redis lock acquisition
|
||||
|
||||
Args:
|
||||
lock_key: Redis key für den Lock
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits locked
|
||||
"""
|
||||
if not self.redis:
|
||||
self._log("Redis nicht verfügbar, Lock-Mechanismus deaktiviert", level='warn')
|
||||
return True # Fallback: Wenn kein Redis, immer lock erlauben
|
||||
|
||||
try:
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
return bool(acquired)
|
||||
except Exception as e:
|
||||
self._log(f"Redis lock error: {e}", level='error')
|
||||
return True # Bei Fehler: Lock erlauben, um Deadlocks zu vermeiden
|
||||
|
||||
def _release_redis_lock(self, lock_key: str) -> None:
|
||||
"""
|
||||
Redis lock freigeben
|
||||
|
||||
Args:
|
||||
lock_key: Redis key für den Lock
|
||||
"""
|
||||
if not self.redis:
|
||||
return
|
||||
|
||||
try:
|
||||
self.redis.delete(lock_key)
|
||||
except Exception as e:
|
||||
self._log(f"Redis unlock error: {e}", level='error')
|
||||
|
||||
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
|
||||
"""
|
||||
Formatiert datetime für EspoCRM (ohne Timezone!)
|
||||
|
||||
Args:
|
||||
dt: Optional datetime object (default: now UTC)
|
||||
|
||||
Returns:
|
||||
String im Format 'YYYY-MM-DD HH:MM:SS'
|
||||
"""
|
||||
if dt is None:
|
||||
dt = datetime.now(pytz.UTC)
|
||||
elif dt.tzinfo is None:
|
||||
dt = pytz.UTC.localize(dt)
|
||||
|
||||
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
async def acquire_sync_lock(self, entity_id: str, **kwargs) -> bool:
|
||||
"""
|
||||
Erwirbt Sync-Lock für eine Entity
|
||||
|
||||
Muss in Subklassen implementiert werden, um entity-spezifische
|
||||
Status-Updates durchzuführen.
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits locked
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement acquire_sync_lock()")
|
||||
|
||||
async def release_sync_lock(self, entity_id: str, **kwargs) -> None:
|
||||
"""
|
||||
Gibt Sync-Lock frei und setzt finalen Status
|
||||
|
||||
Muss in Subklassen implementiert werden, um entity-spezifische
|
||||
Status-Updates durchzuführen.
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement release_sync_lock()")
|
||||
@@ -17,7 +17,7 @@ config = {
|
||||
'description': 'Runs calendar sync automatically every 15 minutes',
|
||||
'flows': ['advoware-calendar-sync'],
|
||||
'triggers': [
|
||||
cron("0 */15 * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
|
||||
cron("0 */15 1o * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
|
||||
],
|
||||
'enqueues': ['calendar_sync_all']
|
||||
}
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
"""
|
||||
EspoCRM Generic Webhooks
|
||||
|
||||
Empfängt Webhooks von EspoCRM für verschiedene Entities.
|
||||
Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext.
|
||||
"""
|
||||
@@ -1,198 +0,0 @@
|
||||
"""EspoCRM Webhook - Document Create
|
||||
|
||||
Empfängt Create-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Create",
|
||||
"description": "Empfängt Create-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/create")
|
||||
],
|
||||
"enqueues": ["vmh.document.create"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document creation in EspoCRM.
|
||||
|
||||
Receives notifications when documents are created and emits queue events
|
||||
for processing (xAI sync, etc.).
|
||||
|
||||
Payload Analysis Mode: Logs comprehensive details about webhook structure.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
payload_details = []
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
# Sammle Details für Logging
|
||||
detail = {
|
||||
'index': idx,
|
||||
'id': entity_id,
|
||||
'name': entity.get('name', 'N/A'),
|
||||
'type': entity.get('type', 'N/A'),
|
||||
'size': entity.get('size', 'N/A'),
|
||||
'all_fields': list(entity.keys())
|
||||
}
|
||||
payload_details.append(detail)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {entity.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# xAI-relevante Felder (falls vorhanden)
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Parent/Relationship Felder
|
||||
rel_fields = {k: v for k, v in entity.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {payload.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in payload.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'create',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.create',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'create',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.create für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'create',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
# Log Stack Trace
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
@@ -1,174 +0,0 @@
|
||||
"""EspoCRM Webhook - Document Delete
|
||||
|
||||
Empfängt Delete-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Delete",
|
||||
"description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/delete")
|
||||
],
|
||||
"enqueues": ["vmh.document.delete"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document deletion in EspoCRM.
|
||||
|
||||
Receives notifications when documents are deleted.
|
||||
Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# Bei Delete haben wir oft nur minimale Daten
|
||||
if 'name' in entity:
|
||||
ctx.logger.info(f" Name: {entity.get('name')}")
|
||||
if 'deletedAt' in entity or 'deleted' in entity:
|
||||
ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}")
|
||||
|
||||
# xAI-relevante Felder (falls vorhanden)
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 🗑️ Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
if 'name' in payload:
|
||||
ctx.logger.info(f" Name: {payload.get('name')}")
|
||||
if 'deletedAt' in payload or 'deleted' in payload:
|
||||
ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'delete',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.delete',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'delete',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.delete für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'delete',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
@@ -1,196 +0,0 @@
|
||||
"""EspoCRM Webhook - Document Update
|
||||
|
||||
Empfängt Update-Webhooks von EspoCRM für Documents.
|
||||
Loggt detailliert alle Payload-Informationen für Analyse.
|
||||
"""
|
||||
import json
|
||||
import datetime
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Update",
|
||||
"description": "Empfängt Update-Webhooks von EspoCRM für Document Entities",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/update")
|
||||
],
|
||||
"enqueues": ["vmh.document.update"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document updates in EspoCRM.
|
||||
|
||||
Receives notifications when documents are updated and emits queue events
|
||||
for processing (xAI sync, etc.).
|
||||
|
||||
Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden.
|
||||
xAI-Feld-Updates sollten keine neuen Webhooks triggern.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# DETAILLIERTES LOGGING FÜR ANALYSE
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Log Request Headers
|
||||
ctx.logger.info("\n🔍 REQUEST HEADERS:")
|
||||
if hasattr(request, 'headers'):
|
||||
for key, value in request.headers.items():
|
||||
ctx.logger.info(f" {key}: {value}")
|
||||
else:
|
||||
ctx.logger.info(" (keine Headers verfügbar)")
|
||||
|
||||
# Log Payload Type & Structure
|
||||
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
|
||||
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
|
||||
|
||||
# Log Full Payload (pretty-printed)
|
||||
ctx.logger.info("\n📄 FULL PAYLOAD:")
|
||||
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PAYLOAD ANALYSE & ID EXTRAKTION
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
|
||||
for idx, entity in enumerate(payload):
|
||||
if isinstance(entity, dict):
|
||||
entity_id = entity.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}")
|
||||
ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
|
||||
|
||||
# Prüfe ob CHANGED fields mitgeliefert werden
|
||||
changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields')
|
||||
if changed_fields:
|
||||
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in entity.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in entity.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
|
||||
elif isinstance(payload, dict):
|
||||
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
|
||||
entity_id = payload.get('id')
|
||||
if entity_id:
|
||||
entity_ids.add(entity_id)
|
||||
|
||||
ctx.logger.info(f"\n 📄 Document:")
|
||||
ctx.logger.info(f" ID: {entity_id}")
|
||||
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}")
|
||||
ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}")
|
||||
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
|
||||
|
||||
# Geänderte Felder
|
||||
changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields')
|
||||
if changed_fields:
|
||||
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_fields = {k: v for k, v in payload.items()
|
||||
if 'xai' in k.lower() or 'collection' in k.lower()}
|
||||
if xai_fields:
|
||||
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
|
||||
|
||||
# Relationship Felder
|
||||
rel_fields = {k: v for k, v in payload.items()
|
||||
if 'parent' in k.lower() or 'related' in k.lower() or
|
||||
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
|
||||
if rel_fields:
|
||||
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
|
||||
else:
|
||||
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# QUEUE EVENTS EMITTIEREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
if not entity_ids:
|
||||
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'update',
|
||||
'ids_count': 0,
|
||||
'warning': 'No document IDs found in payload'
|
||||
}
|
||||
)
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.update',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'action': 'update',
|
||||
'source': 'webhook',
|
||||
'timestamp': datetime.datetime.now().isoformat()
|
||||
}
|
||||
})
|
||||
ctx.logger.info(f"✅ Event emittiert: vmh.document.update für ID {entity_id}")
|
||||
|
||||
ctx.logger.info("\n" + "=" * 80)
|
||||
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body={
|
||||
'status': 'received',
|
||||
'action': 'update',
|
||||
'ids_count': len(entity_ids),
|
||||
'document_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error Type: {type(e).__name__}")
|
||||
ctx.logger.error(f"Error Message: {str(e)}")
|
||||
|
||||
import traceback
|
||||
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'error_type': type(e).__name__,
|
||||
'details': str(e)
|
||||
}
|
||||
)
|
||||
@@ -19,7 +19,7 @@ config = {
|
||||
"description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen",
|
||||
"flows": ["vmh-beteiligte"],
|
||||
"triggers": [
|
||||
cron("0 */15 * * * *") # Alle 15 Minuten (6-field format!)
|
||||
cron("0 */15 1 * * *") # Alle 15 Minuten (6-field format!)
|
||||
],
|
||||
"enqueues": ["vmh.beteiligte.sync_check"]
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ config = {
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
"""Zentraler Sync-Handler für Documents"""
|
||||
entity_id = event_data.get('entity_id')
|
||||
entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente
|
||||
action = event_data.get('action')
|
||||
source = event_data.get('source')
|
||||
|
||||
@@ -43,6 +44,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"Entity Type: {entity_type}")
|
||||
ctx.logger.info(f"Action: {action.upper()}")
|
||||
ctx.logger.info(f"Document ID: {entity_id}")
|
||||
ctx.logger.info(f"Source: {source}")
|
||||
@@ -70,39 +72,40 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
|
||||
try:
|
||||
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
|
||||
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
|
||||
lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type)
|
||||
|
||||
if not lock_acquired:
|
||||
ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe")
|
||||
ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_type} {entity_id}, überspringe")
|
||||
return
|
||||
|
||||
# Lock erfolgreich acquired - MUSS im finally block released werden!
|
||||
try:
|
||||
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
|
||||
try:
|
||||
document = await espocrm.get_entity('Document', entity_id)
|
||||
document = await espocrm.get_entity(entity_type, entity_id)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
|
||||
return
|
||||
|
||||
ctx.logger.info(f"📋 Document geladen:")
|
||||
ctx.logger.info(f"📋 {entity_type} geladen:")
|
||||
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}")
|
||||
ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}")
|
||||
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}")
|
||||
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
|
||||
|
||||
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
|
||||
|
||||
if action == 'delete':
|
||||
await handle_delete(entity_id, document, sync_utils, ctx)
|
||||
await handle_delete(entity_id, document, sync_utils, ctx, entity_type)
|
||||
|
||||
elif action in ['create', 'update']:
|
||||
await handle_create_or_update(entity_id, document, sync_utils, ctx)
|
||||
await handle_create_or_update(entity_id, document, sync_utils, ctx, entity_type)
|
||||
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type)
|
||||
|
||||
except Exception as e:
|
||||
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
|
||||
@@ -114,7 +117,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
success=False,
|
||||
error_message=str(e)[:2000]
|
||||
error_message=str(e)[:2000],
|
||||
entity_type=entity_type
|
||||
)
|
||||
except Exception as release_error:
|
||||
# Selbst Lock-Release failed - logge kritischen Fehler
|
||||
@@ -134,7 +138,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
|
||||
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
|
||||
"""
|
||||
Behandelt Create/Update von Documents
|
||||
|
||||
@@ -146,15 +150,15 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Datei-Status für Preview-Generierung
|
||||
datei_status = document.get('dateiStatus') or document.get('fileStatus')
|
||||
# Datei-Status für Preview-Generierung (verschiedene Feld-Namen unterstützen)
|
||||
datei_status = document.get('fileStatus') or document.get('dateiStatus')
|
||||
|
||||
# Entscheidungslogik: Soll dieses Document zu xAI?
|
||||
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
|
||||
|
||||
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
|
||||
ctx.logger.info(f" Grund: {reason}")
|
||||
ctx.logger.info(f" Datei-Status: {datei_status or 'N/A'}")
|
||||
ctx.logger.info(f" File-Status: {datei_status or 'N/A'}")
|
||||
|
||||
if collection_ids:
|
||||
ctx.logger.info(f" Collections: {collection_ids}")
|
||||
@@ -163,7 +167,9 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
# PREVIEW-GENERIERUNG bei neuen/geänderten Dateien
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']:
|
||||
# Case-insensitive check für Datei-Status
|
||||
datei_status_lower = (datei_status or '').lower()
|
||||
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN")
|
||||
@@ -172,7 +178,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
|
||||
try:
|
||||
# 1. Hole Download-Informationen
|
||||
download_info = await sync_utils.get_document_download_info(entity_id)
|
||||
download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
|
||||
|
||||
if not download_info:
|
||||
ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview")
|
||||
@@ -213,7 +219,8 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
ctx.logger.info(f"📤 Uploading preview to EspoCRM...")
|
||||
await sync_utils.update_sync_metadata(
|
||||
entity_id,
|
||||
preview_data=preview_data
|
||||
preview_data=preview_data,
|
||||
entity_type=entity_type
|
||||
# Keine xaiFileId/collections - nur Preview update
|
||||
)
|
||||
ctx.logger.info(f"✅ Preview uploaded successfully")
|
||||
@@ -244,7 +251,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
|
||||
if not needs_sync:
|
||||
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
|
||||
return
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
@@ -315,9 +322,9 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
|
||||
|
||||
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
|
||||
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
|
||||
"""
|
||||
Behandelt Deletion von Documents
|
||||
Behandelt Delete von Documents
|
||||
|
||||
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
|
||||
"""
|
||||
@@ -327,12 +334,12 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
|
||||
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
xai_file_id = document.get('xaiFileId')
|
||||
xai_file_id = document.get('xaiFileId') or document.get('xaiId')
|
||||
xai_collections = document.get('xaiCollections') or []
|
||||
|
||||
if not xai_file_id or not xai_collections:
|
||||
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
|
||||
return
|
||||
|
||||
ctx.logger.info(f"📋 Document Info:")
|
||||
@@ -354,7 +361,7 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
|
||||
#
|
||||
# ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
|
||||
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)")
|
||||
@@ -364,4 +371,4 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
|
||||
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
|
||||
|
||||
77
steps/vmh/webhook/document_create_api_step.py
Normal file
77
steps/vmh/webhook/document_create_api_step.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""VMH Webhook - Document Create"""
|
||||
import json
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Create",
|
||||
"description": "Empfängt Create-Webhooks von EspoCRM für Documents",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/create")
|
||||
],
|
||||
"enqueues": ["vmh.document.create"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document creation in EspoCRM.
|
||||
|
||||
Receives batch or single entity notifications and emits queue events
|
||||
for each entity ID to be synced to xAI.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
ctx.logger.info("VMH Webhook Document Create empfangen")
|
||||
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# Sammle alle IDs aus dem Batch
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
for entity in payload:
|
||||
if isinstance(entity, dict) and 'id' in entity:
|
||||
entity_ids.add(entity['id'])
|
||||
# Extrahiere entityType falls vorhanden
|
||||
entity_type = entity.get('entityType', 'CDokumente')
|
||||
elif isinstance(payload, dict) and 'id' in payload:
|
||||
entity_ids.add(payload['id'])
|
||||
entity_type = payload.get('entityType', 'CDokumente')
|
||||
|
||||
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Create-Sync gefunden")
|
||||
|
||||
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.create',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
|
||||
'action': 'create',
|
||||
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
|
||||
}
|
||||
})
|
||||
|
||||
return ApiResponse(
|
||||
status_code=200,
|
||||
body={
|
||||
'success': True,
|
||||
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
|
||||
'entity_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Fehler im Document Create Webhook: {e}")
|
||||
ctx.logger.error(f"Payload: {request.body}")
|
||||
|
||||
return ApiResponse(
|
||||
status_code=500,
|
||||
body={
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
)
|
||||
76
steps/vmh/webhook/document_delete_api_step.py
Normal file
76
steps/vmh/webhook/document_delete_api_step.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""VMH Webhook - Document Delete"""
|
||||
import json
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Delete",
|
||||
"description": "Empfängt Delete-Webhooks von EspoCRM für Documents",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/delete")
|
||||
],
|
||||
"enqueues": ["vmh.document.delete"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document deletion in EspoCRM.
|
||||
|
||||
Receives batch or single entity notifications and emits queue events
|
||||
for each entity ID to be removed from xAI.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
ctx.logger.info("VMH Webhook Document Delete empfangen")
|
||||
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# Sammle alle IDs aus dem Batch
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
for entity in payload:
|
||||
if isinstance(entity, dict) and 'id' in entity:
|
||||
entity_ids.add(entity['id'])
|
||||
entity_type = entity.get('entityType', 'CDokumente')
|
||||
elif isinstance(payload, dict) and 'id' in payload:
|
||||
entity_ids.add(payload['id'])
|
||||
entity_type = payload.get('entityType', 'CDokumente')
|
||||
|
||||
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Delete-Sync gefunden")
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.delete',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
|
||||
'action': 'delete',
|
||||
'timestamp': payload[0].get('deletedAt') if isinstance(payload, list) and payload else None
|
||||
}
|
||||
})
|
||||
|
||||
return ApiResponse(
|
||||
status_code=200,
|
||||
body={
|
||||
'success': True,
|
||||
'message': f'{len(entity_ids)} Document(s) zum Delete enqueued',
|
||||
'entity_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Fehler im Document Delete Webhook: {e}")
|
||||
ctx.logger.error(f"Payload: {request.body}")
|
||||
|
||||
return ApiResponse(
|
||||
status_code=500,
|
||||
body={
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
)
|
||||
76
steps/vmh/webhook/document_update_api_step.py
Normal file
76
steps/vmh/webhook/document_update_api_step.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""VMH Webhook - Document Update"""
|
||||
import json
|
||||
from typing import Any
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH Webhook Document Update",
|
||||
"description": "Empfängt Update-Webhooks von EspoCRM für Documents",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/webhook/document/update")
|
||||
],
|
||||
"enqueues": ["vmh.document.update"],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
Webhook handler for Document updates in EspoCRM.
|
||||
|
||||
Receives batch or single entity notifications and emits queue events
|
||||
for each entity ID to be synced to xAI.
|
||||
"""
|
||||
try:
|
||||
payload = request.body or []
|
||||
|
||||
ctx.logger.info("VMH Webhook Document Update empfangen")
|
||||
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# Sammle alle IDs aus dem Batch
|
||||
entity_ids = set()
|
||||
|
||||
if isinstance(payload, list):
|
||||
for entity in payload:
|
||||
if isinstance(entity, dict) and 'id' in entity:
|
||||
entity_ids.add(entity['id'])
|
||||
entity_type = entity.get('entityType', 'CDokumente')
|
||||
elif isinstance(payload, dict) and 'id' in payload:
|
||||
entity_ids.add(payload['id'])
|
||||
entity_type = payload.get('entityType', 'CDokumente')
|
||||
|
||||
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Update-Sync gefunden")
|
||||
|
||||
# Emit events für Queue-Processing
|
||||
for entity_id in entity_ids:
|
||||
await ctx.enqueue({
|
||||
'topic': 'vmh.document.update',
|
||||
'data': {
|
||||
'entity_id': entity_id,
|
||||
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
|
||||
'action': 'update',
|
||||
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
|
||||
}
|
||||
})
|
||||
|
||||
return ApiResponse(
|
||||
status_code=200,
|
||||
body={
|
||||
'success': True,
|
||||
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
|
||||
'entity_ids': list(entity_ids)
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Fehler im Document Update Webhook: {e}")
|
||||
ctx.logger.error(f"Payload: {request.body}")
|
||||
|
||||
return ApiResponse(
|
||||
status_code=500,
|
||||
body={
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
)
|
||||
110
tests/README.md
Normal file
110
tests/README.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# Test Scripts
|
||||
|
||||
This directory contains test scripts for the Motia III xAI Collections integration.
|
||||
|
||||
## Test Files
|
||||
|
||||
### `test_xai_collections_api.py`
|
||||
Tests xAI Collections API authentication and basic operations.
|
||||
|
||||
**Usage:**
|
||||
```bash
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
python tests/test_xai_collections_api.py
|
||||
```
|
||||
|
||||
**Required Environment Variables:**
|
||||
- `XAI_MANAGEMENT_API_KEY` - xAI Management API key for collection operations
|
||||
- `XAI_API_KEY` - xAI Regular API key for file operations
|
||||
|
||||
**Tests:**
|
||||
- ✅ Management API authentication
|
||||
- ✅ Regular API authentication
|
||||
- ✅ Collection listing
|
||||
- ✅ Collection creation
|
||||
- ✅ File upload
|
||||
- ✅ Collection deletion
|
||||
- ✅ Error handling
|
||||
|
||||
### `test_preview_upload.py`
|
||||
Tests preview/thumbnail upload to EspoCRM CDokumente entity.
|
||||
|
||||
**Usage:**
|
||||
```bash
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
python tests/test_preview_upload.py
|
||||
```
|
||||
|
||||
**Required Environment Variables:**
|
||||
- `ESPOCRM_URL` - EspoCRM instance URL (default: https://crm.bitbylaw.com)
|
||||
- `ESPOCRM_API_KEY` - EspoCRM API key
|
||||
|
||||
**Tests:**
|
||||
- ✅ Preview image generation (WebP format, 600x800px)
|
||||
- ✅ Base64 Data URI encoding
|
||||
- ✅ Attachment upload via JSON POST
|
||||
- ✅ Entity update with previewId/previewName
|
||||
|
||||
**Status:** ✅ Successfully tested - Attachment ID `69a71194c7c6baebf` created
|
||||
|
||||
### `test_thumbnail_generation.py`
|
||||
Tests thumbnail generation for various document types.
|
||||
|
||||
**Usage:**
|
||||
```bash
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
python tests/test_thumbnail_generation.py
|
||||
```
|
||||
|
||||
**Supported Formats:**
|
||||
- PDF → WebP (first page)
|
||||
- DOCX/DOC → PDF → WebP
|
||||
- Images (JPEG, PNG, etc.) → WebP resize
|
||||
|
||||
**Dependencies:**
|
||||
- `python3-pil` - PIL/Pillow for image processing
|
||||
- `poppler-utils` - PDF rendering
|
||||
- `libreoffice` - DOCX to PDF conversion
|
||||
- `pdf2image` - PDF to image conversion
|
||||
|
||||
## Running Tests
|
||||
|
||||
### All Tests
|
||||
```bash
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
python -m pytest tests/ -v
|
||||
```
|
||||
|
||||
### Individual Tests
|
||||
```bash
|
||||
cd /opt/motia-iii/bitbylaw
|
||||
python tests/test_xai_collections_api.py
|
||||
python tests/test_preview_upload.py
|
||||
python tests/test_thumbnail_generation.py
|
||||
```
|
||||
|
||||
## Environment Setup
|
||||
|
||||
Create `.env` file in `/opt/motia-iii/bitbylaw/`:
|
||||
```bash
|
||||
# xAI Collections API
|
||||
XAI_MANAGEMENT_API_KEY=xai-token-xxx...
|
||||
XAI_API_KEY=xai-xxx...
|
||||
|
||||
# EspoCRM API
|
||||
ESPOCRM_URL=https://crm.bitbylaw.com
|
||||
ESPOCRM_API_KEY=xxx...
|
||||
|
||||
# Redis (for locking)
|
||||
REDIS_HOST=localhost
|
||||
REDIS_PORT=6379
|
||||
REDIS_DB_ADVOWARE_CACHE=1
|
||||
```
|
||||
|
||||
## Test Results
|
||||
|
||||
Last test run: Successfully validated preview upload functionality
|
||||
- Preview upload works with base64 Data URI format
|
||||
- Attachment created with ID: `69a71194c7c6baebf`
|
||||
- CDokumente entity updated with previewId/previewName
|
||||
- WebP format at 600x800px confirmed working
|
||||
279
tests/test_preview_upload.py
Executable file
279
tests/test_preview_upload.py
Executable file
@@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test Script: Preview Image Upload zu EspoCRM
|
||||
|
||||
Testet das Hochladen eines Preview-Bildes (WebP) als Attachment
|
||||
zu einem CDokumente Entity via EspoCRM API.
|
||||
|
||||
Usage:
|
||||
python test_preview_upload.py <document_id>
|
||||
|
||||
Example:
|
||||
python test_preview_upload.py 69a68906ac3d0fd25
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import base64
|
||||
import os
|
||||
import sys
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
|
||||
|
||||
# EspoCRM Config (aus Environment oder hardcoded für Test)
|
||||
ESPOCRM_API_BASE_URL = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
|
||||
ESPOCRM_API_KEY = os.getenv('ESPOCRM_API_KEY', '')
|
||||
|
||||
# Test-Parameter
|
||||
ENTITY_TYPE = 'CDokumente'
|
||||
FIELD_NAME = 'preview'
|
||||
|
||||
|
||||
def generate_test_webp(text: str = "TEST PREVIEW", size: tuple = (600, 800)) -> bytes:
|
||||
"""
|
||||
Generiert ein einfaches Test-WebP-Bild
|
||||
|
||||
Args:
|
||||
text: Text der im Bild angezeigt wird
|
||||
size: Größe des Bildes (width, height)
|
||||
|
||||
Returns:
|
||||
WebP image als bytes
|
||||
"""
|
||||
print(f"📐 Generating test image ({size[0]}x{size[1]})...")
|
||||
|
||||
# Erstelle einfaches Bild mit Text
|
||||
img = Image.new('RGB', size, color='lightblue')
|
||||
|
||||
# Optional: Füge Text hinzu (benötigt PIL ImageDraw)
|
||||
try:
|
||||
from PIL import ImageDraw, ImageFont
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Versuche ein größeres Font zu laden
|
||||
try:
|
||||
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 40)
|
||||
except:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
# Text zentriert
|
||||
bbox = draw.textbbox((0, 0), text, font=font)
|
||||
text_width = bbox[2] - bbox[0]
|
||||
text_height = bbox[3] - bbox[1]
|
||||
x = (size[0] - text_width) // 2
|
||||
y = (size[1] - text_height) // 2
|
||||
|
||||
draw.text((x, y), text, fill='darkblue', font=font)
|
||||
except Exception as e:
|
||||
print(f"⚠️ Text rendering failed: {e}")
|
||||
|
||||
# Konvertiere zu WebP
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format='WEBP', quality=85)
|
||||
webp_bytes = buffer.getvalue()
|
||||
|
||||
print(f"✅ Test image generated: {len(webp_bytes)} bytes")
|
||||
return webp_bytes
|
||||
|
||||
|
||||
async def upload_preview_to_espocrm(
|
||||
document_id: str,
|
||||
preview_data: bytes,
|
||||
entity_type: str = 'CDokumente'
|
||||
) -> dict:
|
||||
"""
|
||||
Upload Preview zu EspoCRM Attachment API
|
||||
|
||||
Args:
|
||||
document_id: ID des CDokumente/Document Entity
|
||||
preview_data: WebP image als bytes
|
||||
entity_type: Entity-Type (CDokumente oder Document)
|
||||
|
||||
Returns:
|
||||
Response dict mit Attachment ID
|
||||
"""
|
||||
print(f"\n📤 Uploading preview to {entity_type}/{document_id}...")
|
||||
print(f" Preview size: {len(preview_data)} bytes")
|
||||
|
||||
# Base64-encode
|
||||
base64_data = base64.b64encode(preview_data).decode('ascii')
|
||||
file_data_uri = f"data:image/webp;base64,{base64_data}"
|
||||
|
||||
print(f" Base64 encoded: {len(base64_data)} chars")
|
||||
|
||||
# API Request
|
||||
url = ESPOCRM_API_BASE_URL.rstrip('/') + '/Attachment'
|
||||
headers = {
|
||||
'X-Api-Key': ESPOCRM_API_KEY,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
payload = {
|
||||
'name': 'preview.webp',
|
||||
'type': 'image/webp',
|
||||
'role': 'Attachment',
|
||||
'field': FIELD_NAME,
|
||||
'relatedType': entity_type,
|
||||
'relatedId': document_id,
|
||||
'file': file_data_uri
|
||||
}
|
||||
|
||||
print(f"\n🌐 POST {url}")
|
||||
print(f" Headers: X-Api-Key={ESPOCRM_API_KEY[:20]}...")
|
||||
print(f" Payload keys: {list(payload.keys())}")
|
||||
print(f" - name: {payload['name']}")
|
||||
print(f" - type: {payload['type']}")
|
||||
print(f" - role: {payload['role']}")
|
||||
print(f" - field: {payload['field']}")
|
||||
print(f" - relatedType: {payload['relatedType']}")
|
||||
print(f" - relatedId: {payload['relatedId']}")
|
||||
print(f" - file: data:image/webp;base64,... ({len(base64_data)} chars)")
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=30)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.post(url, headers=headers, json=payload) as response:
|
||||
print(f"\n📥 Response Status: {response.status}")
|
||||
print(f" Content-Type: {response.content_type}")
|
||||
|
||||
response_text = await response.text()
|
||||
|
||||
if response.status >= 400:
|
||||
print(f"\n❌ Upload FAILED!")
|
||||
print(f" Status: {response.status}")
|
||||
print(f" Response: {response_text}")
|
||||
raise Exception(f"Upload error {response.status}: {response_text}")
|
||||
|
||||
# Parse JSON response
|
||||
result = await response.json()
|
||||
attachment_id = result.get('id')
|
||||
|
||||
print(f"\n✅ Upload SUCCESSFUL!")
|
||||
print(f" Attachment ID: {attachment_id}")
|
||||
print(f" Full response: {result}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def update_entity_with_preview(
|
||||
document_id: str,
|
||||
attachment_id: str,
|
||||
entity_type: str = 'CDokumente'
|
||||
) -> dict:
|
||||
"""
|
||||
Update Entity mit previewId und previewName
|
||||
|
||||
Args:
|
||||
document_id: Entity ID
|
||||
attachment_id: Attachment ID vom Upload
|
||||
entity_type: Entity-Type
|
||||
|
||||
Returns:
|
||||
Updated entity data
|
||||
"""
|
||||
print(f"\n📝 Updating {entity_type}/{document_id} with previewId...")
|
||||
|
||||
url = f"{ESPOCRM_API_BASE_URL.rstrip('/')}/{entity_type}/{document_id}"
|
||||
headers = {
|
||||
'X-Api-Key': ESPOCRM_API_KEY,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
payload = {
|
||||
'previewId': attachment_id,
|
||||
'previewName': 'preview.webp'
|
||||
}
|
||||
|
||||
print(f" PUT {url}")
|
||||
print(f" Payload: {payload}")
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=30)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.put(url, headers=headers, json=payload) as response:
|
||||
print(f" Response Status: {response.status}")
|
||||
|
||||
if response.status >= 400:
|
||||
response_text = await response.text()
|
||||
print(f"\n❌ Update FAILED!")
|
||||
print(f" Status: {response.status}")
|
||||
print(f" Response: {response_text}")
|
||||
raise Exception(f"Update error {response.status}: {response_text}")
|
||||
|
||||
result = await response.json()
|
||||
print(f"\n✅ Entity updated successfully!")
|
||||
print(f" previewId: {result.get('previewId')}")
|
||||
print(f" previewName: {result.get('previewName')}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main test flow"""
|
||||
print("=" * 80)
|
||||
print("🖼️ ESPOCRM PREVIEW UPLOAD TEST")
|
||||
print("=" * 80)
|
||||
|
||||
# Check arguments
|
||||
if len(sys.argv) < 2:
|
||||
print("\n❌ Error: Document ID required!")
|
||||
print(f"\nUsage: {sys.argv[0]} <document_id>")
|
||||
print(f"Example: {sys.argv[0]} 69a68906ac3d0fd25")
|
||||
sys.exit(1)
|
||||
|
||||
document_id = sys.argv[1]
|
||||
|
||||
# Check API key
|
||||
if not ESPOCRM_API_KEY:
|
||||
print("\n❌ Error: ESPOCRM_API_KEY environment variable not set!")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\n📋 Test Parameters:")
|
||||
print(f" API Base URL: {ESPOCRM_API_BASE_URL}")
|
||||
print(f" API Key: {ESPOCRM_API_KEY[:20]}...")
|
||||
print(f" Entity Type: {ENTITY_TYPE}")
|
||||
print(f" Document ID: {document_id}")
|
||||
print(f" Field: {FIELD_NAME}")
|
||||
|
||||
try:
|
||||
# Step 1: Generate test image
|
||||
print("\n" + "=" * 80)
|
||||
print("STEP 1: Generate Test Image")
|
||||
print("=" * 80)
|
||||
preview_data = generate_test_webp(f"Preview Test\n{document_id[:8]}", size=(600, 800))
|
||||
|
||||
# Step 2: Upload to EspoCRM
|
||||
print("\n" + "=" * 80)
|
||||
print("STEP 2: Upload to EspoCRM Attachment API")
|
||||
print("=" * 80)
|
||||
result = await upload_preview_to_espocrm(document_id, preview_data, ENTITY_TYPE)
|
||||
attachment_id = result.get('id')
|
||||
|
||||
# Step 3: Update Entity
|
||||
print("\n" + "=" * 80)
|
||||
print("STEP 3: Update Entity with Preview Reference")
|
||||
print("=" * 80)
|
||||
await update_entity_with_preview(document_id, attachment_id, ENTITY_TYPE)
|
||||
|
||||
# Success summary
|
||||
print("\n" + "=" * 80)
|
||||
print("✅ TEST SUCCESSFUL!")
|
||||
print("=" * 80)
|
||||
print(f"\n📊 Summary:")
|
||||
print(f" - Attachment ID: {attachment_id}")
|
||||
print(f" - Entity: {ENTITY_TYPE}/{document_id}")
|
||||
print(f" - Preview Size: {len(preview_data)} bytes")
|
||||
print(f"\n🔗 View in EspoCRM:")
|
||||
print(f" {ESPOCRM_API_BASE_URL.replace('/api/v1', '')}/#CDokumente/view/{document_id}")
|
||||
|
||||
except Exception as e:
|
||||
print("\n" + "=" * 80)
|
||||
print("❌ TEST FAILED!")
|
||||
print("=" * 80)
|
||||
print(f"\nError: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
253
tests/test_thumbnail_generation.py
Normal file
253
tests/test_thumbnail_generation.py
Normal file
@@ -0,0 +1,253 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for Document Thumbnail Generation
|
||||
Tests the complete flow:
|
||||
1. Create a test document in EspoCRM
|
||||
2. Upload a file attachment
|
||||
3. Trigger the webhook (or wait for automatic trigger)
|
||||
4. Verify preview generation
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from pathlib import Path
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
|
||||
# Add bitbylaw to path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from services.espocrm import EspoCRMAPI
|
||||
|
||||
|
||||
async def create_test_image(width: int = 800, height: int = 600) -> bytes:
|
||||
"""Create a simple test PNG image"""
|
||||
img = Image.new('RGB', (width, height), color='lightblue')
|
||||
|
||||
# Add some text/pattern so it's not just a solid color
|
||||
from PIL import ImageDraw, ImageFont
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Draw some shapes
|
||||
draw.rectangle([50, 50, width-50, height-50], outline='darkblue', width=5)
|
||||
draw.ellipse([width//4, height//4, 3*width//4, 3*height//4], outline='red', width=3)
|
||||
|
||||
# Add text
|
||||
try:
|
||||
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 48)
|
||||
except:
|
||||
font = None
|
||||
|
||||
text = "TEST IMAGE\nFor Thumbnail\nGeneration"
|
||||
draw.text((width//2, height//2), text, fill='black', anchor='mm', font=font, align='center')
|
||||
|
||||
# Save to bytes
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format='PNG')
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
async def create_test_document(espocrm: EspoCRMAPI) -> str:
|
||||
"""Create a test document in EspoCRM"""
|
||||
print("\n📄 Creating test document in EspoCRM...")
|
||||
|
||||
document_data = {
|
||||
"name": f"Test Thumbnail Generation {asyncio.get_event_loop().time()}",
|
||||
"status": "Active",
|
||||
"dateiStatus": "Neu", # This should trigger preview generation
|
||||
"type": "Image",
|
||||
"description": "Automated test document for thumbnail generation"
|
||||
}
|
||||
|
||||
result = await espocrm.create_entity("Document", document_data)
|
||||
doc_id = result.get("id")
|
||||
|
||||
print(f"✅ Document created: {doc_id}")
|
||||
print(f" Name: {result.get('name')}")
|
||||
print(f" Datei-Status: {result.get('dateiStatus')}")
|
||||
|
||||
return doc_id
|
||||
|
||||
|
||||
async def upload_test_file(espocrm: EspoCRMAPI, doc_id: str) -> str:
|
||||
"""Upload a test image file to the document"""
|
||||
print(f"\n📤 Uploading test image to document {doc_id}...")
|
||||
|
||||
# Create test image
|
||||
image_data = await create_test_image(1200, 900)
|
||||
print(f" Generated test image: {len(image_data)} bytes")
|
||||
|
||||
# Upload to EspoCRM
|
||||
attachment = await espocrm.upload_attachment(
|
||||
file_content=image_data,
|
||||
filename="test_image.png",
|
||||
parent_type="Document",
|
||||
parent_id=doc_id,
|
||||
field="file",
|
||||
mime_type="image/png",
|
||||
role="Attachment"
|
||||
)
|
||||
|
||||
attachment_id = attachment.get("id")
|
||||
print(f"✅ File uploaded: {attachment_id}")
|
||||
print(f" Filename: {attachment.get('name')}")
|
||||
print(f" Size: {attachment.get('size')} bytes")
|
||||
|
||||
return attachment_id
|
||||
|
||||
|
||||
async def trigger_webhook(doc_id: str, action: str = "update"):
|
||||
"""Manually trigger the document webhook"""
|
||||
print(f"\n🔔 Triggering webhook for document {doc_id}...")
|
||||
|
||||
webhook_url = f"http://localhost:7777/vmh/webhook/document/{action}"
|
||||
payload = {
|
||||
"entityType": "Document",
|
||||
"entity": {
|
||||
"id": doc_id,
|
||||
"entityType": "Document"
|
||||
},
|
||||
"data": {
|
||||
"entity": {
|
||||
"id": doc_id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(webhook_url, json=payload) as response:
|
||||
status = response.status
|
||||
text = await response.text()
|
||||
|
||||
if status == 200:
|
||||
print(f"✅ Webhook triggered successfully")
|
||||
print(f" Response: {text}")
|
||||
else:
|
||||
print(f"❌ Webhook failed: {status}")
|
||||
print(f" Response: {text}")
|
||||
|
||||
return status == 200
|
||||
|
||||
|
||||
async def check_preview_generated(espocrm: EspoCRMAPI, doc_id: str, max_wait: int = 30):
|
||||
"""Check if preview was generated (poll for a few seconds)"""
|
||||
print(f"\n🔍 Checking for preview generation (max {max_wait}s)...")
|
||||
|
||||
for i in range(max_wait):
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Get document
|
||||
doc = await espocrm.get_entity("Document", doc_id)
|
||||
|
||||
# Check if preview field is populated
|
||||
preview_id = doc.get("previewId")
|
||||
if preview_id:
|
||||
print(f"\n✅ Preview generated!")
|
||||
print(f" Preview Attachment ID: {preview_id}")
|
||||
print(f" Preview Name: {doc.get('previewName')}")
|
||||
print(f" Preview Type: {doc.get('previewType')}")
|
||||
|
||||
# Try to download and check the preview
|
||||
try:
|
||||
preview_data = await espocrm.download_attachment(preview_id)
|
||||
print(f" Preview Size: {len(preview_data)} bytes")
|
||||
|
||||
# Verify it's a WebP image
|
||||
from PIL import Image
|
||||
img = Image.open(BytesIO(preview_data))
|
||||
print(f" Preview Format: {img.format}")
|
||||
print(f" Preview Dimensions: {img.width}x{img.height}")
|
||||
|
||||
if img.format == "WEBP":
|
||||
print(" ✅ Format is WebP as expected")
|
||||
if img.width <= 600 and img.height <= 800:
|
||||
print(" ✅ Dimensions within expected range")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ⚠️ Could not verify preview: {e}")
|
||||
|
||||
return True
|
||||
|
||||
if (i + 1) % 5 == 0:
|
||||
print(f" Still waiting... ({i + 1}s)")
|
||||
|
||||
print(f"\n❌ Preview not generated after {max_wait}s")
|
||||
return False
|
||||
|
||||
|
||||
async def cleanup_test_document(espocrm: EspoCRMAPI, doc_id: str):
|
||||
"""Delete the test document"""
|
||||
print(f"\n🗑️ Cleaning up test document {doc_id}...")
|
||||
try:
|
||||
await espocrm.delete_entity("Document", doc_id)
|
||||
print("✅ Test document deleted")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not delete test document: {e}")
|
||||
|
||||
|
||||
async def main():
|
||||
print("=" * 80)
|
||||
print("THUMBNAIL GENERATION TEST")
|
||||
print("=" * 80)
|
||||
|
||||
# Initialize EspoCRM API
|
||||
espocrm = EspoCRMAPI()
|
||||
|
||||
doc_id = None
|
||||
try:
|
||||
# Step 1: Create test document
|
||||
doc_id = await create_test_document(espocrm)
|
||||
|
||||
# Step 2: Upload test file
|
||||
attachment_id = await upload_test_file(espocrm, doc_id)
|
||||
|
||||
# Step 3: Update document to trigger webhook (set dateiStatus to trigger sync)
|
||||
print(f"\n🔄 Updating document to trigger webhook...")
|
||||
await espocrm.update_entity("Document", doc_id, {
|
||||
"dateiStatus": "Neu" # This should trigger the webhook
|
||||
})
|
||||
print("✅ Document updated")
|
||||
|
||||
# Step 4: Wait a bit for webhook to be processed
|
||||
print("\n⏳ Waiting 3 seconds for webhook processing...")
|
||||
await asyncio.sleep(3)
|
||||
|
||||
# Step 5: Check if preview was generated
|
||||
success = await check_preview_generated(espocrm, doc_id, max_wait=20)
|
||||
|
||||
# Summary
|
||||
print("\n" + "=" * 80)
|
||||
if success:
|
||||
print("✅ TEST PASSED - Preview generation successful!")
|
||||
else:
|
||||
print("❌ TEST FAILED - Preview was not generated")
|
||||
print("\nCheck logs with:")
|
||||
print(" sudo journalctl -u motia.service --since '2 minutes ago' | grep -E '(PREVIEW|Document)'")
|
||||
print("=" * 80)
|
||||
|
||||
# Ask if we should clean up
|
||||
print(f"\nTest document ID: {doc_id}")
|
||||
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
|
||||
if cleanup == 'y':
|
||||
await cleanup_test_document(espocrm, doc_id)
|
||||
else:
|
||||
print(f"ℹ️ Test document kept: {doc_id}")
|
||||
print(f" View in EspoCRM: https://crm.bitbylaw.com/#Document/view/{doc_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Test failed with error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
if doc_id:
|
||||
print(f"\nTest document ID: {doc_id}")
|
||||
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
|
||||
if cleanup == 'y':
|
||||
await cleanup_test_document(espocrm, doc_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user