feat(graphiti): Implement Graphiti client and related steps for episode ingestion and querying

- Added `graphiti_client.py` to manage the Graphiti client as a singleton.
- Created `ingest_episode_event_step.py` to handle episode ingestion from HTTP webhook events.
- Implemented `ingest_episode_step.py` for validating and enqueuing episode payloads via a POST request.
- Developed `query_graph_step.py` for performing semantic searches in the Graphiti Knowledge-Graph.
- Introduced an `__init__.py` file for the graphiti steps module.
This commit is contained in:
bsiggel
2026-03-30 08:25:49 +00:00
parent 1271e38f2d
commit e255ae1263
8 changed files with 567 additions and 1 deletions

View File

View File

@@ -0,0 +1,98 @@
"""Graphiti - Episode verarbeiten (Queue: graphiti.ingest_episode)
Empfängt das Event vom HTTP-Webhook und schreibt die Episode asynchron
in den Graphiti Knowledge-Graph (Neo4j + xAI LLM + OpenAI Embeddings).
"""
import json
from datetime import datetime, timezone
from typing import Any
from motia import FlowContext, queue
from services.graphiti_client import get_graphiti, GraphitiError
from graphiti_core.nodes import EpisodeType
config = {
"name": "Graphiti Ingest Episode Worker",
"description": "Verarbeitet eine Episode und schreibt sie in den Knowledge-Graph.",
"flows": ["graphiti-knowledge-graph"],
"triggers": [
queue("graphiti.ingest_episode")
],
"enqueues": [],
}
async def handler(event_data: dict[str, Any], ctx: FlowContext[Any]) -> None:
"""
Schreibt die Episode in Graphiti.
Erwartet im event_data:
rag_akten_id (str) group_id im Graph
rag_dokument_id (str) Episode-Name
chunk_ids (list[str])
content (str) Episoden-Text
source (str) z. B. "Zeugenvernehmung"
valid_at (str | None) ISO-String, None → jetzt
"""
ctx.logger.info("=" * 80)
ctx.logger.info("GRAPHITI INGEST WORKER: Starte Episode-Verarbeitung")
rag_akten_id: str = event_data.get("rag_akten_id", "")
rag_dokument_id: str = event_data.get("rag_dokument_id", "")
chunk_ids: list[str] = event_data.get("chunk_ids") or []
content: str = event_data.get("content", "")
source: str = event_data.get("source", "")
raw_valid_at: str | None = event_data.get("valid_at")
ctx.logger.info(f"Akte: {rag_akten_id}")
ctx.logger.info(f"Dokument: {rag_dokument_id}")
ctx.logger.info(f"Source: {source}")
# valid_at auflösen
if raw_valid_at:
try:
reference_time = datetime.fromisoformat(raw_valid_at)
if reference_time.tzinfo is None:
reference_time = reference_time.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
ctx.logger.info(f"valid_at '{raw_valid_at}' ungültig verwende jetzt")
reference_time = datetime.now(timezone.utc)
else:
ctx.logger.info("Kein valid_at verwende jetzt als reference_time")
reference_time = datetime.now(timezone.utc)
ctx.logger.info(f"reference_time: {reference_time.isoformat()}")
source_description = (
f"{source} | "
f"rag_dokument_id:{rag_dokument_id} | "
f"chunk_ids:{json.dumps(chunk_ids, ensure_ascii=False)}"
)
try:
graphiti = await get_graphiti(ctx)
result = await graphiti.add_episode(
name=rag_dokument_id,
episode_body=content,
source_description=source_description,
reference_time=reference_time,
source=EpisodeType.text,
group_id=rag_akten_id,
)
episode_id = result.episode.uuid
ctx.logger.info(f"Episode erfolgreich gespeichert: {episode_id}")
ctx.logger.info("=" * 80)
except GraphitiError as e:
ctx.logger.error(f"GraphitiError: {e}")
ctx.logger.error("=" * 80)
raise # Motia retries
except Exception as e:
ctx.logger.error(f"Unerwarteter Fehler: {type(e).__name__}: {e}")
ctx.logger.error("=" * 80)
raise

