Compare commits
2 Commits
cb0e170ee9
...
70265c9adf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
70265c9adf | ||
|
|
ee9aab049f |
229
docs/DOCUMENT_SYNC_XAI_STATUS.md
Normal file
229
docs/DOCUMENT_SYNC_XAI_STATUS.md
Normal file
@@ -0,0 +1,229 @@
|
||||
# Document Sync mit xAI Collections - Implementierungs-Status
|
||||
|
||||
## ✅ Implementiert
|
||||
|
||||
### 1. Webhook Endpunkte
|
||||
- **POST** `/vmh/webhook/document/create`
|
||||
- **POST** `/vmh/webhook/document/update`
|
||||
- **POST** `/vmh/webhook/document/delete`
|
||||
|
||||
### 2. Event Handler (`document_sync_event_step.py`)
|
||||
- Queue Topics: `vmh.document.{create|update|delete}`
|
||||
- Redis Distributed Locking
|
||||
- Vollständiges Document Loading von EspoCRM
|
||||
|
||||
### 3. Sync Utilities (`document_sync_utils.py`)
|
||||
- **✅ Datei-Status Prüfung**: "Neu", "Geändert" → xAI-Sync erforderlich
|
||||
- **✅ Hash-basierte Change Detection**: MD5/SHA Vergleich für Updates
|
||||
- **✅ Related Entities Discovery**: Many-to-Many Attachments durchsuchen
|
||||
- **✅ Collection Requirements**: Automatische Ermittlung welche Collections nötig sind
|
||||
|
||||
## ⏳ In Arbeit
|
||||
|
||||
### 4. Thumbnail-Generierung (`generate_thumbnail()`)
|
||||
|
||||
**Anforderungen:**
|
||||
- Erste Seite eines PDFs als Vorschaubild
|
||||
- DOCX/DOC → PDF → Image Konvertierung
|
||||
- Bild-Dateien: Resize auf Thumbnail-Größe
|
||||
- Fallback: Generic File-Icons basierend auf MIME-Type
|
||||
|
||||
**Benötigte Dependencies:**
|
||||
```bash
|
||||
# Python Packages
|
||||
pip install pdf2image python-docx Pillow docx2pdf
|
||||
|
||||
# System Dependencies (Ubuntu/Debian)
|
||||
apt-get install poppler-utils libreoffice
|
||||
```
|
||||
|
||||
**Implementierungs-Schritte:**
|
||||
|
||||
1. **PDF Handling** (Priorität 1):
|
||||
```python
|
||||
from pdf2image import convert_from_path
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
def generate_pdf_thumbnail(pdf_path: str) -> bytes:
|
||||
# Konvertiere erste Seite zu Image
|
||||
images = convert_from_path(pdf_path, first_page=1, last_page=1, dpi=150)
|
||||
thumbnail = images[0]
|
||||
|
||||
# Resize auf Thumbnail-Größe (z.B. 200x280)
|
||||
thumbnail.thumbnail((200, 280), Image.Resampling.LANCZOS)
|
||||
|
||||
# Convert zu bytes
|
||||
buffer = io.BytesIO()
|
||||
thumbnail.save(buffer, format='PNG')
|
||||
return buffer.getvalue()
|
||||
```
|
||||
|
||||
2. **DOCX Handling** (Priorität 2):
|
||||
```python
|
||||
from docx2pdf import convert
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
def generate_docx_thumbnail(docx_path: str) -> bytes:
|
||||
# Temporäres PDF erstellen
|
||||
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
|
||||
pdf_path = tmp.name
|
||||
|
||||
# DOCX → PDF Konvertierung (benötigt LibreOffice)
|
||||
convert(docx_path, pdf_path)
|
||||
|
||||
# PDF-Thumbnail generieren
|
||||
thumbnail = generate_pdf_thumbnail(pdf_path)
|
||||
|
||||
# Cleanup
|
||||
os.remove(pdf_path)
|
||||
|
||||
return thumbnail
|
||||
```
|
||||
|
||||
3. **Image Handling** (Priorität 3):
|
||||
```python
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
def generate_image_thumbnail(image_path: str) -> bytes:
|
||||
img = Image.open(image_path)
|
||||
img.thumbnail((200, 280), Image.Resampling.LANCZOS)
|
||||
|
||||
buffer = io.BytesIO()
|
||||
img.save(buffer, format='PNG')
|
||||
return buffer.getvalue()
|
||||
```
|
||||
|
||||
4. **Thumbnail Upload zu EspoCRM**:
|
||||
```python
|
||||
# EspoCRM unterstützt Preview-Images via Attachment API
|
||||
async def upload_thumbnail_to_espocrm(
|
||||
document_id: str,
|
||||
thumbnail_bytes: bytes,
|
||||
espocrm_api
|
||||
):
|
||||
# Create Attachment
|
||||
attachment_data = {
|
||||
'name': 'preview.png',
|
||||
'type': 'image/png',
|
||||
'role': 'Inline Attachment',
|
||||
'parentType': 'Document',
|
||||
'parentId': document_id,
|
||||
'field': 'previewImage' # Custom field?
|
||||
}
|
||||
|
||||
# Upload via EspoCRM Attachment API
|
||||
# POST /api/v1/Attachment mit multipart/form-data
|
||||
# TODO: espocrm.py muss upload_attachment() Methode bekommen
|
||||
```
|
||||
|
||||
**Offene Fragen:**
|
||||
- Welches Feld in EspoCRM Document für Preview? `previewImage`? `thumbnail`?
|
||||
- Größe des Thumbnails? (empfohlen: 200x280 oder 300x400)
|
||||
- Format: PNG oder JPEG?
|
||||
|
||||
## ❌ Noch nicht implementiert
|
||||
|
||||
### 5. xAI Service (`xai_service.py`)
|
||||
|
||||
**Anforderungen:**
|
||||
- File Upload zu xAI (basierend auf `test_xai_collections_api.py`)
|
||||
- Add File zu Collections
|
||||
- Remove File von Collections
|
||||
- File Download von EspoCRM
|
||||
|
||||
**Referenz-Code vorhanden:**
|
||||
- `/opt/motia-iii/bitbylaw/test_xai_collections_api.py` (630 Zeilen, alle xAI Operations getestet)
|
||||
|
||||
**Implementierungs-Plan:**
|
||||
|
||||
```python
|
||||
class XAIService:
|
||||
def __init__(self, context=None):
|
||||
self.management_key = os.getenv('XAI_MANAGEMENT_KEY')
|
||||
self.api_key = os.getenv('XAI_API_KEY')
|
||||
self.context = context
|
||||
|
||||
async def upload_file(self, file_content: bytes, filename: str) -> str:
|
||||
"""Upload File zu xAI → returns file_id"""
|
||||
# Multipart/form-data upload
|
||||
# POST https://api.x.ai/v1/files
|
||||
pass
|
||||
|
||||
async def add_to_collection(self, collection_id: str, file_id: str):
|
||||
"""Add File zu Collection"""
|
||||
# POST https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id}
|
||||
pass
|
||||
|
||||
async def remove_from_collection(self, collection_id: str, file_id: str):
|
||||
"""Remove File von Collection"""
|
||||
# DELETE https://management-api.x.ai/v1/collections/{collection_id}/documents/{file_id}
|
||||
pass
|
||||
|
||||
async def download_from_espocrm(self, attachment_id: str) -> bytes:
|
||||
"""Download File von EspoCRM Attachment"""
|
||||
# GET https://crm.bitbylaw.com/api/v1/Attachment/file/{attachment_id}
|
||||
pass
|
||||
```
|
||||
|
||||
## 📋 Integration Checklist
|
||||
|
||||
### Vollständiger Upload-Flow:
|
||||
|
||||
1. ✅ Webhook empfangen → Event emittieren
|
||||
2. ✅ Event Handler: Lock acquire
|
||||
3. ✅ Document laden von EspoCRM
|
||||
4. ✅ Entscheidung: Sync nötig? (Datei-Status, Hash-Check, Collections)
|
||||
5. ⏳ Download File von EspoCRM
|
||||
6. ⏳ Hash berechnen (MD5/SHA)
|
||||
7. ⏳ Thumbnail generieren
|
||||
8. ❌ Upload zu xAI (falls neu oder Hash changed)
|
||||
9. ❌ Add zu Collections
|
||||
10. ⏳ Update EspoCRM Metadaten (xaiFileId, xaiCollections, xaiSyncedHash, thumbnail)
|
||||
11. ✅ Lock release
|
||||
|
||||
### Datei-Stati in EspoCRM:
|
||||
|
||||
- **"Neu"**: Komplett neue Datei → xAI Upload + Collection Add
|
||||
- **"Geändert"**: File-Inhalt geändert → xAI Re-Upload + Collection Update
|
||||
- **"Gesynct"**: Erfolgreich gesynct, keine Änderungen
|
||||
- **"Fehler"**: Sync fehlgeschlagen (mit Error-Message)
|
||||
|
||||
### EspoCRM Custom Fields:
|
||||
|
||||
**Erforderlich für Document Entity:**
|
||||
- `dateiStatus` (Enum): "Neu", "Geändert", "Gesynct", "Fehler"
|
||||
- `md5` (String): MD5 Hash des Files
|
||||
- `sha` (String): SHA Hash des Files
|
||||
- `xaiFileId` (String): xAI File ID
|
||||
- `xaiCollections` (Array): JSON Array von Collection IDs
|
||||
- `xaiSyncedHash` (String): Hash beim letzten erfolgreichen Sync
|
||||
- `xaiSyncStatus` (Enum): "syncing", "synced", "failed"
|
||||
- `xaiSyncError` (Text): Fehlermeldung bei Sync-Fehler
|
||||
- `previewImage` (Attachment?): Vorschaubild
|
||||
|
||||
## 🚀 Nächste Schritte
|
||||
|
||||
**Priorität 1: xAI Service**
|
||||
- Code aus `test_xai_collections_api.py` extrahieren
|
||||
- In `services/xai_service.py` übertragen
|
||||
- EspoCRM Download-Funktion implementieren
|
||||
|
||||
**Priorität 2: Thumbnail-Generator**
|
||||
- Dependencies installieren
|
||||
- PDF-Thumbnail implementieren
|
||||
- EspoCRM Upload-Methode erweitern
|
||||
|
||||
**Priorität 3: Integration testen**
|
||||
- Document in EspoCRM anlegen
|
||||
- Datei-Status auf "Neu" setzen
|
||||
- Webhook triggern
|
||||
- Logs analysieren
|
||||
|
||||
## 📚 Referenzen
|
||||
|
||||
- **xAI API Tests**: `/opt/motia-iii/bitbylaw/test_xai_collections_api.py`
|
||||
- **EspoCRM API**: `services/espocrm.py`
|
||||
- **Beteiligte Sync** (Referenz-Implementierung): `steps/vmh/beteiligte_sync_event_step.py`
|
||||
457
services/document_sync_utils.py
Normal file
457
services/document_sync_utils.py
Normal file
@@ -0,0 +1,457 @@
|
||||
"""
|
||||
Document Sync Utilities
|
||||
|
||||
Hilfsfunktionen für Document-Synchronisation mit xAI:
|
||||
- Distributed locking via Redis + syncStatus
|
||||
- Entscheidungslogik: Wann muss ein Document zu xAI?
|
||||
- Related Entities ermitteln (Many-to-Many Attachments)
|
||||
- xAI Collection Management
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
|
||||
# Max retry before permanent failure
|
||||
MAX_SYNC_RETRIES = 5
|
||||
|
||||
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
||||
|
||||
|
||||
class DocumentSync:
|
||||
"""Utility-Klasse für Document-Synchronisation mit xAI"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
|
||||
def _init_redis(self) -> redis.Redis:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Logging mit Context-Support"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
||||
"""
|
||||
Atomic distributed lock via Redis + syncStatus update
|
||||
|
||||
Args:
|
||||
entity_id: EspoCRM Document ID
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits im Sync
|
||||
"""
|
||||
try:
|
||||
# STEP 1: Atomic Redis lock (prevents race conditions)
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
|
||||
if not acquired:
|
||||
self._log(f"Redis lock bereits aktiv für Document {entity_id}", level='warn')
|
||||
return False
|
||||
|
||||
# STEP 2: Update syncStatus (für UI visibility) - falls Feld existiert
|
||||
# NOTE: Ggf. muss syncStatus bei Document Entity erst angelegt werden
|
||||
try:
|
||||
await self.espocrm.update_entity('Document', entity_id, {
|
||||
'xaiSyncStatus': 'syncing'
|
||||
})
|
||||
except Exception as e:
|
||||
self._log(f"Konnte xaiSyncStatus nicht setzen (Feld existiert evtl. nicht): {e}", level='debug')
|
||||
|
||||
self._log(f"Sync-Lock für Document {entity_id} erworben")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Acquire Lock: {e}", level='error')
|
||||
# Clean up Redis lock on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
||||
async def release_sync_lock(
|
||||
self,
|
||||
entity_id: str,
|
||||
success: bool = True,
|
||||
error_message: Optional[str] = None,
|
||||
extra_fields: Optional[Dict[str, Any]] = None
|
||||
) -> None:
|
||||
"""
|
||||
Gibt Sync-Lock frei und setzt finalen Status
|
||||
|
||||
Args:
|
||||
entity_id: EspoCRM Document ID
|
||||
success: Ob Sync erfolgreich war
|
||||
error_message: Optional: Fehlermeldung
|
||||
extra_fields: Optional: Zusätzliche Felder (z.B. xaiFileId, xaiCollections)
|
||||
"""
|
||||
try:
|
||||
update_data = {}
|
||||
|
||||
# Status-Feld (falls vorhanden)
|
||||
try:
|
||||
update_data['xaiSyncStatus'] = 'synced' if success else 'failed'
|
||||
|
||||
if error_message:
|
||||
update_data['xaiSyncError'] = error_message[:2000]
|
||||
else:
|
||||
update_data['xaiSyncError'] = None
|
||||
except:
|
||||
pass # Felder existieren evtl. nicht
|
||||
|
||||
# Merge extra fields (z.B. xaiFileId, xaiCollections)
|
||||
if extra_fields:
|
||||
update_data.update(extra_fields)
|
||||
|
||||
if update_data:
|
||||
await self.espocrm.update_entity('Document', entity_id, update_data)
|
||||
|
||||
self._log(f"Sync-Lock released: Document {entity_id} → {'success' if success else 'failed'}")
|
||||
|
||||
# Release Redis lock
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Release Lock: {e}", level='error')
|
||||
# Ensure Redis lock is released even on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
|
||||
async def should_sync_to_xai(self, document: Dict[str, Any]) -> Tuple[bool, List[str], str]:
|
||||
"""
|
||||
Entscheidet ob ein Document zu xAI synchronisiert werden muss
|
||||
|
||||
Prüft:
|
||||
1. Datei-Status Feld ("Neu", "Geändert")
|
||||
2. Hash-Werte für Change Detection
|
||||
3. Related Entities mit xAI Collections
|
||||
|
||||
Args:
|
||||
document: Vollständiges Document Entity von EspoCRM
|
||||
|
||||
Returns:
|
||||
Tuple[bool, List[str], str]:
|
||||
- bool: Ob Sync nötig ist
|
||||
- List[str]: Liste der Collection-IDs in die das Document soll
|
||||
- str: Grund/Beschreibung der Entscheidung
|
||||
"""
|
||||
doc_id = document.get('id')
|
||||
doc_name = document.get('name', 'Unbenannt')
|
||||
|
||||
# xAI-relevante Felder
|
||||
xai_file_id = document.get('xaiFileId')
|
||||
xai_collections = document.get('xaiCollections') or []
|
||||
|
||||
# Datei-Status und Hash-Felder
|
||||
datei_status = document.get('dateiStatus') or document.get('fileStatus')
|
||||
file_md5 = document.get('md5') or document.get('fileMd5')
|
||||
file_sha = document.get('sha') or document.get('fileSha')
|
||||
xai_synced_hash = document.get('xaiSyncedHash') # Hash beim letzten xAI-Sync
|
||||
|
||||
self._log(f"📋 Document Analysis: {doc_name} (ID: {doc_id})")
|
||||
self._log(f" xaiFileId: {xai_file_id or 'N/A'}")
|
||||
self._log(f" xaiCollections: {xai_collections}")
|
||||
self._log(f" Datei-Status: {datei_status or 'N/A'}")
|
||||
self._log(f" MD5: {file_md5[:16] if file_md5 else 'N/A'}...")
|
||||
self._log(f" SHA: {file_sha[:16] if file_sha else 'N/A'}...")
|
||||
self._log(f" xaiSyncedHash: {xai_synced_hash[:16] if xai_synced_hash else 'N/A'}...")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# PRIORITY CHECK: Datei-Status "Neu" oder "Geändert"
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if datei_status in ['Neu', 'Geändert', 'neu', 'geändert', 'New', 'Changed']:
|
||||
self._log(f"🆕 Datei-Status: '{datei_status}' → xAI-Sync ERFORDERLICH")
|
||||
|
||||
# Hole Collections (entweder existierende oder von Related Entities)
|
||||
if xai_collections:
|
||||
target_collections = xai_collections
|
||||
else:
|
||||
target_collections = await self._get_required_collections_from_relations(doc_id)
|
||||
|
||||
if target_collections:
|
||||
return (True, target_collections, f"Datei-Status: {datei_status}")
|
||||
else:
|
||||
# Datei ist neu/geändert aber keine Collections gefunden
|
||||
self._log(f"⚠️ Datei-Status '{datei_status}' aber keine Collections gefunden - überspringe Sync")
|
||||
return (False, [], f"Datei-Status: {datei_status}, aber keine Collections")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 1: Document ist bereits in xAI UND Collections sind gesetzt
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if xai_file_id and xai_collections:
|
||||
self._log(f"✅ Document bereits in xAI gesynct mit {len(xai_collections)} Collection(s)")
|
||||
|
||||
# Prüfe ob File-Inhalt geändert wurde (Hash-Vergleich)
|
||||
current_hash = file_md5 or file_sha
|
||||
|
||||
if current_hash and xai_synced_hash:
|
||||
if current_hash != xai_synced_hash:
|
||||
self._log(f"🔄 Hash-Änderung erkannt! RESYNC erforderlich")
|
||||
self._log(f" Alt: {xai_synced_hash[:16]}...")
|
||||
self._log(f" Neu: {current_hash[:16]}...")
|
||||
return (True, xai_collections, "File-Inhalt geändert (Hash-Mismatch)")
|
||||
else:
|
||||
self._log(f"✅ Hash identisch - keine Änderung")
|
||||
else:
|
||||
self._log(f"⚠️ Keine Hash-Werte verfügbar für Vergleich")
|
||||
|
||||
return (False, xai_collections, "Bereits gesynct, keine Änderung erkannt")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 2: Document hat xaiFileId aber Collections ist leer/None
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
if xai_file_id and not xai_collections:
|
||||
self._log(f"⚠️ Document hat xaiFileId aber keine Collections - prüfe Related Entities")
|
||||
|
||||
# Fallthrough zu FALL 3 - prüfe Related Entities
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 3: Prüfe Related Entities (Attachments)
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
required_collections = await self._get_required_collections_from_relations(doc_id)
|
||||
|
||||
if required_collections:
|
||||
self._log(f"✅ Document ist mit {len(required_collections)} Entity/ies verknüpft die Collections haben")
|
||||
return (True, required_collections, f"Verknüpft mit Entities die Collections benötigen")
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# FALL 4: Keine Collections gefunden → kein Sync nötig
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
self._log(f"⏭️ Kein xAI-Sync nötig: Keine Related Entities mit Collections")
|
||||
return (False, [], "Keine verknüpften Entities mit xAI Collections")
|
||||
|
||||
async def _get_required_collections_from_relations(self, document_id: str) -> List[str]:
|
||||
"""
|
||||
Ermittelt alle xAI Collection-IDs von Entities die mit diesem Document verknüpft sind
|
||||
|
||||
EspoCRM Many-to-Many: Document kann mit beliebigen Entities verknüpft sein
|
||||
(CBeteiligte, Account, CVmhErstgespraech, etc.)
|
||||
|
||||
Args:
|
||||
document_id: Document ID
|
||||
|
||||
Returns:
|
||||
Liste von xAI Collection-IDs (dedupliziert)
|
||||
"""
|
||||
collections = set()
|
||||
|
||||
# Liste von Entity-Types die xAI Collections haben können
|
||||
# NOTE: Erweiterbar für andere Entities
|
||||
entity_types_with_collections = [
|
||||
'CBeteiligte',
|
||||
'Account',
|
||||
'CVmhErstgespraech',
|
||||
# Weitere Entity-Types hier hinzufügen
|
||||
]
|
||||
|
||||
self._log(f"🔍 Prüfe Attachments von Document {document_id}...")
|
||||
|
||||
for entity_type in entity_types_with_collections:
|
||||
try:
|
||||
# Finde alle Entities dieses Typs die dieses Document attached haben
|
||||
# EspoCRM API: Suche wo documentsIds das Document enthält
|
||||
result = await self.espocrm.list_entities(
|
||||
entity_type,
|
||||
where=[{
|
||||
'type': 'arrayAnyOf',
|
||||
'attribute': 'documentsIds',
|
||||
'value': [document_id]
|
||||
}],
|
||||
select='id,name,xaiCollectionId',
|
||||
max_size=100
|
||||
)
|
||||
|
||||
entities = result.get('list', [])
|
||||
|
||||
if entities:
|
||||
self._log(f" ✅ {len(entities)} {entity_type}(s) gefunden")
|
||||
|
||||
for entity in entities:
|
||||
collection_id = entity.get('xaiCollectionId')
|
||||
if collection_id:
|
||||
collections.add(collection_id)
|
||||
self._log(f" → {entity.get('name')}: Collection {collection_id[:16]}...")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f" ⚠️ Fehler beim Prüfen von {entity_type}: {e}", level='warn')
|
||||
continue
|
||||
|
||||
result = list(collections)
|
||||
self._log(f"📊 Gesamt: {len(result)} eindeutige Collection(s) gefunden")
|
||||
|
||||
return result
|
||||
|
||||
async def get_document_download_info(self, document_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Holt Download-Informationen für ein Document
|
||||
|
||||
Returns:
|
||||
Dict mit:
|
||||
- download_url: URL zum Download
|
||||
- filename: Dateiname
|
||||
- mime_type: MIME-Type
|
||||
- size: Dateigröße in Bytes
|
||||
"""
|
||||
try:
|
||||
# Hole vollständiges Document
|
||||
doc = await self.espocrm.get_entity('Document', document_id)
|
||||
|
||||
# EspoCRM Document hat Attachments (Attachment ID in attachmentsIds)
|
||||
attachment_ids = doc.get('attachmentsIds') or []
|
||||
|
||||
if not attachment_ids:
|
||||
self._log(f"⚠️ Document {document_id} hat keine Attachments", level='warn')
|
||||
return None
|
||||
|
||||
# Nehme erstes Attachment (Documents haben normalerweise nur 1 File)
|
||||
attachment_id = attachment_ids[0]
|
||||
|
||||
# Hole Attachment-Details
|
||||
attachment = await self.espocrm.get_entity('Attachment', attachment_id)
|
||||
|
||||
return {
|
||||
'attachment_id': attachment_id,
|
||||
'download_url': f"/api/v1/Attachment/file/{attachment_id}",
|
||||
'filename': attachment.get('name', 'unknown'),
|
||||
'mime_type': attachment.get('type', 'application/octet-stream'),
|
||||
'size': attachment.get('size', 0)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ Fehler beim Laden von Download-Info: {e}", level='error')
|
||||
return None
|
||||
|
||||
async def generate_thumbnail(self, file_path: str, mime_type: str) -> Optional[bytes]:
|
||||
"""
|
||||
Generiert Vorschaubild (Thumbnail) für ein Document
|
||||
|
||||
Unterstützt:
|
||||
- PDF: Erste Seite als Bild
|
||||
- DOCX/DOC: Konvertierung zu PDF, dann erste Seite
|
||||
- Images: Resize auf Thumbnail-Größe
|
||||
- Andere: Platzhalter-Icon basierend auf MIME-Type
|
||||
|
||||
Args:
|
||||
file_path: Pfad zur Datei (lokal oder Download-URL)
|
||||
mime_type: MIME-Type des Documents
|
||||
|
||||
Returns:
|
||||
Thumbnail als bytes (PNG/JPEG) oder None bei Fehler
|
||||
"""
|
||||
self._log(f"🖼️ Thumbnail-Generierung für {mime_type}")
|
||||
|
||||
# TODO: Implementierung
|
||||
#
|
||||
# Benötigte Libraries:
|
||||
# - pdf2image (für PDF → Image)
|
||||
# - python-docx + docx2pdf (für DOCX → PDF → Image)
|
||||
# - Pillow (PIL) für Image-Processing
|
||||
# - poppler-utils (System-Dependency für pdf2image)
|
||||
#
|
||||
# Implementierungs-Schritte:
|
||||
#
|
||||
# 1. PDF-Handling:
|
||||
# from pdf2image import convert_from_path
|
||||
# images = convert_from_path(file_path, first_page=1, last_page=1)
|
||||
# thumbnail = images[0].resize((200, 280))
|
||||
# return thumbnail_to_bytes(thumbnail)
|
||||
#
|
||||
# 2. DOCX-Handling:
|
||||
# - Konvertiere zu temporärem PDF
|
||||
# - Dann wie PDF behandeln
|
||||
#
|
||||
# 3. Image-Handling:
|
||||
# from PIL import Image
|
||||
# img = Image.open(file_path)
|
||||
# img.thumbnail((200, 280))
|
||||
# return image_to_bytes(img)
|
||||
#
|
||||
# 4. Fallback:
|
||||
# - Generic file-type icon basierend auf MIME-Type
|
||||
|
||||
self._log(f"⚠️ Thumbnail-Generierung noch nicht implementiert", level='warn')
|
||||
return None
|
||||
|
||||
async def update_sync_metadata(
|
||||
self,
|
||||
document_id: str,
|
||||
xai_file_id: str,
|
||||
collection_ids: List[str],
|
||||
file_hash: Optional[str] = None,
|
||||
thumbnail_data: Optional[bytes] = None
|
||||
) -> None:
|
||||
"""
|
||||
Updated Document-Metadaten nach erfolgreichem xAI-Sync
|
||||
|
||||
Args:
|
||||
document_id: EspoCRM Document ID
|
||||
xai_file_id: xAI File ID
|
||||
collection_ids: Liste der xAI Collection IDs
|
||||
file_hash: MD5/SHA Hash des gesyncten Files
|
||||
thumbnail_data: Vorschaubild als bytes
|
||||
"""
|
||||
try:
|
||||
update_data = {
|
||||
'xaiFileId': xai_file_id,
|
||||
'xaiCollections': collection_ids,
|
||||
'dateiStatus': 'Gesynct', # Status zurücksetzen
|
||||
}
|
||||
|
||||
# Hash speichern für zukünftige Change Detection
|
||||
if file_hash:
|
||||
update_data['xaiSyncedHash'] = file_hash
|
||||
|
||||
# Thumbnail als Attachment hochladen (falls vorhanden)
|
||||
if thumbnail_data:
|
||||
# TODO: Implementiere Thumbnail-Upload zu EspoCRM
|
||||
# EspoCRM unterstützt Preview-Images für Documents
|
||||
# Muss als separates Attachment hochgeladen werden
|
||||
self._log(f"⚠️ Thumbnail-Upload noch nicht implementiert", level='warn')
|
||||
|
||||
await self.espocrm.update_entity('Document', document_id, update_data)
|
||||
self._log(f"✅ Sync-Metadaten aktualisiert für Document {document_id}")
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ Fehler beim Update von Sync-Metadaten: {e}", level='error')
|
||||
raise
|
||||
280
steps/vmh/document_sync_event_step.py
Normal file
280
steps/vmh/document_sync_event_step.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""
|
||||
VMH Document Sync Handler
|
||||
|
||||
Zentraler Sync-Handler für Documents mit xAI Collections
|
||||
|
||||
Verarbeitet:
|
||||
- vmh.document.create: Neu in EspoCRM → Prüfe ob xAI-Sync nötig
|
||||
- vmh.document.update: Geändert in EspoCRM → Prüfe ob xAI-Sync/Update nötig
|
||||
- vmh.document.delete: Gelöscht in EspoCRM → Remove from xAI Collections
|
||||
"""
|
||||
|
||||
from typing import Dict, Any
|
||||
from motia import FlowContext
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.document_sync_utils import DocumentSync
|
||||
import json
|
||||
import redis
|
||||
import os
|
||||
|
||||
config = {
|
||||
"name": "VMH Document Sync Handler",
|
||||
"description": "Zentraler Sync-Handler für Documents mit xAI Collections",
|
||||
"flows": ["vmh-documents"],
|
||||
"triggers": [
|
||||
{"type": "queue", "topic": "vmh.document.create"},
|
||||
{"type": "queue", "topic": "vmh.document.update"},
|
||||
{"type": "queue", "topic": "vmh.document.delete"}
|
||||
],
|
||||
"enqueues": []
|
||||
}
|
||||
|
||||
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
"""Zentraler Sync-Handler für Documents"""
|
||||
entity_id = event_data.get('entity_id')
|
||||
action = event_data.get('action')
|
||||
source = event_data.get('source')
|
||||
|
||||
if not entity_id:
|
||||
ctx.logger.error("Keine entity_id im Event gefunden")
|
||||
return
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"🔄 DOCUMENT SYNC HANDLER GESTARTET")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"Action: {action.upper()}")
|
||||
ctx.logger.info(f"Document ID: {entity_id}")
|
||||
ctx.logger.info(f"Source: {source}")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Shared Redis client for distributed locking
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
|
||||
# APIs initialisieren
|
||||
espocrm = EspoCRMAPI()
|
||||
sync_utils = DocumentSync(espocrm, redis_client, ctx)
|
||||
|
||||
# TODO: xAI Service wird in nächstem Schritt hinzugefügt
|
||||
# from services.xai_service import XAIService
|
||||
# xai_service = XAIService(ctx)
|
||||
|
||||
try:
|
||||
# 1. ACQUIRE LOCK (verhindert parallele Syncs)
|
||||
lock_acquired = await sync_utils.acquire_sync_lock(entity_id)
|
||||
|
||||
if not lock_acquired:
|
||||
ctx.logger.warn(f"⏸️ Sync bereits aktiv für Document {entity_id}, überspringe")
|
||||
return
|
||||
|
||||
# Lock erfolgreich acquired - MUSS im finally block released werden!
|
||||
try:
|
||||
# 2. FETCH VOLLSTÄNDIGES DOCUMENT VON ESPOCRM
|
||||
try:
|
||||
document = await espocrm.get_entity('Document', entity_id)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler beim Laden von Document: {e}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
return
|
||||
|
||||
ctx.logger.info(f"📋 Document geladen:")
|
||||
ctx.logger.info(f" Name: {document.get('name', 'N/A')}")
|
||||
ctx.logger.info(f" Type: {document.get('type', 'N/A')}")
|
||||
ctx.logger.info(f" xaiFileId: {document.get('xaiFileId', 'N/A')}")
|
||||
ctx.logger.info(f" xaiCollections: {document.get('xaiCollections', [])}")
|
||||
|
||||
# 3. BESTIMME SYNC-AKTION BASIEREND AUF ACTION
|
||||
|
||||
if action == 'delete':
|
||||
await handle_delete(entity_id, document, sync_utils, ctx)
|
||||
|
||||
elif action in ['create', 'update']:
|
||||
await handle_create_or_update(entity_id, document, sync_utils, ctx)
|
||||
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ Unbekannte Action: {action}")
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=f"Unbekannte Action: {action}")
|
||||
|
||||
except Exception as e:
|
||||
# Unerwarteter Fehler während Sync - GARANTIERE Lock-Release
|
||||
ctx.logger.error(f"❌ Unerwarteter Fehler im Sync-Handler: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
try:
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
success=False,
|
||||
error_message=str(e)[:2000]
|
||||
)
|
||||
except Exception as release_error:
|
||||
# Selbst Lock-Release failed - logge kritischen Fehler
|
||||
ctx.logger.critical(f"🚨 CRITICAL: Lock-Release failed für Document {entity_id}: {release_error}")
|
||||
# Force Redis lock release
|
||||
try:
|
||||
lock_key = f"sync_lock:document:{entity_id}"
|
||||
redis_client.delete(lock_key)
|
||||
ctx.logger.info(f"✅ Redis lock manuell released: {lock_key}")
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
# Fehler VOR Lock-Acquire - kein Lock-Release nötig
|
||||
ctx.logger.error(f"❌ Fehler vor Lock-Acquire: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
|
||||
|
||||
async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
|
||||
"""
|
||||
Behandelt Create/Update von Documents
|
||||
|
||||
Entscheidet ob xAI-Sync nötig ist und führt diesen durch
|
||||
"""
|
||||
try:
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🔍 ANALYSE: Braucht dieses Document xAI-Sync?")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# Entscheidungslogik: Soll dieses Document zu xAI?
|
||||
needs_sync, collection_ids, reason = await sync_utils.should_sync_to_xai(document)
|
||||
|
||||
ctx.logger.info(f"📊 Entscheidung: {'✅ SYNC NÖTIG' if needs_sync else '⏭️ KEIN SYNC NÖTIG'}")
|
||||
ctx.logger.info(f" Grund: {reason}")
|
||||
|
||||
if collection_ids:
|
||||
ctx.logger.info(f" Collections: {collection_ids}")
|
||||
|
||||
if not needs_sync:
|
||||
ctx.logger.info("✅ Kein xAI-Sync erforderlich, Lock wird released")
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
return
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# xAI SYNC DURCHFÜHREN
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🤖 xAI SYNC STARTEN")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
# TODO: Implementierung mit xai_service.py
|
||||
ctx.logger.warn("⚠️ xAI Service noch nicht implementiert!")
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("TODO: Folgende Schritte werden implementiert:")
|
||||
ctx.logger.info("1. 📥 Download File von EspoCRM")
|
||||
ctx.logger.info("2. 📤 Upload zu xAI (falls noch kein xaiFileId)")
|
||||
ctx.logger.info("3. 📚 Add zu Collections")
|
||||
ctx.logger.info("4. 💾 Update EspoCRM: xaiFileId + xaiCollections")
|
||||
ctx.logger.info("")
|
||||
|
||||
# PLACEHOLDER Implementation:
|
||||
#
|
||||
# # 1. Download File von EspoCRM
|
||||
# download_info = await sync_utils.get_document_download_info(entity_id)
|
||||
# if not download_info:
|
||||
# raise Exception("Konnte Download-Info nicht ermitteln")
|
||||
#
|
||||
# # 2. Upload zu xAI (falls noch nicht vorhanden)
|
||||
# xai_file_id = document.get('xaiFileId')
|
||||
# if not xai_file_id:
|
||||
# file_content = await download_file_from_espocrm(download_info)
|
||||
# xai_file_id = await xai_service.upload_file(file_content, download_info['filename'])
|
||||
#
|
||||
# # 3. Add zu Collections
|
||||
# for collection_id in collection_ids:
|
||||
# await xai_service.add_file_to_collection(collection_id, xai_file_id)
|
||||
#
|
||||
# # 4. Update EspoCRM
|
||||
# await sync_utils.release_sync_lock(
|
||||
# entity_id,
|
||||
# success=True,
|
||||
# extra_fields={
|
||||
# 'xaiFileId': xai_file_id,
|
||||
# 'xaiCollections': collection_ids
|
||||
# }
|
||||
# )
|
||||
|
||||
# Für jetzt: Success ohne Sync
|
||||
await sync_utils.release_sync_lock(
|
||||
entity_id,
|
||||
success=True,
|
||||
extra_fields={
|
||||
# TODO: Echte xAI-Daten hier einsetzen
|
||||
# 'xaiFileId': xai_file_id,
|
||||
# 'xaiCollections': collection_ids
|
||||
}
|
||||
)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ DOCUMENT SYNC ABGESCHLOSSEN (PLACEHOLDER)")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler bei Create/Update: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
|
||||
|
||||
async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, ctx: FlowContext[Any]):
|
||||
"""
|
||||
Behandelt Deletion von Documents
|
||||
|
||||
Entfernt Document aus xAI Collections (aber löscht File nicht - kann in anderen Collections sein)
|
||||
"""
|
||||
try:
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🗑️ DOCUMENT DELETE - xAI CLEANUP")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
xai_file_id = document.get('xaiFileId')
|
||||
xai_collections = document.get('xaiCollections') or []
|
||||
|
||||
if not xai_file_id or not xai_collections:
|
||||
ctx.logger.info("⏭️ Document war nicht in xAI gesynct, nichts zu tun")
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
return
|
||||
|
||||
ctx.logger.info(f"📋 Document Info:")
|
||||
ctx.logger.info(f" xaiFileId: {xai_file_id}")
|
||||
ctx.logger.info(f" Collections: {xai_collections}")
|
||||
|
||||
# TODO: Implementierung mit xai_service
|
||||
ctx.logger.warn("⚠️ xAI Delete-Operation noch nicht implementiert!")
|
||||
ctx.logger.info("")
|
||||
ctx.logger.info("TODO: Folgende Schritte werden implementiert:")
|
||||
ctx.logger.info("1. 🗑️ Remove File aus allen Collections")
|
||||
ctx.logger.info("2. ⚠️ File NICHT von xAI löschen (kann in anderen Collections sein)")
|
||||
ctx.logger.info("")
|
||||
|
||||
# PLACEHOLDER Implementation:
|
||||
#
|
||||
# for collection_id in xai_collections:
|
||||
# await xai_service.remove_file_from_collection(collection_id, xai_file_id)
|
||||
#
|
||||
# ctx.logger.info(f"✅ File aus {len(xai_collections)} Collection(s) entfernt")
|
||||
|
||||
await sync_utils.release_sync_lock(entity_id, success=True)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("✅ DELETE ABGESCHLOSSEN (PLACEHOLDER)")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Fehler bei Delete: {e}")
|
||||
import traceback
|
||||
ctx.logger.error(traceback.format_exc())
|
||||
await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e))
|
||||
Reference in New Issue
Block a user