Compare commits
2 Commits
6bf2343a12
...
46c0bbf381
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46c0bbf381 | ||
|
|
8f1533337c |
@@ -240,20 +240,18 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
self,
|
||||
knowledge_id: str,
|
||||
collection_id: str,
|
||||
ctx,
|
||||
full_sync: bool = False
|
||||
ctx
|
||||
) -> None:
|
||||
"""
|
||||
Sync all documents of a knowledge base to XAI collection.
|
||||
|
||||
Uses efficient JunctionData endpoint to get all documents with junction data
|
||||
and blake3 hashes in a single API call.
|
||||
and blake3 hashes in a single API call. Hash comparison is always performed.
|
||||
|
||||
Args:
|
||||
knowledge_id: CAIKnowledge entity ID
|
||||
collection_id: XAI Collection ID
|
||||
ctx: Motia context
|
||||
full_sync: If True, force Blake3 hash comparison for all documents (nightly cron)
|
||||
"""
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.xai_service import XAIService
|
||||
@@ -301,8 +299,8 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
if junction_status in ['new', 'unclean', 'failed']:
|
||||
needs_sync = True
|
||||
reason = f"status={junction_status}"
|
||||
elif full_sync and blake3_hash and ai_document_id:
|
||||
# Full sync mode: verify Blake3 hash with XAI
|
||||
elif junction_status == 'synced' and blake3_hash and ai_document_id:
|
||||
# Verify Blake3 hash with XAI (always, since hash from JunctionData API is free)
|
||||
try:
|
||||
xai_doc_info = await xai.get_collection_document(collection_id, ai_document_id)
|
||||
if xai_doc_info:
|
||||
@@ -310,7 +308,7 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
|
||||
if xai_blake3 != blake3_hash:
|
||||
needs_sync = True
|
||||
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs Doc: {blake3_hash[:16]}...)"
|
||||
reason = f"blake3 mismatch (XAI: {xai_blake3[:16] if xai_blake3 else 'N/A'}... vs EspoCRM: {blake3_hash[:16]}...)"
|
||||
ctx.logger.info(f" 🔄 Blake3 mismatch detected!")
|
||||
else:
|
||||
ctx.logger.info(f" ✅ Blake3 hash matches")
|
||||
@@ -327,14 +325,33 @@ class AIKnowledgeSync(BaseSyncUtils):
|
||||
|
||||
ctx.logger.info(f" 🔄 Syncing: {reason}")
|
||||
|
||||
# Get complete document entity with attachment info
|
||||
doc_entity = await espocrm.get_entity('CDokumente', doc_id)
|
||||
attachment_id = doc_entity.get('dokumentId')
|
||||
|
||||
if not attachment_id:
|
||||
ctx.logger.error(f" ❌ No attachment ID found for document {doc_id}")
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# Get attachment details for MIME type
|
||||
try:
|
||||
attachment = await espocrm.get_entity('Attachment', attachment_id)
|
||||
mime_type = attachment.get('type', 'application/octet-stream')
|
||||
file_size = attachment.get('size', 0)
|
||||
except Exception as e:
|
||||
ctx.logger.warn(f" ⚠️ Failed to get attachment details: {e}, using defaults")
|
||||
mime_type = 'application/octet-stream'
|
||||
file_size = 0
|
||||
|
||||
ctx.logger.info(f" 📎 Attachment: {attachment_id} ({mime_type}, {file_size} bytes)")
|
||||
|
||||
# Download document
|
||||
attachment_id = doc.get('documentId') # TODO: Get correct attachment ID from CDokumente
|
||||
file_content = await espocrm.download_attachment(attachment_id)
|
||||
ctx.logger.info(f" 📥 Downloaded {len(file_content)} bytes")
|
||||
|
||||
# Upload to XAI
|
||||
filename = doc_name
|
||||
mime_type = 'application/octet-stream' # TODO: Get from attachment
|
||||
|
||||
xai_file_id = await xai.upload_file(file_content, filename, mime_type)
|
||||
ctx.logger.info(f" 📤 Uploaded to XAI: {xai_file_id}")
|
||||
|
||||
@@ -458,107 +458,6 @@ class XAIService:
|
||||
|
||||
self._log(f"✅ Metadata updated for {file_id}")
|
||||
|
||||
# ========== High-Level Operations ==========
|
||||
|
||||
async def upload_document_with_metadata(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_content: bytes,
|
||||
filename: str,
|
||||
mime_type: str,
|
||||
metadata: Dict[str, str]
|
||||
) -> str:
|
||||
"""
|
||||
Upload file + add to collection with metadata in one operation.
|
||||
|
||||
Args:
|
||||
collection_id: XAI Collection ID
|
||||
file_content: File bytes
|
||||
filename: Filename
|
||||
mime_type: MIME type
|
||||
metadata: Metadata fields
|
||||
|
||||
Returns:
|
||||
XAI file_id
|
||||
|
||||
Raises:
|
||||
RuntimeError: bei Upload/Add-Fehler
|
||||
"""
|
||||
# Step 1: Upload file
|
||||
file_id = await self.upload_file(file_content, filename, mime_type)
|
||||
|
||||
try:
|
||||
# Step 2: Add to collection (XAI API automatically handles metadata)
|
||||
# Note: Metadata muss beim POST mit angegeben werden
|
||||
session = await self._get_session()
|
||||
url = f"{XAI_MANAGEMENT_URL}/v1/collections/{collection_id}/documents/{file_id}"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.management_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
body = {"fields": metadata}
|
||||
|
||||
async with session.post(url, json=body, headers=headers) as response:
|
||||
if response.status not in (200, 201):
|
||||
raw = await response.text()
|
||||
raise RuntimeError(
|
||||
f"Failed to add file to collection with metadata ({response.status}): {raw}"
|
||||
)
|
||||
|
||||
self._log(f"✅ File {file_id} added to collection {collection_id} with metadata")
|
||||
return file_id
|
||||
|
||||
except Exception as e:
|
||||
# Cleanup: File wurde hochgeladen aber nicht zur Collection hinzugefügt
|
||||
self._log(f"⚠️ Failed to add to collection, file {file_id} may be orphaned", level='warn')
|
||||
raise
|
||||
|
||||
async def verify_upload_integrity(
|
||||
self,
|
||||
collection_id: str,
|
||||
file_id: str,
|
||||
retry_attempts: int = 3
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Verifiziert Upload-Integrität via BLAKE3 Hash von XAI.
|
||||
|
||||
Args:
|
||||
collection_id: XAI Collection ID
|
||||
file_id: XAI file_id
|
||||
retry_attempts: Retry bei temporären Fehlern
|
||||
|
||||
Returns:
|
||||
(success: bool, blake3_hash: Optional[str])
|
||||
"""
|
||||
for attempt in range(1, retry_attempts + 1):
|
||||
try:
|
||||
doc_info = await self.get_collection_document(collection_id, file_id)
|
||||
|
||||
if not doc_info:
|
||||
self._log(f"⚠️ Document {file_id} not found in collection", level='warn')
|
||||
return (False, None)
|
||||
|
||||
blake3_hash = doc_info.get('hash')
|
||||
|
||||
if not blake3_hash:
|
||||
self._log(f"⚠️ No hash returned by XAI API", level='warn')
|
||||
return (False, None)
|
||||
|
||||
self._log(f"✅ Upload verified, BLAKE3: {blake3_hash[:32]}...")
|
||||
return (True, blake3_hash)
|
||||
|
||||
except Exception as e:
|
||||
if attempt < retry_attempts:
|
||||
delay = 2 ** attempt # Exponential backoff
|
||||
self._log(f"⚠️ Verification failed (attempt {attempt}), retry in {delay}s", level='warn')
|
||||
await asyncio.sleep(delay)
|
||||
else:
|
||||
self._log(f"❌ Verification failed after {retry_attempts} attempts: {e}", level='error')
|
||||
return (False, None)
|
||||
|
||||
return (False, None)
|
||||
|
||||
def is_mime_type_supported(self, mime_type: str) -> bool:
|
||||
"""
|
||||
Prüft, ob XAI diesen MIME-Type unterstützt.
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
"""AI Knowledge Full Sync - Daily Cron Job"""
|
||||
"""AI Knowledge Daily Sync - Cron Job"""
|
||||
from typing import Any
|
||||
from motia import FlowContext, cron
|
||||
|
||||
|
||||
config = {
|
||||
"name": "AI Knowledge Full Sync",
|
||||
"description": "Daily full sync of all CAIKnowledge entities (catches missed webhooks)",
|
||||
"name": "AI Knowledge Daily Sync",
|
||||
"description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
|
||||
"flows": ["aiknowledge-full-sync"],
|
||||
"triggers": [
|
||||
cron("0 0 2 * * *"), # Daily at 2:00 AM
|
||||
@@ -16,16 +16,17 @@ config = {
|
||||
|
||||
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
"""
|
||||
Daily full sync handler.
|
||||
Daily sync handler - ensures all active knowledge bases are synchronized.
|
||||
|
||||
Loads all CAIKnowledge entities that need sync and emits events.
|
||||
Blake3 hash verification is always performed (hash available from JunctionData API).
|
||||
Runs every day at 02:00:00.
|
||||
"""
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("🌙 DAILY FULL SYNC STARTED")
|
||||
ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
@@ -63,23 +64,22 @@ async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
||||
ctx.logger.info("=" * 80)
|
||||
return
|
||||
|
||||
# Enqueue sync events for all
|
||||
# Enqueue sync events for all (Blake3 verification always enabled)
|
||||
for i, entity in enumerate(entities, 1):
|
||||
await ctx.enqueue({
|
||||
'topic': 'aiknowledge.sync',
|
||||
'data': {
|
||||
'knowledge_id': entity['id'],
|
||||
'source': 'daily_full_sync'
|
||||
'source': 'daily_cron'
|
||||
}
|
||||
})
|
||||
|
||||
ctx.logger.info(
|
||||
f"📤 [{i}/{total}] Enqueued: {entity['name']} "
|
||||
f"(syncStatus={entity.get('syncStatus')})"
|
||||
)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info(f"✅ Full sync complete: {total} events enqueued")
|
||||
ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -60,7 +60,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None:
|
||||
raise RuntimeError(f"Lock busy for {knowledge_id}") # Motia will retry
|
||||
|
||||
try:
|
||||
# Perform sync
|
||||
# Perform sync (Blake3 hash verification always enabled)
|
||||
await sync_utils.sync_knowledge_to_xai(knowledge_id, ctx)
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
Reference in New Issue
Block a user