fix: Simplify error logging in models list API handler

This commit is contained in:
bsiggel
2026-03-19 09:48:57 +00:00
parent 69f0c6a44d
commit 7fffdb2660
2 changed files with 318 additions and 225 deletions

View File

@@ -1,20 +1,29 @@
"""AI Chat Completions API """AI Chat Completions API
Universal OpenAI-compatible Chat Completions API with xAI/LangChain Backend.
OpenAI-compatible Chat Completions endpoint with xAI/LangChain backend.
Features: Features:
- File Search (RAG) via xAI Collections - File Search (RAG) via xAI Collections
- Web Search via xAI web_search tool - Web Search via xAI web_search tool
- Aktenzeichen-based automatic collection lookup - Aktenzeichen-based automatic collection lookup
- **Echtes Streaming** (async generator + proper SSE headers)
- Multiple tools simultaneously - Multiple tools simultaneously
- Clean, reusable architecture for future LLM endpoints
Note: Streaming is not supported (Motia limitation - returns clear error).
Reusability:
- extract_request_params(): Parse requests for any LLM endpoint
- resolve_collection_id(): Auto-detect Aktenzeichen, lookup collection
- initialize_model_with_tools(): Bind tools to any LangChain model
- invoke_and_format_response(): Standard OpenAI response formatting
""" """
import json
import time import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from motia import FlowContext, http, ApiRequest, ApiResponse from motia import FlowContext, http, ApiRequest, ApiResponse
config = { config = {
"name": "AI Chat Completions API", "name": "AI Chat Completions API",
"description": "Universal OpenAI-compatible Chat Completions API with xAI backend, RAG, and web search", "description": "OpenAI-compatible Chat Completions API with xAI backend",
"flows": ["ai-general"], "flows": ["ai-general"],
"triggers": [ "triggers": [
http("POST", "/ai/v1/chat/completions"), http("POST", "/ai/v1/chat/completions"),
@@ -23,89 +32,237 @@ config = {
} }
# ============================================================================
# MAIN HANDLER
# ============================================================================
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
""" """
OpenAI-compatible Chat Completions endpoint mit **echtem** Streaming. OpenAI-compatible Chat Completions endpoint.
Returns:
ApiResponse with chat completion or error
""" """
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
ctx.logger.info("🤖 AI CHAT COMPLETIONS API OPTIMIZED") ctx.logger.info("🤖 AI Chat Completions API")
ctx.logger.info("=" * 80) ctx.logger.info("=" * 80)
# Log request (sicher)
ctx.logger.info("📥 REQUEST DETAILS:")
if request.headers:
ctx.logger.info(" Headers:")
for header_name, header_value in request.headers.items():
if header_name.lower() == 'authorization':
ctx.logger.info(f" {header_name}: Bearer ***MASKED***")
else:
ctx.logger.info(f" {header_name}: {header_value}")
try: try:
# Parse body # 1. Parse and validate request
params = extract_request_params(request, ctx)
# 2. Check streaming (not supported)
if params['stream']:
return ApiResponse(
status=501,
body={
'error': {
'message': 'Streaming is not supported. Please set stream=false.',
'type': 'not_implemented',
'param': 'stream'
}
}
)
# 3. Resolve collection (explicit ID or Aktenzeichen lookup)
collection_id = await resolve_collection_id(
params['collection_id'],
params['messages'],
params['enable_web_search'],
ctx
)
# 4. Validate: collection or web_search required
if not collection_id and not params['enable_web_search']:
return ApiResponse(
status=400,
body={
'error': {
'message': 'Either collection_id or enable_web_search must be provided',
'type': 'invalid_request_error'
}
}
)
# 5. Initialize LLM with tools
model_with_tools = await initialize_model_with_tools(
model_name=params['model'],
temperature=params['temperature'],
max_tokens=params['max_tokens'],
collection_id=collection_id,
enable_web_search=params['enable_web_search'],
web_search_config=params['web_search_config'],
ctx=ctx
)
# 6. Invoke LLM
completion_id = f"chatcmpl-{int(time.time())}"
response = await invoke_and_format_response(
model=model_with_tools,
messages=params['messages'],
completion_id=completion_id,
model_name=params['model'],
ctx=ctx
)
ctx.logger.info(f"✅ Completion successful {len(response.body['choices'][0]['message']['content'])} chars")
return response
except ValueError as e:
ctx.logger.error(f"❌ Validation error: {e}")
return ApiResponse(
status=400,
body={'error': {'message': str(e), 'type': 'invalid_request_error'}}
)
except Exception as e:
ctx.logger.error(f"❌ Error: {e}")
return ApiResponse(
status=500,
body={'error': {'message': 'Internal server error', 'type': 'server_error'}}
)
# ============================================================================
# REUSABLE HELPER FUNCTIONS
# ============================================================================
def extract_request_params(request: ApiRequest, ctx: FlowContext) -> Dict[str, Any]:
"""
Extract and validate request parameters.
Returns:
Dict with validated parameters
Raises:
ValueError: If validation fails
"""
body = request.body or {} body = request.body or {}
if not isinstance(body, dict): if not isinstance(body, dict):
return ApiResponse(status=400, body={'error': 'Request body must be JSON object'}) raise ValueError("Request body must be JSON object")
# Parameter extrahieren
model_name = body.get('model', 'grok-4.20-beta-0309-reasoning')
messages = body.get('messages', []) messages = body.get('messages', [])
temperature = body.get('temperature', 0.7) if not messages or not isinstance(messages, list):
max_tokens = body.get('max_tokens') raise ValueError("messages must be non-empty array")
stream = body.get('stream', False)
extra_body = body.get('extra_body', {})
enable_web_search = body.get('enable_web_search', extra_body.get('enable_web_search', False)) # Extract parameters with defaults
web_search_config = body.get('web_search_config', extra_body.get('web_search_config', {})) params = {
'model': body.get('model', 'grok-4-1-fast-reasoning'),
'messages': messages,
'temperature': body.get('temperature', 0.7),
'max_tokens': body.get('max_tokens'),
'stream': body.get('stream', False),
'extra_body': body.get('extra_body', {}),
}
ctx.logger.info(f"📋 Model: {model_name} | Stream: {stream} | Web Search: {enable_web_search}") # Handle enable_web_search (body or extra_body)
params['enable_web_search'] = body.get(
'enable_web_search',
params['extra_body'].get('enable_web_search', False)
)
# Messages loggen (kurz) # Handle web_search_config
ctx.logger.info("📨 MESSAGES:") params['web_search_config'] = body.get(
for i, msg in enumerate(messages, 1): 'web_search_config',
preview = (msg.get('content', '')[:120] + "...") if len(msg.get('content', '')) > 120 else msg.get('content', '') params['extra_body'].get('web_search_config', {})
ctx.logger.info(f" [{i}] {msg.get('role')}: {preview}") )
# === Collection + Aktenzeichen Logic (unverändert) === # Handle collection_id (multiple sources)
collection_id: Optional[str] = None params['collection_id'] = (
aktenzeichen: Optional[str] = None body.get('collection_id') or
body.get('custom_collection_id') or
params['extra_body'].get('collection_id')
)
# Log concisely
ctx.logger.info(f"📋 Model: {params['model']} | Stream: {params['stream']}")
ctx.logger.info(f"📋 Web Search: {params['enable_web_search']} | Collection: {params['collection_id'] or 'auto'}")
ctx.logger.info(f"📨 Messages: {len(messages)}")
return params
async def resolve_collection_id(
explicit_collection_id: Optional[str],
messages: List[Dict[str, Any]],
enable_web_search: bool,
ctx: FlowContext
) -> Optional[str]:
"""
Resolve collection ID from explicit ID or Aktenzeichen auto-detection.
Args:
explicit_collection_id: Explicitly provided collection ID
messages: Chat messages (for Aktenzeichen extraction)
enable_web_search: Whether web search is enabled
ctx: Motia context
Returns:
Collection ID or None
"""
# Explicit collection ID takes precedence
if explicit_collection_id:
ctx.logger.info(f"🔍 Using explicit collection: {explicit_collection_id}")
return explicit_collection_id
# Try Aktenzeichen auto-detection from first user message
from services.aktenzeichen_utils import (
extract_aktenzeichen,
normalize_aktenzeichen,
remove_aktenzeichen
)
if 'collection_id' in body:
collection_id = body['collection_id']
elif 'custom_collection_id' in body:
collection_id = body['custom_collection_id']
elif 'collection_id' in extra_body:
collection_id = extra_body['collection_id']
else:
for msg in messages: for msg in messages:
if msg.get('role') == 'user': if msg.get('role') == 'user':
content = msg.get('content', '') content = msg.get('content', '')
from services.aktenzeichen_utils import extract_aktenzeichen, normalize_aktenzeichen, remove_aktenzeichen
aktenzeichen_raw = extract_aktenzeichen(content) aktenzeichen_raw = extract_aktenzeichen(content)
if aktenzeichen_raw: if aktenzeichen_raw:
aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw) aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw)
ctx.logger.info(f"🔍 Aktenzeichen detected: {aktenzeichen}")
collection_id = await lookup_collection_by_aktenzeichen(aktenzeichen, ctx) collection_id = await lookup_collection_by_aktenzeichen(aktenzeichen, ctx)
if collection_id: if collection_id:
# Clean Aktenzeichen from message
msg['content'] = remove_aktenzeichen(content) msg['content'] = remove_aktenzeichen(content)
break ctx.logger.info(f"✅ Collection found: {collection_id}")
return collection_id
else:
ctx.logger.warning(f"⚠️ No collection for Aktenzeichen: {aktenzeichen}")
break # Only check first user message
if not collection_id and not enable_web_search: return None
return ApiResponse(
status=400,
body={'error': 'collection_id or web_search required'}
)
# === Service initialisieren ===
async def initialize_model_with_tools(
model_name: str,
temperature: float,
max_tokens: Optional[int],
collection_id: Optional[str],
enable_web_search: bool,
web_search_config: Dict[str, Any],
ctx: FlowContext
) -> Any:
"""
Initialize LangChain model with tool bindings (file_search, web_search).
Returns:
Model instance with tools bound
"""
from services.langchain_xai_service import LangChainXAIService from services.langchain_xai_service import LangChainXAIService
langchain_service = LangChainXAIService(ctx)
model = langchain_service.get_chat_model( service = LangChainXAIService(ctx)
# Create base model
model = service.get_chat_model(
model=model_name, model=model_name,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens max_tokens=max_tokens
) )
model_with_tools = langchain_service.bind_tools( # Bind tools
model_with_tools = service.bind_tools(
model=model, model=model,
collection_id=collection_id, collection_id=collection_id,
enable_web_search=enable_web_search, enable_web_search=enable_web_search,
@@ -113,119 +270,44 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
max_num_results=10 max_num_results=10
) )
completion_id = f"chatcmpl-{ctx.traceId[:12]}" if hasattr(ctx, 'traceId') else f"chatcmpl-{int(time.time())}" return model_with_tools
created_ts = int(time.time())
# ====================== ECHTES STREAMING ======================
if stream:
ctx.logger.info("🌊 Starting REAL SSE streaming (async generator)...")
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # nginx / proxies
"Transfer-Encoding": "chunked",
}
async def sse_generator():
# Initial chunk (manche Clients brauchen das)
yield f'data: {json.dumps({"id": completion_id, "object": "chat.completion.chunk", "created": created_ts, "model": model_name, "choices": [{"index": 0, "delta": {}, "finish_reason": None}]}, ensure_ascii=False)}\n\n'
chunk_count = 0
async for chunk in langchain_service.astream_chat(model_with_tools, messages):
delta = ""
if hasattr(chunk, "content"):
content = chunk.content
if isinstance(content, str):
delta = content
elif isinstance(content, list):
text_parts = [item.get('text', '') for item in content if isinstance(item, dict) and item.get('type') == 'text']
delta = ''.join(text_parts)
if delta:
chunk_count += 1
data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model_name,
"choices": [{
"index": 0,
"delta": {"content": delta},
"finish_reason": None
}]
}
yield f'data: {json.dumps(data, ensure_ascii=False)}\n\n'
# Finish
finish = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created_ts,
"model": model_name,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f'data: {json.dumps(finish, ensure_ascii=False)}\n\n'
yield "data: [DONE]\n\n"
ctx.logger.info(f"✅ Streaming abgeschlossen {chunk_count} Chunks gesendet")
return ApiResponse(
status=200,
headers=headers,
body=sse_generator() # ← async generator = echtes Streaming!
)
# ====================== NON-STREAMING (unverändert + optimiert) ======================
else:
return await handle_non_streaming_response(
model_with_tools=model_with_tools,
messages=messages,
completion_id=completion_id,
created_ts=created_ts,
model_name=model_name,
langchain_service=langchain_service,
ctx=ctx
)
except Exception as e:
ctx.logger.error(f"❌ ERROR: {e}", exc_info=True)
return ApiResponse(
status=500,
body={'error': 'Internal server error', 'message': str(e)}
)
async def handle_non_streaming_response( async def invoke_and_format_response(
model_with_tools, model: Any,
messages: List[Dict[str, Any]], messages: List[Dict[str, Any]],
completion_id: str, completion_id: str,
created_ts: int,
model_name: str, model_name: str,
langchain_service,
ctx: FlowContext ctx: FlowContext
) -> ApiResponse: ) -> ApiResponse:
"""Non-Streaming Handler (optimiert).""" """
try: Invoke LLM and format response in OpenAI-compatible format.
result = await langchain_service.invoke_chat(model_with_tools, messages)
# Content extrahieren (kompatibel mit xAI structured output) Returns:
ApiResponse with chat completion
"""
from services.langchain_xai_service import LangChainXAIService
service = LangChainXAIService(ctx)
result = await service.invoke_chat(model, messages)
# Extract content (handle structured responses)
if hasattr(result, 'content'): if hasattr(result, 'content'):
raw = result.content raw = result.content
if isinstance(raw, list): if isinstance(raw, list):
text_parts = [item.get('text', '') for item in raw if isinstance(item, dict) and item.get('type') == 'text'] # Extract text parts from structured response
text_parts = [
item.get('text', '')
for item in raw
if isinstance(item, dict) and item.get('type') == 'text'
]
content = ''.join(text_parts) or str(raw) content = ''.join(text_parts) or str(raw)
else: else:
content = raw content = raw
else: else:
content = str(result) content = str(result)
# Usage (falls verfügbar) # Extract usage metadata (if available)
usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
if hasattr(result, 'usage_metadata'): if hasattr(result, 'usage_metadata'):
u = result.usage_metadata u = result.usage_metadata
@@ -235,10 +317,11 @@ async def handle_non_streaming_response(
"total_tokens": getattr(u, 'input_tokens', 0) + getattr(u, 'output_tokens', 0) "total_tokens": getattr(u, 'input_tokens', 0) + getattr(u, 'output_tokens', 0)
} }
# Format OpenAI-compatible response
response_body = { response_body = {
'id': completion_id, 'id': completion_id,
'object': 'chat.completion', 'object': 'chat.completion',
'created': created_ts, 'created': int(time.time()),
'model': model_name, 'model': model_name,
'choices': [{ 'choices': [{
'index': 0, 'index': 0,
@@ -248,34 +331,44 @@ async def handle_non_streaming_response(
'usage': usage 'usage': usage
} }
ctx.logger.info(f"✅ Non-streaming fertig {len(content)} Zeichen")
return ApiResponse(status=200, body=response_body) return ApiResponse(status=200, body=response_body)
except Exception as e:
ctx.logger.error(f"❌ Non-streaming failed: {e}")
raise
async def lookup_collection_by_aktenzeichen(
aktenzeichen: str,
ctx: FlowContext
) -> Optional[str]:
"""
Lookup xAI Collection ID by Aktenzeichen via EspoCRM.
async def lookup_collection_by_aktenzeichen(aktenzeichen: str, ctx: FlowContext) -> Optional[str]: Args:
"""Aktenzeichen → Collection Lookup (unverändert).""" aktenzeichen: Normalized Aktenzeichen (e.g., "1234/56")
ctx: Motia context
Returns:
Collection ID or None if not found
"""
try: try:
from services.espocrm import EspoCRMAPI from services.espocrm import EspoCRMAPI
espocrm = EspoCRMAPI(ctx) espocrm = EspoCRMAPI(ctx)
ctx.logger.info(f"🔍 Suche Räumungsklage für Aktenzeichen: {aktenzeichen}")
search_result = await espocrm.search_entities( search_result = await espocrm.search_entities(
entity_type='Raeumungsklage', entity_type='Raeumungsklage',
where=[{'type': 'equals', 'attribute': 'advowareAkteBezeichner', 'value': aktenzeichen}], where=[{
'type': 'equals',
'attribute': 'advowareAkteBezeichner',
'value': aktenzeichen
}],
select=['id', 'xaiCollectionId'], select=['id', 'xaiCollectionId'],
maxSize=1 maxSize=1
) )
if search_result and len(search_result) > 0: if search_result and len(search_result) > 0:
collection_id = search_result[0].get('xaiCollectionId') return search_result[0].get('xaiCollectionId')
if collection_id:
ctx.logger.info(f"✅ Collection gefunden: {collection_id}")
return collection_id
return None return None
except Exception as e: except Exception as e:
ctx.logger.error(f"Lookup failed: {e}") ctx.logger.error(f"Collection lookup failed: {e}")
return None return None

View File

@@ -112,7 +112,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
) )
except Exception as e: except Exception as e:
ctx.logger.error(f"❌ Error listing models: {e}", exc_info=True) ctx.logger.error(f"❌ Error listing models: {e}")
return ApiResponse( return ApiResponse(
status=500, status=500,
body={ body={