feat(graphiti): Integrate Langfuse Tracer for enhanced monitoring and tracing of Graphiti operations

feat(graphiti): Define entity and edge types in new graphiti_schema for structured data extraction
feat(graphiti): Enhance ingest_episode_event_step with schema integration for improved episode processing
This commit is contained in:
bsiggel
2026-03-30 22:00:53 +00:00
parent e255ae1263
commit b4ff370823
4 changed files with 1238 additions and 0 deletions

View File

@@ -12,6 +12,8 @@ from graphiti_core.llm_client import LLMConfig
from graphiti_core.llm_client.openai_generic_client import OpenAIGenericClient
from graphiti_core.embedder import OpenAIEmbedder, OpenAIEmbedderConfig
from services.graphiti_tracer import build_langfuse_tracer
class GraphitiError(Exception):
"""Fehler beim Zugriff auf den Graphiti-Client."""
@@ -89,12 +91,15 @@ async def _build_graphiti() -> Graphiti:
)
)
tracer = build_langfuse_tracer(span_prefix="graphiti", ctx=None)
client = Graphiti(
uri=neo4j_uri,
user=neo4j_user,
password=neo4j_password,
llm_client=llm_client,
embedder=embedder,
tracer=tracer,
)
await client.build_indices_and_constraints()
return client

1096
services/graphiti_schema.py Normal file

File diff suppressed because it is too large Load Diff

132
services/graphiti_tracer.py Normal file
View File

@@ -0,0 +1,132 @@
"""Langfuse-Tracer für Graphiti Knowledge-Graph.
Implementiert das graphiti_core.tracer.Tracer-Interface und leitet
alle Graphiti-Spans an Langfuse weiter.
Konfiguration (Umgebungsvariablen):
LANGFUSE_PUBLIC_KEY Langfuse Public Key (Pflicht)
LANGFUSE_SECRET_KEY Langfuse Secret Key (Pflicht)
LANGFUSE_HOST optional, Standard: https://cloud.langfuse.com
Wenn LANGFUSE_PUBLIC_KEY nicht gesetzt ist, wird ein NoOp-Tracer zurückgegeben.
"""
from __future__ import annotations
import os
from contextlib import contextmanager
from typing import Any, Generator, Optional
from graphiti_core.tracer import NoOpSpan, NoOpTracer, Tracer, TracerSpan
class LangfuseTracerSpan(TracerSpan):
"""Wrapper um einen aktiven Langfuse-Span."""
def __init__(self, langfuse_client: Any) -> None:
self._lf = langfuse_client
def add_attributes(self, attributes: dict[str, Any]) -> None:
"""Schreibt Graphiti-Attribute als Langfuse-Metadata."""
try:
self._lf.update_current_span(metadata=attributes)
except Exception:
pass
def set_status(self, status: str, description: str | None = None) -> None:
"""Übersetzt Graphiti-Status in den Langfuse-Level."""
try:
level = "ERROR" if status == "error" else "DEFAULT"
self._lf.update_current_span(level=level, status_message=description)
except Exception:
pass
def record_exception(self, exception: Exception) -> None:
"""Schreibt Exception als ERROR in den aktuellen Span."""
try:
self._lf.update_current_span(
level="ERROR",
status_message=f"{type(exception).__name__}: {exception}",
)
except Exception:
pass
class LangfuseTracer(Tracer):
"""Graphiti-Tracer-Implementierung auf Basis von Langfuse v3.
Jeder Graphiti-Span wird als Langfuse-Observation (type='span')
unter dem konfigurierten Präfix verschachtelt.
"""
def __init__(
self,
langfuse_client: Any,
span_prefix: str = "graphiti",
) -> None:
"""
Args:
langfuse_client: Initialisierter Langfuse()-Client.
span_prefix: Präfix für alle Span-Namen (z. B. "graphiti").
"""
self._lf = langfuse_client
self._prefix = span_prefix.rstrip(".")
@contextmanager
def start_span(self, name: str) -> Generator[LangfuseTracerSpan | NoOpSpan, None, None]:
"""Startet einen Langfuse-Span für den angegebenen Graphiti-Step."""
full_name = f"{self._prefix}.{name}"
try:
with self._lf.start_as_current_observation(name=full_name, as_type="span"):
yield LangfuseTracerSpan(self._lf)
except Exception:
yield NoOpSpan()
def build_langfuse_tracer(
span_prefix: str = "graphiti",
ctx: Optional[Any] = None,
) -> Tracer:
"""
Erstellt einen Langfuse-Tracer oder einen NoOp-Tracer (wenn Langfuse nicht konfiguriert).
Args:
span_prefix: Präfix für Span-Namen.
ctx: Optionaler Motia-Context für Logging.
Returns:
LangfuseTracer wenn LANGFUSE_PUBLIC_KEY gesetzt, sonst NoOpTracer.
"""
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
host = os.environ.get("LANGFUSE_HOST") # None → cloud.langfuse.com
def _log(msg: str) -> None:
if ctx is not None and hasattr(ctx, "logger"):
ctx.logger.info(f"[GraphitiTracer] {msg}")
else:
print(f"[GraphitiTracer] {msg}")
if not public_key or not secret_key:
_log("LANGFUSE_PUBLIC_KEY/SECRET_KEY nicht gesetzt Tracing deaktiviert (NoOp)")
return NoOpTracer()
try:
from langfuse import Langfuse
kwargs: dict[str, Any] = {
"public_key": public_key,
"secret_key": secret_key,
}
if host:
kwargs["host"] = host
lf_client = Langfuse(**kwargs)
_log(f"Langfuse-Tracer initialisiert (host={host or 'cloud.langfuse.com'}, prefix='{span_prefix}')")
return LangfuseTracer(lf_client, span_prefix=span_prefix)
except ImportError:
_log("langfuse nicht installiert NoOp-Tracer")
return NoOpTracer()
except Exception as e:
_log(f"Langfuse-Init fehlgeschlagen: {e} NoOp-Tracer")
return NoOpTracer()

View File

@@ -10,6 +10,7 @@ from typing import Any
from motia import FlowContext, queue
from services.graphiti_client import get_graphiti, GraphitiError
from services.graphiti_schema import ENTITY_TYPES, EDGE_TYPES, EDGE_TYPE_MAP, EXTRACTION_INSTRUCTIONS
from graphiti_core.nodes import EpisodeType
@@ -81,6 +82,10 @@ async def handler(event_data: dict[str, Any], ctx: FlowContext[Any]) -> None:
reference_time=reference_time,
source=EpisodeType.text,
group_id=rag_akten_id,
entity_types=ENTITY_TYPES,
edge_types=EDGE_TYPES,
edge_type_map=EDGE_TYPE_MAP,
custom_extraction_instructions=EXTRACTION_INSTRUCTIONS,
)
episode_id = result.episode.uuid