diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py index 82a9cd9b4b..39f5d48872 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_base.py @@ -27,12 +27,15 @@ import locale import logging +import os +import re import sys import uuid from collections.abc import Mapping -from dataclasses import dataclass +from dataclasses import dataclass, field from decimal import Decimal as _Decimal from enum import Enum +from types import MappingProxyType from typing import Any, Literal, cast from agent_framework import ( @@ -58,6 +61,100 @@ logger = logging.getLogger(__name__) +_ENV_REFERENCE_RE = re.compile(r"\bEnv\.([A-Za-z_][A-Za-z0-9_]*)") + + +@dataclass(frozen=True) +class DeclarativeEnvConfig: + """Configuration that populates the PowerFx ``Env`` symbol for a workflow. + + Configuration values are always exposed under ``Env.``; + ``os.environ`` is consulted only when ``restrict_to_configuration`` + is ``False`` AND the YAML literally references the name in a PowerFx + expression (the allowlist enforced via ``referenced_names``). + + Attributes: + values: Caller-supplied configuration resolved by name when the + workflow YAML references ``=Env.NAME``. Always exposed in + the ``Env`` symbol regardless of ``restrict_to_configuration``. + restrict_to_configuration: When ``True`` (default), the ``Env`` + symbol is populated exclusively from ``values``; ``os.environ`` + is never consulted. Set to ``False`` to additionally fall back + to ``os.environ`` for names absent from ``values`` that the + workflow YAML explicitly references. + referenced_names: The set of ``Env.NAME`` symbols discovered in + PowerFx expressions inside the workflow definition. The + ``os.environ`` fallback is constrained to this allowlist so + unrelated environment variables never enter the PowerFx scope. + """ + + values: Mapping[str, str] = field(default_factory=lambda: MappingProxyType({})) + restrict_to_configuration: bool = True + referenced_names: frozenset[str] = field(default_factory=lambda: frozenset[str]()) + + def __post_init__(self) -> None: + # Defensive snapshots so the frozen guarantee extends to the + # contents of ``values`` / ``referenced_names``: caller mutations + # to the original objects after construction cannot leak into + # ``resolve()``. + object.__setattr__(self, "values", MappingProxyType(dict(self.values))) + object.__setattr__(self, "referenced_names", frozenset(self.referenced_names)) + + def resolve(self) -> dict[str, str]: + """Return the resolved ``Env`` symbol mapping for the workflow. + + Configuration values are always included (stringified). + ``os.environ`` is consulted only when ``restrict_to_configuration`` + is ``False`` and the name appears in ``referenced_names``, so + unrelated environment variables never enter the PowerFx scope. + Configuration values always win over the environment fallback. + """ + resolved = {name: str(value) for name, value in self.values.items()} + if self.restrict_to_configuration: + return resolved + for name in self.referenced_names.difference(resolved): + env_value = os.environ.get(name) + if env_value is not None: + resolved[name] = env_value + return resolved + + +def discover_env_references(node: Any) -> set[str]: + """Discover ``Env.NAME`` references in PowerFx expressions inside ``node``. + + Walks any nested ``Mapping``/``list``/scalar structure and inspects every + string value. To avoid false positives from doc/description fields that + happen to mention ``Env.SOMETHING`` as plain text, the scan only inspects + strings that begin with ``=`` (PowerFx expression marker, matching the + convention enforced by :meth:`DeclarativeWorkflowState.eval`). + + Args: + node: A parsed workflow definition (typically the dict produced by + ``yaml.safe_load``). + + Returns: + The set of ``Env`` identifier names referenced in PowerFx + expressions inside ``node``. + """ + names: set[str] = set() + + def visit(value: Any) -> None: + if isinstance(value, str): + if value.startswith("="): + names.update(_ENV_REFERENCE_RE.findall(value)) + return + if isinstance(value, Mapping): + for inner in cast(Mapping[Any, Any], value).values(): # type: ignore[redundant-cast] + visit(inner) + return + if isinstance(value, list): + for item in cast(list[Any], value): # type: ignore[redundant-cast] + visit(item) + + visit(node) + return names + + class ConversationData(TypedDict): """Structure for conversation-related state data. @@ -169,13 +266,18 @@ class DeclarativeWorkflowState: - Conversation: Conversation history """ - def __init__(self, state: State): + def __init__(self, state: State, env_config: DeclarativeEnvConfig | None = None): """Initialize with a State instance. Args: state: The workflow's state for persistence + env_config: Configuration that populates the PowerFx ``Env`` + symbol when ``_to_powerfx_symbols`` is called. Defaults to + an empty configuration which results in no ``Env`` binding, + matching the safe default of the :class:`WorkflowFactory`. """ self._state = state + self._env_config = env_config if env_config is not None else DeclarativeEnvConfig() def initialize(self, inputs: Mapping[str, Any] | None = None) -> None: """Initialize the declarative state with inputs. @@ -714,6 +816,14 @@ def _to_powerfx_symbols(self) -> dict[str, Any]: # Custom namespaces **state_data.get("Custom", {}), } + # Resolve the ``Env`` symbol from the workflow-level + # :class:`DeclarativeEnvConfig`. When both ``values`` and the + # ``os.environ`` allowlist produce no entries the symbol is + # omitted so ``=Env.X`` falls back to the literal expression + # string (preserving the legacy "unbound identifier" behaviour). + env_bound = self._env_config.resolve() + if env_bound: + symbols["Env"] = env_bound # Debug log the Local symbols to help diagnose type issues if local_data: for key, value in local_data.items(): @@ -867,6 +977,11 @@ def __init__( action_id = id or action_def.get("id") or f"{action_def.get('kind', 'action')}_{hash(str(action_def)) % 10000}" super().__init__(id=action_id, defer_discovery=True) self._action_def = action_def + # The active :class:`DeclarativeEnvConfig` is stamped onto the + # executor by :class:`DeclarativeWorkflowBuilder` after construction. + # Defaults to an empty configuration so direct ``DeclarativeActionExecutor`` + # construction (e.g. in unit tests) doesn't expose ``os.environ``. + self._declarative_env_config: DeclarativeEnvConfig = DeclarativeEnvConfig() # Manually register handlers after initialization self._handlers = {} @@ -874,6 +989,16 @@ def __init__( self._discover_handlers() self._discover_response_handlers() + def set_declarative_env_config(self, env_config: DeclarativeEnvConfig) -> None: + """Set the workflow-level :class:`DeclarativeEnvConfig` for this executor. + + Called by :class:`DeclarativeWorkflowBuilder` after each executor is + created so that ``_to_powerfx_symbols`` populates the ``Env`` symbol + according to the caller-supplied configuration on the + :class:`WorkflowFactory`. + """ + self._declarative_env_config = env_config + @property def action_def(self) -> dict[str, Any]: """Get the action definition.""" @@ -886,7 +1011,7 @@ def display_name(self) -> str | None: def _get_state(self, state: State) -> DeclarativeWorkflowState: """Get the declarative workflow state wrapper.""" - return DeclarativeWorkflowState(state) + return DeclarativeWorkflowState(state, env_config=self._declarative_env_config) async def _ensure_state_initialized( self, diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py index 67b4a58273..6dad15f95b 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py @@ -24,6 +24,7 @@ from ._declarative_base import ( ConditionResult, DeclarativeActionExecutor, + DeclarativeEnvConfig, LoopIterationResult, ) from ._errors import DeclarativeWorkflowError @@ -140,6 +141,7 @@ def __init__( max_iterations: int | None = None, http_request_handler: HttpRequestHandler | None = None, mcp_tool_handler: MCPToolHandler | None = None, + env_config: DeclarativeEnvConfig | None = None, ): """Initialize the builder. @@ -158,6 +160,10 @@ def __init__( mcp_tool_handler: Handler used to dispatch InvokeMcpTool calls. Must be supplied when the workflow contains any InvokeMcpTool; otherwise build raises ``DeclarativeWorkflowError``. + env_config: Optional :class:`DeclarativeEnvConfig` controlling + how the ``Env`` PowerFx symbol is populated for every + executor built by this builder. Defaults to an empty + configuration (``Env`` not exposed). """ self._yaml_def = yaml_definition self._workflow_id = workflow_id or yaml_definition.get("name", "declarative_workflow") @@ -171,6 +177,7 @@ def __init__( self._seen_explicit_ids: set[str] = set() # Track explicit IDs for duplicate detection self._http_request_handler = http_request_handler self._mcp_tool_handler = mcp_tool_handler + self._env_config: DeclarativeEnvConfig = env_config if env_config is not None else DeclarativeEnvConfig() # Resolve max_iterations: explicit arg > YAML maxTurns > core default resolved = max_iterations if max_iterations is not None else yaml_definition.get("maxTurns") if resolved is not None and (not isinstance(resolved, int) or resolved <= 0): @@ -221,6 +228,15 @@ def build(self) -> Workflow: # Resolve pending gotos (back-edges for loops, forward-edges for jumps) self._resolve_pending_gotos(builder) + # Stamp the resolved DeclarativeEnvConfig onto every executor so they + # expose the configured Env binding through their _get_state(). This + # happens after _create_executors_for_actions and _resolve_pending_gotos + # so it covers the entry node, join nodes, evaluators, foreach + # init/next/exit nodes, and goto placeholders. + for executor in self._executors.values(): + if isinstance(executor, DeclarativeActionExecutor): + executor.set_declarative_env_config(self._env_config) + return builder.build() def _validate_workflow(self, actions: list[dict[str, Any]]) -> None: diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py b/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py index 221dfec3cc..10206b13c5 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py @@ -26,6 +26,7 @@ ) from .._loader import AgentFactory +from ._declarative_base import DeclarativeEnvConfig, discover_env_references from ._declarative_builder import DeclarativeWorkflowBuilder from ._errors import DeclarativeWorkflowError from ._http_handler import HttpRequestHandler @@ -93,6 +94,8 @@ def __init__( max_iterations: int | None = None, http_request_handler: HttpRequestHandler | None = None, mcp_tool_handler: MCPToolHandler | None = None, + configuration: Mapping[str, str] | None = None, + restrict_env_to_configuration: bool = True, ) -> None: """Initialize the workflow factory. @@ -119,6 +122,23 @@ def __init__( for a default backed by :class:`agent_framework.MCPStreamableHTTPTool`, or supply your own implementation to enforce SSRF guards, allowlisting, or auth/connection resolution. + configuration: Optional mapping that populates the PowerFx ``Env`` + symbol referenced from workflow YAML expressions (e.g. + ``=Env.MY_KEY``). Keys supplied here are always exposed + under ``Env.``; the process ``os.environ`` is consulted + only when ``restrict_env_to_configuration`` is ``False``. + When neither source produces a value the ``Env`` symbol is + omitted so ``=Env.X`` evaluates to the literal expression + string. + restrict_env_to_configuration: When ``True`` (default), the + ``Env`` PowerFx symbol is populated exclusively from + ``configuration``; ``os.environ`` is never consulted. Set to + ``False`` to additionally fall back to ``os.environ`` for + names absent from ``configuration`` that the workflow YAML + explicitly references. The fallback is constrained to names + discovered in PowerFx expressions inside the workflow + definition so unrelated environment variables never enter + the PowerFx scope. Examples: .. code-block:: python @@ -151,6 +171,18 @@ def __init__( checkpoint_storage=FileCheckpointStorage("./checkpoints"), env_file=".env", ) + + .. code-block:: python + + from agent_framework.declarative import WorkflowFactory + + # Inject named values for =Env.* references in the workflow YAML + factory = WorkflowFactory( + configuration={ + "MY_SERVER_URL": "https://example.com", + "MY_TOOL_NAME": "search", + }, + ) """ self._agent_factory = agent_factory or AgentFactory(env_file_path=env_file) self._agents: dict[str, SupportsAgentRun | AgentExecutor] = dict(agents) if agents else {} @@ -160,6 +192,8 @@ def __init__( self._max_iterations = max_iterations self._http_request_handler = http_request_handler self._mcp_tool_handler = mcp_tool_handler + self._configuration: dict[str, str] = dict(configuration) if configuration else {} + self._restrict_env_to_configuration = restrict_env_to_configuration def create_workflow_from_yaml_path( self, @@ -394,6 +428,16 @@ def _create_workflow( if description: normalized_def["description"] = description + # Build the DeclarativeEnvConfig from the factory's configuration and the + # set of Env references actually used in the workflow PowerFx expressions. + # The referenced-name allowlist constrains ``os.environ`` fallback (when + # enabled) so unrelated variables never enter the PowerFx scope. + env_config = DeclarativeEnvConfig( + values=dict(self._configuration), + restrict_to_configuration=self._restrict_env_to_configuration, + referenced_names=frozenset(discover_env_references(normalized_def)), + ) + # Build the graph-based workflow, passing agents and tools for specialized executors try: graph_builder = DeclarativeWorkflowBuilder( @@ -405,6 +449,7 @@ def _create_workflow( max_iterations=self._max_iterations, http_request_handler=self._http_request_handler, mcp_tool_handler=self._mcp_tool_handler, + env_config=env_config, ) workflow = graph_builder.build() except ValueError as e: diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_mcp_handler.py b/python/packages/declarative/agent_framework_declarative/_workflows/_mcp_handler.py index 658ce42c23..4a26e74c88 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_mcp_handler.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_mcp_handler.py @@ -33,7 +33,7 @@ from collections import OrderedDict from collections.abc import Awaitable, Callable from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Protocol, cast, runtime_checkable +from typing import TYPE_CHECKING, Any, ClassVar, Protocol, cast, runtime_checkable import httpx @@ -194,6 +194,21 @@ class DefaultMCPToolHandler: Defaults to ``32``. """ + LIST_TOOLS_TOOL_NAME: ClassVar[str] = "tools/list" + """Reserved ``tool_name`` that maps an :class:`MCPToolHandler` invocation + to the MCP protocol ``tools/list`` discovery operation. + + The constant matches the underlying MCP method name so a single + string travels unchanged through host code, YAML, and the protocol + wire. When this handler receives an invocation with this name it + pages through ``session.list_tools()`` and returns the catalog as a + single ``TextContent`` containing JSON of shape + ``{"tools": [{name, description, inputSchema, outputSchema}, ...]}``. + Workflows can reference this name from an ``InvokeMcpTool`` declarative + action to introspect a server's tool surface without an extra round-trip + from host code. + """ + def __init__( self, *, @@ -217,10 +232,27 @@ def __init__( self._closed = False async def invoke_tool(self, invocation: MCPToolInvocation) -> MCPToolResult: - """Invoke ``invocation.tool_name`` on the cached MCP client for the server.""" + """Invoke ``invocation.tool_name`` on the cached MCP client for the server. + + The reserved name :attr:`LIST_TOOLS_TOOL_NAME` (``"tools/list"``) is + intercepted client-side: instead of being forwarded as a tool call, + it is translated to an MCP ``session.list_tools()`` discovery + operation (paginated automatically) and returned as a single + ``TextContent`` containing a JSON tool catalog. + """ from agent_framework import Content from agent_framework.exceptions import ToolExecutionException + # Reserved-name args validation runs before connect: rejecting bad + # input shouldn't require establishing an MCP session. + if invocation.tool_name == self.LIST_TOOLS_TOOL_NAME and invocation.arguments: + message = f"The reserved MCP '{self.LIST_TOOLS_TOOL_NAME}' operation does not accept tool arguments." + return MCPToolResult( + outputs=[Content.from_text(f"Error: {message}")], + is_error=True, + error_message=message, + ) + try: entry = await self._get_or_create_entry(invocation) except Exception as exc: @@ -240,6 +272,8 @@ async def invoke_tool(self, invocation: MCPToolInvocation) -> MCPToolResult: ) try: + if invocation.tool_name == self.LIST_TOOLS_TOOL_NAME: + return await self._invoke_list_tools(entry) raw = await entry.tool.call_tool(invocation.tool_name, **invocation.arguments) except ToolExecutionException as exc: logger.info( @@ -284,6 +318,59 @@ async def invoke_tool(self, invocation: MCPToolInvocation) -> MCPToolResult: outputs = list(raw) return MCPToolResult(outputs=outputs) + @staticmethod + async def _invoke_list_tools(entry: _CacheEntry) -> MCPToolResult: + """Handle the reserved :attr:`LIST_TOOLS_TOOL_NAME` invocation. + + Pages through ``session.list_tools()`` (mirroring the pagination loop + in :meth:`agent_framework.MCPTool.load_tools`) and serialises the + full catalog as a single ``TextContent`` containing JSON of shape + ``{"tools": [{name, description, inputSchema, outputSchema}, ...]}``. + + The output shape, property names, and property order are stable so + downstream PowerFx expressions can rely on the schema. ``indent=2`` + produces human-readable JSON for the conversation log; + ``allow_nan=False`` guards against producing non-conformant JSON + ``NaN``/``Infinity`` tokens if a misbehaving server returns such + values in a schema. + """ + from agent_framework import Content + + session = getattr(entry.tool, "session", None) + if session is None: + message = "MCP session is not connected; cannot list tools." + return MCPToolResult( + outputs=[Content.from_text(f"Error: {message}")], + is_error=True, + error_message=message, + ) + + # Lazy import keeps ``mcp`` types out of module import time. + from mcp import types as mcp_types + + collected: list[Any] = [] + params: mcp_types.PaginatedRequestParams | None = None + while True: + tool_list = await session.list_tools(params=params) + collected.extend(tool_list.tools) + next_cursor = getattr(tool_list, "nextCursor", None) + if not next_cursor: + break + params = mcp_types.PaginatedRequestParams(cursor=next_cursor) + + payload = { + "tools": [ + { + "name": tool.name, + "description": tool.description, + "inputSchema": tool.inputSchema, + "outputSchema": tool.outputSchema, + } + for tool in collected + ], + } + return MCPToolResult(outputs=[Content.from_text(json.dumps(payload, indent=2, allow_nan=False))]) + async def aclose(self) -> None: """Close all cached MCP clients and the owned httpx clients. diff --git a/python/packages/declarative/tests/test_default_mcp_tool_handler.py b/python/packages/declarative/tests/test_default_mcp_tool_handler.py index 3a5c67e1d6..58ce3144ee 100644 --- a/python/packages/declarative/tests/test_default_mcp_tool_handler.py +++ b/python/packages/declarative/tests/test_default_mcp_tool_handler.py @@ -13,6 +13,7 @@ from __future__ import annotations import asyncio +import json import sys from typing import Any from unittest.mock import patch @@ -33,6 +34,55 @@ ) +class FakeListToolsResult: # noqa: B903 - mimics ``mcp.types.ListToolsResult`` shape, not a value type + """Stand-in for ``mcp.types.ListToolsResult`` returned by ``session.list_tools()``.""" + + def __init__(self, tools: list[Any], next_cursor: str | None = None) -> None: + self.tools = tools + self.nextCursor = next_cursor + + +class FakeMcpTool: + """Stand-in for an MCP ``Tool`` (subset used by ``_invoke_list_tools``).""" + + def __init__( + self, + name: str, + description: str | None = None, + inputSchema: dict[str, Any] | None = None, + outputSchema: dict[str, Any] | None = None, + ) -> None: + self.name = name + self.description = description + self.inputSchema = inputSchema if inputSchema is not None else {"type": "object", "properties": {}} + self.outputSchema = outputSchema + + +class FakeMcpSession: + """Stand-in for ``mcp.ClientSession``. + + ``list_tools_pages`` lets a test enqueue multiple paginated responses; + when None (default), an empty single-page result is returned. ``list_tools_error`` + raises a synthetic error on the next call when set. + """ + + def __init__(self) -> None: + self.list_tools_pages: list[FakeListToolsResult] | None = None + self.list_tools_calls: list[Any] = [] + self.list_tools_error: BaseException | None = None + + async def list_tools(self, params: Any = None) -> FakeListToolsResult: + self.list_tools_calls.append(params) + if self.list_tools_error is not None: + raise self.list_tools_error + if self.list_tools_pages is None: + return FakeListToolsResult(tools=[]) + index = len(self.list_tools_calls) - 1 + if index >= len(self.list_tools_pages): + return FakeListToolsResult(tools=[]) + return self.list_tools_pages[index] + + class FakeTool: """Stand-in for ``MCPStreamableHTTPTool``. @@ -50,6 +100,7 @@ def __init__(self, **kwargs: Any) -> None: self.connect_error: BaseException | None = None self.call_handler: Any = lambda **_a: [Content.from_text("ok")] self._httpx_client: httpx.AsyncClient | None = None + self.session: FakeMcpSession | None = None # Mimic MCPStreamableHTTPTool: when no caller client AND header_provider # is set, lazily allocate an owned httpx client during connect. FakeTool.instances.append(self) @@ -63,6 +114,9 @@ async def connect(self) -> None: # Mimic lazy httpx allocation when no client provided AND header_provider set. if self.kwargs.get("http_client") is None and self.kwargs.get("header_provider") is not None: self._httpx_client = httpx.AsyncClient() + # Mimic MCPStreamableHTTPTool: a live session becomes available after connect. + if self.session is None: + self.session = FakeMcpSession() async def close(self) -> None: self.close_count += 1 @@ -541,3 +595,185 @@ def test_key_keeps_header_value_case(self) -> None: k1 = DefaultMCPToolHandler._cache_key("https://x/", None, None, {"X": "Bearer-A"}) k2 = DefaultMCPToolHandler._cache_key("https://x/", None, None, {"X": "bearer-a"}) assert k1 != k2 + + +# ---------- tools/list reserved name -------------------------------------- + + +class TestListTools: + """Exercise the reserved :attr:`DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME` interception path.""" + + @pytest.mark.asyncio + async def test_list_tools_returns_json_catalog(self) -> None: + handler = DefaultMCPToolHandler() + with _patch_tool(): + # Prime the cache so the FakeTool session exists. + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session.list_tools_pages = [ # type: ignore[union-attr] + FakeListToolsResult( + tools=[ + FakeMcpTool( + name="search", + description="Search docs", + inputSchema={"type": "object", "properties": {"q": {"type": "string"}}}, + outputSchema={"type": "object"}, + ), + FakeMcpTool(name="echo", description=None, outputSchema=None), + ], + ), + ] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + assert result.is_error is False + assert len(result.outputs) == 1 + payload = json.loads(result.outputs[0].text) # type: ignore[reportAttributeAccessIssue] + assert payload == { + "tools": [ + { + "name": "search", + "description": "Search docs", + "inputSchema": {"type": "object", "properties": {"q": {"type": "string"}}}, + "outputSchema": {"type": "object"}, + }, + { + "name": "echo", + "description": None, + "inputSchema": {"type": "object", "properties": {}}, + "outputSchema": None, + }, + ], + } + + @pytest.mark.asyncio + async def test_list_tools_property_order_is_stable(self) -> None: + """JSON property order is stable: name, description, inputSchema, outputSchema.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session.list_tools_pages = [ # type: ignore[union-attr] + FakeListToolsResult(tools=[FakeMcpTool(name="t1", description="d")]), + ] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + text = result.outputs[0].text # type: ignore[reportAttributeAccessIssue] + name_idx = text.find('"name"') + desc_idx = text.find('"description"') + input_idx = text.find('"inputSchema"') + output_idx = text.find('"outputSchema"') + assert 0 <= name_idx < desc_idx < input_idx < output_idx + + @pytest.mark.asyncio + async def test_list_tools_indented_output(self) -> None: + """Output is JSON with a 2-space indent so the conversation log is human-readable.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session.list_tools_pages = [ # type: ignore[union-attr] + FakeListToolsResult(tools=[FakeMcpTool(name="t1")]), + ] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + text = result.outputs[0].text # type: ignore[reportAttributeAccessIssue] + # Indented output contains newlines and a 2-space indented key. + assert "\n " in text + + @pytest.mark.asyncio + async def test_list_tools_rejects_arguments(self) -> None: + """Reserved name does NOT accept tool arguments. Fails fast before connect.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + result = await handler.invoke_tool( + _invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME, arguments={"q": "test"}), + ) + assert result.is_error is True + assert "does not accept tool arguments" in (result.error_message or "") + # Args validation runs before connect, so no tool was instantiated. + assert FakeTool.instances == [] + + @pytest.mark.asyncio + async def test_list_tools_empty_args_dict_is_accepted(self) -> None: + """An empty arguments dict is equivalent to no arguments.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + result = await handler.invoke_tool( + _invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME, arguments={}), + ) + assert result.is_error is False + + @pytest.mark.asyncio + async def test_list_tools_paginates(self) -> None: + """Pagination loop calls list_tools repeatedly until nextCursor is empty.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session.list_tools_pages = [ # type: ignore[union-attr] + FakeListToolsResult(tools=[FakeMcpTool(name="a")], next_cursor="cursor1"), + FakeListToolsResult(tools=[FakeMcpTool(name="b")], next_cursor="cursor2"), + FakeListToolsResult(tools=[FakeMcpTool(name="c")], next_cursor=None), + ] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + payload = json.loads(result.outputs[0].text) # type: ignore[reportAttributeAccessIssue] + assert [t["name"] for t in payload["tools"]] == ["a", "b", "c"] + session = FakeTool.instances[0].session + assert session is not None + assert len(session.list_tools_calls) == 3 + # First call has no cursor; second/third use the cursor from the prior page. + assert session.list_tools_calls[0] is None + assert getattr(session.list_tools_calls[1], "cursor", None) == "cursor1" + assert getattr(session.list_tools_calls[2], "cursor", None) == "cursor2" + + @pytest.mark.asyncio + async def test_list_tools_shares_cache_with_call_tool(self) -> None: + """tools/list reuses the same cached MCP session as a regular call_tool.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation(tool_name="search")) + await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + assert len(FakeTool.instances) == 1 + assert FakeTool.instances[0].connect_count == 1 + + @pytest.mark.asyncio + async def test_list_tools_propagates_session_errors_as_error_result(self) -> None: + """Errors raised by session.list_tools become MCPToolResult(is_error=True), not crashes.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session.list_tools_error = httpx.ReadTimeout("read timed out") # type: ignore[union-attr] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + assert result.is_error is True + assert "ReadTimeout" in (result.error_message or "") + + @pytest.mark.asyncio + async def test_list_tools_returns_error_when_session_is_none(self) -> None: + """If somehow the cached tool has no session, return a clear error rather than crashing.""" + handler = DefaultMCPToolHandler() + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].session = None + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + assert result.is_error is True + assert "not connected" in (result.error_message or "") + + @pytest.mark.asyncio + async def test_list_tools_does_not_call_call_tool(self) -> None: + """The reserved name is intercepted; the inner call_tool path is bypassed.""" + handler = DefaultMCPToolHandler() + call_tool_invoked = False + + def fail(**_a: Any) -> Any: + nonlocal call_tool_invoked + call_tool_invoked = True + raise AssertionError("call_tool should not run for tools/list") + + with _patch_tool(): + await handler.invoke_tool(_invocation()) + FakeTool.instances[0].call_handler = fail + FakeTool.instances[0].session.list_tools_pages = [ # type: ignore[union-attr] + FakeListToolsResult(tools=[]), + ] + result = await handler.invoke_tool(_invocation(tool_name=DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME)) + assert call_tool_invoked is False + assert result.is_error is False + + def test_class_attribute_value(self) -> None: + # Constant must equal the MCP protocol method name so a single + # string travels unchanged through host code, YAML, and the wire. + assert DefaultMCPToolHandler.LIST_TOOLS_TOOL_NAME == "tools/list" diff --git a/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/__init__.py b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/__init__.py new file mode 100644 index 0000000000..2a50eae894 --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Microsoft. All rights reserved. diff --git a/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/main.py b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/main.py new file mode 100644 index 0000000000..9592919e33 --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/main.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Invoke a Foundry toolbox MCP endpoint from a declarative workflow. + +The workflow calls ``microsoft_docs_search`` (the Microsoft Learn Docs +MCP server, bundled into a sample toolbox by ``toolbox_provisioning``) +through the toolbox proxy and asks a Foundry agent to summarise the +result. + +Required env vars: + FOUNDRY_PROJECT_ENDPOINT, FOUNDRY_MODEL. + +Run with: + python samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/main.py +""" + +import asyncio +import os +from collections.abc import Generator +from pathlib import Path + +import httpx +from agent_framework import Agent +from agent_framework.declarative import ( + DefaultMCPToolHandler, + MCPToolInvocation, + WorkflowFactory, +) +from agent_framework.foundry import FoundryChatClient +from azure.core.credentials import TokenCredential +from azure.identity import AzureCliCredential, get_bearer_token_provider +from toolbox_provisioning import FOUNDRY_FEATURES_HEADERS, create_sample_toolbox + +AGENT_NAME = "FoundryToolboxMcpAgent" +TOOLBOX_NAME = "declarative_foundry_toolbox_mcp" +DOCS_SERVER_LABEL = "microsoft_docs" + +AGENT_INSTRUCTIONS = """\ +Answer the user's question using ONLY the Microsoft Learn docs search +result already present in the conversation. Cite document titles or +URLs when available. If the result does not contain an answer, say so +plainly rather than guessing. +""" + + +class _BearerAuth(httpx.Auth): + """Inject a fresh Azure AD bearer token on every request.""" + + def __init__(self, credential: TokenCredential) -> None: + self._get_token = get_bearer_token_provider(credential, "https://ai.azure.com/.default") + + def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]: + request.headers["Authorization"] = f"Bearer {self._get_token()}" + yield request + + +async def main() -> None: + """Run the Foundry toolbox MCP workflow.""" + project_endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"] + model = os.environ["FOUNDRY_MODEL"] + + print("=" * 60) + print("Invoke Foundry Toolbox MCP Workflow Demo") + print("=" * 60) + print(f"Provisioning toolbox '{TOOLBOX_NAME}' in Foundry...") + create_sample_toolbox( + name=TOOLBOX_NAME, + docs_server_label=DOCS_SERVER_LABEL, + project_endpoint=project_endpoint, + ) + + toolbox_endpoint = f"{project_endpoint.rstrip('/')}/toolboxes/{TOOLBOX_NAME}/mcp?api-version=v1" + print(f"Toolbox endpoint: {toolbox_endpoint}") + print() + + credential = AzureCliCredential() + chat_client = FoundryChatClient(project_endpoint=project_endpoint, model=model, credential=credential) + summary_agent = Agent(client=chat_client, name=AGENT_NAME, instructions=AGENT_INSTRUCTIONS) + + # ``headers=`` attaches the Foundry-Features preview flag on every + # request, including the MCP ``initialize`` handshake (the YAML's + # per-action ``headers`` only takes effect during ``call_tool``). + # ``timeout=`` matches the MCP-recommended values; httpx's 5s + # default breaks long-running tool calls. + http_client = httpx.AsyncClient( + auth=_BearerAuth(credential), + headers=FOUNDRY_FEATURES_HEADERS, + timeout=httpx.Timeout(30.0, read=300.0), + follow_redirects=True, + ) + + async def _client_provider(invocation: MCPToolInvocation) -> httpx.AsyncClient | None: + # Fail closed when the YAML resolves a different ``serverUrl`` + # so the bearer-bound client cannot be reused against an + # unexpected endpoint and ``DefaultMCPToolHandler`` cannot + # silently fall back to an unauthenticated client. + if invocation.server_url.casefold() != toolbox_endpoint.casefold(): + raise ValueError( + f"Refusing to attach Foundry bearer token to unexpected MCP URL: " + f"{invocation.server_url!r}. Expected: {toolbox_endpoint!r}." + ) + return http_client + + async with ( + http_client, + DefaultMCPToolHandler(client_provider=_client_provider) as mcp_handler, + ): + factory = WorkflowFactory( + agents={AGENT_NAME: summary_agent}, + mcp_tool_handler=mcp_handler, + configuration={ + "FOUNDRY_TOOLBOX_MCP_SERVER_URL": toolbox_endpoint, + "FOUNDRY_TOOLBOX_DOCS_SERVER_LABEL": DOCS_SERVER_LABEL, + }, + ) + workflow = factory.create_workflow_from_yaml_path(Path(__file__).parent / "workflow.yaml") + + print("Ask a question that can be answered from the Microsoft Learn docs.") + print() + user_input = input("You: ").strip() or "How do I configure logging in the Agent Framework?" # noqa: ASYNC250 + + printed_prefix = False + async for event in workflow.run({"text": user_input}, stream=True): + if event.type == "executor_invoked": + if event.executor_id == "search_docs_with_toolbox": + print("[Searching Microsoft Learn docs...]") + elif event.executor_id == "summarize_toolbox_result": + print("[Summarizing results...]") + elif event.type == "output" and isinstance(event.data, str): + if not printed_prefix: + print("\nAgent: ", end="", flush=True) + printed_prefix = True + print(event.data, end="", flush=True) + + print() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/toolbox_provisioning.py b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/toolbox_provisioning.py new file mode 100644 index 0000000000..73cd792346 --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/toolbox_provisioning.py @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Foundry toolbox provisioning helper for ``invoke_foundry_toolbox_mcp``. + +Toolboxes are normally created through the Foundry portal or a separate +deployment script. Bundling the one-off setup here lets the sample run +end-to-end without manual steps. ``main.py`` owns the workflow +execution path. +""" + +from collections.abc import Mapping + +from azure.identity import AzureCliCredential + +# Toolbox admin and MCP runtime traffic are both gated by a preview +# feature flag. The Python ``AIProjectClient`` does not add it +# automatically, so we attach it to every admin call here AND to the +# ``httpx.AsyncClient`` in ``main.py`` so the MCP ``initialize`` +# handshake carries it too. Without the flag on admin calls, +# provisioning succeeds at the HTTP layer but the toolbox is never +# wired to the MCP endpoint — surfacing later as "MCP server failed to +# initialize: Session terminated" on the first ``InvokeMcpTool`` call. +FOUNDRY_FEATURES_HEADERS: Mapping[str, str] = {"Foundry-Features": "Toolboxes=V1Preview"} + + +def create_sample_toolbox(*, name: str, docs_server_label: str, project_endpoint: str) -> None: + """Provision a toolbox version (delete-then-create; idempotent). + + Bundles the Microsoft Learn Docs MCP server under ``docs_server_label``. + Uses ``AzureCliCredential`` because the sample expects ``az login``; + switch to a managed identity or service principal for production + deployments. + """ + from azure.ai.projects import AIProjectClient + from azure.ai.projects.models import MCPTool, Tool + from azure.core.exceptions import ResourceNotFoundError + + with ( + AzureCliCredential() as credential, + AIProjectClient(credential=credential, endpoint=project_endpoint) as project_client, + ): + try: + project_client.beta.toolboxes.delete(name, headers=FOUNDRY_FEATURES_HEADERS) + print(f"Toolbox '{name}' deleted (replacing with a fresh version).") + except ResourceNotFoundError: + pass + + tools: list[Tool] = [ + MCPTool( + server_label=docs_server_label, + server_url="https://learn.microsoft.com/api/mcp", + require_approval="never", + ), + ] + + created = project_client.beta.toolboxes.create_version( + name=name, + description="Sample toolbox exposing the Microsoft Learn Docs MCP server.", + tools=tools, + headers=FOUNDRY_FEATURES_HEADERS, + ) + print(f"Created toolbox {created.name}@{created.version} ({len(created.tools)} tool(s)).") diff --git a/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/workflow.yaml b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/workflow.yaml new file mode 100644 index 0000000000..755bf7b18c --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_foundry_toolbox_mcp/workflow.yaml @@ -0,0 +1,48 @@ +# +# Calls the Microsoft Learn Docs MCP server through a Foundry toolbox +# proxy from a declarative workflow, then asks a Foundry agent to +# summarise the result. The toolbox surfaces MCP-server-backed tools +# as ``___``. +# +# Workflow inputs: +# text: The user's question (required). +# +kind: Workflow +trigger: + + kind: OnConversationStart + id: workflow_invoke_foundry_toolbox_mcp + actions: + + - kind: SetVariable + id: set_search_query + variable: Local.SearchQuery + value: =Workflow.Inputs.text + + # ``autoSend: false`` so the raw JSON tool result is not echoed to + # the host's output stream; ``conversationId`` still appends it to + # the conversation so the summarising agent can read it. + - kind: InvokeMcpTool + id: search_docs_with_toolbox + serverUrl: =Env.FOUNDRY_TOOLBOX_MCP_SERVER_URL + serverLabel: foundry_toolbox + toolName: =Env.FOUNDRY_TOOLBOX_DOCS_SERVER_LABEL & "___microsoft_docs_search" + conversationId: =System.ConversationId + headers: + Foundry-Features: Toolboxes=V1Preview + arguments: + query: =Local.SearchQuery + output: + autoSend: false + result: Local.SearchResult + + - kind: InvokeAzureAgent + id: summarize_toolbox_result + agent: + name: FoundryToolboxMcpAgent + conversationId: =System.ConversationId + input: + messages: '=Concat("Answer the query using the Microsoft Learn docs result already in the conversation: ", Local.SearchQuery)' + output: + autoSend: true + messages: Local.Summary