Files
motia/bitbylaw/services/espocrm.py
bitbylaw e6ab22d5f4 feat: Add EspoCRM and Advoware integration for Beteiligte comparison
- Implemented `compare_beteiligte.py` script for comparing Beteiligte structures between EspoCRM and Advoware.
- Created `beteiligte_comparison_result.json` to store comparison results.
- Developed `EspoCRMAPI` service for handling API interactions with EspoCRM.
- Added comprehensive documentation for the EspoCRM API service.
- Included error handling and logging for API operations.
- Enhanced entity management with CRUD operations and search capabilities.
2026-02-07 14:42:58 +00:00

277 lines
9.0 KiB
Python

import aiohttp
import asyncio
import logging
import redis
from typing import Optional, Dict, Any, List
from config import Config
logger = logging.getLogger(__name__)
class EspoCRMError(Exception):
"""Base exception for EspoCRM API errors"""
pass
class EspoCRMAuthError(EspoCRMError):
"""Authentication error"""
pass
class EspoCRMAPI:
"""
EspoCRM API Client for bitbylaw integration.
Supports:
- API Key authentication (X-Api-Key header)
- Standard REST operations (GET, POST, PUT, DELETE)
- Entity management (Beteiligte, CVmhErstgespraech, etc.)
"""
def __init__(self, context=None):
self.context = context
self._log("EspoCRMAPI __init__ started", level='debug')
# Configuration
self.api_base_url = Config.ESPOCRM_API_BASE_URL
self.api_key = Config.ESPOCRM_API_KEY
if not self.api_key:
raise EspoCRMAuthError("ESPOCRM_MARVIN_API_KEY not configured in environment")
self._log(f"EspoCRM API initialized with base URL: {self.api_base_url}")
# Optional Redis for caching/rate limiting
try:
self.redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS,
socket_connect_timeout=Config.REDIS_TIMEOUT_SECONDS,
decode_responses=True
)
self.redis_client.ping()
self._log("Connected to Redis for EspoCRM operations")
except Exception as e:
self._log(f"Could not connect to Redis: {e}. Continuing without caching.", level='warning')
self.redis_client = None
def _log(self, message: str, level: str = 'info'):
"""Log message via context.logger if available, otherwise use module logger"""
log_func = getattr(logger, level, logger.info)
if self.context and hasattr(self.context, 'logger'):
ctx_log_func = getattr(self.context.logger, level, self.context.logger.info)
ctx_log_func(f"[EspoCRM] {message}")
else:
log_func(f"[EspoCRM] {message}")
def _get_headers(self) -> Dict[str, str]:
"""Generate request headers with API key"""
return {
'X-Api-Key': self.api_key,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
async def api_call(
self,
endpoint: str,
method: str = 'GET',
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
timeout_seconds: Optional[int] = None
) -> Any:
"""
Make an API call to EspoCRM.
Args:
endpoint: API endpoint (e.g., '/Beteiligte/123' or '/CVmhErstgespraech')
method: HTTP method (GET, POST, PUT, DELETE)
params: Query parameters
json_data: JSON body for POST/PUT
timeout_seconds: Request timeout
Returns:
Parsed JSON response or None
Raises:
EspoCRMError: On API errors
"""
# Ensure endpoint starts with /
if not endpoint.startswith('/'):
endpoint = '/' + endpoint
url = self.api_base_url.rstrip('/') + endpoint
headers = self._get_headers()
effective_timeout = aiohttp.ClientTimeout(
total=timeout_seconds or Config.ESPOCRM_API_TIMEOUT_SECONDS
)
self._log(f"API call: {method} {url}", level='debug')
if params:
self._log(f"Params: {params}", level='debug')
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
try:
async with session.request(
method,
url,
headers=headers,
params=params,
json=json_data
) as response:
# Log response status
self._log(f"Response status: {response.status}", level='debug')
# Handle errors
if response.status == 401:
raise EspoCRMAuthError("Authentication failed - check API key")
elif response.status == 403:
raise EspoCRMError("Access forbidden")
elif response.status == 404:
raise EspoCRMError(f"Resource not found: {endpoint}")
elif response.status >= 400:
error_text = await response.text()
raise EspoCRMError(f"API error {response.status}: {error_text}")
# Parse response
if response.content_type == 'application/json':
result = await response.json()
self._log(f"Response received", level='debug')
return result
else:
# For DELETE or other non-JSON responses
return None
except aiohttp.ClientError as e:
self._log(f"API call failed: {e}", level='error')
raise EspoCRMError(f"Request failed: {e}") from e
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
"""
Get a single entity by ID.
Args:
entity_type: Entity type (e.g., 'Beteiligte', 'CVmhErstgespraech')
entity_id: Entity ID
Returns:
Entity data as dict
"""
self._log(f"Getting {entity_type} with ID: {entity_id}")
return await self.api_call(f"/{entity_type}/{entity_id}", method='GET')
async def list_entities(
self,
entity_type: str,
where: Optional[List[Dict]] = None,
select: Optional[str] = None,
order_by: Optional[str] = None,
offset: int = 0,
max_size: int = 50
) -> Dict[str, Any]:
"""
List entities with filtering and pagination.
Args:
entity_type: Entity type
where: Filter conditions (EspoCRM format)
select: Comma-separated field list
order_by: Sort field
offset: Pagination offset
max_size: Max results per page
Returns:
Dict with 'list' and 'total' keys
"""
params = {
'offset': offset,
'maxSize': max_size
}
if where:
params['where'] = where
if select:
params['select'] = select
if order_by:
params['orderBy'] = order_by
self._log(f"Listing {entity_type} entities")
return await self.api_call(f"/{entity_type}", method='GET', params=params)
async def create_entity(
self,
entity_type: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Create a new entity.
Args:
entity_type: Entity type
data: Entity data
Returns:
Created entity with ID
"""
self._log(f"Creating {entity_type} entity")
return await self.api_call(f"/{entity_type}", method='POST', json_data=data)
async def update_entity(
self,
entity_type: str,
entity_id: str,
data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Update an existing entity.
Args:
entity_type: Entity type
entity_id: Entity ID
data: Updated fields
Returns:
Updated entity
"""
self._log(f"Updating {entity_type} with ID: {entity_id}")
return await self.api_call(f"/{entity_type}/{entity_id}", method='PUT', json_data=data)
async def delete_entity(self, entity_type: str, entity_id: str) -> bool:
"""
Delete an entity.
Args:
entity_type: Entity type
entity_id: Entity ID
Returns:
True if successful
"""
self._log(f"Deleting {entity_type} with ID: {entity_id}")
await self.api_call(f"/{entity_type}/{entity_id}", method='DELETE')
return True
async def search_entities(
self,
entity_type: str,
query: str,
fields: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Search entities by text query.
Args:
entity_type: Entity type
query: Search query
fields: Fields to search in
Returns:
List of matching entities
"""
where = [{
'type': 'textFilter',
'value': query
}]
result = await self.list_entities(entity_type, where=where)
return result.get('list', [])