Files
motia/bitbylaw/services/advoware.py

176 lines
8.4 KiB
Python

import aiohttp
import asyncio
import time
import uuid
import hmac
import hashlib
import base64
import requests
import datetime
import redis
import logging
import json
from typing import Optional, Dict, Any
from config import Config # Deine Config, z.B. Env-Vars
logger = logging.getLogger(__name__)
class AdvowareTokenError(Exception):
pass
class AdvowareAPI:
AUTH_URL = "https://security.advo-net.net/api/v1/Token"
TOKEN_CACHE_KEY = 'advoware_access_token'
TOKEN_TIMESTAMP_CACHE_KEY = 'advoware_token_timestamp'
def __init__(self, context=None):
self.context = context # Für Workbench-Logging
self._log("AdvowareAPI __init__ started", level='debug')
self.API_BASE_URL = Config.ADVOWARE_API_BASE_URL
try:
self.redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_ADVOWARE_CACHE),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS,
socket_connect_timeout=Config.REDIS_TIMEOUT_SECONDS
)
self.redis_client.ping()
self._log("Connected to Redis for token caching.")
except redis.exceptions.ConnectionError as e:
self._log(f"Could not connect to Redis: {e}. No token caching.")
self.redis_client = None
# Lade Config mit Logging
self.product_id = Config.ADVOWARE_PRODUCT_ID
self.app_id = Config.ADVOWARE_APP_ID
self.api_key = Config.ADVOWARE_API_KEY
self._log(f"API Key loaded, starts with: {self.api_key[:10]}... (length: {len(self.api_key)})")
self.kanzlei = Config.ADVOWARE_KANZLEI
self.database = Config.ADVOWARE_DATABASE
self.user = Config.ADVOWARE_USER
self.role = Config.ADVOWARE_ROLE
self.password = Config.ADVOWARE_PASSWORD
self.token_lifetime_minutes = getattr(Config, "ADVOWARE_TOKEN_LIFETIME_MINUTES", 55)
self._log("AdvowareAPI __init__ completed")
def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str:
if not nonce:
nonce = str(uuid.uuid4())
message = f"{self.product_id}:{self.app_id}:{nonce}:{request_time_stamp}".encode('utf-8')
try:
api_key_bytes = base64.b64decode(self.api_key)
logger.debug("API Key successfully decoded from base64")
except Exception as e:
self._log(f"API Key not base64, assuming raw key and encoding to base64 for HMAC: {e}")
# Wenn nicht base64, verwende den Key als raw bytes (angenommen, er ist der Secret)
api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key
signature = hmac.new(api_key_bytes, message, hashlib.sha512)
return base64.b64encode(signature.digest()).decode('utf-8')
def _fetch_new_access_token(self) -> str:
self._log("Fetching new access token from Advoware")
nonce = str(uuid.uuid4())
request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
hmac_signature = self._generate_hmac(request_time_stamp, nonce)
headers = {'Content-Type': 'application/json'}
data = {
"AppID": self.app_id, "Kanzlei": self.kanzlei, "Database": self.database,
"User": self.user, "Role": self.role, "Product": self.product_id,
"Password": self.password, "Nonce": nonce,
"HMAC512Signature": hmac_signature, "RequestTimeStamp": request_time_stamp
}
self._log(f"Token request data: AppID={self.app_id}, User={self.user}, Nonce={nonce[:8]}...")
response = requests.post(self.AUTH_URL, json=data, headers=headers, timeout=Config.ADVOWARE_API_TIMEOUT_SECONDS)
self._log(f"Token response status: {response.status_code}")
response.raise_for_status()
result = response.json()
access_token = result.get("access_token")
if not access_token:
self._log("No access_token in response")
raise AdvowareTokenError("No access_token received")
self._log("Access token fetched successfully")
if self.redis_client:
effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60)
self.redis_client.set(self.TOKEN_CACHE_KEY, access_token, ex=effective_ttl)
self.redis_client.set(self.TOKEN_TIMESTAMP_CACHE_KEY, str(time.time()), ex=effective_ttl)
return access_token
def get_access_token(self, force_refresh: bool = False) -> str:
self._log("Getting access token")
if not self.redis_client:
self._log("No Redis, fetching new token")
return self._fetch_new_access_token()
if force_refresh:
self._log("Force refresh, fetching new token")
return self._fetch_new_access_token()
cached_token = self.redis_client.get(self.TOKEN_CACHE_KEY)
token_timestamp = self.redis_client.get(self.TOKEN_TIMESTAMP_CACHE_KEY)
if cached_token and token_timestamp:
try:
timestamp = float(token_timestamp.decode('utf-8'))
if time.time() - timestamp < (self.token_lifetime_minutes - 1) * 60:
self._log("Using cached token")
return cached_token.decode('utf-8')
except ValueError:
pass
self._log("Cached token expired or invalid, fetching new")
return self._fetch_new_access_token()
async def api_call(self, endpoint: str, method: str = 'GET', headers: Optional[Dict] = None,
params: Optional[Dict] = None, json_data: Optional[Dict] = None,
files: Optional[Any] = None, data: Optional[Any] = None,
timeout_seconds: Optional[int] = None) -> Any:
# Bereinige doppelte Slashes
endpoint = endpoint.lstrip('/')
url = self.API_BASE_URL.rstrip('/') + '/' + endpoint
effective_timeout = aiohttp.ClientTimeout(total=timeout_seconds or Config.ADVOWARE_API_TIMEOUT_SECONDS)
token = self.get_access_token() # Sync call
effective_headers = headers.copy() if headers else {}
effective_headers['Authorization'] = f'Bearer {token}'
effective_headers.setdefault('Content-Type', 'application/json')
# Prefer 'data' parameter over 'json_data' if provided (for backward compatibility)
json_payload = data if data is not None else json_data
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
try:
if self.context:
self.context.logger.debug(f"Making API call: {method} {url}")
else:
logger.debug(f"Making API call: {method} {url}")
async with session.request(method, url, headers=effective_headers, params=params, json=json_payload) as response:
response.raise_for_status()
if response.status == 401:
self._log("401 Unauthorized, refreshing token")
token = self.get_access_token(force_refresh=True)
effective_headers['Authorization'] = f'Bearer {token}'
async with session.request(method, url, headers=effective_headers, params=params, json=json_payload) as response:
response.raise_for_status()
return await response.json() if response.content_type == 'application/json' else None
response.raise_for_status()
if response.content_type == 'application/json':
try:
return await response.json()
except Exception as e:
self._log(f"JSON parse error: {e}")
# For methods like DELETE that may return 200 with no body, return None
return None
else:
return None
except aiohttp.ClientError as e:
self._log(f"API call failed: {e}")
raise
def _log(self, message, level='info'):
if self.context:
if level == 'debug':
self.context.logger.debug(message)
else:
self.context.logger.info(message)
else:
if level == 'debug':
logger.debug(message)
else:
logger.info(message)