From b4841524c3df1cf7a6b6ed602a930276b37df238 Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Wed, 4 Feb 2026 16:54:53 -0500 Subject: [PATCH 1/5] Add evaluation harness with experiment and trace evaluation support --- .../aieng/agent_evals/async_utils.py | 94 +++- .../aieng/agent_evals/evaluation/__init__.py | 47 ++ .../agent_evals/evaluation/experiment.py | 213 ++++++++ .../aieng/agent_evals/evaluation/trace.py | 481 ++++++++++++++++++ .../aieng/agent_evals/evaluation/types.py | 182 +++++++ 5 files changed, 1001 insertions(+), 16 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/evaluation/__init__.py create mode 100644 aieng-eval-agents/aieng/agent_evals/evaluation/experiment.py create mode 100644 aieng-eval-agents/aieng/agent_evals/evaluation/trace.py create mode 100644 aieng-eval-agents/aieng/agent_evals/evaluation/types.py diff --git a/aieng-eval-agents/aieng/agent_evals/async_utils.py b/aieng-eval-agents/aieng/agent_evals/async_utils.py index cf4e5cc..b64769c 100644 --- a/aieng-eval-agents/aieng/agent_evals/async_utils.py +++ b/aieng-eval-agents/aieng/agent_evals/async_utils.py @@ -1,15 +1,18 @@ """Utils for async workflows.""" import asyncio -import types -from typing import Any, Awaitable, Callable, Coroutine, Sequence, TypeVar +import contextvars +import threading +from concurrent.futures import Future +from typing import Any, Awaitable, Callable, Coroutine, ParamSpec, Sequence, TypeVar -from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn, TimeRemainingColumn +from aieng.agent_evals.progress import create_progress T = TypeVar("T") +P = ParamSpec("P") -__all__ = ["rate_limited", "gather_with_progress"] +__all__ = ["rate_limited", "gather_with_progress", "run_coroutine_sync"] async def rate_limited(_fn: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore) -> T: @@ -32,7 +35,7 @@ async def rate_limited(_fn: Callable[[], Awaitable[T]], semaphore: asyncio.Semap async def gather_with_progress( - coros: "list[types.CoroutineType[Any, Any, T]]", + coros: Sequence[Coroutine[Any, Any, T]], description: str = "Running tasks", ) -> Sequence[T]: """Run a list of coroutines concurrently and display a rich.Progress bar. @@ -41,7 +44,7 @@ async def gather_with_progress( Parameters ---------- - coros : list[CoroutineType[Any, Any, T]] + coros : Sequence[Coroutine[Any, Any, T]] List of coroutines to run. description : str, optional Description to show in the progress bar, by default "Running tasks". @@ -57,14 +60,8 @@ async def gather_with_progress( # Pre‐allocate a results list; we'll fill in each slot as its Task completes results: list[T | None] = [None] * len(tasks) - # Create and start a Progress bar with a total equal to the number of tasks - with Progress( - TextColumn("[bold blue]{task.description}"), - BarColumn(), - TextColumn("{task.completed}/{task.total}"), - TimeRemainingColumn(), - TimeElapsedColumn(), - ) as progress: + # Use the shared progress style with a total equal to the number of tasks. + with create_progress() as progress: progress_task = progress.add_task(description, total=len(tasks)) # as_completed yields each Task as soon as it finishes @@ -78,14 +75,79 @@ async def gather_with_progress( return results # type: ignore -async def _indexed(index: int, coro: Coroutine[None, None, T]) -> tuple[int, T]: +def run_coroutine_sync( + coro_fn: Callable[P, Coroutine[Any, Any, T]], + /, + *args: P.args, + **kwargs: P.kwargs, +) -> T: + """Run an async callable from synchronous code. + + This helper supports both normal Python scripts (no active event loop) + and environments that already have a running loop (for example notebooks). + + Parameters + ---------- + coro_fn : Callable[P, Coroutine[Any, Any, T]] + Async callable to execute. + *args : P.args + Positional arguments forwarded to ``coro_fn``. + **kwargs : P.kwargs + Keyword arguments forwarded to ``coro_fn``. + + Returns + ------- + T + Value returned by the awaited coroutine. + + Raises + ------ + BaseException + Re-raises any exception raised by ``coro_fn``. + + Notes + ----- + If a loop is already running, the coroutine is executed in a dedicated + worker thread with its own event loop. + """ + # Check whether this thread already has a running event loop. + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop: asyncio.run is the simplest and safest path. + return asyncio.run(coro_fn(*args, **kwargs)) + + # Create a thread-safe container for either a return value or an exception. + result: Future[T] = Future() + # Copy ContextVar values so request-local state survives the thread hop. + context = contextvars.copy_context() + + def _runner() -> None: + """Execute coroutine on a dedicated event loop.""" + try: + # Run the coroutine in the copied context on a new loop in worker thread. + result.set_result(context.run(asyncio.run, coro_fn(*args, **kwargs))) + except BaseException as exc: # pragma: no cover - surfaced to caller + # Capture failure so the caller re-raises the original exception. + result.set_exception(exc) + + # Use a separate thread to avoid calling asyncio.run inside an active loop. + thread = threading.Thread(target=_runner) + thread.start() + thread.join() + + # Return the value, or re-raise any exception stored in the Future. + return result.result() + + +async def _indexed(index: int, coro: Coroutine[Any, Any, T]) -> tuple[int, T]: """Return (index, await coro). Parameters ---------- index : int The index to pair with the coroutine result. - coro : Coroutine[None, None, T] + coro : Coroutine[Any, Any, T] The coroutine to await. Returns diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/__init__.py b/aieng-eval-agents/aieng/agent_evals/evaluation/__init__.py new file mode 100644 index 0000000..310dc98 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/__init__.py @@ -0,0 +1,47 @@ +"""Evaluation harness. + +This package provides a beginner-friendly wrapper around Langfuse's +``dataset.run_experiment`` workflow, plus optional trace-based +second-pass evaluators. +""" + +from .experiment import run_experiment, run_experiment_with_trace_evals +from .trace import extract_trace_metrics, run_trace_evaluations +from .types import ( + CompositeEvaluatorFunction, + Evaluation, + EvaluationResult, + EvaluatorFunction, + ExperimentItemResult, + ExperimentResult, + RunEvaluatorFunction, + TaskFunction, + TraceEvalResult, + TraceEvalStatus, + TraceEvaluatorFunction, + TraceMetrics, + TraceObservationPredicate, + TraceWaitConfig, +) + + +__all__ = [ + "run_experiment", + "run_experiment_with_trace_evals", + "run_trace_evaluations", + "extract_trace_metrics", + "CompositeEvaluatorFunction", + "Evaluation", + "EvaluatorFunction", + "ExperimentItemResult", + "ExperimentResult", + "EvaluationResult", + "RunEvaluatorFunction", + "TaskFunction", + "TraceEvaluatorFunction", + "TraceEvalStatus", + "TraceEvalResult", + "TraceMetrics", + "TraceObservationPredicate", + "TraceWaitConfig", +] diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/experiment.py b/aieng-eval-agents/aieng/agent_evals/evaluation/experiment.py new file mode 100644 index 0000000..5a5cd15 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/experiment.py @@ -0,0 +1,213 @@ +"""Langfuse experiment wrapper for the evaluation harness.""" + +from typing import Any + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.evaluation.trace import run_trace_evaluations +from aieng.agent_evals.evaluation.types import ( + CompositeEvaluatorFunction, + EvaluationResult, + EvaluatorFunction, + ExperimentResult, + RunEvaluatorFunction, + TaskFunction, + TraceEvaluatorFunction, + TraceWaitConfig, +) + + +def run_experiment( + dataset_name: str, + *, + name: str, + task: TaskFunction, + evaluators: list[EvaluatorFunction], + composite_evaluator: CompositeEvaluatorFunction | None = None, + run_evaluators: list[RunEvaluatorFunction] | None = None, + description: str | None = None, + run_name: str | None = None, + max_concurrency: int = 10, + metadata: dict[str, Any] | None = None, +) -> ExperimentResult: + """Run evaluators over a Langfuse dataset as an experiment. + + This is a thin convenience layer around Langfuse's ``dataset.run_experiment`` + that includes fetching the dataset by name and reusing the shared client. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + name : str + Human-readable name for the experiment run. + task : TaskFunction + Function that executes the agent for a single dataset item. + evaluators : list[EvaluatorFunction] + Item-level evaluators that grade each output. + composite_evaluator : CompositeEvaluatorFunction | None, optional, default=None + Receives the same inputs as item-level evaluators + ``(input, output, expected_output, metadata)`` plus the list of evaluations + from item-level evaluators. Useful for weighted averages, pass/fail decisions + based on multiple criteria, or custom scoring logic combining multiple metrics. + run_evaluators : list[RunEvaluatorFunction] | None, optional, default=None + Run-level evaluators that compute aggregate metrics. + description : str | None, optional, default=None + Description of the experiment for the Langfuse UI. + run_name : str | None, optional, default=None + Explicit dataset run name override. + max_concurrency : int, optional, default=10 + Maximum number of concurrent task executions. + metadata : dict[str, Any] | None, optional, default=None + Metadata attached to the dataset run and traces. + + Returns + ------- + ExperimentResult + The Langfuse experiment result with item and run evaluations. + + Examples + -------- + >>> from aieng.agent_evals.evaluation import run_experiment + >>> from langfuse.experiment import Evaluation + >>> def task(*, input, **kwargs): + ... return {"answer": input["question"]} + >>> def exact_match(*, output, expected_output, **kwargs): + ... is_match = output["answer"] == expected_output["answer"] + ... return Evaluation(name="exact_match", value=is_match) + >>> result = run_experiment( + ... "qa_dataset", + ... name="baseline", + ... task=task, + ... evaluators=[exact_match], + ... ) + >>> isinstance(result.item_results, list) + True + """ + # The client manager keeps a shared Langfuse client so we avoid re-auth + # and let users decide when to close it. + client_manager = AsyncClientManager.get_instance() + langfuse_client = client_manager.langfuse_client + + # Fetch the dataset by name + dataset = langfuse_client.get_dataset(dataset_name) + + return dataset.run_experiment( + name=name, + run_name=run_name, + description=description, + task=task, + evaluators=evaluators, + composite_evaluator=composite_evaluator, + run_evaluators=run_evaluators or [], + max_concurrency=max_concurrency, + metadata=metadata, + ) + + +def run_experiment_with_trace_evals( + dataset_name: str, + *, + name: str, + task: TaskFunction, + evaluators: list[EvaluatorFunction], + trace_evaluators: list[TraceEvaluatorFunction], + composite_evaluator: CompositeEvaluatorFunction | None = None, + run_evaluators: list[RunEvaluatorFunction] | None = None, + description: str | None = None, + run_name: str | None = None, + max_concurrency: int = 10, + metadata: dict[str, Any] | None = None, + trace_wait: TraceWaitConfig | None = None, + trace_max_concurrency: int = 10, +) -> EvaluationResult: + """Run an experiment and then evaluate traces in a second pass. + + This helper encapsulates a two-pass workflow that first runs an experiment + to produce outputs and trace IDs, then waits for trace ingestion and runs + trace evaluators in a second pass. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + name : str + Human-readable name for the experiment run. + task : TaskFunction + Function that executes the agent for a dataset item. + evaluators : list[EvaluatorFunction] + Item-level evaluators that grade each output. + trace_evaluators : list[TraceEvaluatorFunction] + Trace-level evaluators that grade tool use, groundedness, and + other trace-derived metrics. + composite_evaluator : CompositeEvaluatorFunction | None, optional, default=None + Receives the same inputs as item-level evaluators + ``(input, output, expected_output, metadata)`` plus the list of evaluations + from item-level evaluators. Useful for weighted averages, pass/fail decisions + based on multiple criteria, or custom scoring logic combining multiple metrics. + run_evaluators : list[RunEvaluatorFunction] | None, optional, default=None + Run-level evaluators that compute aggregate metrics. + description : str | None, optional, default=None + Description of the experiment for the Langfuse UI. + run_name : str | None, optional, default=None + Explicit dataset run name override. + max_concurrency : int, optional, default=10 + Maximum number of concurrent task executions. + metadata : dict[str, Any] | None, optional, default=None + Metadata attached to the dataset run and traces. + trace_wait : TraceWaitConfig | None, optional, default=None + Trace polling configuration for the second pass. + trace_max_concurrency : int, optional, default=10 + Maximum number of concurrent trace evaluations. + + Returns + ------- + EvaluationResult + A container with the experiment result and trace evaluations. + + Examples + -------- + >>> from aieng.agent_evals.evaluation import run_experiment_with_trace_evals + >>> from langfuse.experiment import Evaluation + >>> def task(*, input, **kwargs): + ... return {"answer": input["question"]} + >>> def exact_match(*, output, expected_output, **kwargs): + ... is_match = output["answer"] == expected_output["answer"] + ... return Evaluation(name="exact_match", value=is_match) + >>> def trace_latency(*, trace, item_result, **kwargs): + ... return Evaluation(name="latency_sec", value=trace.latency or 0.0) + >>> result = run_experiment_with_trace_evals( + ... "qa_dataset", + ... name="baseline-with-trace-evals", + ... task=task, + ... evaluators=[exact_match], + ... trace_evaluators=[trace_latency], + ... ) + >>> result.trace_evaluations is not None + True + """ + # Pass 1 produces outputs and trace IDs; trace data itself may still be ingesting. + experiment_result = run_experiment( + dataset_name, + name=name, + task=task, + evaluators=evaluators, + composite_evaluator=composite_evaluator, + run_evaluators=run_evaluators, + description=description, + run_name=run_name, + max_concurrency=max_concurrency, + metadata=metadata, + ) + + # Pass 2 waits for trace completeness before grading tool use and groundedness. + trace_result = run_trace_evaluations( + experiment_result, + trace_evaluators, + wait=trace_wait, + max_concurrency=trace_max_concurrency, + ) + + return EvaluationResult(experiment=experiment_result, trace_evaluations=trace_result) + + +__all__ = ["run_experiment", "run_experiment_with_trace_evals"] diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py new file mode 100644 index 0000000..75e9173 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -0,0 +1,481 @@ +"""Trace-based evaluation helpers for the harness.""" + +import asyncio +import functools +import inspect +import logging +from typing import Any, Awaitable, Literal + +import httpx +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.async_utils import gather_with_progress, rate_limited, run_coroutine_sync +from aieng.agent_evals.evaluation.types import ( + Evaluation, + ExperimentItemResult, + ExperimentResult, + TraceEvalResult, + TraceEvalStatus, + TraceEvaluatorFunction, + TraceMetrics, + TraceObservationPredicate, + TraceWaitConfig, +) +from aieng.agent_evals.langfuse import flush_traces +from langfuse import Langfuse +from langfuse.api import ObservationsView +from langfuse.api.core import ApiError +from langfuse.api.resources import NotFoundError +from langfuse.api.resources.commons.types.trace_with_full_details import TraceWithFullDetails +from tenacity import AsyncRetrying, RetryError, retry_if_exception, stop_after_delay, wait_exponential + + +logger = logging.getLogger(__name__) + + +def run_trace_evaluations( + experiment_result: ExperimentResult, + trace_evaluators: list[TraceEvaluatorFunction], + *, + wait: TraceWaitConfig | None = None, + max_concurrency: int = 10, +) -> TraceEvalResult: + """Evaluate traces for each experiment item. + + Parameters + ---------- + experiment_result : ExperimentResult + Result returned by Langfuse ``run_experiment``. + trace_evaluators : list[TraceEvaluatorFunction] + Trace-level evaluators to apply to each trace. Evaluators can return + ``Evaluation``/``list[Evaluation]`` directly or as awaitables. + wait : TraceWaitConfig | None, optional, default=None + Configuration for waiting until traces are fully populated. + max_concurrency : int, optional, default=10 + Maximum number of trace evaluations to run in parallel. + + Returns + ------- + TraceEvalResult + Container with trace evaluation outputs and error metadata. + + Examples + -------- + >>> from aieng.agent_evals.evaluation import run_experiment, run_trace_evaluations + >>> from langfuse.experiment import Evaluation + >>> def task(*, input, **kwargs): + ... return {"answer": input["question"]} + >>> def exact_match(*, output, expected_output, **kwargs): + ... is_match = output["answer"] == expected_output["answer"] + ... return Evaluation(name="exact_match", value=is_match) + >>> def trace_turn_count(*, trace, item_result, **kwargs): + ... return Evaluation(name="turn_count", value=len(trace.observations or [])) + >>> experiment_result = run_experiment( + ... "qa_dataset", + ... name="baseline", + ... task=task, + ... evaluators=[exact_match], + ... ) + >>> trace_result = run_trace_evaluations(experiment_result, [trace_turn_count]) + >>> isinstance(trace_result.evaluations_by_trace_id, dict) + True + """ + return run_coroutine_sync( + _run_trace_evaluations_async, + experiment_result=experiment_result, + trace_evaluators=trace_evaluators, + wait=wait, + max_concurrency=max_concurrency, + ) + + +async def _run_trace_evaluations_async( + experiment_result: ExperimentResult, + trace_evaluators: list[TraceEvaluatorFunction], + *, + wait: TraceWaitConfig | None = None, + max_concurrency: int = 10, +) -> TraceEvalResult: + """Run trace evaluations asynchronously with bounded concurrency.""" + result = TraceEvalResult() + wait_config = wait or TraceWaitConfig() + + item_results = [item_result for item_result in experiment_result.item_results if item_result.trace_id] + if not item_results or not trace_evaluators: + logger.info("No trace evaluations to run; skipping trace evaluation pass.") + return result + + client_manager = AsyncClientManager.get_instance() + langfuse_client = client_manager.langfuse_client + semaphore = asyncio.Semaphore(max_concurrency) + + async def _evaluate_item( + item_result: ExperimentItemResult, + ) -> tuple[str, list[Evaluation], TraceEvalStatus, str | None]: + trace_id = item_result.trace_id or "" + evaluations, status, error_message = await _evaluate_trace( + langfuse_client=langfuse_client, + item_result=item_result, + trace_evaluators=trace_evaluators, + wait=wait_config, + ) + return trace_id, evaluations, status, error_message + + trace_eval_coroutines = [ + rate_limited( + _fn=functools.partial(_evaluate_item, item_result), + semaphore=semaphore, + ) + for item_result in item_results + ] + task_results = await gather_with_progress(trace_eval_coroutines, description="Evaluating traces") + + for trace_id, evaluations, status, error_message in task_results: + if status == TraceEvalStatus.SKIPPED: + result.skipped_trace_ids.append(trace_id) + result.errors_by_trace_id[trace_id] = error_message or "Trace not ready for evaluation" + logger.info("Trace %s evaluation skipped: %s", trace_id, error_message) + continue + if status == TraceEvalStatus.FAILED: + result.failed_trace_ids.append(trace_id) + result.errors_by_trace_id[trace_id] = error_message or "Trace evaluation failed" + logger.warning("Trace %s evaluation failed: %s", trace_id, error_message) + continue + result.evaluations_by_trace_id[trace_id] = evaluations + + flush_traces() + return result + + +def extract_trace_metrics( + trace: TraceWithFullDetails, + *, + tool_call_predicate: TraceObservationPredicate | None = None, + turn_predicate: TraceObservationPredicate | None = None, +) -> TraceMetrics: + """Extract common metrics from a Langfuse trace. + + This helper provides a consistent, best-effort way to compute + trace-derived statistics such as tool call counts and turns. + Heuristics are intentionally conservative and can be overridden + with custom predicates when instrumentation differs. + + Parameters + ---------- + trace : TraceWithFullDetails + Trace object returned by the Langfuse API. + tool_call_predicate : TraceObservationPredicate | None, optional + Predicate that returns True for observations that represent tool calls. + Defaults to a heuristic based on observation type, name, and metadata. + turn_predicate : TraceObservationPredicate | None, optional + Predicate that returns True for observations that represent assistant + turns or model generations. Defaults to a heuristic based on + observation type and metadata. + + Returns + ------- + TraceMetrics + A dataclass containing extracted metrics. + + Examples + -------- + >>> from types import SimpleNamespace + >>> from aieng.agent_evals.evaluation import extract_trace_metrics + >>> trace = SimpleNamespace(observations=[], latency=0.12, total_cost=None) + >>> metrics = extract_trace_metrics(trace) + >>> metrics.observation_count + 0 + """ + observations = trace.observations or [] + tool_predicate = tool_call_predicate or _default_tool_call_predicate + turn_predicate_fn = turn_predicate or _default_turn_predicate + + tool_call_count = sum(1 for obs in observations if tool_predicate(obs)) + turn_count = sum(1 for obs in observations if turn_predicate_fn(obs)) + total_input_tokens = _sum_token_usage(observations, token_type="input") + total_output_tokens = _sum_token_usage(observations, token_type="output") + total_cost = _extract_total_cost(trace, observations) + + # Latency is provided directly by Langfuse; we keep it as-is to avoid + # computing potentially misleading fallbacks from partial observations. + latency_sec = trace.latency + + return TraceMetrics( + tool_call_count=tool_call_count, + turn_count=turn_count, + observation_count=len(observations), + latency_sec=latency_sec, + total_input_tokens=total_input_tokens, + total_output_tokens=total_output_tokens, + total_cost=total_cost, + ) + + +async def _evaluate_trace( + langfuse_client: Langfuse, + item_result: ExperimentItemResult, + trace_evaluators: list[TraceEvaluatorFunction], + wait: TraceWaitConfig, +) -> tuple[list[Evaluation], TraceEvalStatus, str | None]: + """Fetch and evaluate a single trace. + + Returns + ------- + tuple[list[Evaluation], TraceEvalStatus, str | None] + Evaluations, status string, and optional error detail. + Status is one of "ok", "skipped", or "failed". + """ + trace_id = item_result.trace_id + if not trace_id: + return [], TraceEvalStatus.SKIPPED, "Missing `trace_id` on experiment item result." + + try: + trace, ready = await _fetch_trace_with_wait(langfuse_client, trace_id, wait) + except Exception as exc: + return [], TraceEvalStatus.FAILED, f"Trace fetch failed: {exc}" + + if trace is None or not ready: + return [], TraceEvalStatus.SKIPPED, "Trace did not become ready within wait window." + + evaluations: list[Evaluation] = [] + for evaluator in trace_evaluators: + try: + raw_result = evaluator(trace=trace, item_result=item_result) + evaluations.extend(await _normalize_evaluations(raw_result)) + except Exception as exc: + evaluator_name = _get_evaluator_name(evaluator) + return [], TraceEvalStatus.FAILED, f"Trace evaluator '{evaluator_name}' failed: {exc}" + + # Persist scores so they appear alongside traces in the Langfuse UI. + _upload_trace_scores(langfuse_client, trace_id, evaluations) + + return evaluations, TraceEvalStatus.OK, None + + +async def _fetch_trace_with_wait( + langfuse_client: Langfuse, trace_id: str, wait: TraceWaitConfig +) -> tuple[TraceWithFullDetails | None, bool]: + """Fetch a trace with retry/backoff until it is ready or timeout expires.""" + last_trace: TraceWithFullDetails | None = None + + retrying = AsyncRetrying( + stop=stop_after_delay(wait.max_wait_sec), + wait=wait_exponential( + multiplier=wait.backoff_multiplier, + min=wait.initial_delay_sec, + max=wait.max_delay_sec, + ), + retry=retry_if_exception(_is_retryable_trace_fetch_error), + reraise=False, + ) + + try: + async for attempt in retrying: + with attempt: + trace = await langfuse_client.async_api.trace.get(trace_id) + last_trace = trace + + if _trace_ready(trace): + return trace, True + + raise _TraceNotReadyError("Trace input/output not ready.") + except RetryError: + pass + + return last_trace, bool(last_trace and _trace_ready(last_trace)) + + +class _TraceNotReadyError(Exception): + """Internal signal used to retry until trace readiness criteria are met.""" + + +def _is_retryable_trace_fetch_error(exc: BaseException) -> bool: + """Return True if the exception indicates a retryable trace fetch error.""" + if isinstance(exc, (_TraceNotReadyError, NotFoundError, httpx.TransportError)): + return True + + if isinstance(exc, ApiError): + status = exc.status_code + return status in (408, 429) or (status is not None and status >= 500) + + return False + + +def _trace_ready(trace: TraceWithFullDetails) -> bool: + """Check whether a trace has the required fields to evaluate.""" + # This is a heuristic. The input and output fields being populated is a strong + # signal that the trace is complete, however, it not every field of the trace + # might be fully populated depending on instrumentation. + return trace.input is not None and trace.output is not None + + +def _default_tool_call_predicate(observation: ObservationsView) -> bool: + """Best-effort heuristic for identifying tool call observations.""" + obs_type = (observation.type or "").lower() + name = (observation.name or "").lower() + metadata = observation.metadata + + if "tool" in obs_type or "tool" in name: + return True + + # Some instrumentations store tool metadata for function calls. + if isinstance(metadata, dict): + for key in ("tool_name", "tool", "function", "function_name"): + if key in metadata: + return True + + return False + + +def _default_turn_predicate(observation: ObservationsView) -> bool: + """Best-effort heuristic for identifying assistant turns.""" + obs_type = (observation.type or "").lower() + name = (observation.name or "").lower() + metadata = observation.metadata + + if "generation" in obs_type: + return True + + if "assistant" in name or "response" in name: + return True + + if isinstance(metadata, dict): + role = str(metadata.get("role", "")).lower() + if role == "assistant": + return True + + return False + + +def _sum_token_usage(observations: list[ObservationsView], *, token_type: str) -> int: + """Aggregate token usage for a specific type across observations.""" + total = 0 + usage_keys = _usage_keys_for_token_type(token_type) + + for observation in observations: + usage_details = observation.usage_details + for key in usage_keys: + value = usage_details.get(key) + if value is not None: + total += value + break # Only count the first matching key per observation. + + return total + + +def _extract_total_cost(trace: TraceWithFullDetails, observations: list[ObservationsView]) -> float | None: + """Extract total trace cost, preferring trace-level totals when available.""" + trace_total_cost = trace.total_cost + if trace_total_cost is not None: + return trace_total_cost + + total_cost = 0.0 + saw_cost = False + + for observation in observations: + cost_details = observation.cost_details + for key in ("total", "total_cost", "totalCost"): + value = cost_details.get(key) + if value is not None: + total_cost += value + saw_cost = True + break # Only count the first matching key per observation. + + if saw_cost: + return total_cost + return None + + +def _usage_keys_for_token_type(token_type: str) -> tuple[str, ...]: + """Return common Langfuse/provider keys for a token usage type.""" + if token_type == "input": + return ("input", "input_tokens", "inputTokens", "prompt_tokens", "promptTokens") + if token_type == "output": + return ("output", "output_tokens", "outputTokens", "completion_tokens", "completionTokens") + return (token_type,) + + +def _get_evaluator_name(evaluator: Any) -> str: + """Best-effort evaluator name for error messages.""" + target = evaluator + if isinstance(target, functools.partial): + target = target.func + + name = getattr(target, "__name__", None) + if isinstance(name, str) and name: + return name + + return target.__class__.__name__ + + +async def _normalize_evaluations( + result: Evaluation | list[Evaluation] | Awaitable[Evaluation | list[Evaluation]], +) -> list[Evaluation]: + """Normalize evaluator outputs (including awaitables) to Evaluation objects.""" + resolved_result: Any = result + if inspect.isawaitable(resolved_result): + resolved_result = await resolved_result + + if isinstance(resolved_result, Evaluation): + return [resolved_result] + + # Accept dict outputs to mirror Langfuse evaluator return conventions. + if isinstance(resolved_result, dict): + return [Evaluation(**resolved_result)] + + if isinstance(resolved_result, list): + normalized: list[Evaluation] = [] + for item in resolved_result: + if isinstance(item, Evaluation): + normalized.append(item) + elif isinstance(item, dict): + normalized.append(Evaluation(**item)) + return normalized + + return [] + + +def _upload_trace_scores(langfuse_client: Langfuse, trace_id: str, evaluations: list[Evaluation]) -> None: + """Persist trace evaluations to Langfuse as scores.""" + for evaluation in evaluations: + # Skip missing values to avoid creating empty or invalid scores. + if evaluation.value is None: + continue + + score_name = evaluation.name or "" + score_data_type = evaluation.data_type + score_value = evaluation.value + + if isinstance(score_value, str): + categorical_data_type: Literal["CATEGORICAL"] | None = None + if score_data_type == "CATEGORICAL": + categorical_data_type = "CATEGORICAL" + + langfuse_client.create_score( + name=score_name, + value=score_value, + trace_id=trace_id, # Link score to trace via trace_id + comment=evaluation.comment, + metadata=evaluation.metadata, + data_type=categorical_data_type, + config_id=evaluation.config_id, + ) + continue + + numeric_data_type: Literal["NUMERIC", "BOOLEAN"] | None = None + if score_data_type == "NUMERIC": + numeric_data_type = "NUMERIC" + elif score_data_type == "BOOLEAN" or isinstance(score_value, bool): + numeric_data_type = "BOOLEAN" + + langfuse_client.create_score( + name=score_name, + value=float(score_value), + trace_id=trace_id, # Link score to trace via trace_id + comment=evaluation.comment, + metadata=evaluation.metadata, + data_type=numeric_data_type, + config_id=evaluation.config_id, + ) + + +__all__ = ["run_trace_evaluations", "extract_trace_metrics"] diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/types.py b/aieng-eval-agents/aieng/agent_evals/evaluation/types.py new file mode 100644 index 0000000..c81a714 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/types.py @@ -0,0 +1,182 @@ +"""Type definitions for the evaluation harness. + +This module centralizes the public typing surface for the harness so other modules +can depend on a stable API without importing Langfuse internals directly. +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Awaitable, Callable, Protocol + +from langfuse.api.resources.commons.types.observations_view import ObservationsView +from langfuse.api.resources.commons.types.trace_with_full_details import TraceWithFullDetails +from langfuse.batch_evaluation import CompositeEvaluatorFunction +from langfuse.experiment import ( + Evaluation, + EvaluatorFunction, + ExperimentItemResult, + ExperimentResult, + RunEvaluatorFunction, + TaskFunction, +) + + +class TraceEvalStatus(Enum): + """Enumeration of trace evaluation statuses.""" + + OK = "ok" + """Trace evaluated successfully.""" + + SKIPPED = "skipped" + """Trace evaluation was skipped due to incomplete or missing data.""" + + FAILED = "failed" + """Trace evaluation failed due to an error during processing.""" + + +class TraceEvaluatorFunction(Protocol): + """Protocol for trace-based evaluators. + + Trace evaluators run in a second pass after the experiment completes. + They receive the fully populated trace and the matching item result, + then return one or more Langfuse evaluations. Evaluators may return + results synchronously or as awaitables. + """ + + def __call__( + self, *, trace: TraceWithFullDetails, item_result: ExperimentItemResult, **kwargs: Any + ) -> Evaluation | list[Evaluation] | Awaitable[Evaluation | list[Evaluation]]: + """Evaluate a trace for a single experiment item. + + Parameters + ---------- + trace : TraceWithFullDetails + The Langfuse trace for the item, including observations and scores. + item_result : ExperimentItemResult + The experiment item result containing the task output and metadata. + **kwargs : Any + Additional keyword arguments forwarded by the harness. + + Returns + ------- + Evaluation | list[Evaluation] | Awaitable[Evaluation | list[Evaluation]] + One or more Langfuse evaluations to attach to the trace. Can be + returned directly or wrapped in an awaitable for async evaluators. + """ + ... + + +# Predicate signature for classifying observations in trace metrics extraction. +TraceObservationPredicate = Callable[[ObservationsView], bool] + + +@dataclass(frozen=True) +class TraceMetrics: + """Common trace-derived metrics. + + Parameters + ---------- + tool_call_count : int + Estimated number of tool call observations in the trace. + turn_count : int + Estimated number of conversational turns in the trace. + observation_count : int + Total number of observations recorded for the trace. + latency_sec : float | None + End-to-end trace latency in seconds, if available. + total_input_tokens : int + Total input token count across trace observations. + total_output_tokens : int + Total output token count across trace observations. + total_cost : float | None + Total trace cost in USD, if available. + """ + + tool_call_count: int + turn_count: int + observation_count: int + latency_sec: float | None + total_input_tokens: int + total_output_tokens: int + total_cost: float | None + + +@dataclass(frozen=True) +class TraceWaitConfig: + """Configuration for trace fetch retries. + + Parameters + ---------- + max_wait_sec : float + Maximum total time to wait for trace readiness. + initial_delay_sec : float + Initial delay between retries. + max_delay_sec : float + Maximum delay between retries. + backoff_multiplier : float + Exponential backoff multiplier applied to retry delay. + """ + + max_wait_sec: float = 180.0 + initial_delay_sec: float = 1.0 + max_delay_sec: float = 10.0 + backoff_multiplier: float = 2.0 + + +@dataclass +class TraceEvalResult: + """Result container for trace evaluations. + + Parameters + ---------- + evaluations_by_trace_id : dict[str, list[Evaluation]] + Evaluations produced for each trace that completed successfully. + skipped_trace_ids : list[str] + Trace IDs skipped because trace data was incomplete or missing. + failed_trace_ids : list[str] + Trace IDs that failed due to errors during evaluation. + errors_by_trace_id : dict[str, str] + Error messages associated with skipped or failed traces. + run_evaluations : list[Evaluation] + Aggregated trace evaluation metrics written at dataset-run level. + """ + + evaluations_by_trace_id: dict[str, list[Evaluation]] = field(default_factory=dict) + skipped_trace_ids: list[str] = field(default_factory=list) + failed_trace_ids: list[str] = field(default_factory=list) + errors_by_trace_id: dict[str, str] = field(default_factory=dict) + run_evaluations: list[Evaluation] = field(default_factory=list) + + +@dataclass(frozen=True) +class EvaluationResult: + """Aggregate result for an evaluation run. + + Parameters + ---------- + experiment : ExperimentResult + The Langfuse experiment result from the output-based pass. + trace_evaluations : TraceEvalResult | None + Trace evaluation results, if a second pass was performed. + """ + + experiment: ExperimentResult + trace_evaluations: TraceEvalResult | None + + +__all__ = [ + "CompositeEvaluatorFunction", + "Evaluation", + "EvaluatorFunction", + "ExperimentItemResult", + "ExperimentResult", + "RunEvaluatorFunction", + "TaskFunction", + "TraceEvaluatorFunction", + "TraceMetrics", + "TraceObservationPredicate", + "TraceEvalStatus", + "TraceEvalResult", + "TraceWaitConfig", + "EvaluationResult", +] From dce0b672d7b082d72e89a7c4f49c0c203699de6d Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Thu, 5 Feb 2026 11:41:52 -0500 Subject: [PATCH 2/5] Refactor trace evaluation coroutine setup for improved readability --- aieng-eval-agents/aieng/agent_evals/evaluation/trace.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index 75e9173..27cb6a9 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -121,10 +121,7 @@ async def _evaluate_item( return trace_id, evaluations, status, error_message trace_eval_coroutines = [ - rate_limited( - _fn=functools.partial(_evaluate_item, item_result), - semaphore=semaphore, - ) + rate_limited(_fn=functools.partial(_evaluate_item, item_result), semaphore=semaphore) for item_result in item_results ] task_results = await gather_with_progress(trace_eval_coroutines, description="Evaluating traces") @@ -303,8 +300,8 @@ def _is_retryable_trace_fetch_error(exc: BaseException) -> bool: def _trace_ready(trace: TraceWithFullDetails) -> bool: """Check whether a trace has the required fields to evaluate.""" # This is a heuristic. The input and output fields being populated is a strong - # signal that the trace is complete, however, it not every field of the trace - # might be fully populated depending on instrumentation. + # signal that the trace is complete; however, every field of the trace might + # not be fully populated depending on instrumentation. return trace.input is not None and trace.output is not None From abe05eba8e6a6021de3b526355c94a55c8251bda Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Thu, 5 Feb 2026 11:42:16 -0500 Subject: [PATCH 3/5] Add unit tests for trace evaluation helpers and metrics extraction --- .../agent_evals/evaluation/test_trace.py | 183 ++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 aieng-eval-agents/tests/aieng/agent_evals/evaluation/test_trace.py diff --git a/aieng-eval-agents/tests/aieng/agent_evals/evaluation/test_trace.py b/aieng-eval-agents/tests/aieng/agent_evals/evaluation/test_trace.py new file mode 100644 index 0000000..d941e9e --- /dev/null +++ b/aieng-eval-agents/tests/aieng/agent_evals/evaluation/test_trace.py @@ -0,0 +1,183 @@ +"""Tests for trace evaluation helpers.""" + +import asyncio +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from aieng.agent_evals.evaluation.trace import extract_trace_metrics, run_trace_evaluations +from aieng.agent_evals.evaluation.types import TraceWaitConfig + + +def _make_trace(*, input, output, observations=None, latency=0.0, total_cost=None) -> SimpleNamespace: # noqa: A002 + return SimpleNamespace( + input=input, output=output, observations=observations or [], latency=latency, total_cost=total_cost + ) + + +def _make_observation(*, type, name, metadata=None, usage_details=None, cost_details=None) -> SimpleNamespace: # noqa: A002 + return SimpleNamespace( + type=type, + name=name, + metadata=metadata or {}, + usage_details=usage_details or {}, + cost_details=cost_details or {}, + usage=None, + ) + + +def _patch_langfuse_client(monkeypatch, trace_get) -> MagicMock: + fake_langfuse_client = MagicMock() + fake_langfuse_client.async_api = SimpleNamespace(trace=SimpleNamespace(get=trace_get)) + fake_langfuse_client.create_score = MagicMock() + + fake_manager = SimpleNamespace(langfuse_client=fake_langfuse_client) + monkeypatch.setattr("aieng.agent_evals.evaluation.trace.AsyncClientManager.get_instance", lambda: fake_manager) + monkeypatch.setattr("aieng.agent_evals.evaluation.trace.flush_traces", lambda: None) + + return fake_langfuse_client + + +def test_run_trace_evaluations_returns_default_on_no_trace_evaluators() -> None: + """Return empty result when no trace evaluators are provided.""" + experiment_result = SimpleNamespace(item_results=[], dataset_run_id=None) + + trace_result = run_trace_evaluations( + experiment_result=experiment_result, # pyright: ignore[reportArgumentType] + trace_evaluators=[], + ) + + assert trace_result.evaluations_by_trace_id == {} + assert trace_result.skipped_trace_ids == [] + assert trace_result.failed_trace_ids == [] + assert trace_result.errors_by_trace_id == {} + + +def test_run_trace_evaluations_returns_default_when_no_trace_ids() -> None: + """Ignore items without trace IDs and return empty results.""" + experiment_result = SimpleNamespace(item_results=[SimpleNamespace(trace_id=None)], dataset_run_id=None) + + trace_result = run_trace_evaluations( + experiment_result=experiment_result, # pyright: ignore[reportArgumentType] + trace_evaluators=[lambda trace, item_result: {"name": "metric", "value": 1}], # pyright: ignore[reportArgumentType] + ) + + assert trace_result.evaluations_by_trace_id == {} + assert trace_result.skipped_trace_ids == [] + assert trace_result.failed_trace_ids == [] + assert trace_result.errors_by_trace_id == {} + + +def test_run_trace_evaluations_skips_when_trace_not_ready(monkeypatch) -> None: + """Skip trace evaluation when trace output is not ready within wait window.""" + fake_trace = _make_trace(input={"case": "x"}, output=None, observations=[], latency=0.5) + _patch_langfuse_client(monkeypatch, trace_get=AsyncMock(return_value=fake_trace)) + + experiment_result = SimpleNamespace(item_results=[SimpleNamespace(trace_id="trace-skip")], dataset_run_id=None) + + trace_result = run_trace_evaluations( + experiment_result=experiment_result, # pyright: ignore[reportArgumentType] + trace_evaluators=[lambda trace, item_result: {"name": "metric", "value": 1}], # pyright: ignore[reportArgumentType] + wait=TraceWaitConfig(max_wait_sec=0.01, initial_delay_sec=0.01, max_delay_sec=0.01), + ) + + assert trace_result.evaluations_by_trace_id == {} + assert trace_result.skipped_trace_ids == ["trace-skip"] + assert "trace-skip" in trace_result.errors_by_trace_id + + +def test_run_trace_evaluations_ok_uploads_scores_from_async_evaluator(monkeypatch) -> None: + """Upload scores when async evaluator returns evaluations.""" + fake_trace = _make_trace(input={"case": "x"}, output={"done": True}, observations=[], latency=0.5) + fake_langfuse_client = _patch_langfuse_client(monkeypatch, trace_get=AsyncMock(return_value=fake_trace)) + + experiment_result = SimpleNamespace(item_results=[SimpleNamespace(trace_id="trace-ok")], dataset_run_id=None) + + async def async_trace_evaluator(trace, item_result) -> list[Any]: + await asyncio.sleep(0) + return [ + {"name": "verdict", "value": "pass", "data_type": "CATEGORICAL"}, + {"name": "policy_ok", "value": True}, + ] + + trace_result = run_trace_evaluations( + experiment_result=experiment_result, # pyright: ignore[reportArgumentType] + trace_evaluators=[async_trace_evaluator], # pyright: ignore[reportArgumentType] + ) + + assert trace_result.skipped_trace_ids == [] + assert trace_result.failed_trace_ids == [] + assert "trace-ok" in trace_result.evaluations_by_trace_id + assert len(trace_result.evaluations_by_trace_id["trace-ok"]) == 2 + assert fake_langfuse_client.create_score.call_count == 2 + + call_kwargs = [call.kwargs for call in fake_langfuse_client.create_score.call_args_list] + assert any(kwargs.get("data_type") == "CATEGORICAL" for kwargs in call_kwargs) + assert any(kwargs.get("data_type") == "BOOLEAN" for kwargs in call_kwargs) + + +def test_run_trace_evaluations_failed_when_evaluator_raises(monkeypatch) -> None: + """Record evaluator failures and include evaluator name in errors.""" + fake_trace = _make_trace(input={"case": "x"}, output={"done": True}, observations=[], latency=0.5) + fake_langfuse_client = _patch_langfuse_client(monkeypatch, trace_get=AsyncMock(return_value=fake_trace)) + + experiment_result = SimpleNamespace(item_results=[SimpleNamespace(trace_id="trace-fail")], dataset_run_id=None) + + def failing_eval(trace, item_result): + raise RuntimeError("boom") + + trace_result = run_trace_evaluations( + experiment_result=experiment_result, # pyright: ignore[reportArgumentType] + trace_evaluators=[failing_eval], # pyright: ignore[reportArgumentType] + ) + + assert trace_result.failed_trace_ids == ["trace-fail"] + assert "failing_eval" in trace_result.errors_by_trace_id["trace-fail"] + fake_langfuse_client.create_score.assert_not_called() + + +def test_extract_trace_metrics_handles_total_cost_and_observation_fallback() -> None: + """Prefer trace-level total_cost and fallback to observation sums.""" + observations = [ + _make_observation( + type="generation", + name="assistant_response", + metadata={}, + usage_details={"input": 10, "output": 6}, + cost_details={"total": 0.12}, + ), + _make_observation( + type="tool_call", + name="query_tool", + metadata={}, + usage_details={"input_tokens": 4, "output_tokens": 2}, + cost_details={"totalCost": 0.03}, + ), + ] + + trace_with_total = _make_trace( + input={"case": "a"}, + output={"done": True}, + observations=observations, + latency=1.5, + total_cost=0.5, + ) + metrics = extract_trace_metrics(trace_with_total) # pyright: ignore[reportArgumentType] + assert metrics.tool_call_count == 1 + assert metrics.turn_count == 1 + assert metrics.observation_count == 2 + assert metrics.total_input_tokens == 14 + assert metrics.total_output_tokens == 8 + assert metrics.total_cost == 0.5 + assert metrics.latency_sec == 1.5 + + trace_without_total = _make_trace( + input={"case": "b"}, + output={"done": True}, + observations=observations, + latency=0.7, + total_cost=None, + ) + metrics = extract_trace_metrics(trace_without_total) # pyright: ignore[reportArgumentType] + assert metrics.total_cost == 0.15 + assert metrics.latency_sec == 0.7 From 7011611380744a7a72118e3296f1c9db05512b4d Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Thu, 5 Feb 2026 12:06:13 -0500 Subject: [PATCH 4/5] Fix type casting for trace_id in _evaluate_item function --- aieng-eval-agents/aieng/agent_evals/evaluation/trace.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index 27cb6a9..f547994 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -4,7 +4,7 @@ import functools import inspect import logging -from typing import Any, Awaitable, Literal +from typing import Any, Awaitable, Literal, cast import httpx from aieng.agent_evals.async_client_manager import AsyncClientManager @@ -111,7 +111,7 @@ async def _run_trace_evaluations_async( async def _evaluate_item( item_result: ExperimentItemResult, ) -> tuple[str, list[Evaluation], TraceEvalStatus, str | None]: - trace_id = item_result.trace_id or "" + trace_id = cast(str, item_result.trace_id) # item_result already filtered for non-None trace_id evaluations, status, error_message = await _evaluate_trace( langfuse_client=langfuse_client, item_result=item_result, From 4cfcdab0186c04120944efdbd74c4b4601d2c699 Mon Sep 17 00:00:00 2001 From: fcogidi <41602287+fcogidi@users.noreply.github.com> Date: Thu, 5 Feb 2026 12:08:48 -0500 Subject: [PATCH 5/5] Clarify comments in _sum_token_usage to explain key counting logic for token usage --- aieng-eval-agents/aieng/agent_evals/evaluation/trace.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index f547994..34428dd 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -354,7 +354,11 @@ def _sum_token_usage(observations: list[ObservationsView], *, token_type: str) - value = usage_details.get(key) if value is not None: total += value - break # Only count the first matching key per observation. + # Only count the first matching key per observation: all keys in + # `usage_keys` are aliases/alternative naming conventions for the + # same underlying usage value (for example, "input_tokens" vs + # "prompt_tokens"), so counting more than one would double-count. + break return total