diff --git a/steps/ai/chat_completions_api_step.py b/steps/ai/chat_completions_api_step.py index 15905b6..f98f5bd 100644 --- a/steps/ai/chat_completions_api_step.py +++ b/steps/ai/chat_completions_api_step.py @@ -1,20 +1,29 @@ """AI Chat Completions API -Universal OpenAI-compatible Chat Completions API with xAI/LangChain Backend. + +OpenAI-compatible Chat Completions endpoint with xAI/LangChain backend. + Features: -- File Search (RAG) via xAI Collections +- File Search (RAG) via xAI Collections - Web Search via xAI web_search tool - Aktenzeichen-based automatic collection lookup -- **Echtes Streaming** (async generator + proper SSE headers) - Multiple tools simultaneously +- Clean, reusable architecture for future LLM endpoints + +Note: Streaming is not supported (Motia limitation - returns clear error). + +Reusability: +- extract_request_params(): Parse requests for any LLM endpoint +- resolve_collection_id(): Auto-detect Aktenzeichen, lookup collection +- initialize_model_with_tools(): Bind tools to any LangChain model +- invoke_and_format_response(): Standard OpenAI response formatting """ -import json import time from typing import Any, Dict, List, Optional from motia import FlowContext, http, ApiRequest, ApiResponse config = { "name": "AI Chat Completions API", - "description": "Universal OpenAI-compatible Chat Completions API with xAI backend, RAG, and web search", + "description": "OpenAI-compatible Chat Completions API with xAI backend", "flows": ["ai-general"], "triggers": [ http("POST", "/ai/v1/chat/completions"), @@ -23,259 +32,343 @@ config = { } +# ============================================================================ +# MAIN HANDLER +# ============================================================================ + async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: """ - OpenAI-compatible Chat Completions endpoint mit **echtem** Streaming. + OpenAI-compatible Chat Completions endpoint. + + Returns: + ApiResponse with chat completion or error """ ctx.logger.info("=" * 80) - ctx.logger.info("🤖 AI CHAT COMPLETIONS API – OPTIMIZED") + ctx.logger.info("🤖 AI Chat Completions API") ctx.logger.info("=" * 80) - - # Log request (sicher) - ctx.logger.info("📥 REQUEST DETAILS:") - if request.headers: - ctx.logger.info(" Headers:") - for header_name, header_value in request.headers.items(): - if header_name.lower() == 'authorization': - ctx.logger.info(f" {header_name}: Bearer ***MASKED***") - else: - ctx.logger.info(f" {header_name}: {header_value}") - + try: - # Parse body - body = request.body or {} - if not isinstance(body, dict): - return ApiResponse(status=400, body={'error': 'Request body must be JSON object'}) - - # Parameter extrahieren - model_name = body.get('model', 'grok-4.20-beta-0309-reasoning') - messages = body.get('messages', []) - temperature = body.get('temperature', 0.7) - max_tokens = body.get('max_tokens') - stream = body.get('stream', False) - extra_body = body.get('extra_body', {}) - - enable_web_search = body.get('enable_web_search', extra_body.get('enable_web_search', False)) - web_search_config = body.get('web_search_config', extra_body.get('web_search_config', {})) - - ctx.logger.info(f"📋 Model: {model_name} | Stream: {stream} | Web Search: {enable_web_search}") - - # Messages loggen (kurz) - ctx.logger.info("📨 MESSAGES:") - for i, msg in enumerate(messages, 1): - preview = (msg.get('content', '')[:120] + "...") if len(msg.get('content', '')) > 120 else msg.get('content', '') - ctx.logger.info(f" [{i}] {msg.get('role')}: {preview}") - - # === Collection + Aktenzeichen Logic (unverändert) === - collection_id: Optional[str] = None - aktenzeichen: Optional[str] = None - - if 'collection_id' in body: - collection_id = body['collection_id'] - elif 'custom_collection_id' in body: - collection_id = body['custom_collection_id'] - elif 'collection_id' in extra_body: - collection_id = extra_body['collection_id'] - else: - for msg in messages: - if msg.get('role') == 'user': - content = msg.get('content', '') - from services.aktenzeichen_utils import extract_aktenzeichen, normalize_aktenzeichen, remove_aktenzeichen - aktenzeichen_raw = extract_aktenzeichen(content) - if aktenzeichen_raw: - aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw) - collection_id = await lookup_collection_by_aktenzeichen(aktenzeichen, ctx) - if collection_id: - msg['content'] = remove_aktenzeichen(content) - break - - if not collection_id and not enable_web_search: + # 1. Parse and validate request + params = extract_request_params(request, ctx) + + # 2. Check streaming (not supported) + if params['stream']: + return ApiResponse( + status=501, + body={ + 'error': { + 'message': 'Streaming is not supported. Please set stream=false.', + 'type': 'not_implemented', + 'param': 'stream' + } + } + ) + + # 3. Resolve collection (explicit ID or Aktenzeichen lookup) + collection_id = await resolve_collection_id( + params['collection_id'], + params['messages'], + params['enable_web_search'], + ctx + ) + + # 4. Validate: collection or web_search required + if not collection_id and not params['enable_web_search']: return ApiResponse( status=400, - body={'error': 'collection_id or web_search required'} - ) - - # === Service initialisieren === - from services.langchain_xai_service import LangChainXAIService - langchain_service = LangChainXAIService(ctx) - - model = langchain_service.get_chat_model( - model=model_name, - temperature=temperature, - max_tokens=max_tokens - ) - - model_with_tools = langchain_service.bind_tools( - model=model, - collection_id=collection_id, - enable_web_search=enable_web_search, - web_search_config=web_search_config, - max_num_results=10 - ) - - completion_id = f"chatcmpl-{ctx.traceId[:12]}" if hasattr(ctx, 'traceId') else f"chatcmpl-{int(time.time())}" - created_ts = int(time.time()) - - # ====================== ECHTES STREAMING ====================== - if stream: - ctx.logger.info("🌊 Starting REAL SSE streaming (async generator)...") - - headers = { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no", # nginx / proxies - "Transfer-Encoding": "chunked", - } - - async def sse_generator(): - # Initial chunk (manche Clients brauchen das) - yield f'data: {json.dumps({"id": completion_id, "object": "chat.completion.chunk", "created": created_ts, "model": model_name, "choices": [{"index": 0, "delta": {}, "finish_reason": None}]}, ensure_ascii=False)}\n\n' - - chunk_count = 0 - async for chunk in langchain_service.astream_chat(model_with_tools, messages): - delta = "" - if hasattr(chunk, "content"): - content = chunk.content - if isinstance(content, str): - delta = content - elif isinstance(content, list): - text_parts = [item.get('text', '') for item in content if isinstance(item, dict) and item.get('type') == 'text'] - delta = ''.join(text_parts) - - if delta: - chunk_count += 1 - data = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created_ts, - "model": model_name, - "choices": [{ - "index": 0, - "delta": {"content": delta}, - "finish_reason": None - }] - } - yield f'data: {json.dumps(data, ensure_ascii=False)}\n\n' - - # Finish - finish = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created_ts, - "model": model_name, - "choices": [{ - "index": 0, - "delta": {}, - "finish_reason": "stop" - }] + body={ + 'error': { + 'message': 'Either collection_id or enable_web_search must be provided', + 'type': 'invalid_request_error' + } } - yield f'data: {json.dumps(finish, ensure_ascii=False)}\n\n' - yield "data: [DONE]\n\n" - - ctx.logger.info(f"✅ Streaming abgeschlossen – {chunk_count} Chunks gesendet") - - return ApiResponse( - status=200, - headers=headers, - body=sse_generator() # ← async generator = echtes Streaming! ) - - # ====================== NON-STREAMING (unverändert + optimiert) ====================== - else: - return await handle_non_streaming_response( - model_with_tools=model_with_tools, - messages=messages, - completion_id=completion_id, - created_ts=created_ts, - model_name=model_name, - langchain_service=langchain_service, - ctx=ctx - ) - + + # 5. Initialize LLM with tools + model_with_tools = await initialize_model_with_tools( + model_name=params['model'], + temperature=params['temperature'], + max_tokens=params['max_tokens'], + collection_id=collection_id, + enable_web_search=params['enable_web_search'], + web_search_config=params['web_search_config'], + ctx=ctx + ) + + # 6. Invoke LLM + completion_id = f"chatcmpl-{int(time.time())}" + response = await invoke_and_format_response( + model=model_with_tools, + messages=params['messages'], + completion_id=completion_id, + model_name=params['model'], + ctx=ctx + ) + + ctx.logger.info(f"✅ Completion successful – {len(response.body['choices'][0]['message']['content'])} chars") + return response + + except ValueError as e: + ctx.logger.error(f"❌ Validation error: {e}") + return ApiResponse( + status=400, + body={'error': {'message': str(e), 'type': 'invalid_request_error'}} + ) except Exception as e: - ctx.logger.error(f"❌ ERROR: {e}", exc_info=True) + ctx.logger.error(f"❌ Error: {e}") return ApiResponse( status=500, - body={'error': 'Internal server error', 'message': str(e)} + body={'error': {'message': 'Internal server error', 'type': 'server_error'}} ) -async def handle_non_streaming_response( - model_with_tools, +# ============================================================================ +# REUSABLE HELPER FUNCTIONS +# ============================================================================ + +def extract_request_params(request: ApiRequest, ctx: FlowContext) -> Dict[str, Any]: + """ + Extract and validate request parameters. + + Returns: + Dict with validated parameters + + Raises: + ValueError: If validation fails + """ + body = request.body or {} + + if not isinstance(body, dict): + raise ValueError("Request body must be JSON object") + + messages = body.get('messages', []) + if not messages or not isinstance(messages, list): + raise ValueError("messages must be non-empty array") + + # Extract parameters with defaults + params = { + 'model': body.get('model', 'grok-4-1-fast-reasoning'), + 'messages': messages, + 'temperature': body.get('temperature', 0.7), + 'max_tokens': body.get('max_tokens'), + 'stream': body.get('stream', False), + 'extra_body': body.get('extra_body', {}), + } + + # Handle enable_web_search (body or extra_body) + params['enable_web_search'] = body.get( + 'enable_web_search', + params['extra_body'].get('enable_web_search', False) + ) + + # Handle web_search_config + params['web_search_config'] = body.get( + 'web_search_config', + params['extra_body'].get('web_search_config', {}) + ) + + # Handle collection_id (multiple sources) + params['collection_id'] = ( + body.get('collection_id') or + body.get('custom_collection_id') or + params['extra_body'].get('collection_id') + ) + + # Log concisely + ctx.logger.info(f"📋 Model: {params['model']} | Stream: {params['stream']}") + ctx.logger.info(f"📋 Web Search: {params['enable_web_search']} | Collection: {params['collection_id'] or 'auto'}") + ctx.logger.info(f"📨 Messages: {len(messages)}") + + return params + + +async def resolve_collection_id( + explicit_collection_id: Optional[str], + messages: List[Dict[str, Any]], + enable_web_search: bool, + ctx: FlowContext +) -> Optional[str]: + """ + Resolve collection ID from explicit ID or Aktenzeichen auto-detection. + + Args: + explicit_collection_id: Explicitly provided collection ID + messages: Chat messages (for Aktenzeichen extraction) + enable_web_search: Whether web search is enabled + ctx: Motia context + + Returns: + Collection ID or None + """ + # Explicit collection ID takes precedence + if explicit_collection_id: + ctx.logger.info(f"🔍 Using explicit collection: {explicit_collection_id}") + return explicit_collection_id + + # Try Aktenzeichen auto-detection from first user message + from services.aktenzeichen_utils import ( + extract_aktenzeichen, + normalize_aktenzeichen, + remove_aktenzeichen + ) + + for msg in messages: + if msg.get('role') == 'user': + content = msg.get('content', '') + aktenzeichen_raw = extract_aktenzeichen(content) + + if aktenzeichen_raw: + aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw) + ctx.logger.info(f"🔍 Aktenzeichen detected: {aktenzeichen}") + + collection_id = await lookup_collection_by_aktenzeichen(aktenzeichen, ctx) + + if collection_id: + # Clean Aktenzeichen from message + msg['content'] = remove_aktenzeichen(content) + ctx.logger.info(f"✅ Collection found: {collection_id}") + return collection_id + else: + ctx.logger.warning(f"⚠️ No collection for Aktenzeichen: {aktenzeichen}") + break # Only check first user message + + return None + + +async def initialize_model_with_tools( + model_name: str, + temperature: float, + max_tokens: Optional[int], + collection_id: Optional[str], + enable_web_search: bool, + web_search_config: Dict[str, Any], + ctx: FlowContext +) -> Any: + """ + Initialize LangChain model with tool bindings (file_search, web_search). + + Returns: + Model instance with tools bound + """ + from services.langchain_xai_service import LangChainXAIService + + service = LangChainXAIService(ctx) + + # Create base model + model = service.get_chat_model( + model=model_name, + temperature=temperature, + max_tokens=max_tokens + ) + + # Bind tools + model_with_tools = service.bind_tools( + model=model, + collection_id=collection_id, + enable_web_search=enable_web_search, + web_search_config=web_search_config, + max_num_results=10 + ) + + return model_with_tools + + +async def invoke_and_format_response( + model: Any, messages: List[Dict[str, Any]], completion_id: str, - created_ts: int, model_name: str, - langchain_service, ctx: FlowContext ) -> ApiResponse: - """Non-Streaming Handler (optimiert).""" - try: - result = await langchain_service.invoke_chat(model_with_tools, messages) - - # Content extrahieren (kompatibel mit xAI structured output) - if hasattr(result, 'content'): - raw = result.content - if isinstance(raw, list): - text_parts = [item.get('text', '') for item in raw if isinstance(item, dict) and item.get('type') == 'text'] - content = ''.join(text_parts) or str(raw) - else: - content = raw + """ + Invoke LLM and format response in OpenAI-compatible format. + + Returns: + ApiResponse with chat completion + """ + from services.langchain_xai_service import LangChainXAIService + + service = LangChainXAIService(ctx) + result = await service.invoke_chat(model, messages) + + # Extract content (handle structured responses) + if hasattr(result, 'content'): + raw = result.content + if isinstance(raw, list): + # Extract text parts from structured response + text_parts = [ + item.get('text', '') + for item in raw + if isinstance(item, dict) and item.get('type') == 'text' + ] + content = ''.join(text_parts) or str(raw) else: - content = str(result) - - # Usage (falls verfügbar) - usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} - if hasattr(result, 'usage_metadata'): - u = result.usage_metadata - usage = { - "prompt_tokens": getattr(u, 'input_tokens', 0), - "completion_tokens": getattr(u, 'output_tokens', 0), - "total_tokens": getattr(u, 'input_tokens', 0) + getattr(u, 'output_tokens', 0) - } - - response_body = { - 'id': completion_id, - 'object': 'chat.completion', - 'created': created_ts, - 'model': model_name, - 'choices': [{ - 'index': 0, - 'message': {'role': 'assistant', 'content': content}, - 'finish_reason': 'stop' - }], - 'usage': usage + content = raw + else: + content = str(result) + + # Extract usage metadata (if available) + usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + if hasattr(result, 'usage_metadata'): + u = result.usage_metadata + usage = { + "prompt_tokens": getattr(u, 'input_tokens', 0), + "completion_tokens": getattr(u, 'output_tokens', 0), + "total_tokens": getattr(u, 'input_tokens', 0) + getattr(u, 'output_tokens', 0) } - - ctx.logger.info(f"✅ Non-streaming fertig – {len(content)} Zeichen") - return ApiResponse(status=200, body=response_body) - - except Exception as e: - ctx.logger.error(f"❌ Non-streaming failed: {e}") - raise + + # Format OpenAI-compatible response + response_body = { + 'id': completion_id, + 'object': 'chat.completion', + 'created': int(time.time()), + 'model': model_name, + 'choices': [{ + 'index': 0, + 'message': {'role': 'assistant', 'content': content}, + 'finish_reason': 'stop' + }], + 'usage': usage + } + + return ApiResponse(status=200, body=response_body) -async def lookup_collection_by_aktenzeichen(aktenzeichen: str, ctx: FlowContext) -> Optional[str]: - """Aktenzeichen → Collection Lookup (unverändert).""" +async def lookup_collection_by_aktenzeichen( + aktenzeichen: str, + ctx: FlowContext +) -> Optional[str]: + """ + Lookup xAI Collection ID by Aktenzeichen via EspoCRM. + + Args: + aktenzeichen: Normalized Aktenzeichen (e.g., "1234/56") + ctx: Motia context + + Returns: + Collection ID or None if not found + """ try: from services.espocrm import EspoCRMAPI + espocrm = EspoCRMAPI(ctx) - ctx.logger.info(f"🔍 Suche Räumungsklage für Aktenzeichen: {aktenzeichen}") - + search_result = await espocrm.search_entities( entity_type='Raeumungsklage', - where=[{'type': 'equals', 'attribute': 'advowareAkteBezeichner', 'value': aktenzeichen}], + where=[{ + 'type': 'equals', + 'attribute': 'advowareAkteBezeichner', + 'value': aktenzeichen + }], select=['id', 'xaiCollectionId'], maxSize=1 ) - + if search_result and len(search_result) > 0: - collection_id = search_result[0].get('xaiCollectionId') - if collection_id: - ctx.logger.info(f"✅ Collection gefunden: {collection_id}") - return collection_id + return search_result[0].get('xaiCollectionId') + return None + except Exception as e: - ctx.logger.error(f"❌ Lookup failed: {e}") + ctx.logger.error(f"❌ Collection lookup failed: {e}") return None \ No newline at end of file diff --git a/steps/ai/models_list_api_step.py b/steps/ai/models_list_api_step.py index 63b22f2..44e07bd 100644 --- a/steps/ai/models_list_api_step.py +++ b/steps/ai/models_list_api_step.py @@ -112,7 +112,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"❌ Error listing models: {e}", exc_info=True) + ctx.logger.error(f"❌ Error listing models: {e}") return ApiResponse( status=500, body={