Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 78 additions & 16 deletions aieng-eval-agents/aieng/agent_evals/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Utils for async workflows."""

import asyncio
import types
from typing import Any, Awaitable, Callable, Coroutine, Sequence, TypeVar
import contextvars
import threading
from concurrent.futures import Future
from typing import Any, Awaitable, Callable, Coroutine, ParamSpec, Sequence, TypeVar

from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn, TimeRemainingColumn
from aieng.agent_evals.progress import create_progress


T = TypeVar("T")
P = ParamSpec("P")

__all__ = ["rate_limited", "gather_with_progress"]
__all__ = ["rate_limited", "gather_with_progress", "run_coroutine_sync"]


async def rate_limited(_fn: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore) -> T:
Expand All @@ -32,7 +35,7 @@ async def rate_limited(_fn: Callable[[], Awaitable[T]], semaphore: asyncio.Semap


async def gather_with_progress(
coros: "list[types.CoroutineType[Any, Any, T]]",
coros: Sequence[Coroutine[Any, Any, T]],
description: str = "Running tasks",
) -> Sequence[T]:
"""Run a list of coroutines concurrently and display a rich.Progress bar.
Expand All @@ -41,7 +44,7 @@ async def gather_with_progress(

Parameters
----------
coros : list[CoroutineType[Any, Any, T]]
coros : Sequence[Coroutine[Any, Any, T]]
List of coroutines to run.
description : str, optional
Description to show in the progress bar, by default "Running tasks".
Expand All @@ -57,14 +60,8 @@ async def gather_with_progress(
# Pre‐allocate a results list; we'll fill in each slot as its Task completes
results: list[T | None] = [None] * len(tasks)

# Create and start a Progress bar with a total equal to the number of tasks
with Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total}"),
TimeRemainingColumn(),
TimeElapsedColumn(),
) as progress:
# Use the shared progress style with a total equal to the number of tasks.
with create_progress() as progress:
progress_task = progress.add_task(description, total=len(tasks))

# as_completed yields each Task as soon as it finishes
Expand All @@ -78,14 +75,79 @@ async def gather_with_progress(
return results # type: ignore


async def _indexed(index: int, coro: Coroutine[None, None, T]) -> tuple[int, T]:
def run_coroutine_sync(
coro_fn: Callable[P, Coroutine[Any, Any, T]],
/,
*args: P.args,
**kwargs: P.kwargs,
) -> T:
"""Run an async callable from synchronous code.

This helper supports both normal Python scripts (no active event loop)
and environments that already have a running loop (for example notebooks).

Parameters
----------
coro_fn : Callable[P, Coroutine[Any, Any, T]]
Async callable to execute.
*args : P.args
Positional arguments forwarded to ``coro_fn``.
**kwargs : P.kwargs
Keyword arguments forwarded to ``coro_fn``.

Returns
-------
T
Value returned by the awaited coroutine.

Raises
------
BaseException
Re-raises any exception raised by ``coro_fn``.

Notes
-----
If a loop is already running, the coroutine is executed in a dedicated
worker thread with its own event loop.
"""
# Check whether this thread already has a running event loop.
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop: asyncio.run is the simplest and safest path.
return asyncio.run(coro_fn(*args, **kwargs))

# Create a thread-safe container for either a return value or an exception.
result: Future[T] = Future()
# Copy ContextVar values so request-local state survives the thread hop.
context = contextvars.copy_context()

def _runner() -> None:
"""Execute coroutine on a dedicated event loop."""
try:
# Run the coroutine in the copied context on a new loop in worker thread.
result.set_result(context.run(asyncio.run, coro_fn(*args, **kwargs)))
except BaseException as exc: # pragma: no cover - surfaced to caller
# Capture failure so the caller re-raises the original exception.
result.set_exception(exc)

# Use a separate thread to avoid calling asyncio.run inside an active loop.
thread = threading.Thread(target=_runner)
thread.start()
thread.join()

# Return the value, or re-raise any exception stored in the Future.
return result.result()


async def _indexed(index: int, coro: Coroutine[Any, Any, T]) -> tuple[int, T]:
"""Return (index, await coro).

Parameters
----------
index : int
The index to pair with the coroutine result.
coro : Coroutine[None, None, T]
coro : Coroutine[Any, Any, T]
The coroutine to await.

Returns
Expand Down
47 changes: 47 additions & 0 deletions aieng-eval-agents/aieng/agent_evals/evaluation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Evaluation harness.

This package provides a beginner-friendly wrapper around Langfuse's
``dataset.run_experiment`` workflow, plus optional trace-based
second-pass evaluators.
"""

from .experiment import run_experiment, run_experiment_with_trace_evals
from .trace import extract_trace_metrics, run_trace_evaluations
from .types import (
CompositeEvaluatorFunction,
Evaluation,
EvaluationResult,
EvaluatorFunction,
ExperimentItemResult,
ExperimentResult,
RunEvaluatorFunction,
TaskFunction,
TraceEvalResult,
TraceEvalStatus,
TraceEvaluatorFunction,
TraceMetrics,
TraceObservationPredicate,
TraceWaitConfig,
)


__all__ = [
"run_experiment",
"run_experiment_with_trace_evals",
"run_trace_evaluations",
"extract_trace_metrics",
"CompositeEvaluatorFunction",
"Evaluation",
"EvaluatorFunction",
"ExperimentItemResult",
"ExperimentResult",
"EvaluationResult",
"RunEvaluatorFunction",
"TaskFunction",
"TraceEvaluatorFunction",
"TraceEvalStatus",
"TraceEvalResult",
"TraceMetrics",
"TraceObservationPredicate",
"TraceWaitConfig",
]
Loading