Compare commits

..

2 Commits

Author SHA1 Message Date
bsiggel
bcb6454b2a Add comprehensive test scripts for thumbnail generation and xAI collections API
- Implemented `test_thumbnail_generation.py` to validate the complete flow of document thumbnail generation in EspoCRM, including document creation, file upload, webhook triggering, and preview verification.
- Created `test_xai_collections_api.py` to test critical operations of the xAI Collections API, covering file uploads, collection CRUD operations, document management, and response validation.
- Both scripts include detailed logging for success and error states, ensuring robust testing and easier debugging.
2026-03-03 17:03:08 +00:00
bsiggel
c45bfb7233 Enhance EspoCRM API and Webhook Handling
- Improved logging for file uploads in EspoCRMAPI to include upload parameters and error details.
- Updated cron job configurations for calendar sync and participant sync to trigger every 15 minutes on the first minute of the hour.
- Enhanced document create, delete, and update webhook handlers to determine and log the entity type.
- Refactored document sync event handler to include entity type in sync operations and logging.
- Added a new test script for uploading preview images to EspoCRM and verifying the upload process.
- Created a test script for document thumbnail generation, including document creation, file upload, webhook triggering, and preview verification.
2026-03-03 16:53:55 +00:00
19 changed files with 1209 additions and 1309 deletions

View File

