import aiohttp import asyncio import logging import redis from typing import Optional, Dict, Any, List from config import Config logger = logging.getLogger(__name__) class EspoCRMError(Exception): """Base exception for EspoCRM API errors""" pass class EspoCRMAuthError(EspoCRMError): """Authentication error""" pass class EspoCRMAPI: """ EspoCRM API Client for bitbylaw integration. Supports: - API Key authentication (X-Api-Key header) - Standard REST operations (GET, POST, PUT, DELETE) - Entity management (Beteiligte, CVmhErstgespraech, etc.) """ def __init__(self, context=None): self.context = context self._log("EspoCRMAPI __init__ started", level='debug') # Configuration self.api_base_url = Config.ESPOCRM_API_BASE_URL self.api_key = Config.ESPOCRM_API_KEY if not self.api_key: raise EspoCRMAuthError("ESPOCRM_MARVIN_API_KEY not configured in environment") self._log(f"EspoCRM API initialized with base URL: {self.api_base_url}") # Optional Redis for caching/rate limiting try: self.redis_client = redis.Redis( host=Config.REDIS_HOST, port=int(Config.REDIS_PORT), db=int(Config.REDIS_DB_ADVOWARE_CACHE), socket_timeout=Config.REDIS_TIMEOUT_SECONDS, socket_connect_timeout=Config.REDIS_TIMEOUT_SECONDS, decode_responses=True ) self.redis_client.ping() self._log("Connected to Redis for EspoCRM operations") except Exception as e: self._log(f"Could not connect to Redis: {e}. Continuing without caching.", level='warning') self.redis_client = None def _log(self, message: str, level: str = 'info'): """Log message via context.logger if available, otherwise use module logger""" log_func = getattr(logger, level, logger.info) if self.context and hasattr(self.context, 'logger'): ctx_log_func = getattr(self.context.logger, level, self.context.logger.info) ctx_log_func(f"[EspoCRM] {message}") else: log_func(f"[EspoCRM] {message}") def _get_headers(self) -> Dict[str, str]: """Generate request headers with API key""" return { 'X-Api-Key': self.api_key, 'Content-Type': 'application/json', 'Accept': 'application/json' } async def api_call( self, endpoint: str, method: str = 'GET', params: Optional[Dict] = None, json_data: Optional[Dict] = None, timeout_seconds: Optional[int] = None ) -> Any: """ Make an API call to EspoCRM. Args: endpoint: API endpoint (e.g., '/Beteiligte/123' or '/CVmhErstgespraech') method: HTTP method (GET, POST, PUT, DELETE) params: Query parameters json_data: JSON body for POST/PUT timeout_seconds: Request timeout Returns: Parsed JSON response or None Raises: EspoCRMError: On API errors """ # Ensure endpoint starts with / if not endpoint.startswith('/'): endpoint = '/' + endpoint url = self.api_base_url.rstrip('/') + endpoint headers = self._get_headers() effective_timeout = aiohttp.ClientTimeout( total=timeout_seconds or Config.ESPOCRM_API_TIMEOUT_SECONDS ) self._log(f"API call: {method} {url}", level='debug') if params: self._log(f"Params: {params}", level='debug') async with aiohttp.ClientSession(timeout=effective_timeout) as session: try: async with session.request( method, url, headers=headers, params=params, json=json_data ) as response: # Log response status self._log(f"Response status: {response.status}", level='debug') # Handle errors if response.status == 401: raise EspoCRMAuthError("Authentication failed - check API key") elif response.status == 403: raise EspoCRMError("Access forbidden") elif response.status == 404: raise EspoCRMError(f"Resource not found: {endpoint}") elif response.status >= 400: error_text = await response.text() raise EspoCRMError(f"API error {response.status}: {error_text}") # Parse response if response.content_type == 'application/json': result = await response.json() self._log(f"Response received", level='debug') return result else: # For DELETE or other non-JSON responses return None except aiohttp.ClientError as e: self._log(f"API call failed: {e}", level='error') raise EspoCRMError(f"Request failed: {e}") from e async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]: """ Get a single entity by ID. Args: entity_type: Entity type (e.g., 'Beteiligte', 'CVmhErstgespraech') entity_id: Entity ID Returns: Entity data as dict """ self._log(f"Getting {entity_type} with ID: {entity_id}") return await self.api_call(f"/{entity_type}/{entity_id}", method='GET') async def list_entities( self, entity_type: str, where: Optional[List[Dict]] = None, select: Optional[str] = None, order_by: Optional[str] = None, offset: int = 0, max_size: int = 50 ) -> Dict[str, Any]: """ List entities with filtering and pagination. Args: entity_type: Entity type where: Filter conditions (EspoCRM format) select: Comma-separated field list order_by: Sort field offset: Pagination offset max_size: Max results per page Returns: Dict with 'list' and 'total' keys """ params = { 'offset': offset, 'maxSize': max_size } if where: params['where'] = where if select: params['select'] = select if order_by: params['orderBy'] = order_by self._log(f"Listing {entity_type} entities") return await self.api_call(f"/{entity_type}", method='GET', params=params) async def create_entity( self, entity_type: str, data: Dict[str, Any] ) -> Dict[str, Any]: """ Create a new entity. Args: entity_type: Entity type data: Entity data Returns: Created entity with ID """ self._log(f"Creating {entity_type} entity") return await self.api_call(f"/{entity_type}", method='POST', json_data=data) async def update_entity( self, entity_type: str, entity_id: str, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update an existing entity. Args: entity_type: Entity type entity_id: Entity ID data: Updated fields Returns: Updated entity """ self._log(f"Updating {entity_type} with ID: {entity_id}") return await self.api_call(f"/{entity_type}/{entity_id}", method='PUT', json_data=data) async def delete_entity(self, entity_type: str, entity_id: str) -> bool: """ Delete an entity. Args: entity_type: Entity type entity_id: Entity ID Returns: True if successful """ self._log(f"Deleting {entity_type} with ID: {entity_id}") await self.api_call(f"/{entity_type}/{entity_id}", method='DELETE') return True async def search_entities( self, entity_type: str, query: str, fields: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Search entities by text query. Args: entity_type: Entity type query: Search query fields: Fields to search in Returns: List of matching entities """ where = [{ 'type': 'textFilter', 'value': query }] result = await self.list_entities(entity_type, where=where) return result.get('list', [])