feat: Add Aktenzeichen utility functions and LangChain xAI service integration
- Implemented utility functions for extracting, validating, and normalizing Aktenzeichen in 'aktenzeichen_utils.py'. - Created LangChainXAIService for integrating LangChain ChatXAI with file search capabilities in 'langchain_xai_service.py'. - Developed VMH xAI Chat Completions API to handle OpenAI-compatible requests with support for Aktenzeichen detection and file search in 'xai_chat_completion_api_step.py'.
This commit is contained in:
@@ -18,5 +18,8 @@ dependencies = [
|
||||
"google-api-python-client>=2.100.0", # Google Calendar API
|
||||
"google-auth>=2.23.0", # Google OAuth2
|
||||
"backoff>=2.2.1", # Retry/backoff decorator
|
||||
"langchain>=0.3.0", # LangChain framework
|
||||
"langchain-xai>=0.2.0", # xAI integration for LangChain
|
||||
"langchain-core>=0.3.0", # LangChain core
|
||||
]
|
||||
|
||||
|
||||
110
services/aktenzeichen_utils.py
Normal file
110
services/aktenzeichen_utils.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Aktenzeichen-Erkennung und Validation
|
||||
|
||||
Utility functions für das Erkennen, Validieren und Normalisieren von
|
||||
Aktenzeichen im Format '1234/56' oder 'ABC/23'.
|
||||
"""
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# Regex für Aktenzeichen: 1-4 Zeichen (alphanumerisch) + "/" + 2 Ziffern
|
||||
AKTENZEICHEN_REGEX = re.compile(r'^([A-Za-z0-9]{1,4}/\d{2})\s*', re.IGNORECASE)
|
||||
|
||||
|
||||
def extract_aktenzeichen(text: str) -> Optional[str]:
|
||||
"""
|
||||
Extrahiert Aktenzeichen vom Anfang des Textes.
|
||||
|
||||
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}
|
||||
|
||||
Examples:
|
||||
>>> extract_aktenzeichen("1234/56 Was ist der Stand?")
|
||||
"1234/56"
|
||||
>>> extract_aktenzeichen("ABC/23 Frage zum Vertrag")
|
||||
"ABC/23"
|
||||
>>> extract_aktenzeichen("Kein Aktenzeichen hier")
|
||||
None
|
||||
|
||||
Args:
|
||||
text: Eingabetext (z.B. erste Message)
|
||||
|
||||
Returns:
|
||||
Aktenzeichen als String, oder None wenn nicht gefunden
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return None
|
||||
|
||||
match = AKTENZEICHEN_REGEX.match(text.strip())
|
||||
return match.group(1) if match else None
|
||||
|
||||
|
||||
def remove_aktenzeichen(text: str) -> str:
|
||||
"""
|
||||
Entfernt Aktenzeichen vom Anfang des Textes.
|
||||
|
||||
Examples:
|
||||
>>> remove_aktenzeichen("1234/56 Was ist der Stand?")
|
||||
"Was ist der Stand?"
|
||||
>>> remove_aktenzeichen("Kein Aktenzeichen")
|
||||
"Kein Aktenzeichen"
|
||||
|
||||
Args:
|
||||
text: Eingabetext mit Aktenzeichen
|
||||
|
||||
Returns:
|
||||
Text ohne Aktenzeichen (whitespace getrimmt)
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return text
|
||||
|
||||
return AKTENZEICHEN_REGEX.sub('', text, count=1).strip()
|
||||
|
||||
|
||||
def validate_aktenzeichen(az: str) -> bool:
|
||||
"""
|
||||
Validiert Aktenzeichen-Format.
|
||||
|
||||
Pattern: ^[A-Za-z0-9]{1,4}/\d{2}$
|
||||
|
||||
Examples:
|
||||
>>> validate_aktenzeichen("1234/56")
|
||||
True
|
||||
>>> validate_aktenzeichen("ABC/23")
|
||||
True
|
||||
>>> validate_aktenzeichen("12345/567") # Zu lang
|
||||
False
|
||||
>>> validate_aktenzeichen("1234-56") # Falsches Trennzeichen
|
||||
False
|
||||
|
||||
Args:
|
||||
az: Aktenzeichen zum Validieren
|
||||
|
||||
Returns:
|
||||
True wenn valide, False sonst
|
||||
"""
|
||||
if not az or not isinstance(az, str):
|
||||
return False
|
||||
|
||||
return bool(re.match(r'^[A-Za-z0-9]{1,4}/\d{2}$', az, re.IGNORECASE))
|
||||
|
||||
|
||||
def normalize_aktenzeichen(az: str) -> str:
|
||||
"""
|
||||
Normalisiert Aktenzeichen (uppercase, trim whitespace).
|
||||
|
||||
Examples:
|
||||
>>> normalize_aktenzeichen("abc/23")
|
||||
"ABC/23"
|
||||
>>> normalize_aktenzeichen(" 1234/56 ")
|
||||
"1234/56"
|
||||
|
||||
Args:
|
||||
az: Aktenzeichen zum Normalisieren
|
||||
|
||||
Returns:
|
||||
Normalisiertes Aktenzeichen (uppercase, getrimmt)
|
||||
"""
|
||||
if not az or not isinstance(az, str):
|
||||
return az
|
||||
|
||||
return az.strip().upper()
|
||||
162
services/langchain_xai_service.py
Normal file
162
services/langchain_xai_service.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""LangChain xAI Integration Service
|
||||
|
||||
Service für LangChain ChatXAI Integration mit File Search Binding.
|
||||
Analog zu xai_service.py für xAI Files API.
|
||||
"""
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, AsyncIterator
|
||||
from services.logging_utils import get_service_logger
|
||||
|
||||
|
||||
class LangChainXAIService:
|
||||
"""
|
||||
Wrapper für LangChain ChatXAI mit Motia-Integration.
|
||||
|
||||
Benötigte Umgebungsvariablen:
|
||||
- XAI_API_KEY: API Key für xAI (für ChatXAI model)
|
||||
|
||||
Usage:
|
||||
service = LangChainXAIService(ctx)
|
||||
model = service.get_chat_model(model="grok-2-latest")
|
||||
model_with_tools = service.bind_file_search(model, collection_id)
|
||||
result = await service.invoke_chat(model_with_tools, messages)
|
||||
"""
|
||||
|
||||
def __init__(self, ctx=None):
|
||||
"""
|
||||
Initialize LangChain xAI Service.
|
||||
|
||||
Args:
|
||||
ctx: Optional Motia context for logging
|
||||
|
||||
Raises:
|
||||
ValueError: If XAI_API_KEY not configured
|
||||
"""
|
||||
self.api_key = os.getenv('XAI_API_KEY', '')
|
||||
self.ctx = ctx
|
||||
self.logger = get_service_logger('langchain_xai', ctx)
|
||||
|
||||
if not self.api_key:
|
||||
raise ValueError("XAI_API_KEY not configured in environment")
|
||||
|
||||
def _log(self, msg: str, level: str = 'info') -> None:
|
||||
"""Delegate logging to service logger"""
|
||||
log_func = getattr(self.logger, level, self.logger.info)
|
||||
log_func(msg)
|
||||
|
||||
def get_chat_model(
|
||||
self,
|
||||
model: str = "grok-2-latest",
|
||||
temperature: float = 0.7,
|
||||
max_tokens: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
Initialisiert ChatXAI Model.
|
||||
|
||||
Args:
|
||||
model: Model name (default: grok-2-latest)
|
||||
temperature: Sampling temperature 0.0-1.0
|
||||
max_tokens: Optional max tokens for response
|
||||
|
||||
Returns:
|
||||
ChatXAI model instance
|
||||
|
||||
Raises:
|
||||
ImportError: If langchain_xai not installed
|
||||
"""
|
||||
try:
|
||||
from langchain_xai import ChatXAI
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"langchain_xai not installed. "
|
||||
"Run: pip install langchain-xai>=0.2.0"
|
||||
)
|
||||
|
||||
self._log(f"🤖 Initializing ChatXAI: model={model}, temp={temperature}")
|
||||
|
||||
kwargs = {
|
||||
"model": model,
|
||||
"api_key": self.api_key,
|
||||
"temperature": temperature
|
||||
}
|
||||
if max_tokens:
|
||||
kwargs["max_tokens"] = max_tokens
|
||||
|
||||
return ChatXAI(**kwargs)
|
||||
|
||||
def bind_file_search(
|
||||
self,
|
||||
model,
|
||||
collection_id: str,
|
||||
max_num_results: int = 10
|
||||
):
|
||||
"""
|
||||
Bindet xAI file_search Tool an Model.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model instance
|
||||
collection_id: xAI Collection ID (vector store)
|
||||
max_num_results: Max results from file search (default: 10)
|
||||
|
||||
Returns:
|
||||
Model with bound file_search tool
|
||||
"""
|
||||
self._log(f"🔍 Binding file_search: collection={collection_id}, max_results={max_num_results}")
|
||||
|
||||
tools = [{
|
||||
"type": "file_search",
|
||||
"vector_store_ids": [collection_id],
|
||||
"max_num_results": max_num_results
|
||||
}]
|
||||
|
||||
return model.bind_tools(tools)
|
||||
|
||||
async def invoke_chat(
|
||||
self,
|
||||
model,
|
||||
messages: List[Dict[str, Any]]
|
||||
) -> Any:
|
||||
"""
|
||||
Non-streaming Chat Completion.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model (with or without tools)
|
||||
messages: List of message dicts [{"role": "user", "content": "..."}]
|
||||
|
||||
Returns:
|
||||
LangChain AIMessage with response
|
||||
|
||||
Raises:
|
||||
Exception: If API call fails
|
||||
"""
|
||||
self._log(f"💬 Invoking chat: {len(messages)} messages", level='debug')
|
||||
|
||||
result = await model.ainvoke(messages)
|
||||
|
||||
self._log(f"✅ Response received: {len(result.content)} chars", level='debug')
|
||||
return result
|
||||
|
||||
async def astream_chat(
|
||||
self,
|
||||
model,
|
||||
messages: List[Dict[str, Any]]
|
||||
) -> AsyncIterator:
|
||||
"""
|
||||
Streaming Chat Completion.
|
||||
|
||||
Args:
|
||||
model: ChatXAI model (with or without tools)
|
||||
messages: List of message dicts
|
||||
|
||||
Yields:
|
||||
Chunks from streaming response
|
||||
|
||||
Example:
|
||||
async for chunk in service.astream_chat(model, messages):
|
||||
delta = chunk.content if hasattr(chunk, "content") else ""
|
||||
# Process delta...
|
||||
"""
|
||||
self._log(f"💬 Streaming chat: {len(messages)} messages", level='debug')
|
||||
|
||||
async for chunk in model.astream(messages):
|
||||
yield chunk
|
||||
439
steps/vmh/xai_chat_completion_api_step.py
Normal file
439
steps/vmh/xai_chat_completion_api_step.py
Normal file
@@ -0,0 +1,439 @@
|
||||
"""VMH xAI Chat Completions API
|
||||
|
||||
OpenAI-kompatible Chat Completions API mit xAI/LangChain Backend.
|
||||
Unterstützt file_search über xAI Collections (RAG).
|
||||
"""
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||
|
||||
|
||||
config = {
|
||||
"name": "VMH xAI Chat Completions API",
|
||||
"description": "OpenAI-compatible Chat Completions API with xAI LangChain backend",
|
||||
"flows": ["vmh-chat"],
|
||||
"triggers": [
|
||||
http("POST", "/vmh/v1/chat/completions")
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
||||
"""
|
||||
OpenAI-compatible Chat Completions endpoint.
|
||||
|
||||
Request Body (OpenAI format):
|
||||
{
|
||||
"model": "grok-2-latest",
|
||||
"messages": [
|
||||
{"role": "system", "content": "You are helpful"},
|
||||
{"role": "user", "content": "1234/56 Was ist der Stand?"}
|
||||
],
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 2000,
|
||||
"stream": false,
|
||||
"extra_body": {
|
||||
"collection_id": "col_abc123" // Optional: override auto-detection
|
||||
}
|
||||
}
|
||||
|
||||
Aktenzeichen-Erkennung (Priority):
|
||||
1. extra_body.collection_id (explicit override)
|
||||
2. First user message starts with Aktenzeichen (e.g., "1234/56 ...")
|
||||
3. Error 400 if no collection_id found (strict mode)
|
||||
|
||||
Response (OpenAI format):
|
||||
Non-Streaming:
|
||||
{
|
||||
"id": "chatcmpl-...",
|
||||
"object": "chat.completion",
|
||||
"created": 1234567890,
|
||||
"model": "grok-2-latest",
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"message": {"role": "assistant", "content": "..."},
|
||||
"finish_reason": "stop"
|
||||
}],
|
||||
"usage": {"prompt_tokens": X, "completion_tokens": Y, "total_tokens": Z}
|
||||
}
|
||||
|
||||
Streaming (SSE):
|
||||
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},...}]}
|
||||
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":" world"},...}]}
|
||||
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
|
||||
data: [DONE]
|
||||
"""
|
||||
from services.langchain_xai_service import LangChainXAIService
|
||||
from services.aktenzeichen_utils import extract_aktenzeichen, normalize_aktenzeichen
|
||||
from services.espocrm import EspoCRMAPI
|
||||
|
||||
ctx.logger.info("=" * 80)
|
||||
ctx.logger.info("💬 VMH CHAT COMPLETIONS API")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
try:
|
||||
# Parse request body
|
||||
body = request.body or {}
|
||||
|
||||
if not isinstance(body, dict):
|
||||
ctx.logger.error(f"❌ Invalid request body type: {type(body)}")
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={'error': 'Request body must be JSON object'}
|
||||
)
|
||||
|
||||
# Extract parameters
|
||||
model_name = body.get('model', 'grok-2-latest')
|
||||
messages = body.get('messages', [])
|
||||
temperature = body.get('temperature', 0.7)
|
||||
max_tokens = body.get('max_tokens')
|
||||
stream = body.get('stream', False)
|
||||
extra_body = body.get('extra_body', {})
|
||||
|
||||
ctx.logger.info(f"📋 Model: {model_name}")
|
||||
ctx.logger.info(f"📋 Messages: {len(messages)}")
|
||||
ctx.logger.info(f"📋 Stream: {stream}")
|
||||
ctx.logger.debug(f"Messages: {json.dumps(messages, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# Validate messages
|
||||
if not messages or not isinstance(messages, list):
|
||||
ctx.logger.error("❌ Missing or invalid messages array")
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={'error': 'messages must be non-empty array'}
|
||||
)
|
||||
|
||||
# Determine collection_id (Priority: extra_body > Aktenzeichen > error)
|
||||
collection_id: Optional[str] = None
|
||||
aktenzeichen: Optional[str] = None
|
||||
|
||||
# Priority 1: Explicit collection_id in extra_body
|
||||
if 'collection_id' in extra_body:
|
||||
collection_id = extra_body['collection_id']
|
||||
ctx.logger.info(f"🔍 Collection ID from extra_body: {collection_id}")
|
||||
|
||||
# Priority 2: Extract Aktenzeichen from first user message
|
||||
else:
|
||||
for msg in messages:
|
||||
if msg.get('role') == 'user':
|
||||
content = msg.get('content', '')
|
||||
aktenzeichen_raw = extract_aktenzeichen(content)
|
||||
|
||||
if aktenzeichen_raw:
|
||||
aktenzeichen = normalize_aktenzeichen(aktenzeichen_raw)
|
||||
ctx.logger.info(f"🔍 Aktenzeichen detected: {aktenzeichen}")
|
||||
|
||||
# Lookup collection_id via EspoCRM
|
||||
collection_id = await lookup_collection_by_aktenzeichen(
|
||||
aktenzeichen, ctx
|
||||
)
|
||||
|
||||
if collection_id:
|
||||
ctx.logger.info(f"✅ Collection found: {collection_id}")
|
||||
|
||||
# Remove Aktenzeichen from message (clean prompt)
|
||||
from services.aktenzeichen_utils import remove_aktenzeichen
|
||||
msg['content'] = remove_aktenzeichen(content)
|
||||
ctx.logger.debug(f"Cleaned message: {msg['content']}")
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ No collection found for {aktenzeichen}")
|
||||
|
||||
break # Only check first user message
|
||||
|
||||
# Priority 3: Error if no collection_id (strict mode)
|
||||
if not collection_id:
|
||||
ctx.logger.error("❌ No collection_id found (neither extra_body nor Aktenzeichen)")
|
||||
ctx.logger.error(" Provide collection_id in extra_body or start message with Aktenzeichen")
|
||||
return ApiResponse(
|
||||
status=400,
|
||||
body={
|
||||
'error': 'collection_id required',
|
||||
'message': 'Provide collection_id in extra_body or start message with Aktenzeichen (e.g., "1234/56 question")'
|
||||
}
|
||||
)
|
||||
|
||||
# Initialize LangChain xAI Service
|
||||
try:
|
||||
langchain_service = LangChainXAIService(ctx)
|
||||
except ValueError as e:
|
||||
ctx.logger.error(f"❌ Service initialization failed: {e}")
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={'error': 'Service configuration error', 'details': str(e)}
|
||||
)
|
||||
|
||||
# Create ChatXAI model
|
||||
model = langchain_service.get_chat_model(
|
||||
model=model_name,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens
|
||||
)
|
||||
|
||||
# Bind file_search tool
|
||||
model_with_tools = langchain_service.bind_file_search(
|
||||
model=model,
|
||||
collection_id=collection_id,
|
||||
max_num_results=10
|
||||
)
|
||||
|
||||
# Generate completion_id
|
||||
completion_id = f"chatcmpl-{ctx.traceId[:12]}" if hasattr(ctx, 'traceId') else f"chatcmpl-{int(time.time())}"
|
||||
created_ts = int(time.time())
|
||||
|
||||
# Branch: Streaming vs Non-Streaming
|
||||
if stream:
|
||||
ctx.logger.info("🌊 Starting streaming response...")
|
||||
return await handle_streaming_response(
|
||||
model_with_tools=model_with_tools,
|
||||
messages=messages,
|
||||
completion_id=completion_id,
|
||||
created_ts=created_ts,
|
||||
model_name=model_name,
|
||||
langchain_service=langchain_service,
|
||||
ctx=ctx
|
||||
)
|
||||
else:
|
||||
ctx.logger.info("📦 Starting non-streaming response...")
|
||||
return await handle_non_streaming_response(
|
||||
model_with_tools=model_with_tools,
|
||||
messages=messages,
|
||||
completion_id=completion_id,
|
||||
created_ts=created_ts,
|
||||
model_name=model_name,
|
||||
langchain_service=langchain_service,
|
||||
ctx=ctx
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error("❌ ERROR: CHAT COMPLETIONS API")
|
||||
ctx.logger.error("=" * 80)
|
||||
ctx.logger.error(f"Error: {e}", exc_info=True)
|
||||
ctx.logger.error(f"Request body: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||
ctx.logger.error("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=500,
|
||||
body={
|
||||
'error': 'Internal server error',
|
||||
'message': str(e)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def handle_non_streaming_response(
|
||||
model_with_tools,
|
||||
messages: List[Dict[str, Any]],
|
||||
completion_id: str,
|
||||
created_ts: int,
|
||||
model_name: str,
|
||||
langchain_service,
|
||||
ctx: FlowContext
|
||||
) -> ApiResponse:
|
||||
"""
|
||||
Handle non-streaming chat completion.
|
||||
|
||||
Returns:
|
||||
ApiResponse with OpenAI-format JSON body
|
||||
"""
|
||||
try:
|
||||
# Invoke model
|
||||
result = await langchain_service.invoke_chat(model_with_tools, messages)
|
||||
|
||||
# Extract content
|
||||
content = result.content if hasattr(result, 'content') else str(result)
|
||||
|
||||
# Build OpenAI-compatible response
|
||||
response_body = {
|
||||
'id': completion_id,
|
||||
'object': 'chat.completion',
|
||||
'created': created_ts,
|
||||
'model': model_name,
|
||||
'choices': [{
|
||||
'index': 0,
|
||||
'message': {
|
||||
'role': 'assistant',
|
||||
'content': content
|
||||
},
|
||||
'finish_reason': 'stop'
|
||||
}],
|
||||
'usage': {
|
||||
'prompt_tokens': 0, # LangChain doesn't expose token counts easily
|
||||
'completion_tokens': 0,
|
||||
'total_tokens': 0
|
||||
}
|
||||
}
|
||||
|
||||
# Log token usage (if available)
|
||||
if hasattr(result, 'usage_metadata'):
|
||||
usage = result.usage_metadata
|
||||
prompt_tokens = getattr(usage, 'input_tokens', 0)
|
||||
completion_tokens = getattr(usage, 'output_tokens', 0)
|
||||
response_body['usage'] = {
|
||||
'prompt_tokens': prompt_tokens,
|
||||
'completion_tokens': completion_tokens,
|
||||
'total_tokens': prompt_tokens + completion_tokens
|
||||
}
|
||||
ctx.logger.info(f"📊 Token Usage: prompt={prompt_tokens}, completion={completion_tokens}")
|
||||
|
||||
ctx.logger.info(f"✅ Chat completion: {len(content)} chars")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
return ApiResponse(
|
||||
status=200,
|
||||
body=response_body
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Non-streaming completion failed: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
async def handle_streaming_response(
|
||||
model_with_tools,
|
||||
messages: List[Dict[str, Any]],
|
||||
completion_id: str,
|
||||
created_ts: int,
|
||||
model_name: str,
|
||||
langchain_service,
|
||||
ctx: FlowContext
|
||||
):
|
||||
"""
|
||||
Handle streaming chat completion via SSE.
|
||||
|
||||
Returns:
|
||||
Streaming response generator
|
||||
"""
|
||||
async def stream_generator():
|
||||
try:
|
||||
# Set SSE headers
|
||||
await ctx.response.status(200)
|
||||
await ctx.response.headers({
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive"
|
||||
})
|
||||
|
||||
ctx.logger.info("🌊 Streaming started")
|
||||
|
||||
# Stream chunks
|
||||
chunk_count = 0
|
||||
total_content = ""
|
||||
|
||||
async for chunk in langchain_service.astream_chat(model_with_tools, messages):
|
||||
# Extract delta content
|
||||
delta = chunk.content if hasattr(chunk, "content") else ""
|
||||
|
||||
if delta:
|
||||
total_content += delta
|
||||
chunk_count += 1
|
||||
|
||||
# Build SSE data
|
||||
data = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created_ts,
|
||||
"model": model_name,
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"delta": {"content": delta},
|
||||
"finish_reason": None
|
||||
}]
|
||||
}
|
||||
|
||||
# Send SSE event
|
||||
await ctx.response.stream(f"data: {json.dumps(data, ensure_ascii=False)}\n\n")
|
||||
|
||||
# Send finish event
|
||||
finish_data = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
"created": created_ts,
|
||||
"model": model_name,
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"delta": {},
|
||||
"finish_reason": "stop"
|
||||
}]
|
||||
}
|
||||
await ctx.response.stream(f"data: {json.dumps(finish_data)}\n\n")
|
||||
|
||||
# Send [DONE]
|
||||
await ctx.response.stream("data: [DONE]\n\n")
|
||||
|
||||
# Close stream
|
||||
await ctx.response.close()
|
||||
|
||||
ctx.logger.info(f"✅ Streaming completed: {chunk_count} chunks, {len(total_content)} chars")
|
||||
ctx.logger.info("=" * 80)
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Streaming failed: {e}", exc_info=True)
|
||||
|
||||
# Send error event
|
||||
error_data = {
|
||||
"error": {
|
||||
"message": str(e),
|
||||
"type": "server_error"
|
||||
}
|
||||
}
|
||||
await ctx.response.stream(f"data: {json.dumps(error_data)}\n\n")
|
||||
await ctx.response.close()
|
||||
|
||||
return stream_generator()
|
||||
|
||||
|
||||
async def lookup_collection_by_aktenzeichen(
|
||||
aktenzeichen: str,
|
||||
ctx: FlowContext
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Lookup xAI Collection ID for Aktenzeichen via EspoCRM.
|
||||
|
||||
Search strategy:
|
||||
1. Search for Raeumungsklage with matching advowareAkteBezeichner
|
||||
2. Return xaiCollectionId if found
|
||||
|
||||
Args:
|
||||
aktenzeichen: Normalized Aktenzeichen (e.g., "1234/56")
|
||||
ctx: Motia context
|
||||
|
||||
Returns:
|
||||
Collection ID or None if not found
|
||||
"""
|
||||
try:
|
||||
# Initialize EspoCRM API
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
|
||||
# Search Räumungsklage by advowareAkteBezeichner
|
||||
ctx.logger.info(f"🔍 Searching Räumungsklage for Aktenzeichen: {aktenzeichen}")
|
||||
|
||||
search_result = await espocrm.search_entities(
|
||||
entity_type='Raeumungsklage',
|
||||
where=[{
|
||||
'type': 'equals',
|
||||
'attribute': 'advowareAkteBezeichner',
|
||||
'value': aktenzeichen
|
||||
}],
|
||||
select=['id', 'xaiCollectionId', 'advowareAkteBezeichner'],
|
||||
maxSize=1
|
||||
)
|
||||
|
||||
if search_result and len(search_result) > 0:
|
||||
entity = search_result[0]
|
||||
collection_id = entity.get('xaiCollectionId')
|
||||
|
||||
if collection_id:
|
||||
ctx.logger.info(f"✅ Found Räumungsklage: {entity.get('id')}")
|
||||
return collection_id
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ Räumungsklage found but no xaiCollectionId: {entity.get('id')}")
|
||||
else:
|
||||
ctx.logger.warn(f"⚠️ No Räumungsklage found for {aktenzeichen}")
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Collection lookup failed: {e}", exc_info=True)
|
||||
return None
|
||||
Reference in New Issue
Block a user