feat: Implement AI Chat Completions API with support for file search, web search, and Aktenzeichen-based collection lookup
This commit is contained in:
0
steps/ai/__init__.py
Normal file
0
steps/ai/__init__.py
Normal file
@@ -1,7 +1,13 @@
|
||||
"""VMH xAI Chat Completions API
|
||||
"""AI Chat Completions API
|
||||
|
||||
OpenAI-kompatible Chat Completions API mit xAI/LangChain Backend.
|
||||
Unterstützt file_search über xAI Collections (RAG).
|
||||
Universal OpenAI-compatible Chat Completions API with xAI/LangChain Backend.
|
||||
|
||||
Features:
|
||||
- File Search (RAG) via xAI Collections
|
||||
- Web Search via xAI web_search tool
|
||||
- Aktenzeichen-based automatic collection lookup
|
||||
- Streaming & Non-Streaming support
|
||||
- Multiple tools simultaneously (file_search + web_search)
|
||||
"""
|
||||
import json
|
||||
import time
|
||||
@@ -10,11 +16,11 @@ from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH xAI Chat Completions API",
|
||||
"description": "OpenAI-compatible Chat Completions API with xAI LangChain backend",
|
||||
"flows": ["vmh-chat"],
|
||||
"name": "AI Chat Completions API",
|
||||
"description": "Universal OpenAI-compatible Chat Completions API with xAI backend, RAG, and web search",
|
||||
"flows": ["ai-general"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/v1/chat/completions")
|
||||
http("POST", "/ai/chat/completions")
|
||||
],
|
||||
}
|
||||
|
||||
@@ -25,7 +31,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
|
||||
Request Body (OpenAI format):
|
||||
{
|
||||
"model": "grok-2-latest",
|
||||
"model": "grok-4.20-beta-0309-reasoning",
|
||||
"messages": [
|
||||
{"role": "system", "content": "You are helpful"},
|
||||
{"role": "user", "content": "1234/56 Was ist der Stand?"}
|
||||
@@ -47,7 +53,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
Aktenzeichen-Erkennung (Priority):
|
||||
1. extra_body.collection_id (explicit override)
|
||||
2. First user message starts with Aktenzeichen (e.g., "1234/56 ...")
|
||||
3. Error 400 if no collection_id found (strict mode)
|
||||
3. Web-only mode if no collection_id (must enable_web_search)
|
||||
|
||||
Response (OpenAI format):
|
||||
Non-Streaming:
|
||||
@@ -55,7 +61,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"id": "chatcmpl-...",
|
||||
"object": "chat.completion",
|
||||
"created": 1234567890,
|
||||
"model": "grok-2-latest",
|
||||
"model": "grok-4.20-beta-0309-reasoning",
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"message": {"role": "assistant", "content": "..."},
|
||||
@@ -75,7 +81,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
from services.espocrm import EspoCRMAPI
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("💬 VMH CHAT COMPLETIONS API")
|
||||
ctx.logger.info("🤖 AI CHAT COMPLETIONS API")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
try:
|
||||
@@ -90,7 +96,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
)
|
||||
|
||||
# Extract parameters
|
||||
model_name = body.get('model', 'grok-4-1-fast-reasoning')
|
||||
model_name = body.get('model', 'grok-4.20-beta-0309-reasoning')
|
||||
messages = body.get('messages', [])
|
||||
temperature = body.get('temperature', 0.7)
|
||||
max_tokens = body.get('max_tokens')
|
||||
@@ -107,7 +113,16 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
ctx.logger.info(f"📋 Web Search: {'enabled' if enable_web_search else 'disabled'}")
|
||||
if enable_web_search and web_search_config:
|
||||
ctx.logger.debug(f"Web Search Config: {json.dumps(web_search_config, indent=2)}")
|
||||
ctx.logger.debug(f"Messages: {json.dumps(messages, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# Log full conversation messages
|
||||
ctx.logger.info("-" * 80)
|
||||
ctx.logger.info("📨 REQUEST MESSAGES:")
|
||||
for i, msg in enumerate(messages, 1):
|
||||
role = msg.get('role', 'unknown')
|
||||
content = msg.get('content', '')
|
||||
preview = content[:150] + "..." if len(content) > 150 else content
|
||||
ctx.logger.info(f" [{i}] {role}: {preview}")
|
||||
ctx.logger.info("-" * 80)
|
||||
|
||||
# Validate messages
|
||||
if not messages or not isinstance(messages, list):
|
||||
@@ -117,7 +132,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
body={'error': 'messages must be non-empty array'}
|
||||
)
|
||||
|
||||
# Determine collection_id (Priority: extra_body > Aktenzeichen > error)
|
||||
# Determine collection_id (Priority: extra_body > Aktenzeichen > optional for web-only)
|
||||
collection_id: Optional[str] = None
|
||||
aktenzeichen: Optional[str] = None
|
||||
|
||||
@@ -222,7 +237,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error("❌ ERROR: CHAT COMPLETIONS API")
|
||||
ctx.logger.error("❌ ERROR: AI CHAT COMPLETIONS API")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error: {e}", exc_info=True)
|
||||
ctx.logger.error(f"Request body: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||
@@ -256,8 +271,23 @@ async def handle_non_streaming_response(
|
||||
# Invoke model
|
||||
result = await langchain_service.invoke_chat(model_with_tools, messages)
|
||||
|
||||
# Extract content
|
||||
content = result.content if hasattr(result, 'content') else str(result)
|
||||
# Extract content - handle both string and structured responses
|
||||
if hasattr(result, 'content'):
|
||||
raw_content = result.content
|
||||
|
||||
# If content is a list (tool calls + text message), extract text
|
||||
if isinstance(raw_content, list):
|
||||
# Find the text message (usually last element with type='text')
|
||||
text_messages = [
|
||||
item.get('text', '')
|
||||
for item in raw_content
|
||||
if isinstance(item, dict) and item.get('type') == 'text'
|
||||
]
|
||||
content = text_messages[0] if text_messages else str(raw_content)
|
||||
else:
|
||||
content = raw_content
|
||||
else:
|
||||
content = str(result)
|
||||
|
||||
# Build OpenAI-compatible response
|
||||
response_body = {
|
||||
@@ -292,8 +322,32 @@ async def handle_non_streaming_response(
|
||||
}
|
||||
ctx.logger.info(f"📊 Token Usage: prompt={prompt_tokens}, completion={completion_tokens}")
|
||||
|
||||
# Log citations if available (from tool response annotations)
|
||||
if hasattr(result, 'content') and isinstance(result.content, list):
|
||||
# Extract citations from structured response
|
||||
for item in result.content:
|
||||
if isinstance(item, dict) and item.get('type') == 'text':
|
||||
annotations = item.get('annotations', [])
|
||||
if annotations:
|
||||
ctx.logger.info(f"🔗 Citations: {len(annotations)}")
|
||||
for i, citation in enumerate(annotations[:10], 1): # Log first 10
|
||||
url = citation.get('url', 'N/A')
|
||||
title = citation.get('title', '')
|
||||
if url.startswith('collections://'):
|
||||
# Internal collection reference
|
||||
ctx.logger.debug(f" [{i}] Collection Document: {title}")
|
||||
else:
|
||||
# External URL
|
||||
ctx.logger.debug(f" [{i}] {url}")
|
||||
|
||||
# Log complete response content
|
||||
ctx.logger.info(f"✅ Chat completion: {len(content)} chars")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📝 COMPLETE RESPONSE:")
|
||||
ctx.logger.info("-" * 80)
|
||||
ctx.logger.info(content)
|
||||
ctx.logger.info("-" * 80)
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
@@ -337,8 +391,23 @@ async def handle_streaming_response(
|
||||
total_content = ""
|
||||
|
||||
async for chunk in langchain_service.astream_chat(model_with_tools, messages):
|
||||
# Extract delta content
|
||||
delta = chunk.content if hasattr(chunk, "content") else ""
|
||||
# Extract delta content - handle structured chunks
|
||||
if hasattr(chunk, "content"):
|
||||
chunk_content = chunk.content
|
||||
|
||||
# If chunk content is a list (tool calls), extract text parts
|
||||
if isinstance(chunk_content, list):
|
||||
# Accumulate only text deltas
|
||||
text_parts = [
|
||||
item.get('text', '')
|
||||
for item in chunk_content
|
||||
if isinstance(item, dict) and item.get('type') == 'text'
|
||||
]
|
||||
delta = ''.join(text_parts)
|
||||
else:
|
||||
delta = chunk_content
|
||||
else:
|
||||
delta = ""
|
||||
|
||||
if delta:
|
||||
total_content += delta
|
||||
@@ -380,8 +449,14 @@ async def handle_streaming_response(
|
||||
# Close stream
|
||||
await ctx.response.close()
|
||||
|
||||
# Log complete streamed response
|
||||
ctx.logger.info(f"✅ Streaming completed: {chunk_count} chunks, {len(total_content)} chars")
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("📝 COMPLETE STREAMED RESPONSE:")
|
||||
ctx.logger.info("-" * 80)
|
||||
ctx.logger.info(total_content)
|
||||
ctx.logger.info("-" * 80)
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Streaming failed: {e}", exc_info=True)
|
||||
Reference in New Issue
Block a user