Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/mcp/shared/inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,20 @@ def find_invalid_x_mcp_header(input_schema: Any) -> str | None:
return f"{X_MCP_HEADER_KEY} found at a schema position not reachable via a pure `properties` chain"
where = ".".join(path)
header = schema[X_MCP_HEADER_KEY]
if not isinstance(header, str) or not _RFC9110_TOKEN.fullmatch(header):
# Wrong type and malformed value are distinct failures with distinct messages: the
# non-str arm returns before any interpolation, because `repr` of an arbitrary
# schema value is not total (a large `int` exceeds `sys.get_int_max_str_digits`).
if not isinstance(header, str):
return f"property {where!r}: {X_MCP_HEADER_KEY} must be a string, not {type(header).__name__}"
if not _RFC9110_TOKEN.fullmatch(header):
return f"property {where!r}: {X_MCP_HEADER_KEY} {header!r} is not an RFC 9110 token"
prop_type = schema.get("type")
if not isinstance(prop_type, str) or prop_type not in _X_MCP_HEADER_PRIMITIVE_TYPES:
if not isinstance(prop_type, str):
return (
f"property {where!r}: {X_MCP_HEADER_KEY} is only permitted on "
f"integer/string/boolean properties (the type keyword is {type(prop_type).__name__}, not a string)"
)
if prop_type not in _X_MCP_HEADER_PRIMITIVE_TYPES:
return (
f"property {where!r}: {X_MCP_HEADER_KEY} is only permitted on "
f"integer/string/boolean properties (got {prop_type!r})"
Expand Down
74 changes: 52 additions & 22 deletions tests/shared/test_inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from mcp_types.version import LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION, MODERN_PROTOCOL_VERSIONS

from mcp.shared.inbound import (
_SUBSCHEMA_LIST,
_SUBSCHEMA_MAP,
_SUBSCHEMA_SINGLE,
ERROR_CODE_HTTP_STATUS,
MCP_METHOD_HEADER,
MCP_NAME_HEADER,
Expand Down Expand Up @@ -378,6 +381,10 @@ def _schema(**props: Any) -> dict[str, Any]:
_schema(a={"type": "string", "const": {"x-mcp-header": "ignored"}}),
id="annotation-lookalike-in-const-is-data",
),
pytest.param(
_schema(a={"type": "string", "enum": [{"x-mcp-header": "ignored"}]}),
id="annotation-lookalike-in-enum-is-data",
),
pytest.param(
{"properties": {"a": {"type": "string", "x-mcp-header": "R"}}, "$ref": "#/$defs/loop"},
id="ref-is-not-dereferenced",
Expand All @@ -404,12 +411,14 @@ def test_find_invalid_x_mcp_header_accepts_valid_or_absent_annotations(input_sch
pytest.param(_schema(a={"type": "string", "x-mcp-header": "Région"}), id="non-ascii"),
pytest.param(_schema(a={"type": "string", "x-mcp-header": "Region\t1"}), id="control-char"),
pytest.param(_schema(a={"type": "string", "x-mcp-header": 42}), id="non-string"),
pytest.param(_schema(a={"type": "string", "x-mcp-header": 10**5000}), id="oversized-int-header"),
pytest.param(_schema(a={"type": "object", "x-mcp-header": "Data"}), id="on-object"),
pytest.param(_schema(a={"type": "array", "x-mcp-header": "Items"}), id="on-array"),
pytest.param(_schema(a={"type": "null", "x-mcp-header": "Nil"}), id="on-null"),
pytest.param(_schema(a={"type": "number", "x-mcp-header": "Ratio"}), id="on-number"),
pytest.param(_schema(a={"type": ["string", "null"], "x-mcp-header": "Maybe"}), id="array-type"),
pytest.param(_schema(a={"type": {"not": "valid"}, "x-mcp-header": "Bad"}), id="dict-type"),
pytest.param(_schema(a={"type": 10**5000, "x-mcp-header": "Big"}), id="oversized-int-type"),
pytest.param(_schema(a={"x-mcp-header": "NoType"}), id="missing-type"),
pytest.param(
_schema(a={"type": "string", "x-mcp-header": "Region"}, b={"type": "string", "x-mcp-header": "Region"}),
Expand All @@ -420,28 +429,8 @@ def test_find_invalid_x_mcp_header_accepts_valid_or_absent_annotations(input_sch
id="duplicate-diff-case",
),
pytest.param(
_schema(a={"type": "array", "items": {"type": "string", "x-mcp-header": "X"}}),
id="under-items",
),
pytest.param(
{"allOf": [{"properties": {"a": {"type": "string", "x-mcp-header": "X"}}}]},
id="under-allOf",
),
pytest.param(
{"oneOf": [{"type": "string", "x-mcp-header": "X"}]},
id="under-oneOf",
),
pytest.param(
_schema(a={"if": {"type": "string", "x-mcp-header": "X"}}),
id="under-if",
),
pytest.param(
{"$defs": {"T": {"type": "string", "x-mcp-header": "X"}}, "properties": {}},
id="under-defs",
),
pytest.param(
{"patternProperties": {"^a": {"type": "string", "x-mcp-header": "X"}}},
id="under-patternProperties",
{"allOf": [{"type": "object", "properties": {"a": {"type": "string", "x-mcp-header": "X"}}}]},
id="properties-chain-not-restored-below-an-applicator",
),
pytest.param(
{"type": "string", "x-mcp-header": "X"},
Expand Down Expand Up @@ -470,6 +459,47 @@ def test_find_invalid_x_mcp_header_rejects_malformed_annotations(input_schema: d
assert isinstance(find_invalid_x_mcp_header(input_schema), str)


# Keyword → a value of that keyword's own JSON Schema shape carrying an annotated subschema.
# Deliberately a literal table, independent of the `_SUBSCHEMA_*` sets in `inbound.py`:
# dropping a keyword from the walk must FAIL its case here, not shrink the parametrization.
_ANNOTATED = {"type": "string", "x-mcp-header": "Region"}
_APPLICATOR_CASES: dict[str, Any] = {
"$defs": {"T": _ANNOTATED},
"additionalProperties": _ANNOTATED,
"allOf": [_ANNOTATED],
"anyOf": [_ANNOTATED],
"contains": _ANNOTATED,
"contentSchema": _ANNOTATED,
"definitions": {"T": _ANNOTATED},
"dependentSchemas": {"k": _ANNOTATED},
"else": _ANNOTATED,
"if": _ANNOTATED,
"items": _ANNOTATED,
"not": _ANNOTATED,
"oneOf": [_ANNOTATED],
"patternProperties": {"^a": _ANNOTATED},
"prefixItems": [_ANNOTATED],
"propertyNames": _ANNOTATED,
"then": _ANNOTATED,
"unevaluatedItems": _ANNOTATED,
"unevaluatedProperties": _ANNOTATED,
}


@pytest.mark.parametrize("keyword", sorted(_APPLICATOR_CASES))
def test_find_invalid_x_mcp_header_rejects_annotations_under_every_non_properties_applicator(keyword: str) -> None:
"""Spec-mandated: a property reached through any applicator other than `properties` is not
statically reachable, so its annotation invalidates the whole tool definition."""
schema = _schema(ok={"type": "string"}) | {keyword: _APPLICATOR_CASES[keyword]}
assert isinstance(find_invalid_x_mcp_header(schema), str)


def test_schema_walk_applicator_keywords_match_the_pinned_reject_cases() -> None:
"""SDK-defined: a keyword added to the walk must gain a literal reject case above (a removed
keyword already fails its case there)."""
assert _SUBSCHEMA_LIST | _SUBSCHEMA_MAP | _SUBSCHEMA_SINGLE == set(_APPLICATOR_CASES)


def test_find_invalid_x_mcp_header_reports_dotted_path_for_nested_property() -> None:
"""SDK-defined: the reason string names the nested property by its dotted `properties` path."""
schema = _schema(outer={"type": "object", "properties": {"r": {"type": "object", "x-mcp-header": "R"}}})
Expand Down
Loading