"""Langfuse-Tracer für Graphiti Knowledge-Graph. Implementiert das graphiti_core.tracer.Tracer-Interface und leitet alle Graphiti-Spans an Langfuse weiter. Konfiguration (Umgebungsvariablen): LANGFUSE_PUBLIC_KEY – Langfuse Public Key (Pflicht) LANGFUSE_SECRET_KEY – Langfuse Secret Key (Pflicht) LANGFUSE_HOST – optional, Standard: https://cloud.langfuse.com Wenn LANGFUSE_PUBLIC_KEY nicht gesetzt ist, wird ein NoOp-Tracer zurückgegeben. """ from __future__ import annotations import os from contextlib import contextmanager from typing import Any, Generator, Optional from graphiti_core.tracer import NoOpSpan, NoOpTracer, Tracer, TracerSpan class LangfuseTracerSpan(TracerSpan): """Wrapper um einen aktiven Langfuse-Span.""" def __init__(self, langfuse_client: Any) -> None: self._lf = langfuse_client def add_attributes(self, attributes: dict[str, Any]) -> None: """Schreibt Graphiti-Attribute als Langfuse-Metadata.""" try: self._lf.update_current_span(metadata=attributes) except Exception: pass def set_status(self, status: str, description: str | None = None) -> None: """Übersetzt Graphiti-Status in den Langfuse-Level.""" try: level = "ERROR" if status == "error" else "DEFAULT" self._lf.update_current_span(level=level, status_message=description) except Exception: pass def record_exception(self, exception: Exception) -> None: """Schreibt Exception als ERROR in den aktuellen Span.""" try: self._lf.update_current_span( level="ERROR", status_message=f"{type(exception).__name__}: {exception}", ) except Exception: pass class LangfuseTracer(Tracer): """Graphiti-Tracer-Implementierung auf Basis von Langfuse v3. Jeder Graphiti-Span wird als Langfuse-Observation (type='span') unter dem konfigurierten Präfix verschachtelt. """ def __init__( self, langfuse_client: Any, span_prefix: str = "graphiti", ) -> None: """ Args: langfuse_client: Initialisierter Langfuse()-Client. span_prefix: Präfix für alle Span-Namen (z. B. "graphiti"). """ self._lf = langfuse_client self._prefix = span_prefix.rstrip(".") @contextmanager def start_span(self, name: str) -> Generator[LangfuseTracerSpan | NoOpSpan, None, None]: """Startet einen Langfuse-Span für den angegebenen Graphiti-Step.""" full_name = f"{self._prefix}.{name}" try: with self._lf.start_as_current_observation(name=full_name, as_type="span"): yield LangfuseTracerSpan(self._lf) except Exception: yield NoOpSpan() def build_langfuse_tracer( span_prefix: str = "graphiti", ctx: Optional[Any] = None, ) -> Tracer: """ Erstellt einen Langfuse-Tracer oder einen NoOp-Tracer (wenn Langfuse nicht konfiguriert). Args: span_prefix: Präfix für Span-Namen. ctx: Optionaler Motia-Context für Logging. Returns: LangfuseTracer wenn LANGFUSE_PUBLIC_KEY gesetzt, sonst NoOpTracer. """ public_key = os.environ.get("LANGFUSE_PUBLIC_KEY") secret_key = os.environ.get("LANGFUSE_SECRET_KEY") host = os.environ.get("LANGFUSE_HOST") # None → cloud.langfuse.com def _log(msg: str) -> None: if ctx is not None and hasattr(ctx, "logger"): ctx.logger.info(f"[GraphitiTracer] {msg}") else: print(f"[GraphitiTracer] {msg}") if not public_key or not secret_key: _log("LANGFUSE_PUBLIC_KEY/SECRET_KEY nicht gesetzt – Tracing deaktiviert (NoOp)") return NoOpTracer() try: from langfuse import Langfuse kwargs: dict[str, Any] = { "public_key": public_key, "secret_key": secret_key, } if host: kwargs["host"] = host lf_client = Langfuse(**kwargs) _log(f"Langfuse-Tracer initialisiert (host={host or 'cloud.langfuse.com'}, prefix='{span_prefix}')") return LangfuseTracer(lf_client, span_prefix=span_prefix) except ImportError: _log("langfuse nicht installiert – NoOp-Tracer") return NoOpTracer() except Exception as e: _log(f"Langfuse-Init fehlgeschlagen: {e} – NoOp-Tracer") return NoOpTracer()