732 lines
26 KiB
Python
732 lines
26 KiB
Python
"""EspoCRM API client for Motia III"""
|
|
import aiohttp
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Optional, Dict, Any, List
|
|
import os
|
|
|
|
from services.exceptions import (
|
|
EspoCRMAPIError,
|
|
EspoCRMAuthError,
|
|
EspoCRMTimeoutError,
|
|
RetryableError,
|
|
ValidationError
|
|
)
|
|
from services.redis_client import get_redis_client
|
|
from services.config import ESPOCRM_CONFIG, API_CONFIG
|
|
from services.logging_utils import get_service_logger
|
|
|
|
|
|
class EspoCRMAPI:
|
|
"""
|
|
EspoCRM API Client for BitByLaw integration.
|
|
|
|
Supports:
|
|
- API Key authentication (X-Api-Key header)
|
|
- Standard REST operations (GET, POST, PUT, DELETE)
|
|
- Entity management (CBeteiligte, CVmhErstgespraech, etc.)
|
|
|
|
Environment variables required:
|
|
- ESPOCRM_API_BASE_URL (e.g., https://crm.bitbylaw.com/api/v1)
|
|
- ESPOCRM_API_KEY (Marvin API key)
|
|
- ESPOCRM_API_TIMEOUT_SECONDS (optional, default: 30)
|
|
"""
|
|
|
|
def __init__(self, context=None):
|
|
"""
|
|
Initialize EspoCRM API client.
|
|
|
|
Args:
|
|
context: Motia FlowContext for logging (optional)
|
|
"""
|
|
self.context = context
|
|
self.logger = get_service_logger('espocrm', context)
|
|
self.logger.debug("EspoCRMAPI initializing")
|
|
|
|
# Load configuration from environment
|
|
self.api_base_url = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
|
|
self.api_key = os.getenv('ESPOCRM_API_KEY', '')
|
|
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', str(API_CONFIG.default_timeout_seconds)))
|
|
|
|
if not self.api_key:
|
|
raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment")
|
|
|
|
self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
|
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
self._entity_defs_cache: Dict[str, Dict[str, Any]] = {}
|
|
self._entity_defs_cache_ttl_seconds = int(os.getenv('ESPOCRM_METADATA_TTL_SECONDS', '300'))
|
|
|
|
# Metadata cache (complete metadata loaded once)
|
|
self._metadata_cache: Optional[Dict[str, Any]] = None
|
|
self._metadata_cache_ts: float = 0
|
|
|
|
# Optional Redis for caching/rate limiting (centralized)
|
|
self.redis_client = get_redis_client(strict=False)
|
|
if self.redis_client:
|
|
self.logger.info("Connected to Redis for EspoCRM operations")
|
|
else:
|
|
self.logger.warning("⚠️ Redis unavailable - caching disabled")
|
|
|
|
def _log(self, message: str, level: str = 'info') -> None:
|
|
"""Delegate to IntegrationLogger with optional level"""
|
|
log_func = getattr(self.logger, level, self.logger.info)
|
|
log_func(message)
|
|
|
|
def _get_headers(self) -> Dict[str, str]:
|
|
"""Generate request headers with API key"""
|
|
return {
|
|
'X-Api-Key': self.api_key,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
async def _get_session(self) -> aiohttp.ClientSession:
|
|
if self._session is None or self._session.closed:
|
|
self._session = aiohttp.ClientSession()
|
|
return self._session
|
|
|
|
async def close(self) -> None:
|
|
if self._session and not self._session.closed:
|
|
await self._session.close()
|
|
|
|
async def get_metadata(self) -> Dict[str, Any]:
|
|
"""
|
|
Get complete EspoCRM metadata (cached).
|
|
|
|
Loads once and caches for TTL duration.
|
|
Much faster than individual entity def calls.
|
|
|
|
Returns:
|
|
Complete metadata dict with entityDefs, clientDefs, etc.
|
|
"""
|
|
now = time.monotonic()
|
|
|
|
# Return cached if still valid
|
|
if (self._metadata_cache is not None and
|
|
(now - self._metadata_cache_ts) < self._entity_defs_cache_ttl_seconds):
|
|
return self._metadata_cache
|
|
|
|
# Load fresh metadata
|
|
try:
|
|
self._log("📥 Loading complete EspoCRM metadata...", level='debug')
|
|
metadata = await self.api_call("/Metadata", method='GET')
|
|
|
|
if not isinstance(metadata, dict):
|
|
self._log("⚠️ Metadata response is not a dict, using empty", level='warn')
|
|
metadata = {}
|
|
|
|
# Cache it
|
|
self._metadata_cache = metadata
|
|
self._metadata_cache_ts = now
|
|
|
|
entity_count = len(metadata.get('entityDefs', {}))
|
|
self._log(f"✅ Metadata cached: {entity_count} entity definitions", level='debug')
|
|
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
self._log(f"❌ Failed to load metadata: {e}", level='error')
|
|
# Return empty dict as fallback
|
|
return {}
|
|
|
|
async def get_entity_def(self, entity_type: str) -> Dict[str, Any]:
|
|
"""
|
|
Get entity definition for a specific entity type (cached via metadata).
|
|
|
|
Uses complete metadata cache - much faster and correct API usage.
|
|
|
|
Args:
|
|
entity_type: Entity type (e.g., 'Document', 'CDokumente', 'Account')
|
|
|
|
Returns:
|
|
Entity definition dict with fields, links, etc.
|
|
"""
|
|
try:
|
|
metadata = await self.get_metadata()
|
|
entity_defs = metadata.get('entityDefs', {})
|
|
|
|
if not isinstance(entity_defs, dict):
|
|
self._log(f"⚠️ entityDefs is not a dict for {entity_type}", level='warn')
|
|
return {}
|
|
|
|
entity_def = entity_defs.get(entity_type, {})
|
|
|
|
if not entity_def:
|
|
self._log(f"⚠️ No entity definition found for '{entity_type}'", level='debug')
|
|
|
|
return entity_def
|
|
|
|
except Exception as e:
|
|
self._log(f"⚠️ Could not load entity def for {entity_type}: {e}", level='warn')
|
|
return {}
|
|
|
|
async def api_call(
|
|
self,
|
|
endpoint: str,
|
|
method: str = 'GET',
|
|
params: Optional[Dict] = None,
|
|
json_data: Optional[Dict] = None,
|
|
timeout_seconds: Optional[int] = None
|
|
) -> Any:
|
|
"""
|
|
Make an API call to EspoCRM.
|
|
|
|
Args:
|
|
endpoint: API endpoint (e.g., '/CBeteiligte/123' or '/CVmhErstgespraech')
|
|
method: HTTP method (GET, POST, PUT, DELETE)
|
|
params: Query parameters
|
|
json_data: JSON body for POST/PUT
|
|
timeout_seconds: Request timeout
|
|
|
|
Returns:
|
|
Parsed JSON response or None
|
|
|
|
Raises:
|
|
EspoCRMAuthError: Authentication failed
|
|
EspoCRMTimeoutError: Request timed out
|
|
EspoCRMAPIError: Other API errors
|
|
"""
|
|
# Ensure endpoint starts with /
|
|
if not endpoint.startswith('/'):
|
|
endpoint = '/' + endpoint
|
|
|
|
url = self.api_base_url.rstrip('/') + endpoint
|
|
headers = self._get_headers()
|
|
effective_timeout = aiohttp.ClientTimeout(
|
|
total=timeout_seconds or self.api_timeout_seconds
|
|
)
|
|
|
|
session = await self._get_session()
|
|
try:
|
|
with self.logger.api_call(endpoint, method):
|
|
async with session.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json_data,
|
|
timeout=effective_timeout
|
|
) as response:
|
|
# Handle errors
|
|
if response.status == 401:
|
|
raise EspoCRMAuthError(
|
|
"Authentication failed - check API key",
|
|
status_code=401
|
|
)
|
|
elif response.status == 403:
|
|
raise EspoCRMAPIError(
|
|
"Access forbidden",
|
|
status_code=403
|
|
)
|
|
elif response.status == 404:
|
|
raise EspoCRMAPIError(
|
|
f"Resource not found: {endpoint}",
|
|
status_code=404
|
|
)
|
|
elif response.status >= 500:
|
|
error_text = await response.text()
|
|
raise RetryableError(
|
|
f"Server error {response.status}: {error_text}"
|
|
)
|
|
elif response.status >= 400:
|
|
error_text = await response.text()
|
|
raise EspoCRMAPIError(
|
|
f"API error {response.status}: {error_text}",
|
|
status_code=response.status,
|
|
response_body=error_text
|
|
)
|
|
|
|
# Parse response
|
|
if response.content_type == 'application/json':
|
|
result = await response.json()
|
|
return result
|
|
else:
|
|
# For DELETE or other non-JSON responses
|
|
return None
|
|
|
|
except asyncio.TimeoutError:
|
|
raise EspoCRMTimeoutError(
|
|
f"Request timed out after {effective_timeout.total}s",
|
|
status_code=408
|
|
)
|
|
except aiohttp.ClientError as e:
|
|
self.logger.error(f"API call failed: {e}")
|
|
raise EspoCRMAPIError(f"Request failed: {str(e)}")
|
|
|
|
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get a single entity by ID.
|
|
|
|
Args:
|
|
entity_type: Entity type (e.g., 'CBeteiligte', 'CVmhErstgespraech')
|
|
entity_id: Entity ID
|
|
|
|
Returns:
|
|
Entity data as dict
|
|
"""
|
|
self._log(f"Getting {entity_type} with ID: {entity_id}")
|
|
return await self.api_call(f"/{entity_type}/{entity_id}", method='GET')
|
|
|
|
async def list_entities(
|
|
self,
|
|
entity_type: str,
|
|
where: Optional[List[Dict]] = None,
|
|
select: Optional[str] = None,
|
|
order_by: Optional[str] = None,
|
|
offset: int = 0,
|
|
max_size: int = 50
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
List entities with filtering and pagination.
|
|
|
|
Args:
|
|
entity_type: Entity type
|
|
where: Filter conditions (EspoCRM format)
|
|
select: Comma-separated field list
|
|
order_by: Sort field
|
|
offset: Pagination offset
|
|
max_size: Max results per page
|
|
|
|
Returns:
|
|
Dict with 'list' and 'total' keys
|
|
"""
|
|
params = {
|
|
'offset': offset,
|
|
'maxSize': max_size
|
|
}
|
|
|
|
if where:
|
|
import json
|
|
# EspoCRM expects JSON-encoded where clause
|
|
params['where'] = where if isinstance(where, str) else json.dumps(where)
|
|
if select:
|
|
params['select'] = select
|
|
if order_by:
|
|
params['orderBy'] = order_by
|
|
|
|
self._log(f"Listing {entity_type} entities")
|
|
return await self.api_call(f"/{entity_type}", method='GET', params=params)
|
|
|
|
async def list_related(
|
|
self,
|
|
entity_type: str,
|
|
entity_id: str,
|
|
link: str,
|
|
where: Optional[List[Dict]] = None,
|
|
select: Optional[str] = None,
|
|
order_by: Optional[str] = None,
|
|
order: Optional[str] = None,
|
|
offset: int = 0,
|
|
max_size: int = 50
|
|
) -> Dict[str, Any]:
|
|
params = {
|
|
'offset': offset,
|
|
'maxSize': max_size
|
|
}
|
|
|
|
if where:
|
|
import json
|
|
params['where'] = where if isinstance(where, str) else json.dumps(where)
|
|
if select:
|
|
params['select'] = select
|
|
if order_by:
|
|
params['orderBy'] = order_by
|
|
if order:
|
|
params['order'] = order
|
|
|
|
self._log(f"Listing related {entity_type}/{entity_id}/{link}")
|
|
return await self.api_call(f"/{entity_type}/{entity_id}/{link}", method='GET', params=params)
|
|
|
|
async def create_entity(
|
|
self,
|
|
entity_type: str,
|
|
data: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new entity.
|
|
|
|
Args:
|
|
entity_type: Entity type
|
|
data: Entity data
|
|
|
|
Returns:
|
|
Created entity with ID
|
|
"""
|
|
self._log(f"Creating {entity_type} entity")
|
|
return await self.api_call(f"/{entity_type}", method='POST', json_data=data)
|
|
|
|
async def update_entity(
|
|
self,
|
|
entity_type: str,
|
|
entity_id: str,
|
|
data: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update an existing entity.
|
|
|
|
Args:
|
|
entity_type: Entity type
|
|
entity_id: Entity ID
|
|
data: Updated fields
|
|
|
|
Returns:
|
|
Updated entity
|
|
"""
|
|
self._log(f"Updating {entity_type} with ID: {entity_id}")
|
|
return await self.api_call(f"/{entity_type}/{entity_id}", method='PUT', json_data=data)
|
|
|
|
async def delete_entity(self, entity_type: str, entity_id: str) -> bool:
|
|
"""
|
|
Delete an entity.
|
|
|
|
Args:
|
|
entity_type: Entity type
|
|
entity_id: Entity ID
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self._log(f"Deleting {entity_type} with ID: {entity_id}")
|
|
await self.api_call(f"/{entity_type}/{entity_id}", method='DELETE')
|
|
return True
|
|
|
|
async def search_entities(
|
|
self,
|
|
entity_type: str,
|
|
query: str,
|
|
fields: Optional[List[str]] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search entities by text query.
|
|
|
|
Args:
|
|
entity_type: Entity type
|
|
query: Search query
|
|
fields: Fields to search in
|
|
|
|
Returns:
|
|
List of matching entities
|
|
"""
|
|
where = [{
|
|
'type': 'textFilter',
|
|
'value': query
|
|
}]
|
|
|
|
result = await self.list_entities(entity_type, where=where)
|
|
return result.get('list', [])
|
|
|
|
async def upload_attachment(
|
|
self,
|
|
file_content: bytes,
|
|
filename: str,
|
|
parent_type: str,
|
|
parent_id: str,
|
|
field: str,
|
|
mime_type: str = 'application/octet-stream',
|
|
role: str = 'Attachment'
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Upload an attachment to EspoCRM.
|
|
|
|
Args:
|
|
file_content: File content as bytes
|
|
filename: Name of the file
|
|
parent_type: Parent entity type (e.g., 'Document')
|
|
parent_id: Parent entity ID
|
|
field: Field name for the attachment (e.g., 'preview')
|
|
mime_type: MIME type of the file
|
|
role: Attachment role (default: 'Attachment')
|
|
|
|
Returns:
|
|
Attachment entity data
|
|
"""
|
|
self._log(f"Uploading attachment: {filename} ({len(file_content)} bytes) to {parent_type}/{parent_id}/{field}")
|
|
|
|
url = self.api_base_url.rstrip('/') + '/Attachment'
|
|
headers = {
|
|
'X-Api-Key': self.api_key,
|
|
# Content-Type wird automatisch von aiohttp gesetzt für FormData
|
|
}
|
|
|
|
# Erstelle FormData
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field('file', file_content, filename=filename, content_type=mime_type)
|
|
form_data.add_field('parentType', parent_type)
|
|
form_data.add_field('parentId', parent_id)
|
|
form_data.add_field('field', field)
|
|
form_data.add_field('role', role)
|
|
form_data.add_field('name', filename)
|
|
|
|
self._log(f"Upload params: parentType={parent_type}, parentId={parent_id}, field={field}, role={role}")
|
|
|
|
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
|
|
|
session = await self._get_session()
|
|
try:
|
|
async with session.post(url, headers=headers, data=form_data, timeout=effective_timeout) as response:
|
|
self._log(f"Upload response status: {response.status}")
|
|
|
|
if response.status == 401:
|
|
raise EspoCRMAuthError("Authentication failed - check API key")
|
|
elif response.status == 403:
|
|
raise EspoCRMError("Access forbidden")
|
|
elif response.status == 404:
|
|
raise EspoCRMError(f"Attachment endpoint not found")
|
|
elif response.status >= 400:
|
|
error_text = await response.text()
|
|
self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error')
|
|
raise EspoCRMError(f"Upload error {response.status}: {error_text}")
|
|
|
|
# Parse response
|
|
if response.content_type == 'application/json':
|
|
result = await response.json()
|
|
attachment_id = result.get('id')
|
|
self._log(f"✅ Attachment uploaded successfully: {attachment_id}")
|
|
return result
|
|
else:
|
|
response_text = await response.text()
|
|
self._log(f"⚠️ Non-JSON response: {response_text[:200]}", level='warn')
|
|
return {'success': True, 'response': response_text}
|
|
|
|
except aiohttp.ClientError as e:
|
|
self._log(f"Upload failed: {e}", level='error')
|
|
raise EspoCRMError(f"Upload request failed: {e}") from e
|
|
|
|
async def download_attachment(self, attachment_id: str) -> bytes:
|
|
"""
|
|
Download an attachment from EspoCRM.
|
|
|
|
Args:
|
|
attachment_id: Attachment ID
|
|
|
|
Returns:
|
|
File content as bytes
|
|
"""
|
|
self._log(f"Downloading attachment: {attachment_id}")
|
|
|
|
url = self.api_base_url.rstrip('/') + f'/Attachment/file/{attachment_id}'
|
|
headers = {
|
|
'X-Api-Key': self.api_key,
|
|
}
|
|
|
|
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
|
|
|
session = await self._get_session()
|
|
try:
|
|
async with session.get(url, headers=headers, timeout=effective_timeout) as response:
|
|
if response.status == 401:
|
|
raise EspoCRMAuthError("Authentication failed - check API key")
|
|
elif response.status == 403:
|
|
raise EspoCRMError("Access forbidden")
|
|
elif response.status == 404:
|
|
raise EspoCRMError(f"Attachment not found: {attachment_id}")
|
|
elif response.status >= 400:
|
|
error_text = await response.text()
|
|
raise EspoCRMError(f"Download error {response.status}: {error_text}")
|
|
|
|
content = await response.read()
|
|
self._log(f"✅ Downloaded {len(content)} bytes")
|
|
return content
|
|
|
|
except aiohttp.ClientError as e:
|
|
self._log(f"Download failed: {e}", level='error')
|
|
raise EspoCRMError(f"Download request failed: {e}") from e
|
|
|
|
# ========== Junction Table Operations ==========
|
|
|
|
async def get_junction_entries(
|
|
self,
|
|
junction_entity: str,
|
|
filter_field: str,
|
|
filter_value: str,
|
|
max_size: int = 1000
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Load junction table entries with filtering.
|
|
|
|
Args:
|
|
junction_entity: Junction entity name (e.g., 'CAIKnowledgeCDokumente')
|
|
filter_field: Field to filter on (e.g., 'cAIKnowledgeId')
|
|
filter_value: Value to match
|
|
max_size: Maximum entries to return
|
|
|
|
Returns:
|
|
List of junction records with ALL additionalColumns
|
|
|
|
Example:
|
|
entries = await espocrm.get_junction_entries(
|
|
'CAIKnowledgeCDokumente',
|
|
'cAIKnowledgeId',
|
|
'kb-123'
|
|
)
|
|
"""
|
|
self._log(f"Loading junction entries: {junction_entity} where {filter_field}={filter_value}")
|
|
|
|
result = await self.list_entities(
|
|
junction_entity,
|
|
where=[{
|
|
'type': 'equals',
|
|
'attribute': filter_field,
|
|
'value': filter_value
|
|
}],
|
|
max_size=max_size
|
|
)
|
|
|
|
entries = result.get('list', [])
|
|
self._log(f"✅ Loaded {len(entries)} junction entries")
|
|
return entries
|
|
|
|
async def update_junction_entry(
|
|
self,
|
|
junction_entity: str,
|
|
junction_id: str,
|
|
fields: Dict[str, Any]
|
|
) -> None:
|
|
"""
|
|
Update junction table entry.
|
|
|
|
Args:
|
|
junction_entity: Junction entity name
|
|
junction_id: Junction entry ID
|
|
fields: Fields to update
|
|
|
|
Example:
|
|
await espocrm.update_junction_entry(
|
|
'CAIKnowledgeCDokumente',
|
|
'jct-123',
|
|
{'syncstatus': 'synced', 'lastSync': '2026-03-11T20:00:00Z'}
|
|
)
|
|
"""
|
|
await self.update_entity(junction_entity, junction_id, fields)
|
|
|
|
async def get_knowledge_documents_with_junction(
|
|
self,
|
|
knowledge_id: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all documents linked to a CAIKnowledge entry with junction data.
|
|
|
|
Uses custom EspoCRM endpoint: GET /JunctionData/CAIKnowledge/{knowledge_id}/dokumentes
|
|
|
|
Returns enriched list with:
|
|
- junctionId: Junction table ID
|
|
- cAIKnowledgeId, cDokumenteId: Junction keys
|
|
- aiDocumentId: XAI document ID from junction
|
|
- syncstatus: Sync status from junction (new, synced, failed, unclean)
|
|
- lastSync: Last sync timestamp from junction
|
|
- documentId, documentName: Document info
|
|
- blake3hash: Blake3 hash from document entity
|
|
- documentCreatedAt, documentModifiedAt: Document timestamps
|
|
|
|
This consolidates multiple API calls into one efficient query.
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
|
|
Returns:
|
|
List of document dicts with junction data
|
|
|
|
Example:
|
|
docs = await espocrm.get_knowledge_documents_with_junction('69b1b03582bb6e2da')
|
|
for doc in docs:
|
|
print(f"{doc['documentName']}: {doc['syncstatus']}")
|
|
"""
|
|
# JunctionData uses API Gateway URL, not direct EspoCRM
|
|
# Use gateway URL from env or construct from ESPOCRM_API_BASE_URL
|
|
gateway_url = os.getenv('ESPOCRM_GATEWAY_URL', 'https://api.bitbylaw.com/vmh/crm')
|
|
url = f"{gateway_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes"
|
|
|
|
self._log(f"GET {url}")
|
|
|
|
try:
|
|
session = await self._get_session()
|
|
timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
|
|
|
async with session.get(url, headers=self._get_headers(), timeout=timeout) as response:
|
|
self._log(f"Response status: {response.status}")
|
|
|
|
if response.status == 404:
|
|
# Knowledge base not found or no documents linked
|
|
return []
|
|
|
|
if response.status >= 400:
|
|
error_text = await response.text()
|
|
raise EspoCRMAPIError(f"JunctionData GET failed: {response.status} - {error_text}")
|
|
|
|
result = await response.json()
|
|
documents = result.get('list', [])
|
|
|
|
self._log(f"✅ Loaded {len(documents)} document(s) with junction data")
|
|
return documents
|
|
|
|
except asyncio.TimeoutError:
|
|
raise EspoCRMTimeoutError(f"Timeout getting junction data for knowledge {knowledge_id}")
|
|
except aiohttp.ClientError as e:
|
|
raise EspoCRMAPIError(f"Network error getting junction data: {e}")
|
|
|
|
async def update_knowledge_document_junction(
|
|
self,
|
|
knowledge_id: str,
|
|
document_id: str,
|
|
fields: Dict[str, Any],
|
|
update_last_sync: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update junction columns for a specific document link.
|
|
|
|
Uses custom EspoCRM endpoint:
|
|
PUT /JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id}
|
|
|
|
Args:
|
|
knowledge_id: CAIKnowledge entity ID
|
|
document_id: CDokumente entity ID
|
|
fields: Junction fields to update (aiDocumentId, syncstatus, etc.)
|
|
update_last_sync: Whether to update lastSync timestamp (default: True)
|
|
|
|
Returns:
|
|
Updated junction data
|
|
|
|
Example:
|
|
await espocrm.update_knowledge_document_junction(
|
|
'69b1b03582bb6e2da',
|
|
'69a68b556a39771bf',
|
|
{
|
|
'aiDocumentId': 'xai-file-abc123',
|
|
'syncstatus': 'synced'
|
|
},
|
|
update_last_sync=True
|
|
)
|
|
"""
|
|
# JunctionData uses API Gateway URL, not direct EspoCRM
|
|
gateway_url = os.getenv('ESPOCRM_GATEWAY_URL', 'https://api.bitbylaw.com/vmh/crm')
|
|
url = f"{gateway_url}/JunctionData/CAIKnowledge/{knowledge_id}/dokumentes/{document_id}"
|
|
|
|
payload = {**fields}
|
|
if update_last_sync:
|
|
payload['updateLastSync'] = True
|
|
|
|
self._log(f"PUT {url}")
|
|
self._log(f" Payload: {payload}")
|
|
|
|
try:
|
|
session = await self._get_session()
|
|
timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
|
|
|
async with session.put(url, headers=self._get_headers(), json=payload, timeout=timeout) as response:
|
|
self._log(f"Response status: {response.status}")
|
|
|
|
if response.status >= 400:
|
|
error_text = await response.text()
|
|
raise EspoCRMAPIError(f"JunctionData PUT failed: {response.status} - {error_text}")
|
|
|
|
result = await response.json()
|
|
self._log(f"✅ Junction updated: junctionId={result.get('junctionId')}")
|
|
return result
|
|
|
|
except asyncio.TimeoutError:
|
|
raise EspoCRMTimeoutError(f"Timeout updating junction data")
|
|
except aiohttp.ClientError as e:
|
|
raise EspoCRMAPIError(f"Network error updating junction data: {e}")
|