@@ -1,320 +0,0 @@
# Vollständige Migrations-Analyse
## Motia v0.17 → Motia III v1.0-RC
**Datum:** 1. März 2026
**Status:** 🎉 **100% KOMPLETT - ALLE PHASEN ABGESCHLOSSEN!** 🎉
---
## ✅ MIGRIERT - Production-Ready
### 1. Steps (21 von 21 Steps - 100% Complete!)
#### Phase 1: Advoware Proxy (4 Steps)
- ✅ [`advoware_api_proxy_get_step.py`](steps/advoware_proxy/advoware_api_proxy_get_step.py) - GET Proxy
- ✅ [`advoware_api_proxy_post_step.py`](steps/advoware_proxy/advoware_api_proxy_post_step.py) - POST Proxy
- ✅ [`advoware_api_proxy_put_step.py`](steps/advoware_proxy/advoware_api_proxy_put_step.py) - PUT Proxy
- ✅ [`advoware_api_proxy_delete_step.py`](steps/advoware_proxy/advoware_api_proxy_delete_step.py) - DELETE Proxy
#### Phase 2: VMH Webhooks (6 Steps)
- ✅ [`beteiligte_create_api_step.py`](steps/vmh/webhook/beteiligte_create_api_step.py) - POST /vmh/webhook/beteiligte/create
- ✅ [`beteiligte_update_api_step.py`](steps/vmh/webhook/beteiligte_update_api_step.py) - POST /vmh/webhook/beteiligte/update
- ✅ [`beteiligte_delete_api_step.py`](steps/vmh/webhook/beteiligte_delete_api_step.py) - POST /vmh/webhook/beteiligte/delete
- ✅ [`bankverbindungen_create_api_step.py`](steps/vmh/webhook/bankverbindungen_create_api_step.py) - POST /vmh/webhook/bankverbindungen/create
- ✅ [`bankverbindungen_update_api_step.py`](steps/vmh/webhook/bankverbindungen_update_api_step.py) - POST /vmh/webhook/bankverbindungen/update
- ✅ [`bankverbindungen_delete_api_step.py`](steps/vmh/webhook/bankverbindungen_delete_api_step.py) - POST /vmh/webhook/bankverbindungen/delete
#### Phase 3: VMH Sync Handlers (3 Steps)
- ✅ [`beteiligte_sync_event_step.py`](steps/vmh/beteiligte_sync_event_step.py) - Subscriber für Queue-Events (mit Kommunikation-Integration!)
- ✅ [`bankverbindungen_sync_event_step.py`](steps/vmh/bankverbindungen_sync_event_step.py) - Subscriber für Queue-Events
- ✅ [`beteiligte_sync_cron_step.py`](steps/vmh/beteiligte_sync_cron_step.py) - Cron-Job alle 15 Min.
---
### 2. Services (11 Module, 100% komplett)
#### Core APIs
- ✅ [`advoware.py`](services/advoware.py) (310 Zeilen) - Advoware API Client mit Token-Auth
- ✅ [`advoware_service.py`](services/advoware_service.py) (179 Zeilen) - High-Level Advoware Service
- ✅ [`espocrm.py`](services/espocrm.py) (293 Zeilen) - EspoCRM API Client
#### Mapper & Sync Utils
- ✅ [`espocrm_mapper.py`](services/espocrm_mapper.py) (663 Zeilen) - Beteiligte Mapping
- ✅ [`bankverbindungen_mapper.py`](services/bankverbindungen_mapper.py) (141 Zeilen) - Bankverbindungen Mapping
- ✅ [`beteiligte_sync_utils.py`](services/beteiligte_sync_utils.py) (663 Zeilen) - Distributed Locking, Retry Logic
- ✅ [`notification_utils.py`](services/notification_utils.py) (200 Zeilen) - In-App Notifications
#### Phase 4: Kommunikation Sync
- ✅ [`kommunikation_mapper.py`](services/kommunikation_mapper.py) (334 Zeilen) - Email/Phone Mapping mit Base64 Marker
- ✅ [`kommunikation_sync_utils.py`](services/kommunikation_sync_utils.py) (999 Zeilen) - Bidirektionaler Sync mit 3-Way Diffing
#### Phase 5: Adressen Sync (2 Module - Phase 5)
- ✅ [`adressen_mapper.py`](services/adressen_mapper.py) (267 Zeilen) - Adressen Mapping
- ✅ [`adressen_sync.py`](services/adressen_sync.py) (697 Zeilen) - Adressen Sync mit READ-ONLY Detection
#### Phase 6: Google Calendar Sync (4 Steps + Utils)
- ✅ [`calendar_sync_cron_step.py`](steps/advoware_cal_sync/calendar_sync_cron_step.py) - Cron-Trigger alle 15 Min.
- ✅ [`calendar_sync_all_step.py`](steps/advoware_cal_sync/calendar_sync_all_step.py) - Bulk-Sync mit Redis-Priorisierung
- ✅ [`calendar_sync_event_step.py`](steps/advoware_cal_sync/calendar_sync_event_step.py) - **1053 Zeilen!** Main Sync Handler
- ✅ [`calendar_sync_a9 Topics - 100% Complete!)
#### VMH Beteiligte
-`vmh.beteiligte.create` - Webhook → Sync Handler
-`vmh.beteiligte.update` - Webhook → Sync Handler
-`vmh.beteiligte.delete` - Webhook → Sync Handler
-`vmh.beteiligte.sync_check` - Cron → Sync Handler
#### VMH Bankverbindungen
-`vmh.bankverbindungen.create` - Webhook → Sync Handler
-`vmh.bankverbindungen.update` - Webhook → Sync Handler
-`vmh.bankverbindungen.delete` - Webhook → Sync Handler
#### Calendar Sync
-`calendar_sync_all` - Cron/API → All Step → Employee Events
-`calendar_sync_employee` - All/API → Event Step (Main Sync Logic)
---
### 4. HTTP Endpoints (14 Endpoints - 100% Complete!
-`vmh.bankverbindungen.create` - Webhook → Sync Handler
-`vmh.bankverbindungen.update` - Webhook → Sync Handler
-`vmh.bankverbindungen.delete` - Webhook → Sync Handler
---
### 4. HTTP Endpoints (13 Endpoints, 100% komplett)
#### Advoware Proxy (4 Endpoints)
-`GET /advoware/proxy?path=...` - Advoware API Proxy
-`POST /advoware/proxy?path=...` - Advoware API Proxy
-`PUT /advoware/proxy?path=...` - Advoware API Proxy
-`DELETE /advoware/proxy?path=...` - Advoware API Proxy
#### VMH Webhooks - Beteiligte (3 Endpoints)
-`POST /vmh/webhook/beteiligte/create` - EspoCRM Webhook Handler
-`POST /vmh/webhook/beteiligte/update` - EspoCRM Webhook Handler
-`POST /vmh/webhook/beteiligte/delete` - EspoCRM Webhook Handler
#### VMH Webhooks - Bankverbindungen (3 Endpoints)
- ✅ `Calendar Sync (1 Endpoint)
-`POST /advoware/calendar/sync` - Manual Calendar Sync Trigger (kuerzel or "ALL")
#### POST /vmh/webhook/bankverbindungen/create` - EspoCRM Webhook Handler
-`POST /vmh/webhook/bankverbindungen/update` - EspoCRM Webhook Handler
-`POST /vmh/webhook/bankverbindungen/delete` - EspoCRM Webhook Handler
#### Example Ticketing (6 Endpoints - Demo)
-`POST /tickets` - Create Ticket
-`GET /tickets` - List Tickets
-`POST /tickets/{id}/triage` - Triage
-`POST /tickets/{id}/escalate` - Escalate
-`POST /tickets/{id}/notify` - Notify Customer
- ✅ Cron: SLA Monitor
2 Jobs - 100% Complete!)
-**VMH Beteiligte Sync Cron** (alle 15 Min.)
- Findet Entities mit Status: `pending_sync`, `dirty`, `failed`
- Auto-Reset für `permanently_failed` nach 24h
- Findet `clean` Entities > 24h nicht gesynct
- Emittiert `vmh.beteiligte.sync_check` Events
-**Calendar Sync Cron** (alle 15 Min.)
- Emittiert `calendar_sync_all` Events
- Triggered Bulk-Sync für alle oder priorisierte Mitarbeiter
- Redis-basierte Priorisierung (älteste zuerst)
---
### 6. Dependencies (pyproject.toml - 100% Complete!
---
### 6. Dependencies (pyproject.toml aktualisiert)
```toml
dependencies = [
"asyncpg>=0.29.0", # ✅ NEU für Calendar Sync (PostgreSQL)
"google-api-python-client>=2.100.0", # ✅ NEU für Calendar Sync
"google-auth>=2.23.0", # ✅ NEU für Calendar Sync
"backoff>=2.2.1", # ✅ NEU für Calendar Sync (Retry Logic)
]
```
---
## ❌ NICHT MIGRIERT → ALLE MIGRIERT! 🎉
~~### Phase 6: Google Calendar Sync (4 Steps)~~
**Status:****VOLLSTÄNDIG MIGRIERT!** (1. März 2026)
-`calendar_sync_cron_step.py` - Cron-Trigger (alle 15 Min.)
-`calendar_sync_all_step.py` - Bulk-Sync Handler
-`calendar_sync_event_step.py` - Queue-Event Handler (**1053 Zeilen!**)
-`calendar_sync_api_step.py` - HTTP API für manuellen Trigger
-`calendar_sync_utils.py` - Hilfs-Funktionen
**Dependencies (ALLE installiert):**
-`google-api-python-client` - Google Calendar API
-`google-auth` - Google OAuth2
-`asyncpg` - PostgreSQL Connection
-`backoff` - Retry/Backoff Logic
**Migration abgeschlossen in:** ~4 Stunden (statt geschätzt 3-5 Tage
**Dependencies (nicht benötigt):**
-`google-api-python-client` - Google Calendar API
-`google-auth` - Google OAuth2
- ❌ PostgreSQL Connection - Für Termine-Datenbank
**Geschätzte Migration:** 3-5 Tage (komplex wegen Google API + PostgreSQL)
**Priorität:** MEDIUM (funktioniert aktuell im old-motia)
---
### Root-Level Steps (Test/Specialized Logic)
**Status:** Bewusst NICHT migriert (nicht Teil der Core-Funktionalität)
-`/opt/motia-iii/old-motia/steps/crm-bbl-vmh-reset-nextcall_step.py` (96 Zeilen)
- **Zweck:** CVmhErstgespraech Status-Check Cron-Job
- **Grund:** Spezialisierte Business-Logik, nicht Teil der Core-Sync-Infrastruktur
- **Status:** Kann bei Bedarf später migriert werden
-`/opt/motia-iii/old-motia/steps/event_step.py` (Test/Demo)
-`/opt/motia-iii/old-motia/steps/hello_step.py` (Test/Demo)
---
## 📊 Migrations-Statistik
| Kategorie | Migriert | 21 | 0 | 21 | **100%** ✅ |
| **Service Module** | 11 | 0 | 11 | **100%** ✅ |
| **Queue Events** | 9 | 0 | 9 | **100%** ✅ |
| **HTTP Endpoints** | 14 | 0 | 14 | **100%** ✅ |
| **Cron Jobs** | 2 | 0 | 2 | **100%** ✅ |
| **Code (Zeilen)** | ~9.000 | 0 | ~9.000 | **100%** ✅ |
---
## 🎯 Funktionalitäts-Matrix
| Feature | Old-Motia | Motia III | Status |
|---------|-----------|-----------|--------|
| **Advoware Proxy API** | ✅ | ✅ | ✅ KOMPLETT |
| **VMH Beteiligte Sync** | ✅ | ✅ | ✅ KOMPLETT |
| **VMH Bankverbindungen Sync** | ✅ | ✅ | ✅ KOMPLETT |
| **Kommunikation Sync (Email/Phone)** | ✅ | ✅ | ✅ KOMPLETT |
| **Adressen Sync** | ✅ | ✅ | ✅ KOMPLETT |
| **EspoCRM Webhooks** | ✅ | ✅ | ✅ KOMPLETT |
| **Distributed Locking** | ✅ | ✅ | ✅ KOMPLETT |
| **Retry Logic & Backoff** | ✅ | ✅ | ✅ KOMPLETT |
| **Notifications** | ✅ | ✅ | ✅ KOMPLETT |
| **Sync Validation** | ✅ | ✅ | ✅ KOMPLETT |
| **Cron-basierter Auto-Retry** | ✅ | ✅ | ✅ KOMPLETT |
| **Google Calendar Sync** | ✅ | ✅ | ✅ **KOMPLETT** |
---
## 🏆 Migration erfolgreich abgeschlossen!
**Alle 21 Production Steps, 11 Service Module, 9 Queue Events, 14 HTTP Endpoints und 2 Cron Jobs wurden erfolgreich migriert!**
| **Cron-basierter Auto-Retry** | ✅ | ✅ | ✅ KOMPLETT |
| **Google Calendar Sync** | ✅ | ❌ | ⏳ PHASE 6 |
| **CVmhErstgespraech Logic** | ✅ | ❌ | ⏳ Optional |
---
## 🔄 Sync-Architektur Übersicht
```
┌─────────────────┐
│ EspoCRM API │
└────────┬────────┘
│ Webhooks
┌─────────────────────────────────────┐
│ VMH Webhook Steps (6 Endpoints) │
│ • Batch & Single Entity Support │
│ • Deduplication │
└────────┬────────────────────────────┘
│ Emits Queue Events
┌─────────────────────────────────────┐
│ Queue System (Redis/Builtin) │
│ • vmh.beteiligte.* │
│ • vmh.bankverbindungen.* │
└────────┬────────────────────────────┘
┌─────────────────────────────────────┐
│ Sync Event Handlers (3 Steps) │
│ • Distributed Locking (Redis) │
│ • Retry Logic & Backoff │
│ • Conflict Resolution │
└────────┬────────────────────────────┘
├──► Stammdaten Sync
│ (espocrm_mapper.py)
├──► Kommunikation Sync ✅ NEW!
│ (kommunikation_sync_utils.py)
│ • 3-Way Diffing
│ • Bidirectional
│ • Slot-Management
└──► Adressen Sync ✅ NEW!
(adressen_sync.py)
• CREATE/UPDATE/DELETE
• READ-ONLY Detection
┌─────────────────────────────────────┐
│ Advoware API (advoware.py) │
│ • Token-based Auth │
│ • HMAC Signing │
└─────────────────────────────────────┘
┌──────────────────┐
│ Cron Job (15min)│
└────────┬─────────┘
▼ Emits sync_check Events
┌─────────────────────────┐
│ Auto-Retry & Cleanup │
│ • pending_sync │
│ • dirty │
│ • failed → retry │
│ • permanently_failed │
│ → auto-reset (24h) │
└─────────────────────────┘
```
---
## ✅ FAZIT
**Die gesamte Core-Funktionalität (außer Google Calendar) wurde erfolgreich migriert!**
### Production-Ready Features:
1. ✅ Vollständige Advoware ↔ EspoCRM Synchronisation
2. ✅ Bidirektionale Kommunikationsdaten (Email/Phone)
3. ✅ Bidirektionale Adressen
4. ✅ Webhook-basierte Event-Verarbeitung
5. ✅ Automatisches Retry-System
6. ✅ Distributed Locking
7. ✅ Konflikt-Erkennung & Resolution
### Code-Qualität:
- ✅ Keine Compile-Errors
- ✅ Motia III API korrekt verwendet
- ✅ Alle Dependencies vorhanden
- ✅ Type-Hints (Pydantic Models)
- ✅ Error-Handling & Logging
### Deployment:
- ✅ Alle Steps registriert
- ✅ Queue-System konfiguriert
- ✅ Cron-Jobs aktiv
- ✅ Redis-Integration
**Das System ist bereit für Production! 🚀**

View File

@@ -1,276 +0,0 @@
# Motia Migration Status
**🎉 MIGRATION 100% KOMPLETT**
> 📋 Detaillierte Analyse: [MIGRATION_COMPLETE_ANALYSIS.md](MIGRATION_COMPLETE_ANALYSIS.md)
## Quick Stats
-**21 von 21 Steps** migriert (100%)
-**11 von 11 Service-Module** migriert (100%)
-**~9.000 Zeilen Code** migriert (100%)
-**14 HTTP Endpoints** aktiv
-**9 Queue Events** konfiguriert
-**2 Cron Jobs** (VMH: alle 15 Min., Calendar: alle 15 Min.)
---
## Overview
Migrating from **old-motia v0.17** (Node.js + Python hybrid) to **Motia III v1.0-RC** (pure Python).
## Old System Analysis
### Location
- Old system: `/opt/motia-iii/old-motia/`
- Old project dir: `/opt/motia-iii/old-motia/bitbylaw/`
### Steps Found in Old System
#### Root Steps (`/opt/motia-iii/old-motia/steps/`)
1. `crm-bbl-vmh-reset-nextcall_step.py`
2. `event_step.py`
3. `hello_step.py`
#### BitByLaw Steps (`/opt/motia-iii/old-motia/bitbylaw/steps/`)
**Advoware Calendar Sync** (`advoware_cal_sync/`):
- `calendar_sync_all_step.py`
- `calendar_sync_api_step.py`
- `calendar_sync_cron_step.py`
- `calendar_sync_event_step.py`
- `audit_calendar_sync.py`
- `calendar_sync_utils.py` (utility module)
**Advoware Proxy** (`advoware_proxy/`):
- `advoware_api_proxy_get_step.py`
- `advoware_api_proxy_post_step.py`
- `advoware_api_proxy_put_step.py`
- `advoware_api_proxy_delete_step.py`
**VMH Integration** (`vmh/`):
- `beteiligte_sync_cron_step.py`
- `beteiligte_sync_event_step.py`
- `bankverbindungen_sync_event_step.py`
- `webhook/bankverbindungen_create_api_step.py`
- `webhook/bankverbindungen_update_api_step.py`
- `webhook/bankverbindungen_delete_api_step.py`
- `webhook/beteiligte_create_api_step.py`
- `webhook/beteiligte_update_api_step.py`
- `webhook/beteiligte_delete_api_step.py`
### Supporting Services/Modules
From `/opt/motia-iii/old-motia/bitbylaw/`:
- `services/advoware.py` - Advoware API wrapper
- `config.py` - Configuration module
- Dependencies: PostgreSQL, Redis, Google Calendar API
## Migration Changes Required
### Key Structural Changes
#### 1. Config Format
```python
# OLD
config = {
"type": "api", # or "event", "cron"
"name": "StepName",
"path": "/endpoint",
"method": "GET",
"cron": "0 5 * * *",
"subscribes": ["topic"],
"emits": ["other-topic"]
}
# NEW
from motia import http, queue, cron
config = {
"name": "StepName",
"flows": ["flow-name"],
"triggers": [
http("GET", "/endpoint")
# or queue("topic", input=schema)
# or cron("0 0 5 * * *") # 6-field!
],
"enqueues": ["other-topic"]
}
```
#### 2. Handler Signature
```python
# OLD - API
async def handler(req, context):
body = req.get('body', {})
await context.emit({"topic": "x", "data": {...}})
return {"status": 200, "body": {...}}
# NEW - API
from motia import ApiRequest, ApiResponse, FlowContext
async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse:
body = request.body
await ctx.enqueue({"topic": "x", "data": {...}})
return ApiResponse(status=200, body={...})
# OLD - Event/Queue
async def handler(data, context):
context.logger.info(data['field'])
# NEW - Queue
async def handler(input_data: dict, ctx: FlowContext):
ctx.logger.info(input_data['field'])
# OLD - Cron
async def handler(context):
context.logger.info("Running")
# NEW - Cron
async def handler(input_data: dict, ctx: FlowContext):
ctx.logger.info("Running")
```
#### 3. Method Changes
- `context.emit()``ctx.enqueue()`
- `req.get('body')``request.body`
- `req.get('queryParams')``request.query_params`
- `req.get('pathParams')``request.path_params`
- `req.get('headers')``request.headers`
- Return dict → `ApiResponse` object
#### 4. Cron Format
- OLD: 5-field `"0 5 * * *"` (minute hour day month weekday)
- NEW: 6-field `"0 0 5 * * *"` (second minute hour day month weekday)
## Migration Strategy
### Phase 1: Simple Steps (Priority)
Start with simple API proxy steps as they're straightforward:
1. ✅ Example ticketing steps (already in new system)
2. ⏳ Advoware proxy steps (GET, POST, PUT, DELETE)
3. ⏳ Simple webhook handlers
### Phase 2: Complex Integration Steps
Steps with external dependencies:
4. ⏳ VMH sync steps (beteiligte, bankverbindungen)
5. ⏳ Calendar sync steps (most complex - Google Calendar + Redis + PostgreSQL)
### Phase 3: Supporting Infrastructure
- Migrate `services/` modules (advoware.py wrapper)
- Migrate `config.py` to use environment variables properly
- Update dependencies in `pyproject.toml`
### Dependencies to Review
From old `requirements.txt` and code analysis:
- `asyncpg` - PostgreSQL async driver
- `redis` - Redis client
- `google-api-python-client` - Google Calendar API
- `google-auth` - Google OAuth2
- `backoff` - Retry/backoff decorator
- `pytz` - Timezone handling
- `pydantic` - Already in new system
- `requests` / `aiohttp` - HTTP clients for Advoware API
## Migration Roadmap
### ✅ COMPLETED
| Phase | Module | Lines | Status |
|-------|--------|-------|--------|
| **1** | Advoware Proxy (GET, POST, PUT, DELETE) | ~400 | ✅ Complete |
| **1** | `advoware.py`, `advoware_service.py` | ~800 | ✅ Complete |
| **2** | VMH Webhook Steps (6 endpoints) | ~900 | ✅ Complete |
| **2** | `espocrm.py`, `espocrm_mapper.py` | ~900 | ✅ Complete |
| **2** | `bankverbindungen_mapper.py`, `beteiligte_sync_utils.py`, `notification_utils.py` | ~1200 | ✅ Complete |
| **3** | VMH Sync Event Steps (2 handlers + 1 cron) | ~1000 | ✅ Complete |
| **4** | Kommunikation Sync (`kommunikation_mapper.py`, `kommunikation_sync_utils.py`) | ~1333 | ✅ Complete |
| **5** | Adressen Sync (`adressen_mapper.py`, `adressen_sync.py`) | ~964 | ✅ Complete |
| **6** | **Google Calendar Sync** (`calendar_sync_*.py`, `calendar_sync_utils.py`) | ~1500 | ✅ **Complete** |
**Total migrated: ~9.000 lines of production code**
### ✅ Phase 6 COMPLETED: Google Calendar Sync
**Advoware Calendar Sync** - Google Calendar ↔ Advoware Sync:
-`calendar_sync_cron_step.py` - Cron-Trigger (alle 15 Min.)
-`calendar_sync_all_step.py` - Bulk-Sync Handler mit Redis-basierter Priorisierung
-`calendar_sync_event_step.py` - Queue-Event Handler (**1053 Zeilen komplexe Sync-Logik!**)
-`calendar_sync_api_step.py` - HTTP API für manuellen Trigger
-`calendar_sync_utils.py` - Hilfs-Funktionen (DB, Google Service, Redis, Logging)
**Dependencies:**
-`google-api-python-client` - Google Calendar API
-`google-auth` - Google OAuth2
-`asyncpg` - PostgreSQL async driver
-`backoff` - Retry/backoff decorator
**Features:**
- ✅ Bidirektionale Synchronisation (Google ↔ Advoware)
- ✅ 4-Phase Sync-Algorithmus (New Adv→Google, New Google→Adv, Deletes, Updates)
- ✅ PostgreSQL als Sync-State Hub (calendar_sync Tabelle)
- ✅ Redis-basiertes Rate Limiting (Token Bucket für Google API)
- ✅ Distributed Locking per Employee
- ✅ Automatische Calendar-Creation mit ACL
- ✅ Recurring Events Support (RRULE)
- ✅ Timezone-Handling (Europe/Berlin)
- ✅ Backoff-Retry für API-Fehler
- ✅ Write-Protection für Advoware
- ✅ Source-System-Wins & Last-Change-Wins Strategien
### ⏳ REMAINING
**Keine! Die Migration ist zu 100% abgeschlossen.**
### Completed
- ✅ Analysis of old system structure
- ✅ MIGRATION_GUIDE.md reviewed
- ✅ Migration patterns documented
- ✅ New system has example ticketing steps
-**Phase 1: Advoware Proxy Steps migrated** (GET, POST, PUT, DELETE)
-**Advoware API service module migrated** (services/advoware.py)
-**Phase 2: VMH Integration - Webhook Steps migrated** (6 endpoints)
-**EspoCRM API service module migrated** (services/espocrm.py)
- ✅ All endpoints registered and running:
- **Advoware Proxy:**
- `GET /advoware/proxy6 Complete ✅
**🎉 ALLE PHASEN ABGESCHLOSSEN! 100% MIGRATION ERFOLGREICH!**
**Phase 6** - Google Calendar Sync:
-`calendar_sync_cron_step.py` (Cron-Trigger alle 15 Min.)
-`calendar_sync_all_step.py` (Bulk-Handler mit Redis-Priorisierung)
-`calendar_sync_event_step.py` (1053 Zeilen - 4-Phase Sync-Algorithmus)
-`calendar_sync_api_step.py` (HTTP API für manuelle Triggers)
-`calendar_sync_utils.py` (DB, Google Service, Redis Client)
**Sync-Architektur komplett:**
1. **Advoware Proxy** (Phase 1) → HTTP API für Advoware-Zugriff
2. **Webhooks** (Phase 2) → Emittieren Queue-Events
3. **Event Handler** (Phase 3) → Verarbeiten Events mit Stammdaten-Sync
4. **Kommunikation Sync** (Phase 4) → Bidirektionale Email/Phone-Synchronisation
5. **Adressen Sync** (Phase 5) → Bidirektionale Adressen-Synchronisation
6. **Calendar Sync** (Phase 6) → Google Calendar ↔ Advoware Bidirektional
7. **Cron Jobs** (Phase 3 & 6) → Regelmäßige Sync-Checks & Auto-Retries
Die vollständige Synchronisations- und Integrations-Pipeline ist nun zu 100%
**Phase 5** - Adressen Sync:
-`adressen_mapper.py` (267 Zeilen - CAdressen ↔ Advoware Adressen)
-`adressen_sync.py` (697 Zeilen - CREATE/UPDATE mit READ-ONLY Detection)
### Sync-Architektur komplett:
1. **Webhooks** (Phase 2) → Emittieren Queue-Events
2. **Event Handler** (Phase 3) → Verarbeiten Events mit Stammdaten-Sync
3. **Kommunikation Sync** (Phase 4) → Bidirektionale Email/Phone-Synchronisation
4. **Adressen Sync** (Phase 5) → Bidirektionale Adressen-Synchronisation
5. **Cron Job** (Phase 3) → Regelmäßige Sync-Checks & Auto-Retries
Die vollständige Synchronisations-Pipeline ist nun einsatzbereit!
## Notes
- Old system was Node.js + Python hybrid (Python steps as child processes)
- New system is pure Python (standalone SDK)
- No need for Node.js/npm anymore
- iii engine handles all infrastructure (queues, state, HTTP, cron)
- Console replaced Workbench

View File

@@ -11,14 +11,11 @@ Hilfsfunktionen für Document-Synchronisation mit xAI:
from typing import Dict, Any, Optional, List, Tuple from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import redis
import os from services.sync_utils_base import BaseSyncUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Lock TTL in seconds (prevents deadlocks)
LOCK_TTL_SECONDS = 900 # 15 minutes
# Max retry before permanent failure # Max retry before permanent failure
MAX_SYNC_RETRIES = 5 MAX_SYNC_RETRIES = 5
@@ -26,82 +23,49 @@ MAX_SYNC_RETRIES = 5
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
class DocumentSync: class DocumentSync(BaseSyncUtils):
"""Utility-Klasse für Document-Synchronisation mit xAI""" """Utility-Klasse für Document-Synchronisation mit xAI"""
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None): def _get_lock_key(self, entity_id: str) -> str:
self.espocrm = espocrm_api """Redis Lock-Key für Documents"""
self.context = context return f"sync_lock:document:{entity_id}"
self.logger = context.logger if context else logger
self.redis = redis_client or self._init_redis()
def _init_redis(self) -> redis.Redis: async def acquire_sync_lock(self, entity_id: str, entity_type: str = 'CDokumente') -> bool:
"""Initialize Redis client for distributed locking"""
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
client.ping()
return client
except Exception as e:
self._log(f"Redis connection failed: {e}", level='error')
return None
def _log(self, message: str, level: str = 'info'):
"""Logging mit Context-Support"""
if self.context and hasattr(self.context, 'logger'):
getattr(self.context.logger, level)(message)
else:
getattr(logger, level)(message)
async def acquire_sync_lock(self, entity_id: str) -> bool:
""" """
Atomic distributed lock via Redis + syncStatus update Atomic distributed lock via Redis + syncStatus update
Args: Args:
entity_id: EspoCRM Document ID entity_id: EspoCRM Document ID
entity_type: Entity-Type (CDokumente oder Document)
Returns: Returns:
True wenn Lock erfolgreich, False wenn bereits im Sync True wenn Lock erfolgreich, False wenn bereits im Sync
""" """
try: try:
# STEP 1: Atomic Redis lock (prevents race conditions) # STEP 1: Atomic Redis lock (prevents race conditions)
if self.redis: lock_key = self._get_lock_key(entity_id)
lock_key = f"sync_lock:document:{entity_id}" if not self._acquire_redis_lock(lock_key):
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS) self._log(f"Redis lock bereits aktiv für {entity_type} {entity_id}", level='warn')
return False
if not acquired: # STEP 2: Update syncStatus (für UI visibility) - nur bei Document Entity
self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn') # CDokumente hat dieses Feld nicht - überspringen
return False if entity_type == 'Document':
try:
await self.espocrm.update_entity(entity_type, entity_id, {
'xaiSyncStatus': 'syncing'
})
except Exception as e:
self._log(f"Konnte xaiSyncStatus nicht setzen: {e}", level='debug')
# STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert self._log(f"Sync-Lock für {entity_type} {entity_id} erworben")
# NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden
try:
await self.espocrm.update_entity('Document', entity_id, {
'xaiSyncStatus': 'syncing'
})
except Exception as e:
self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug')
self._log(f"Sync-Lock für Document {entity_id} erworben")
return True return True
except Exception as e: except Exception as e:
self._log(f"Fehler beim Acquire Lock: {e}", level='error') self._log(f"Fehler beim Acquire Lock: {e}", level='error')
# Clean up Redis lock on error # Clean up Redis lock on error
if self.redis: lock_key = self._get_lock_key(entity_id)
try: self._release_redis_lock(lock_key)
lock_key = f"sync_lock:document:{entity_id}"
self.redis.delete(lock_key)
except:
pass
return False return False
async def release_sync_lock( async def release_sync_lock(
@@ -109,7 +73,8 @@ class DocumentSync:
entity_id: str, entity_id: str,
success: bool = True, success: bool = True,
error_message: Optional[str] = None, error_message: Optional[str] = None,
extra_fields: Optional[Dict[str, Any]] = None extra_fields: Optional[Dict[str, Any]] = None,
entity_type: str = 'CDokumente'
) -> None: ) -> None:
""" """
Gibt Sync-Lock frei und setzt finalen Status Gibt Sync-Lock frei und setzt finalen Status
@@ -119,44 +84,41 @@ class DocumentSync:
success: Ob Sync erfolgreich war success: Ob Sync erfolgreich war
error_message: Optional: Fehlermeldung error_message: Optional: Fehlermeldung
extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections) extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections)
entity_type: Entity-Type (CDokumente oder Document)
""" """
try: try:
update_data = {} update_data = {}
# Status-Feld (falls vorhanden) # Status-Felder nur bei Document Entity (CDokumente hat diese Felder nicht)
try: if entity_type == 'Document':
update_data['xaiSyncStatus'] = 'synced' if success else 'failed' try:
update_data['xaiSyncStatus'] = 'synced' if success else 'failed'
if error_message: if error_message:
update_data['xaiSyncError'] = error_message[:2000] update_data['xaiSyncError'] = error_message[:2000]
else: else:
update_data['xaiSyncError'] = None update_data['xaiSyncError'] = None
except: except:
pass # Felder existieren evtl. nicht pass # Felder existieren evtl. nicht
# Merge extra fields (z.B. xaiFileId, xaiCollections) # Merge extra fields (z.B. xaiFileId, xaiCollections)
if extra_fields: if extra_fields:
update_data.update(extra_fields) update_data.update(extra_fields)
if update_data: if update_data:
await self.espocrm.update_entity('Document', entity_id, update_data) await self.espocrm.update_entity(entity_type, entity_id, update_data)
self._log(f"Sync-Lock released: Document {entity_id}{'success' if success else 'failed'}") self._log(f"Sync-Lock released: {entity_type} {entity_id}{'success' if success else 'failed'}")
# Release Redis lock # Release Redis lock
if self.redis: lock_key = self._get_lock_key(entity_id)
lock_key = f"sync_lock:document:{entity_id}" self._release_redis_lock(lock_key)
self.redis.delete(lock_key)
except Exception as e: except Exception as e:
self._log(f"Fehler beim Release Lock: {e}", level='error') self._log(f"Fehler beim Release Lock: {e}", level='error')
# Ensure Redis lock is released even on error # Ensure Redis lock is released even on error
if self.redis: lock_key = self._get_lock_key(entity_id)
try: self._release_redis_lock(lock_key)
lock_key = f"sync_lock:document:{entity_id}"
self.redis.delete(lock_key)
except:
pass
async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]: async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]:
""" """
@@ -322,12 +284,17 @@ class DocumentSync:
return result return result
async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]: async def get_document_download_info(self, document_id: str, entity_type: str = 'CDokumente') -> Optional[Dict[str, Any]]:
""" """
Holt Download-Informationen für ein Document Holt Download-Informationen für ein Document
Args:
document_id: ID des Documents
entity_type: Entity-Type (CDokumente oder Document)
Returns: Returns:
Dict mit: Dict mit:
- attachment_id: ID des Attachments
- download_url: URL zum Download - download_url: URL zum Download
- filename: Dateiname - filename: Dateiname
- mime_type: MIME-Type - mime_type: MIME-Type
@@ -335,25 +302,49 @@ class DocumentSync:
""" """
try: try:
# Hole vollständiges Document # Hole vollständiges Document
doc = await self.espocrm.get_entity('Document', document_id) doc = await self.espocrm.get_entity(entity_type, document_id)
# EspoCRM Document hat Attachments (Attachment ID in attachmentsIds) # EspoCRM Documents können Files auf verschiedene Arten speichern:
attachment_ids = doc.get('attachmentsIds') or [] # CDokumente: dokumentId/dokumentName (Custom Entity)
# Document: fileId/fileName ODER attachmentsIds
if not attachment_ids: attachment_id = None
self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn') filename = None
# Prüfe zuerst dokumentId (CDokumente Custom Entity)
if doc.get('dokumentId'):
attachment_id = doc.get('dokumentId')
filename = doc.get('dokumentName')
self._log(f"📎 CDokumente verwendet dokumentId: {attachment_id}")
# Fallback: fileId (Standard Document Entity)
elif doc.get('fileId'):
attachment_id = doc.get('fileId')
filename = doc.get('fileName')
self._log(f"📎 Document verwendet fileId: {attachment_id}")
# Fallback 2: attachmentsIds (z.B. bei zusätzlichen Attachments)
elif doc.get('attachmentsIds'):
attachment_ids = doc.get('attachmentsIds')
if attachment_ids:
attachment_id = attachment_ids[0]
self._log(f"📎 Document verwendet attachmentsIds: {attachment_id}")
if not attachment_id:
self._log(f"⚠️ {entity_type} {document_id} hat weder dokumentId, fileId noch attachmentsIds", level='warn')
self._log(f" Verfügbare Felder: {list(doc.keys())}")
return None return None
# Nehme erstes Attachment (Documents haben normalerweise nur 1 File)
attachment_id = attachment_ids[0]
# Hole Attachment-Details # Hole Attachment-Details
attachment = await self.espocrm.get_entity('Attachment', attachment_id) attachment = await self.espocrm.get_entity('Attachment', attachment_id)
# Filename: Nutze dokumentName/fileName falls vorhanden, sonst aus Attachment
final_filename = filename or attachment.get('name', 'unknown')
return { return {
'attachment_id': attachment_id, 'attachment_id': attachment_id,
'download_url': f"/api/v1/Attachment/file/{attachment_id}", 'download_url': f"/api/v1/Attachment/file/{attachment_id}",
'filename': attachment.get('name', 'unknown'), 'filename': final_filename,
'mime_type': attachment.get('type', 'application/octet-stream'), 'mime_type': attachment.get('type', 'application/octet-stream'),
'size': attachment.get('size', 0) 'size': attachment.get('size', 0)
} }
@@ -476,7 +467,8 @@ class DocumentSync:
xai_file_id: Optional[str] = None, xai_file_id: Optional[str] = None,
collection_ids: Optional[List[str]] = None, collection_ids: Optional[List[str]] = None,
file_hash: Optional[str] = None, file_hash: Optional[str] = None,
preview_data: Optional[bytes] = None preview_data: Optional[bytes] = None,
entity_type: str = 'CDokumente'
) -> None: ) -> None:
""" """
Updated Document-Metadaten nach erfolgreichem xAI-Sync Updated Document-Metadaten nach erfolgreichem xAI-Sync
@@ -487,20 +479,29 @@ class DocumentSync:
collection_ids: Liste der xAI Collection IDs (optional) collection_ids: Liste der xAI Collection IDs (optional)
file_hash: MD5/SHA Hash des gesyncten Files file_hash: MD5/SHA Hash des gesyncten Files
preview_data: Vorschaubild (WebP) als bytes preview_data: Vorschaubild (WebP) als bytes
entity_type: Entity-Type (CDokumente oder Document)
""" """
try: try:
update_data = {} update_data = {}
# Nur xAI-Felder updaten wenn vorhanden # Nur xAI-Felder updaten wenn vorhanden
if xai_file_id: if xai_file_id:
update_data['xaiFileId'] = xai_file_id # CDokumente verwendet xaiId, Document verwendet xaiFileId
if entity_type == 'CDokumente':
update_data['xaiId'] = xai_file_id
else:
update_data['xaiFileId'] = xai_file_id
if collection_ids is not None: if collection_ids is not None:
update_data['xaiCollections'] = collection_ids update_data['xaiCollections'] = collection_ids
# Nur Status auf "Gesynct" setzen wenn xAI-File-ID vorhanden # Nur Status auf "Gesynct" setzen wenn xAI-File-ID vorhanden
if xai_file_id: if xai_file_id:
update_data['dateiStatus'] = 'Gesynct' # CDokumente verwendet fileStatus, Document verwendet dateiStatus
if entity_type == 'CDokumente':
update_data['fileStatus'] = 'synced'
else:
update_data['dateiStatus'] = 'Gesynct'
# Hash speichern für zukünftige Change Detection # Hash speichern für zukünftige Change Detection
if file_hash: if file_hash:
@@ -508,40 +509,78 @@ class DocumentSync:
# Preview als Attachment hochladen (falls vorhanden) # Preview als Attachment hochladen (falls vorhanden)
if preview_data: if preview_data:
await self._upload_preview_to_espocrm(document_id, preview_data) await self._upload_preview_to_espocrm(document_id, preview_data, entity_type)
# Nur updaten wenn es etwas zu updaten gibt # Nur updaten wenn es etwas zu updaten gibt
if update_data: if update_data:
await self.espocrm.update_entity('Document', document_id, update_data) await self.espocrm.update_entity(entity_type, document_id, update_data)
self._log(f"✅ Sync-Metadaten aktualisiert für Document {document_id}: {list(update_data.keys())}") self._log(f"✅ Sync-Metadaten aktualisiert für {entity_type} {document_id}: {list(update_data.keys())}")
except Exception as e: except Exception as e:
self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error') self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error')
raise raise
async def _upload_preview_to_espocrm(self, document_id: str, preview_data: bytes) -> None: async def _upload_preview_to_espocrm(self, document_id: str, preview_data: bytes, entity_type: str = 'CDokumente') -> None:
""" """
Lädt Preview-Image als Attachment zu EspoCRM hoch Lädt Preview-Image als Attachment zu EspoCRM hoch
Args: Args:
document_id: Document ID document_id: Document ID
preview_data: WebP Preview als bytes preview_data: WebP Preview als bytes
entity_type: Entity-Type (CDokumente oder Document)
""" """
try: try:
self._log(f"📤 Uploading preview image ({len(preview_data)} bytes)...") self._log(f"📤 Uploading preview image to {entity_type} ({len(preview_data)} bytes)...")
# Upload via EspoCRM Attachment API # EspoCRM erwartet base64-encoded file im Format: data:mime/type;base64,xxxxx
await self.espocrm.upload_attachment( import base64
file_content=preview_data, import aiohttp
filename='preview.webp',
parent_type='Document',
parent_id=document_id,
field='preview',
mime_type='image/webp',
role='Attachment'
)
self._log(f"✅ Preview erfolgreich hochgeladen") # Base64-encode preview data
base64_data = base64.b64encode(preview_data).decode('ascii')
file_data_uri = f"data:image/webp;base64,{base64_data}"
# Upload via JSON POST mit base64-encoded file field
url = self.espocrm.api_base_url.rstrip('/') + '/Attachment'
headers = {
'X-Api-Key': self.espocrm.api_key,
'Content-Type': 'application/json'
}
payload = {
'name': 'preview.webp',
'type': 'image/webp',
'role': 'Attachment',
'field': 'preview',
'relatedType': entity_type,
'relatedId': document_id,
'file': file_data_uri
}
self._log(f"📤 Posting to {url} with base64-encoded file ({len(base64_data)} chars)")
self._log(f" relatedType={entity_type}, relatedId={document_id}, field=preview")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, headers=headers, json=payload) as response:
self._log(f"Upload response status: {response.status}")
if response.status >= 400:
error_text = await response.text()
self._log(f"❌ Upload failed: {error_text}", level='error')
raise Exception(f"Upload error {response.status}: {error_text}")
result = await response.json()
attachment_id = result.get('id')
self._log(f"✅ Preview Attachment created: {attachment_id}")
# Update Entity mit previewId
self._log(f"📝 Updating {entity_type} with previewId...")
await self.espocrm.update_entity(entity_type, document_id, {
'previewId': attachment_id,
'previewName': 'preview.webp'
})
self._log(f"{entity_type} previewId/previewName aktualisiert")
except Exception as e: except Exception as e:
self._log(f"❌ Fehler beim Preview-Upload: {e}", level='error') self._log(f"❌ Fehler beim Preview-Upload: {e}", level='error')

View File

@@ -341,12 +341,14 @@ class EspoCRMAPI:
form_data.add_field('role', role) form_data.add_field('role', role)
form_data.add_field('name', filename) form_data.add_field('name', filename)
self._log(f"Upload params: parentType={parent_type}, parentId={parent_id}, field={field}, role={role}")
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds) effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
async with aiohttp.ClientSession(timeout=effective_timeout) as session: async with aiohttp.ClientSession(timeout=effective_timeout) as session:
try: try:
async with session.post(url, headers=headers, data=form_data) as response: async with session.post(url, headers=headers, data=form_data) as response:
self._log(f"Upload response status: {response.status}", level='debug') self._log(f"Upload response status: {response.status}")
if response.status == 401: if response.status == 401:
raise EspoCRMAuthError("Authentication failed - check API key") raise EspoCRMAuthError("Authentication failed - check API key")
@@ -356,6 +358,7 @@ class EspoCRMAPI:
raise EspoCRMError(f"Attachment endpoint not found") raise EspoCRMError(f"Attachment endpoint not found")
elif response.status >= 400: elif response.status >= 400:
error_text = await response.text() error_text = await response.text()
self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error')
raise EspoCRMError(f"Upload error {response.status}: {error_text}") raise EspoCRMError(f"Upload error {response.status}: {error_text}")
# Parse response # Parse response

150
services/sync_utils_base.py Normal file
View File

@@ -0,0 +1,150 @@
"""
Base Sync Utilities
Gemeinsame Funktionalität für alle Sync-Operationen:
- Redis Distributed Locking
- Context-aware Logging
- EspoCRM API Helpers
"""
from typing import Dict, Any, Optional
from datetime import datetime
import logging
import redis
import os
import pytz
logger = logging.getLogger(__name__)
# Lock TTL in seconds (prevents deadlocks)
LOCK_TTL_SECONDS = 900 # 15 minutes
class BaseSyncUtils:
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
"""
Args:
espocrm_api: EspoCRM API client instance
redis_client: Optional Redis client (wird sonst initialisiert)
context: Optional Motia FlowContext für Logging
"""
self.espocrm = espocrm_api
self.context = context
self.logger = context.logger if context else logger
self.redis = redis_client or self._init_redis()
def _init_redis(self) -> Optional[redis.Redis]:
"""Initialize Redis client for distributed locking"""
try:
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
client.ping()
return client
except Exception as e:
self._log(f"Redis connection failed: {e}", level='error')
return None
def _log(self, message: str, level: str = 'info'):
"""
Context-aware logging
Falls ein FlowContext vorhanden ist, wird dessen Logger verwendet.
Sonst fallback auf Standard-Logger.
"""
if self.context and hasattr(self.context, 'logger'):
getattr(self.context.logger, level)(message)
else:
getattr(logger, level)(message)
def _get_lock_key(self, entity_id: str) -> str:
"""
Erzeugt Redis Lock-Key für eine Entity
Muss in Subklassen überschrieben werden, um entity-spezifische Prefixes zu nutzen.
z.B. 'sync_lock:cbeteiligte:{entity_id}' oder 'sync_lock:document:{entity_id}'
"""
raise NotImplementedError("Subclass must implement _get_lock_key()")
def _acquire_redis_lock(self, lock_key: str) -> bool:
"""
Atomic Redis lock acquisition
Args:
lock_key: Redis key für den Lock
Returns:
True wenn Lock erfolgreich, False wenn bereits locked
"""
if not self.redis:
self._log("Redis nicht verfügbar, Lock-Mechanismus deaktiviert", level='warn')
return True # Fallback: Wenn kein Redis, immer lock erlauben
try:
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
return bool(acquired)
except Exception as e:
self._log(f"Redis lock error: {e}", level='error')
return True # Bei Fehler: Lock erlauben, um Deadlocks zu vermeiden
def _release_redis_lock(self, lock_key: str) -> None:
"""
Redis lock freigeben
Args:
lock_key: Redis key für den Lock
"""
if not self.redis:
return
try:
self.redis.delete(lock_key)
except Exception as e:
self._log(f"Redis unlock error: {e}", level='error')
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
"""
Formatiert datetime für EspoCRM (ohne Timezone!)
Args:
dt: Optional datetime object (default: now UTC)
Returns:
String im Format 'YYYY-MM-DD HH:MM:SS'
"""
if dt is None:
dt = datetime.now(pytz.UTC)
elif dt.tzinfo is None:
dt = pytz.UTC.localize(dt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
async def acquire_sync_lock(self, entity_id: str, **kwargs) -> bool:
"""
Erwirbt Sync-Lock für eine Entity
Muss in Subklassen implementiert werden, um entity-spezifische
Status-Updates durchzuführen.
Returns:
True wenn Lock erfolgreich, False wenn bereits locked
"""
raise NotImplementedError("Subclass must implement acquire_sync_lock()")
async def release_sync_lock(self, entity_id: str, **kwargs) -> None:
"""
Gibt Sync-Lock frei und setzt finalen Status
Muss in Subklassen implementiert werden, um entity-spezifische
Status-Updates durchzuführen.
"""
raise NotImplementedError("Subclass must implement release_sync_lock()")

View File

@@ -17,7 +17,7 @@ config = {
'description': 'Runs calendar sync automatically every 15 minutes', 'description': 'Runs calendar sync automatically every 15 minutes',
'flows': ['advoware-calendar-sync'], 'flows': ['advoware-calendar-sync'],
'triggers': [ 'triggers': [
cron("0 */15 * * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday) cron("0 */15 1o * * *") # Every 15 minutes at second 0 (6-field: sec min hour day month weekday)
], ],
'enqueues': ['calendar_sync_all'] 'enqueues': ['calendar_sync_all']
} }

View File

@@ -1,6 +0,0 @@
"""
EspoCRM Generic Webhooks
Empfängt Webhooks von EspoCRM für verschiedene Entities.
Zentrale Anlaufstelle für alle EspoCRM-Events außerhalb VMH-Kontext.
"""

View File

@@ -1,198 +0,0 @@
"""EspoCRM Webhook - Document Create
Empfängt Create-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/create")
],
"enqueues": ["vmh.document.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document creation in EspoCRM.
Receives notifications when documents are created and emits queue events
for processing (xAI sync, etc.).
Payload Analysis Mode: Logs comprehensive details about webhook structure.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT CREATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
payload_details = []
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
# Sammle Details für Logging
detail = {
'index': idx,
'id': entity_id,
'name': entity.get('name', 'N/A'),
'type': entity.get('type', 'N/A'),
'size': entity.get('size', 'N/A'),
'all_fields': list(entity.keys())
}
payload_details.append(detail)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Type: {entity.get('type', 'N/A')}")
ctx.logger.info(f" Size: {entity.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Parent/Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Type: {payload.get('type', 'N/A')}")
ctx.logger.info(f" Size: {payload.get('size', 'N/A')} bytes")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.create',
'data': {
'entity_id': entity_id,
'action': 'create',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.create für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'create',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Create Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
# Log Stack Trace
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -1,174 +0,0 @@
"""EspoCRM Webhook - Document Delete
Empfängt Delete-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/delete")
],
"enqueues": ["vmh.document.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document deletion in EspoCRM.
Receives notifications when documents are deleted.
Note: Bei Deletion haben wir ggf. nur die ID, keine vollständigen Entity-Daten.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT DELETE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Bei Delete haben wir oft nur minimale Daten
if 'name' in entity:
ctx.logger.info(f" Name: {entity.get('name')}")
if 'deletedAt' in entity or 'deleted' in entity:
ctx.logger.info(f" Deleted At: {entity.get('deletedAt', entity.get('deleted', 'N/A'))}")
# xAI-relevante Felder (falls vorhanden)
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 🗑️ Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
if 'name' in payload:
ctx.logger.info(f" Name: {payload.get('name')}")
if 'deletedAt' in payload or 'deleted' in payload:
ctx.logger.info(f" Deleted At: {payload.get('deletedAt', payload.get('deleted', 'N/A'))}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.delete',
'data': {
'entity_id': entity_id,
'action': 'delete',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.delete für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'delete',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Delete Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -1,196 +0,0 @@
"""EspoCRM Webhook - Document Update
Empfängt Update-Webhooks von EspoCRM für Documents.
Loggt detailliert alle Payload-Informationen für Analyse.
"""
import json
import datetime
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Document Entities",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/update")
],
"enqueues": ["vmh.document.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document updates in EspoCRM.
Receives notifications when documents are updated and emits queue events
for processing (xAI sync, etc.).
Note: Loop-Prevention sollte auf EspoCRM-Seite implementiert werden.
xAI-Feld-Updates sollten keine neuen Webhooks triggern.
"""
try:
payload = request.body or []
# ═══════════════════════════════════════════════════════════════
# DETAILLIERTES LOGGING FÜR ANALYSE
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("=" * 80)
ctx.logger.info("📥 EspoCRM DOCUMENT UPDATE WEBHOOK EMPFANGEN")
ctx.logger.info("=" * 80)
# Log Request Headers
ctx.logger.info("\n🔍 REQUEST HEADERS:")
if hasattr(request, 'headers'):
for key, value in request.headers.items():
ctx.logger.info(f" {key}: {value}")
else:
ctx.logger.info(" (keine Headers verfügbar)")
# Log Payload Type & Structure
ctx.logger.info(f"\n📦 PAYLOAD TYPE: {type(payload).__name__}")
ctx.logger.info(f"📦 PAYLOAD LENGTH: {len(payload) if isinstance(payload, (list, dict)) else 'N/A'}")
# Log Full Payload (pretty-printed)
ctx.logger.info("\n📄 FULL PAYLOAD:")
ctx.logger.info(json.dumps(payload, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════════════════════════════
# PAYLOAD ANALYSE & ID EXTRAKTION
# ═══════════════════════════════════════════════════════════════
entity_ids = set()
if isinstance(payload, list):
ctx.logger.info(f"\n✅ Payload ist LIST mit {len(payload)} Einträgen")
for idx, entity in enumerate(payload):
if isinstance(entity, dict):
entity_id = entity.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document #{idx + 1}:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {entity.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {entity.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {entity.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(entity.keys())}")
# Prüfe ob CHANGED fields mitgeliefert werden
changed_fields = entity.get('changedFields') or entity.get('changed') or entity.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in entity.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in entity.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
elif isinstance(payload, dict):
ctx.logger.info("\n✅ Payload ist SINGLE DICT")
entity_id = payload.get('id')
if entity_id:
entity_ids.add(entity_id)
ctx.logger.info(f"\n 📄 Document:")
ctx.logger.info(f" ID: {entity_id}")
ctx.logger.info(f" Name: {payload.get('name', 'N/A')}")
ctx.logger.info(f" Modified At: {payload.get('modifiedAt', 'N/A')}")
ctx.logger.info(f" Modified By: {payload.get('modifiedById', 'N/A')}")
ctx.logger.info(f" Verfügbare Felder: {', '.join(payload.keys())}")
# Geänderte Felder
changed_fields = payload.get('changedFields') or payload.get('changed') or payload.get('modifiedFields')
if changed_fields:
ctx.logger.info(f" 🔄 Geänderte Felder: {json.dumps(changed_fields, ensure_ascii=False)}")
# xAI-relevante Felder
xai_fields = {k: v for k, v in payload.items()
if 'xai' in k.lower() or 'collection' in k.lower()}
if xai_fields:
ctx.logger.info(f" 🤖 xAI-Felder: {json.dumps(xai_fields, ensure_ascii=False)}")
# Relationship Felder
rel_fields = {k: v for k, v in payload.items()
if 'parent' in k.lower() or 'related' in k.lower() or
'link' in k.lower() or k.endswith('Id') or k.endswith('Ids')}
if rel_fields:
ctx.logger.info(f" 🔗 Relationship-Felder: {json.dumps(rel_fields, ensure_ascii=False)}")
else:
ctx.logger.warning(f"⚠️ Unerwarteter Payload-Typ: {type(payload)}")
# ═══════════════════════════════════════════════════════════════
# QUEUE EVENTS EMITTIEREN
# ═══════════════════════════════════════════════════════════════
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"📊 ZUSAMMENFASSUNG: {len(entity_ids)} Document(s) gefunden")
ctx.logger.info("=" * 80)
if not entity_ids:
ctx.logger.warning("⚠️ Keine Document-IDs im Payload gefunden!")
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': 0,
'warning': 'No document IDs found in payload'
}
)
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.update',
'data': {
'entity_id': entity_id,
'action': 'update',
'source': 'webhook',
'timestamp': datetime.datetime.now().isoformat()
}
})
ctx.logger.info(f"✅ Event emittiert: vmh.document.update für ID {entity_id}")
ctx.logger.info("\n" + "=" * 80)
ctx.logger.info(f"✅ WEBHOOK VERARBEITUNG ABGESCHLOSSEN")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
'status': 'received',
'action': 'update',
'ids_count': len(entity_ids),
'document_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error("=" * 80)
ctx.logger.error(f"❌ FEHLER beim Verarbeiten des Document Update Webhooks")
ctx.logger.error("=" * 80)
ctx.logger.error(f"Error Type: {type(e).__name__}")
ctx.logger.error(f"Error Message: {str(e)}")
import traceback
ctx.logger.error(f"Stack Trace:\n{traceback.format_exc()}")
return ApiResponse(
status=500,
body={
'error': 'Internal server error',
'error_type': type(e).__name__,
'details': str(e)
}
)

View File

@@ -19,7 +19,7 @@ config = {
"description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen", "description": "Prüft alle 15 Minuten welche Beteiligte synchronisiert werden müssen",
"flows": ["vmh-beteiligte"], "flows": ["vmh-beteiligte"],
"triggers": [ "triggers": [
cron("0 */15 * * * *") # Alle 15 Minuten (6-field format!) cron("0 */15 1 * * *") # Alle 15 Minuten (6-field format!)
], ],
"enqueues": ["vmh.beteiligte.sync_check"] "enqueues": ["vmh.beteiligte.sync_check"]
} }

View File

@@ -33,6 +33,7 @@ config = {
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
"""Zentraler Sync-Handler für Documents""" """Zentraler Sync-Handler für Documents"""
entity_id = event_data.get('entity_id') entity_id = event_data.get('entity_id')
entity_type = event_data.get('entity_type', 'CDokumente') # Default: CDokumente
action = event_data.get('action') action = event_data.get('action')
source = event_data.get('source') source = event_data.get('source')
@@ -43,6 +44,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET") ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info(f"Entity Type: {entity_type}")
ctx.logger.info(f"Action: {action.upper()}") ctx.logger.info(f"Action: {action.upper()}")
ctx.logger.info(f"Document ID: {entity_id}") ctx.logger.info(f"Document ID: {entity_id}")
ctx.logger.info(f"Source: {source}") ctx.logger.info(f"Source: {source}")
@@ -70,39 +72,40 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
try: try:
# 1. ACQUIRE LOCK (verhindert parallele Syncs) # 1. ACQUIRE LOCK (verhindert parallele Syncs)
lock_acquired = await sync_utils.acquire_sync_lock(entity_id) lock_acquired = await sync_utils.acquire_sync_lock(entity_id, entity_type)
if not lock_acquired: if not lock_acquired:
ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe") ctx.logger.warn(f"⏸️ Sync bereits aktiv für {entity_type} {entity_id}, überspringe")
return return
# Lock erfolgreich acquired - MUSS im finally block released werden! # Lock erfolgreich acquired - MUSS im finally block released werden!
try: try:
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM # 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
try: try:
document = await espocrm.get_entity('Document', entity_id) document = await espocrm.get_entity(entity_type, entity_id)
except Exception as e: except Exception as e:
ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}") ctx.logger.error(f"❌ Fehler beim Laden von {entity_type}: {e}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)
return return
ctx.logger.info(f"📋 Document geladen:") ctx.logger.info(f"📋 {entity_type} geladen:")
ctx.logger.info(f" Name: {document.get('name', 'N/A')}") ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
ctx.logger.info(f" Type: {document.get('type', 'N/A')}") ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}") ctx.logger.info(f" fileStatus: {document.get('fileStatus', 'N/A')}")
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId') or document.get('xaiId', 'N/A')}")
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}") ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION # 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
if action == 'delete': if action == 'delete':
await handle_delete(entity_id, document, sync_utils, ctx) await handle_delete(entity_id, document, sync_utils, ctx, entity_type)
elif action in ['create', 'update']: elif action in ['create', 'update']:
await handle_create_or_update(entity_id, document, sync_utils, ctx) await handle_create_or_update(entity_id, document, sync_utils, ctx, entity_type)
else: else:
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}") ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}") await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}", entity_type=entity_type)
except Exception as e: except Exception as e:
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release # Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
@@ -114,7 +117,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
await sync_utils.release_sync_lock( await sync_utils.release_sync_lock(
entity_id, entity_id,
success=False, success=False,
error_message=str(e)[:2000] error_message=str(e)[:2000],
entity_type=entity_type
) )
except Exception as release_error: except Exception as release_error:
# Selbst Lock-Release failed - logge kritischen Fehler # Selbst Lock-Release failed - logge kritischen Fehler
@@ -134,7 +138,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
ctx.logger.error(traceback.format_exc()) ctx.logger.error(traceback.format_exc())
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
""" """
Behandelt Create/Update von Documents Behandelt Create/Update von Documents
@@ -146,15 +150,15 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?") ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
# Datei-Status für Preview-Generierung # Datei-Status für Preview-Generierung (verschiedene Feld-Namen unterstützen)
datei_status = document.get('dateiStatus') or document.get('fileStatus') datei_status = document.get('fileStatus') or document.get('dateiStatus')
# Entscheidungslogik: Soll dieses Document zu xAI? # Entscheidungslogik: Soll dieses Document zu xAI?
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document) needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}") ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
ctx.logger.info(f" Grund: {reason}") ctx.logger.info(f" Grund: {reason}")
ctx.logger.info(f" Datei-Status: {datei_status or 'N/A'}") ctx.logger.info(f" File-Status: {datei_status or 'N/A'}")
if collection_ids: if collection_ids:
ctx.logger.info(f" Collections: {collection_ids}") ctx.logger.info(f" Collections: {collection_ids}")
@@ -163,7 +167,9 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
# PREVIEW-GENERIERUNG bei neuen/geänderten Dateien # PREVIEW-GENERIERUNG bei neuen/geänderten Dateien
# ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════
if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']: # Case-insensitive check für Datei-Status
datei_status_lower = (datei_status or '').lower()
if datei_status_lower in ['neu', 'geändert', 'new', 'changed']:
ctx.logger.info("") ctx.logger.info("")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN") ctx.logger.info("🖼️ PREVIEW-GENERIERUNG STARTEN")
@@ -172,7 +178,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
try: try:
# 1. Hole Download-Informationen # 1. Hole Download-Informationen
download_info = await sync_utils.get_document_download_info(entity_id) download_info = await sync_utils.get_document_download_info(entity_id, entity_type)
if not download_info: if not download_info:
ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview") ctx.logger.warn("⚠️ Keine Download-Info verfügbar - überspringe Preview")
@@ -213,7 +219,8 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
ctx.logger.info(f"📤 Uploading preview to EspoCRM...") ctx.logger.info(f"📤 Uploading preview to EspoCRM...")
await sync_utils.update_sync_metadata( await sync_utils.update_sync_metadata(
entity_id, entity_id,
preview_data=preview_data preview_data=preview_data,
entity_type=entity_type
# Keine xaiFileId/collections - nur Preview update # Keine xaiFileId/collections - nur Preview update
) )
ctx.logger.info(f"✅ Preview uploaded successfully") ctx.logger.info(f"✅ Preview uploaded successfully")
@@ -244,7 +251,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
if not needs_sync: if not needs_sync:
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released") ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
await sync_utils.release_sync_lock(entity_id, success=True) await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return return
# ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════
@@ -315,9 +322,9 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]): async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any], entity_type: str = 'CDokumente'):
""" """
Behandelt Deletion von Documents Behandelt Delete von Documents
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein) Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
""" """
@@ -327,12 +334,12 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP") ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
xai_file_id = document.get('xaiFileId') xai_file_id = document.get('xaiFileId') or document.get('xaiId')
xai_collections = document.get('xaiCollections') or [] xai_collections = document.get('xaiCollections') or []
if not xai_file_id or not xai_collections: if not xai_file_id or not xai_collections:
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun") ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
await sync_utils.release_sync_lock(entity_id, success=True) await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
return return
ctx.logger.info(f"📋 Document Info:") ctx.logger.info(f"📋 Document Info:")
@@ -354,7 +361,7 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
# #
# ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt") # ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
await sync_utils.release_sync_lock(entity_id, success=True) await sync_utils.release_sync_lock(entity_id, success=True, entity_type=entity_type)
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)") ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)")
@@ -364,4 +371,4 @@ async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: Do
ctx.logger.error(f"❌ Fehler bei Delete: {e}") ctx.logger.error(f"❌ Fehler bei Delete: {e}")
import traceback import traceback
ctx.logger.error(traceback.format_exc()) ctx.logger.error(traceback.format_exc())
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e), entity_type=entity_type)

View File

@@ -0,0 +1,77 @@
"""VMH Webhook - Document Create"""
import json
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Create",
"description": "Empfängt Create-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/create")
],
"enqueues": ["vmh.document.create"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document creation in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to xAI.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Document Create empfangen")
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
# Extrahiere entityType falls vorhanden
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Create-Sync gefunden")
# Emit events für Queue-Processing (Deduplizierung erfolgt im Event-Handler via Lock)
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.create',
'data': {
'entity_id': entity_id,
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
'action': 'create',
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
}
})
return ApiResponse(
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler im Document Create Webhook: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={
'success': False,
'error': str(e)
}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Document Delete"""
import json
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Delete",
"description": "Empfängt Delete-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/delete")
],
"enqueues": ["vmh.document.delete"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document deletion in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be removed from xAI.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Document Delete empfangen")
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Delete-Sync gefunden")
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.delete',
'data': {
'entity_id': entity_id,
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
'action': 'delete',
'timestamp': payload[0].get('deletedAt') if isinstance(payload, list) and payload else None
}
})
return ApiResponse(
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Delete enqueued',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler im Document Delete Webhook: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={
'success': False,
'error': str(e)
}
)

View File

@@ -0,0 +1,76 @@
"""VMH Webhook - Document Update"""
import json
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "VMH Webhook Document Update",
"description": "Empfängt Update-Webhooks von EspoCRM für Documents",
"flows": ["vmh-documents"],
"triggers": [
http("POST", "/vmh/webhook/document/update")
],
"enqueues": ["vmh.document.update"],
}
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Webhook handler for Document updates in EspoCRM.
Receives batch or single entity notifications and emits queue events
for each entity ID to be synced to xAI.
"""
try:
payload = request.body or []
ctx.logger.info("VMH Webhook Document Update empfangen")
ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}")
# Sammle alle IDs aus dem Batch
entity_ids = set()
if isinstance(payload, list):
for entity in payload:
if isinstance(entity, dict) and 'id' in entity:
entity_ids.add(entity['id'])
entity_type = entity.get('entityType', 'CDokumente')
elif isinstance(payload, dict) and 'id' in payload:
entity_ids.add(payload['id'])
entity_type = payload.get('entityType', 'CDokumente')
ctx.logger.info(f"{len(entity_ids)} Document IDs zum Update-Sync gefunden")
# Emit events für Queue-Processing
for entity_id in entity_ids:
await ctx.enqueue({
'topic': 'vmh.document.update',
'data': {
'entity_id': entity_id,
'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente',
'action': 'update',
'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None
}
})
return ApiResponse(
status_code=200,
body={
'success': True,
'message': f'{len(entity_ids)} Document(s) zum Sync enqueued',
'entity_ids': list(entity_ids)
}
)
except Exception as e:
ctx.logger.error(f"Fehler im Document Update Webhook: {e}")
ctx.logger.error(f"Payload: {request.body}")
return ApiResponse(
status_code=500,
body={
'success': False,
'error': str(e)
}
)

110
tests/README.md Normal file
View File

@@ -0,0 +1,110 @@
# Test Scripts
This directory contains test scripts for the Motia III xAI Collections integration.
## Test Files
### `test_xai_collections_api.py`
Tests xAI Collections API authentication and basic operations.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_xai_collections_api.py
```
**Required Environment Variables:**
- `XAI_MANAGEMENT_API_KEY` - xAI Management API key for collection operations
- `XAI_API_KEY` - xAI Regular API key for file operations
**Tests:**
- ✅ Management API authentication
- ✅ Regular API authentication
- ✅ Collection listing
- ✅ Collection creation
- ✅ File upload
- ✅ Collection deletion
- ✅ Error handling
### `test_preview_upload.py`
Tests preview/thumbnail upload to EspoCRM CDokumente entity.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_preview_upload.py
```
**Required Environment Variables:**
- `ESPOCRM_URL` - EspoCRM instance URL (default: https://crm.bitbylaw.com)
- `ESPOCRM_API_KEY` - EspoCRM API key
**Tests:**
- ✅ Preview image generation (WebP format, 600x800px)
- ✅ Base64 Data URI encoding
- ✅ Attachment upload via JSON POST
- ✅ Entity update with previewId/previewName
**Status:** ✅ Successfully tested - Attachment ID `69a71194c7c6baebf` created
### `test_thumbnail_generation.py`
Tests thumbnail generation for various document types.
**Usage:**
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_thumbnail_generation.py
```
**Supported Formats:**
- PDF → WebP (first page)
- DOCX/DOC → PDF → WebP
- Images (JPEG, PNG, etc.) → WebP resize
**Dependencies:**
- `python3-pil` - PIL/Pillow for image processing
- `poppler-utils` - PDF rendering
- `libreoffice` - DOCX to PDF conversion
- `pdf2image` - PDF to image conversion
## Running Tests
### All Tests
```bash
cd /opt/motia-iii/bitbylaw
python -m pytest tests/ -v
```
### Individual Tests
```bash
cd /opt/motia-iii/bitbylaw
python tests/test_xai_collections_api.py
python tests/test_preview_upload.py
python tests/test_thumbnail_generation.py
```
## Environment Setup
Create `.env` file in `/opt/motia-iii/bitbylaw/`:
```bash
# xAI Collections API
XAI_MANAGEMENT_API_KEY=xai-token-xxx...
XAI_API_KEY=xai-xxx...
# EspoCRM API
ESPOCRM_URL=https://crm.bitbylaw.com
ESPOCRM_API_KEY=xxx...
# Redis (for locking)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
```
## Test Results
Last test run: Successfully validated preview upload functionality
- Preview upload works with base64 Data URI format
- Attachment created with ID: `69a71194c7c6baebf`
- CDokumente entity updated with previewId/previewName
- WebP format at 600x800px confirmed working

279
tests/test_preview_upload.py Executable file
View File

@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
Test Script: Preview Image Upload zu EspoCRM
Testet das Hochladen eines Preview-Bildes (WebP) als Attachment
zu einem CDokumente Entity via EspoCRM API.
Usage:
python test_preview_upload.py <document_id>
Example:
python test_preview_upload.py 69a68906ac3d0fd25
"""
import asyncio
import aiohttp
import base64
import os
import sys
from io import BytesIO
from PIL import Image
# EspoCRM Config (aus Environment oder hardcoded für Test)
ESPOCRM_API_BASE_URL = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
ESPOCRM_API_KEY = os.getenv('ESPOCRM_API_KEY', '')
# Test-Parameter
ENTITY_TYPE = 'CDokumente'
FIELD_NAME = 'preview'
def generate_test_webp(text: str = "TEST PREVIEW", size: tuple = (600, 800)) -> bytes:
"""
Generiert ein einfaches Test-WebP-Bild
Args:
text: Text der im Bild angezeigt wird
size: Größe des Bildes (width, height)
Returns:
WebP image als bytes
"""
print(f"📐 Generating test image ({size[0]}x{size[1]})...")
# Erstelle einfaches Bild mit Text
img = Image.new('RGB', size, color='lightblue')
# Optional: Füge Text hinzu (benötigt PIL ImageDraw)
try:
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# Versuche ein größeres Font zu laden
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 40)
except:
font = ImageFont.load_default()
# Text zentriert
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (size[0] - text_width) // 2
y = (size[1] - text_height) // 2
draw.text((x, y), text, fill='darkblue', font=font)
except Exception as e:
print(f"⚠️ Text rendering failed: {e}")
# Konvertiere zu WebP
buffer = BytesIO()
img.save(buffer, format='WEBP', quality=85)
webp_bytes = buffer.getvalue()
print(f"✅ Test image generated: {len(webp_bytes)} bytes")
return webp_bytes
async def upload_preview_to_espocrm(
document_id: str,
preview_data: bytes,
entity_type: str = 'CDokumente'
) -> dict:
"""
Upload Preview zu EspoCRM Attachment API
Args:
document_id: ID des CDokumente/Document Entity
preview_data: WebP image als bytes
entity_type: Entity-Type (CDokumente oder Document)
Returns:
Response dict mit Attachment ID
"""
print(f"\n📤 Uploading preview to {entity_type}/{document_id}...")
print(f" Preview size: {len(preview_data)} bytes")
# Base64-encode
base64_data = base64.b64encode(preview_data).decode('ascii')
file_data_uri = f"data:image/webp;base64,{base64_data}"
print(f" Base64 encoded: {len(base64_data)} chars")
# API Request
url = ESPOCRM_API_BASE_URL.rstrip('/') + '/Attachment'
headers = {
'X-Api-Key': ESPOCRM_API_KEY,
'Content-Type': 'application/json'
}
payload = {
'name': 'preview.webp',
'type': 'image/webp',
'role': 'Attachment',
'field': FIELD_NAME,
'relatedType': entity_type,
'relatedId': document_id,
'file': file_data_uri
}
print(f"\n🌐 POST {url}")
print(f" Headers: X-Api-Key={ESPOCRM_API_KEY[:20]}...")
print(f" Payload keys: {list(payload.keys())}")
print(f" - name: {payload['name']}")
print(f" - type: {payload['type']}")
print(f" - role: {payload['role']}")
print(f" - field: {payload['field']}")
print(f" - relatedType: {payload['relatedType']}")
print(f" - relatedId: {payload['relatedId']}")
print(f" - file: data:image/webp;base64,... ({len(base64_data)} chars)")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, headers=headers, json=payload) as response:
print(f"\n📥 Response Status: {response.status}")
print(f" Content-Type: {response.content_type}")
response_text = await response.text()
if response.status >= 400:
print(f"\n❌ Upload FAILED!")
print(f" Status: {response.status}")
print(f" Response: {response_text}")
raise Exception(f"Upload error {response.status}: {response_text}")
# Parse JSON response
result = await response.json()
attachment_id = result.get('id')
print(f"\n✅ Upload SUCCESSFUL!")
print(f" Attachment ID: {attachment_id}")
print(f" Full response: {result}")
return result
async def update_entity_with_preview(
document_id: str,
attachment_id: str,
entity_type: str = 'CDokumente'
) -> dict:
"""
Update Entity mit previewId und previewName
Args:
document_id: Entity ID
attachment_id: Attachment ID vom Upload
entity_type: Entity-Type
Returns:
Updated entity data
"""
print(f"\n📝 Updating {entity_type}/{document_id} with previewId...")
url = f"{ESPOCRM_API_BASE_URL.rstrip('/')}/{entity_type}/{document_id}"
headers = {
'X-Api-Key': ESPOCRM_API_KEY,
'Content-Type': 'application/json'
}
payload = {
'previewId': attachment_id,
'previewName': 'preview.webp'
}
print(f" PUT {url}")
print(f" Payload: {payload}")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.put(url, headers=headers, json=payload) as response:
print(f" Response Status: {response.status}")
if response.status >= 400:
response_text = await response.text()
print(f"\n❌ Update FAILED!")
print(f" Status: {response.status}")
print(f" Response: {response_text}")
raise Exception(f"Update error {response.status}: {response_text}")
result = await response.json()
print(f"\n✅ Entity updated successfully!")
print(f" previewId: {result.get('previewId')}")
print(f" previewName: {result.get('previewName')}")
return result
async def main():
"""Main test flow"""
print("=" * 80)
print("🖼️ ESPOCRM PREVIEW UPLOAD TEST")
print("=" * 80)
# Check arguments
if len(sys.argv) < 2:
print("\n❌ Error: Document ID required!")
print(f"\nUsage: {sys.argv[0]} <document_id>")
print(f"Example: {sys.argv[0]} 69a68906ac3d0fd25")
sys.exit(1)
document_id = sys.argv[1]
# Check API key
if not ESPOCRM_API_KEY:
print("\n❌ Error: ESPOCRM_API_KEY environment variable not set!")
sys.exit(1)
print(f"\n📋 Test Parameters:")
print(f" API Base URL: {ESPOCRM_API_BASE_URL}")
print(f" API Key: {ESPOCRM_API_KEY[:20]}...")
print(f" Entity Type: {ENTITY_TYPE}")
print(f" Document ID: {document_id}")
print(f" Field: {FIELD_NAME}")
try:
# Step 1: Generate test image
print("\n" + "=" * 80)
print("STEP 1: Generate Test Image")
print("=" * 80)
preview_data = generate_test_webp(f"Preview Test\n{document_id[:8]}", size=(600, 800))
# Step 2: Upload to EspoCRM
print("\n" + "=" * 80)
print("STEP 2: Upload to EspoCRM Attachment API")
print("=" * 80)
result = await upload_preview_to_espocrm(document_id, preview_data, ENTITY_TYPE)
attachment_id = result.get('id')
# Step 3: Update Entity
print("\n" + "=" * 80)
print("STEP 3: Update Entity with Preview Reference")
print("=" * 80)
await update_entity_with_preview(document_id, attachment_id, ENTITY_TYPE)
# Success summary
print("\n" + "=" * 80)
print("✅ TEST SUCCESSFUL!")
print("=" * 80)
print(f"\n📊 Summary:")
print(f" - Attachment ID: {attachment_id}")
print(f" - Entity: {ENTITY_TYPE}/{document_id}")
print(f" - Preview Size: {len(preview_data)} bytes")
print(f"\n🔗 View in EspoCRM:")
print(f" {ESPOCRM_API_BASE_URL.replace('/api/v1', '')}/#CDokumente/view/{document_id}")
except Exception as e:
print("\n" + "=" * 80)
print("❌ TEST FAILED!")
print("=" * 80)
print(f"\nError: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Test script for Document Thumbnail Generation
Tests the complete flow:
1. Create a test document in EspoCRM
2. Upload a file attachment
3. Trigger the webhook (or wait for automatic trigger)
4. Verify preview generation
"""
import asyncio
import aiohttp
import os
import sys
import json
from pathlib import Path
from io import BytesIO
from PIL import Image
# Add bitbylaw to path
sys.path.insert(0, str(Path(__file__).parent))
from services.espocrm import EspoCRMAPI
async def create_test_image(width: int = 800, height: int = 600) -> bytes:
"""Create a simple test PNG image"""
img = Image.new('RGB', (width, height), color='lightblue')
# Add some text/pattern so it's not just a solid color
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
# Draw some shapes
draw.rectangle([50, 50, width-50, height-50], outline='darkblue', width=5)
draw.ellipse([width//4, height//4, 3*width//4, 3*height//4], outline='red', width=3)
# Add text
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 48)
except:
font = None
text = "TEST IMAGE\nFor Thumbnail\nGeneration"
draw.text((width//2, height//2), text, fill='black', anchor='mm', font=font, align='center')
# Save to bytes
buffer = BytesIO()
img.save(buffer, format='PNG')
return buffer.getvalue()
async def create_test_document(espocrm: EspoCRMAPI) -> str:
"""Create a test document in EspoCRM"""
print("\n📄 Creating test document in EspoCRM...")
document_data = {
"name": f"Test Thumbnail Generation {asyncio.get_event_loop().time()}",
"status": "Active",
"dateiStatus": "Neu", # This should trigger preview generation
"type": "Image",
"description": "Automated test document for thumbnail generation"
}
result = await espocrm.create_entity("Document", document_data)
doc_id = result.get("id")
print(f"✅ Document created: {doc_id}")
print(f" Name: {result.get('name')}")
print(f" Datei-Status: {result.get('dateiStatus')}")
return doc_id
async def upload_test_file(espocrm: EspoCRMAPI, doc_id: str) -> str:
"""Upload a test image file to the document"""
print(f"\n📤 Uploading test image to document {doc_id}...")
# Create test image
image_data = await create_test_image(1200, 900)
print(f" Generated test image: {len(image_data)} bytes")
# Upload to EspoCRM
attachment = await espocrm.upload_attachment(
file_content=image_data,
filename="test_image.png",
parent_type="Document",
parent_id=doc_id,
field="file",
mime_type="image/png",
role="Attachment"
)
attachment_id = attachment.get("id")
print(f"✅ File uploaded: {attachment_id}")
print(f" Filename: {attachment.get('name')}")
print(f" Size: {attachment.get('size')} bytes")
return attachment_id
async def trigger_webhook(doc_id: str, action: str = "update"):
"""Manually trigger the document webhook"""
print(f"\n🔔 Triggering webhook for document {doc_id}...")
webhook_url = f"http://localhost:7777/vmh/webhook/document/{action}"
payload = {
"entityType": "Document",
"entity": {
"id": doc_id,
"entityType": "Document"
},
"data": {
"entity": {
"id": doc_id
}
}
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
status = response.status
text = await response.text()
if status == 200:
print(f"✅ Webhook triggered successfully")
print(f" Response: {text}")
else:
print(f"❌ Webhook failed: {status}")
print(f" Response: {text}")
return status == 200
async def check_preview_generated(espocrm: EspoCRMAPI, doc_id: str, max_wait: int = 30):
"""Check if preview was generated (poll for a few seconds)"""
print(f"\n🔍 Checking for preview generation (max {max_wait}s)...")
for i in range(max_wait):
await asyncio.sleep(1)
# Get document
doc = await espocrm.get_entity("Document", doc_id)
# Check if preview field is populated
preview_id = doc.get("previewId")
if preview_id:
print(f"\n✅ Preview generated!")
print(f" Preview Attachment ID: {preview_id}")
print(f" Preview Name: {doc.get('previewName')}")
print(f" Preview Type: {doc.get('previewType')}")
# Try to download and check the preview
try:
preview_data = await espocrm.download_attachment(preview_id)
print(f" Preview Size: {len(preview_data)} bytes")
# Verify it's a WebP image
from PIL import Image
img = Image.open(BytesIO(preview_data))
print(f" Preview Format: {img.format}")
print(f" Preview Dimensions: {img.width}x{img.height}")
if img.format == "WEBP":
print(" ✅ Format is WebP as expected")
if img.width <= 600 and img.height <= 800:
print(" ✅ Dimensions within expected range")
except Exception as e:
print(f" ⚠️ Could not verify preview: {e}")
return True
if (i + 1) % 5 == 0:
print(f" Still waiting... ({i + 1}s)")
print(f"\n❌ Preview not generated after {max_wait}s")
return False
async def cleanup_test_document(espocrm: EspoCRMAPI, doc_id: str):
"""Delete the test document"""
print(f"\n🗑️ Cleaning up test document {doc_id}...")
try:
await espocrm.delete_entity("Document", doc_id)
print("✅ Test document deleted")
except Exception as e:
print(f"⚠️ Could not delete test document: {e}")
async def main():
print("=" * 80)
print("THUMBNAIL GENERATION TEST")
print("=" * 80)
# Initialize EspoCRM API
espocrm = EspoCRMAPI()
doc_id = None
try:
# Step 1: Create test document
doc_id = await create_test_document(espocrm)
# Step 2: Upload test file
attachment_id = await upload_test_file(espocrm, doc_id)
# Step 3: Update document to trigger webhook (set dateiStatus to trigger sync)
print(f"\n🔄 Updating document to trigger webhook...")
await espocrm.update_entity("Document", doc_id, {
"dateiStatus": "Neu" # This should trigger the webhook
})
print("✅ Document updated")
# Step 4: Wait a bit for webhook to be processed
print("\n⏳ Waiting 3 seconds for webhook processing...")
await asyncio.sleep(3)
# Step 5: Check if preview was generated
success = await check_preview_generated(espocrm, doc_id, max_wait=20)
# Summary
print("\n" + "=" * 80)
if success:
print("✅ TEST PASSED - Preview generation successful!")
else:
print("❌ TEST FAILED - Preview was not generated")
print("\nCheck logs with:")
print(" sudo journalctl -u motia.service --since '2 minutes ago' | grep -E '(PREVIEW|Document)'")
print("=" * 80)
# Ask if we should clean up
print(f"\nTest document ID: {doc_id}")
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
if cleanup == 'y':
await cleanup_test_document(espocrm, doc_id)
else:
print(f" Test document kept: {doc_id}")
print(f" View in EspoCRM: https://crm.bitbylaw.com/#Document/view/{doc_id}")
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
if doc_id:
print(f"\nTest document ID: {doc_id}")
cleanup = input("\nDelete test document? (y/N): ").strip().lower()
if cleanup == 'y':
await cleanup_test_document(espocrm, doc_id)
if __name__ == "__main__":
asyncio.run(main())