View File

@@ -0,0 +1,87 @@
"""Graphiti - Episode ingestieren (POST /ingest_episode)
Dünner HTTP-Webhook: validiert den Payload und enqueued ihn sofort.
Die eigentliche Graphiti-Arbeit passiert in ingest_episode_event_step.py.
"""
from datetime import datetime, timezone
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
config = {
"name": "Graphiti Ingest Episode",
"description": "Nimmt Episode-Payload entgegen, validiert und enqueued ihn.",
"flows": ["graphiti-knowledge-graph"],
"triggers": [
http("POST", "/ingest_episode")
],
"enqueues": ["graphiti.ingest_episode"],
}
def _parse_valid_at(raw: Any) -> str | None:
"""ISO-String validieren; gibt None zurück wenn ungültig/fehlend."""
if not raw or not isinstance(raw, str):
return None
try:
dt = datetime.fromisoformat(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
except (ValueError, TypeError):
return None
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Validiert den Ingest-Payload und enqueued ein graphiti.ingest_episode-Event.
Payload (JSON):
rag_akten_id (str, Pflicht)
rag_dokument_id (str, Pflicht)
chunk_ids (list[str], optional)
content (str, Pflicht)
source (str, Pflicht)
valid_at (ISO-String | null, optional)
Returns:
202 { status: "accepted", group_id, rag_dokument_id }
400 bei fehlendem Pflichtfeld
"""
ctx.logger.info("=" * 80)
ctx.logger.info("GRAPHITI INGEST WEBHOOK: Payload empfangen")
body: dict = request.body or {}
ctx.logger.info(f"Payload-Keys: {list(body.keys())}")
# Pflichtfelder
missing = [f for f in ("rag_akten_id", "rag_dokument_id", "content", "source") if not body.get(f)]
if missing:
ctx.logger.error(f"Fehlende Pflichtfelder: {missing}")
return ApiResponse(status=400, body={"error": f"Fehlende Pflichtfelder: {missing}"})
valid_at = _parse_valid_at(body.get("valid_at"))
event_data = {
"rag_akten_id": body["rag_akten_id"],
"rag_dokument_id": body["rag_dokument_id"],
"chunk_ids": body.get("chunk_ids") or [],
"content": body["content"],
"source": body["source"],
"valid_at": valid_at, # None → Queue-Step löst CRM-Fallback aus
}
await ctx.enqueue({"topic": "graphiti.ingest_episode", "data": event_data})
ctx.logger.info(f"Event 'graphiti.ingest_episode' enqueued für {body['rag_akten_id']}/{body['rag_dokument_id']}")
ctx.logger.info("=" * 80)
return ApiResponse(
status=202,
body={
"status": "accepted",
"group_id": body["rag_akten_id"],
"rag_dokument_id": body["rag_dokument_id"],
},
)

View File

@@ -0,0 +1,149 @@
"""Graphiti - Graph abfragen (POST /query_graph)
Führt eine semantische Suche im Graphiti Knowledge-Graph durch,
gefiltert nach group_id (= rag_akten_id). Optionaler time_point
schränkt das Ergebnis auf zum Zeitpunkt gültige Kanten ein.
"""
from datetime import datetime, timezone
from typing import Any
from motia import FlowContext, http, ApiRequest, ApiResponse
from graphiti_core.search.search_filters import DateFilter, ComparisonOperator, SearchFilters
from services.graphiti_client import get_graphiti, GraphitiError
config = {
"name": "Graphiti Query Graph",
"description": "Führt eine semantische Suche im Graphiti Knowledge-Graph durch.",
"flows": ["graphiti-knowledge-graph"],
"triggers": [
http("POST", "/query_graph")
],
"enqueues": [],
}
def _parse_dt(raw: Any) -> datetime | None:
"""ISO-String → timezone-aware datetime, oder None bei Fehler."""
if not raw or not isinstance(raw, str):
return None
try:
dt = datetime.fromisoformat(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def _serialize_edge(edge: Any) -> dict:
"""Serialisiert eine EntityEdge in ein dict für die JSON-Antwort."""
return {
"uuid": edge.uuid,
"name": edge.name,
"fact": edge.fact,
"valid_at": edge.valid_at.isoformat() if edge.valid_at else None,
"invalid_at": edge.invalid_at.isoformat() if edge.invalid_at else None,
"created_at": edge.created_at.isoformat() if edge.created_at else None,
"source_node_uuid": edge.source_node_uuid,
"target_node_uuid": edge.target_node_uuid,
}
# ---------------------------------------------------------------------------
# HTTP-Handler
# ---------------------------------------------------------------------------
async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
"""
Sucht im Graphiti Knowledge-Graph nach relevanten Fakten.
Payload (JSON):
rag_akten_id (str, Pflicht) filtert auf group_id
query (str, Pflicht) Suchanfrage
time_point (str, optional) ISO-Datum; filtert auf zum
Zeitpunkt gültige Kanten
Returns:
200 { status, group_id, query, time_point, num_results, results[] }
400 bei fehlendem Pflichtfeld
500 bei internem Fehler
"""
ctx.logger.info("=" * 80)
ctx.logger.info("GRAPHITI QUERY: Starte Graph-Suche")
body: dict = request.body or {}
ctx.logger.info(f"Payload-Keys: {list(body.keys())}")
# --- Pflichtfelder prüfen ---
missing = [f for f in ("rag_akten_id", "query") if not body.get(f)]
if missing:
ctx.logger.error(f"Fehlende Pflichtfelder: {missing}")
return ApiResponse(status=400, body={"error": f"Fehlende Pflichtfelder: {missing}"})
rag_akten_id: str = body["rag_akten_id"]
query: str = body["query"]
raw_time_point: Any = body.get("time_point")
time_point = _parse_dt(raw_time_point)
if raw_time_point and time_point is None:
ctx.logger.warning(
f"time_point konnte nicht geparst werden ({raw_time_point!r}), wird ignoriert"
)
ctx.logger.info(f"group_id (rag_akten_id): {rag_akten_id}")
ctx.logger.info(f"query: {query!r}")
ctx.logger.info(f"time_point: {time_point.isoformat() if time_point else 'keiner'}")
try:
graphiti = await get_graphiti(ctx)
# Zeitfilter aufbauen: Kanten die zum time_point noch gültig waren
# (valid_at <= time_point AND (invalid_at IS NULL OR invalid_at > time_point))
search_filter: SearchFilters | None = None
if time_point is not None:
search_filter = SearchFilters(
valid_at=[[DateFilter(
date=time_point,
comparison_operator=ComparisonOperator.less_than_equal,
)]],
invalid_at=[[DateFilter(
comparison_operator=ComparisonOperator.is_null,
)], [DateFilter(
date=time_point,
comparison_operator=ComparisonOperator.greater_than,
)]],
)
edges = await graphiti.search(
query=query,
group_ids=[rag_akten_id],
num_results=20,
search_filter=search_filter,
)
ctx.logger.info(f"Gefundene Kanten: {len(edges)}")
ctx.logger.info("=" * 80)
return ApiResponse(
status=200,
body={
"status": "success",
"group_id": rag_akten_id,
"query": query,
"time_point": time_point.isoformat() if time_point else None,
"num_results": len(edges),
"results": [_serialize_edge(e) for e in edges],
},
)
except GraphitiError as e:
ctx.logger.error(f"GraphitiError: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(status=500, body={"error": str(e)})
except Exception as e:
ctx.logger.error(f"Fehler bei der Graph-Suche: {type(e).__name__}: {e}")
ctx.logger.error("=" * 80)
return ApiResponse(status=500, body={"error": "Interner Fehler", "details": str(e)})