From a53051ea8e19dab62a42711fdb7c4bf81ca543eb Mon Sep 17 00:00:00 2001 From: bsiggel Date: Tue, 3 Mar 2026 17:24:35 +0000 Subject: [PATCH] feat(api-client): implement session management for AdvowareAPI and EspoCRMAPI --- services/advoware.py | 155 ++++++++++++++++-------------- services/espocrm.py | 222 +++++++++++++++++++++++-------------------- 2 files changed, 201 insertions(+), 176 deletions(-) diff --git a/services/advoware.py b/services/advoware.py index 0f10346..a1fc95c 100644 --- a/services/advoware.py +++ b/services/advoware.py @@ -73,6 +73,17 @@ class AdvowareAPI: self.logger.info("AdvowareAPI initialized") + self._session: Optional[aiohttp.ClientSession] = None + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str: """Generate HMAC-SHA512 signature for authentication""" if not nonce: @@ -260,77 +271,79 @@ class AdvowareAPI: # Use 'data' parameter if provided, otherwise 'json_data' json_payload = data if data is not None else json_data - async with aiohttp.ClientSession(timeout=effective_timeout) as session: - try: - with self.logger.api_call(endpoint, method): - async with session.request( - method, - url, - headers=effective_headers, - params=params, - json=json_payload - ) as response: - # Handle 401 - retry with fresh token - if response.status == 401: - self.logger.warning("401 Unauthorized, refreshing token") - token = self.get_access_token(force_refresh=True) - effective_headers['Authorization'] = f'Bearer {token}' - - async with session.request( - method, - url, - headers=effective_headers, - params=params, - json=json_payload - ) as retry_response: - if retry_response.status == 401: - raise AdvowareAuthError( - "Authentication failed even after token refresh", - status_code=401 - ) - - if retry_response.status >= 500: - error_text = await retry_response.text() - raise RetryableError( - f"Server error {retry_response.status}: {error_text}" - ) - - retry_response.raise_for_status() - return await self._parse_response(retry_response) - - # Handle other error codes - if response.status == 404: - error_text = await response.text() - raise AdvowareAPIError( - f"Resource not found: {endpoint}", - status_code=404, - response_body=error_text - ) - - if response.status >= 500: - error_text = await response.text() - raise RetryableError( - f"Server error {response.status}: {error_text}" - ) - - if response.status >= 400: - error_text = await response.text() - raise AdvowareAPIError( - f"API error {response.status}: {error_text}", - status_code=response.status, - response_body=error_text - ) - - return await self._parse_response(response) - - except asyncio.TimeoutError: - raise AdvowareTimeoutError( - f"Request timed out after {effective_timeout.total}s", - status_code=408 - ) - except aiohttp.ClientError as e: - self.logger.error(f"API call failed: {e}") - raise AdvowareAPIError(f"Request failed: {str(e)}") + session = await self._get_session() + try: + with self.logger.api_call(endpoint, method): + async with session.request( + method, + url, + headers=effective_headers, + params=params, + json=json_payload, + timeout=effective_timeout + ) as response: + # Handle 401 - retry with fresh token + if response.status == 401: + self.logger.warning("401 Unauthorized, refreshing token") + token = self.get_access_token(force_refresh=True) + effective_headers['Authorization'] = f'Bearer {token}' + + async with session.request( + method, + url, + headers=effective_headers, + params=params, + json=json_payload, + timeout=effective_timeout + ) as retry_response: + if retry_response.status == 401: + raise AdvowareAuthError( + "Authentication failed even after token refresh", + status_code=401 + ) + + if retry_response.status >= 500: + error_text = await retry_response.text() + raise RetryableError( + f"Server error {retry_response.status}: {error_text}" + ) + + retry_response.raise_for_status() + return await self._parse_response(retry_response) + + # Handle other error codes + if response.status == 404: + error_text = await response.text() + raise AdvowareAPIError( + f"Resource not found: {endpoint}", + status_code=404, + response_body=error_text + ) + + if response.status >= 500: + error_text = await response.text() + raise RetryableError( + f"Server error {response.status}: {error_text}" + ) + + if response.status >= 400: + error_text = await response.text() + raise AdvowareAPIError( + f"API error {response.status}: {error_text}", + status_code=response.status, + response_body=error_text + ) + + return await self._parse_response(response) + + except asyncio.TimeoutError: + raise AdvowareTimeoutError( + f"Request timed out after {effective_timeout.total}s", + status_code=408 + ) + except aiohttp.ClientError as e: + self.logger.error(f"API call failed: {e}") + raise AdvowareAPIError(f"Request failed: {str(e)}") async def _parse_response(self, response: aiohttp.ClientResponse) -> Any: """Parse API response""" diff --git a/services/espocrm.py b/services/espocrm.py index 87008f4..4794963 100644 --- a/services/espocrm.py +++ b/services/espocrm.py @@ -54,6 +54,8 @@ class EspoCRMAPI: raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment") self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}") + + self._session: Optional[aiohttp.ClientSession] = None # Optional Redis for caching/rate limiting (centralized) self.redis_client = get_redis_client(strict=False) @@ -70,6 +72,15 @@ class EspoCRMAPI: 'Accept': 'application/json' } + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + async def api_call( self, endpoint: str, @@ -106,61 +117,62 @@ class EspoCRMAPI: total=timeout_seconds or self.api_timeout_seconds ) - async with aiohttp.ClientSession(timeout=effective_timeout) as session: - try: - with self.logger.api_call(endpoint, method): - async with session.request( - method, - url, - headers=headers, - params=params, - json=json_data - ) as response: - # Handle errors - if response.status == 401: - raise EspoCRMAuthError( - "Authentication failed - check API key", - status_code=401 - ) - elif response.status == 403: - raise EspoCRMAPIError( - "Access forbidden", - status_code=403 - ) - elif response.status == 404: - raise EspoCRMAPIError( - f"Resource not found: {endpoint}", - status_code=404 - ) - elif response.status >= 500: - error_text = await response.text() - raise RetryableError( - f"Server error {response.status}: {error_text}" - ) - elif response.status >= 400: - error_text = await response.text() - raise EspoCRMAPIError( - f"API error {response.status}: {error_text}", - status_code=response.status, - response_body=error_text - ) - - # Parse response - if response.content_type == 'application/json': - result = await response.json() - return result - else: - # For DELETE or other non-JSON responses - return None - - except asyncio.TimeoutError: - raise EspoCRMTimeoutError( - f"Request timed out after {effective_timeout.total}s", - status_code=408 - ) - except aiohttp.ClientError as e: - self.logger.error(f"API call failed: {e}") - raise EspoCRMAPIError(f"Request failed: {str(e)}") + session = await self._get_session() + try: + with self.logger.api_call(endpoint, method): + async with session.request( + method, + url, + headers=headers, + params=params, + json=json_data, + timeout=effective_timeout + ) as response: + # Handle errors + if response.status == 401: + raise EspoCRMAuthError( + "Authentication failed - check API key", + status_code=401 + ) + elif response.status == 403: + raise EspoCRMAPIError( + "Access forbidden", + status_code=403 + ) + elif response.status == 404: + raise EspoCRMAPIError( + f"Resource not found: {endpoint}", + status_code=404 + ) + elif response.status >= 500: + error_text = await response.text() + raise RetryableError( + f"Server error {response.status}: {error_text}" + ) + elif response.status >= 400: + error_text = await response.text() + raise EspoCRMAPIError( + f"API error {response.status}: {error_text}", + status_code=response.status, + response_body=error_text + ) + + # Parse response + if response.content_type == 'application/json': + result = await response.json() + return result + else: + # For DELETE or other non-JSON responses + return None + + except asyncio.TimeoutError: + raise EspoCRMTimeoutError( + f"Request timed out after {effective_timeout.total}s", + status_code=408 + ) + except aiohttp.ClientError as e: + self.logger.error(f"API call failed: {e}") + raise EspoCRMAPIError(f"Request failed: {str(e)}") async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]: """ @@ -340,36 +352,36 @@ class EspoCRMAPI: effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds) - async with aiohttp.ClientSession(timeout=effective_timeout) as session: - try: - async with session.post(url, headers=headers, data=form_data) as response: - self._log(f"Upload response status: {response.status}") - - if response.status == 401: - raise EspoCRMAuthError("Authentication failed - check API key") - elif response.status == 403: - raise EspoCRMError("Access forbidden") - elif response.status == 404: - raise EspoCRMError(f"Attachment endpoint not found") - elif response.status >= 400: - error_text = await response.text() - self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error') - raise EspoCRMError(f"Upload error {response.status}: {error_text}") - - # Parse response - if response.content_type == 'application/json': - result = await response.json() - attachment_id = result.get('id') - self._log(f"✅ Attachment uploaded successfully: {attachment_id}") - return result - else: - response_text = await response.text() - self._log(f"⚠️ Non-JSON response: {response_text[:200]}", level='warn') - return {'success': True, 'response': response_text} - - except aiohttp.ClientError as e: - self._log(f"Upload failed: {e}", level='error') - raise EspoCRMError(f"Upload request failed: {e}") from e + session = await self._get_session() + try: + async with session.post(url, headers=headers, data=form_data, timeout=effective_timeout) as response: + self._log(f"Upload response status: {response.status}") + + if response.status == 401: + raise EspoCRMAuthError("Authentication failed - check API key") + elif response.status == 403: + raise EspoCRMError("Access forbidden") + elif response.status == 404: + raise EspoCRMError(f"Attachment endpoint not found") + elif response.status >= 400: + error_text = await response.text() + self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error') + raise EspoCRMError(f"Upload error {response.status}: {error_text}") + + # Parse response + if response.content_type == 'application/json': + result = await response.json() + attachment_id = result.get('id') + self._log(f"✅ Attachment uploaded successfully: {attachment_id}") + return result + else: + response_text = await response.text() + self._log(f"⚠️ Non-JSON response: {response_text[:200]}", level='warn') + return {'success': True, 'response': response_text} + + except aiohttp.ClientError as e: + self._log(f"Upload failed: {e}", level='error') + raise EspoCRMError(f"Upload request failed: {e}") from e async def download_attachment(self, attachment_id: str) -> bytes: """ @@ -390,23 +402,23 @@ class EspoCRMAPI: effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds) - async with aiohttp.ClientSession(timeout=effective_timeout) as session: - try: - async with session.get(url, headers=headers) as response: - if response.status == 401: - raise EspoCRMAuthError("Authentication failed - check API key") - elif response.status == 403: - raise EspoCRMError("Access forbidden") - elif response.status == 404: - raise EspoCRMError(f"Attachment not found: {attachment_id}") - elif response.status >= 400: - error_text = await response.text() - raise EspoCRMError(f"Download error {response.status}: {error_text}") - - content = await response.read() - self._log(f"✅ Downloaded {len(content)} bytes") - return content - - except aiohttp.ClientError as e: - self._log(f"Download failed: {e}", level='error') - raise EspoCRMError(f"Download request failed: {e}") from e + session = await self._get_session() + try: + async with session.get(url, headers=headers, timeout=effective_timeout) as response: + if response.status == 401: + raise EspoCRMAuthError("Authentication failed - check API key") + elif response.status == 403: + raise EspoCRMError("Access forbidden") + elif response.status == 404: + raise EspoCRMError(f"Attachment not found: {attachment_id}") + elif response.status >= 400: + error_text = await response.text() + raise EspoCRMError(f"Download error {response.status}: {error_text}") + + content = await response.read() + self._log(f"✅ Downloaded {len(content)} bytes") + return content + + except aiohttp.ClientError as e: + self._log(f"Download failed: {e}", level='error') + raise EspoCRMError(f"Download request failed: {e}") from e