Files
motia-iii/services/graphiti_tracer.py
bsiggel b4ff370823 feat(graphiti): Integrate Langfuse Tracer for enhanced monitoring and tracing of Graphiti operations
feat(graphiti): Define entity and edge types in new graphiti_schema for structured data extraction
feat(graphiti): Enhance ingest_episode_event_step with schema integration for improved episode processing
2026-03-30 22:00:53 +00:00

133 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Langfuse-Tracer für Graphiti Knowledge-Graph.
Implementiert das graphiti_core.tracer.Tracer-Interface und leitet
alle Graphiti-Spans an Langfuse weiter.
Konfiguration (Umgebungsvariablen):
LANGFUSE_PUBLIC_KEY Langfuse Public Key (Pflicht)
LANGFUSE_SECRET_KEY Langfuse Secret Key (Pflicht)
LANGFUSE_HOST optional, Standard: https://cloud.langfuse.com
Wenn LANGFUSE_PUBLIC_KEY nicht gesetzt ist, wird ein NoOp-Tracer zurückgegeben.
"""
from __future__ import annotations
import os
from contextlib import contextmanager
from typing import Any, Generator, Optional
from graphiti_core.tracer import NoOpSpan, NoOpTracer, Tracer, TracerSpan
class LangfuseTracerSpan(TracerSpan):
"""Wrapper um einen aktiven Langfuse-Span."""
def __init__(self, langfuse_client: Any) -> None:
self._lf = langfuse_client
def add_attributes(self, attributes: dict[str, Any]) -> None:
"""Schreibt Graphiti-Attribute als Langfuse-Metadata."""
try:
self._lf.update_current_span(metadata=attributes)
except Exception:
pass
def set_status(self, status: str, description: str | None = None) -> None:
"""Übersetzt Graphiti-Status in den Langfuse-Level."""
try:
level = "ERROR" if status == "error" else "DEFAULT"
self._lf.update_current_span(level=level, status_message=description)
except Exception:
pass
def record_exception(self, exception: Exception) -> None:
"""Schreibt Exception als ERROR in den aktuellen Span."""
try:
self._lf.update_current_span(
level="ERROR",
status_message=f"{type(exception).__name__}: {exception}",
)
except Exception:
pass
class LangfuseTracer(Tracer):
"""Graphiti-Tracer-Implementierung auf Basis von Langfuse v3.
Jeder Graphiti-Span wird als Langfuse-Observation (type='span')
unter dem konfigurierten Präfix verschachtelt.
"""
def __init__(
self,
langfuse_client: Any,
span_prefix: str = "graphiti",
) -> None:
"""
Args:
langfuse_client: Initialisierter Langfuse()-Client.
span_prefix: Präfix für alle Span-Namen (z. B. "graphiti").
"""
self._lf = langfuse_client
self._prefix = span_prefix.rstrip(".")
@contextmanager
def start_span(self, name: str) -> Generator[LangfuseTracerSpan | NoOpSpan, None, None]:
"""Startet einen Langfuse-Span für den angegebenen Graphiti-Step."""
full_name = f"{self._prefix}.{name}"
try:
with self._lf.start_as_current_observation(name=full_name, as_type="span"):
yield LangfuseTracerSpan(self._lf)
except Exception:
yield NoOpSpan()
def build_langfuse_tracer(
span_prefix: str = "graphiti",
ctx: Optional[Any] = None,
) -> Tracer:
"""
Erstellt einen Langfuse-Tracer oder einen NoOp-Tracer (wenn Langfuse nicht konfiguriert).
Args:
span_prefix: Präfix für Span-Namen.
ctx: Optionaler Motia-Context für Logging.
Returns:
LangfuseTracer wenn LANGFUSE_PUBLIC_KEY gesetzt, sonst NoOpTracer.
"""
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
host = os.environ.get("LANGFUSE_HOST") # None → cloud.langfuse.com
def _log(msg: str) -> None:
if ctx is not None and hasattr(ctx, "logger"):
ctx.logger.info(f"[GraphitiTracer] {msg}")
else:
print(f"[GraphitiTracer] {msg}")
if not public_key or not secret_key:
_log("LANGFUSE_PUBLIC_KEY/SECRET_KEY nicht gesetzt Tracing deaktiviert (NoOp)")
return NoOpTracer()
try:
from langfuse import Langfuse
kwargs: dict[str, Any] = {
"public_key": public_key,
"secret_key": secret_key,
}
if host:
kwargs["host"] = host
lf_client = Langfuse(**kwargs)
_log(f"Langfuse-Tracer initialisiert (host={host or 'cloud.langfuse.com'}, prefix='{span_prefix}')")
return LangfuseTracer(lf_client, span_prefix=span_prefix)
except ImportError:
_log("langfuse nicht installiert NoOp-Tracer")
return NoOpTracer()
except Exception as e:
_log(f"Langfuse-Init fehlgeschlagen: {e} NoOp-Tracer")
return NoOpTracer()