Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions crates/rmcp/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,17 @@ pub struct JsonRpcResponse<R = JsonObject> {
#[expect(clippy::exhaustive_structs, reason = "intentionally exhaustive")]
pub struct JsonRpcError {
pub jsonrpc: JsonRpcVersion2_0,
pub id: RequestId,
// MCP 2025-11-25 §Error Responses: `id` is optional and omitted when the
// server cannot read the request id (e.g. parse error / invalid request).
// https://modelcontextprotocol.io/specification/2025-11-25/basic#error-responses
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<RequestId>,
pub error: ErrorData,
}

impl JsonRpcError {
/// Create a new JsonRpcError.
pub fn new(id: RequestId, error: ErrorData) -> Self {
pub fn new(id: Option<RequestId>, error: ErrorData) -> Self {
Self {
jsonrpc: JsonRpcVersion2_0,
id,
Expand Down Expand Up @@ -601,7 +605,7 @@ impl<Req, Resp, Not> JsonRpcMessage<Req, Resp, Not> {
})
}
#[inline]
pub const fn error(error: ErrorData, id: RequestId) -> Self {
pub const fn error(error: ErrorData, id: Option<RequestId>) -> Self {
JsonRpcMessage::Error(JsonRpcError {
jsonrpc: JsonRpcVersion2_0,
id,
Expand Down Expand Up @@ -633,15 +637,15 @@ impl<Req, Resp, Not> JsonRpcMessage<Req, Resp, Not> {
_ => None,
}
}
pub fn into_error(self) -> Option<(ErrorData, RequestId)> {
pub fn into_error(self) -> Option<(ErrorData, Option<RequestId>)> {
match self {
JsonRpcMessage::Error(e) => Some((e.error, e.id)),
_ => None,
}
}
pub fn into_result(self) -> Option<(Result<Resp, ErrorData>, RequestId)> {
pub fn into_result(self) -> Option<(Result<Resp, ErrorData>, Option<RequestId>)> {
match self {
JsonRpcMessage::Response(r) => Some((Ok(r.result), r.id)),
JsonRpcMessage::Response(r) => Some((Ok(r.result), Some(r.id))),
JsonRpcMessage::Error(e) => Some((Err(e.error), e.id)),

_ => None,
Expand Down
10 changes: 8 additions & 2 deletions crates/rmcp/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ where
Event::ToSink(m) => {
if let Some(id) = match &m {
JsonRpcMessage::Response(response) => Some(&response.id),
JsonRpcMessage::Error(error) => Some(&error.id),
JsonRpcMessage::Error(error) => error.id.as_ref(),
_ => None,
} {
if let Some(ct) = local_ct_pool.remove(id) {
Expand Down Expand Up @@ -971,7 +971,7 @@ where
}
Err(error) => {
tracing::warn!(%id, ?error, "response error");
JsonRpcMessage::error(error, id)
JsonRpcMessage::error(error, Some(id))
}
};
let _send_result = sink.send(response).await;
Expand Down Expand Up @@ -1028,6 +1028,12 @@ where
}
}
Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
let Some(id) = id else {
// MCP error responses without an id (e.g. Parse error / Invalid Request)
// can't be routed back to a pending request — log and drop.
tracing::debug!(?error, "received id-less peer error");
continue;
};
if let Some(responder) = local_responder_pool.remove(&id) {
let _response_result = responder.send(Err(ServiceError::McpError(error)));
if let Err(_error) = _response_result {
Expand Down
2 changes: 1 addition & 1 deletion crates/rmcp/src/service/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where
}
Err(e) => {
transport
.send(ServerJsonRpcMessage::error(e.clone(), id))
.send(ServerJsonRpcMessage::error(e.clone(), Some(id)))
.await
.map_err(|error| {
ServerInitializeError::transport::<T>(error, "sending error response")
Expand Down
157 changes: 132 additions & 25 deletions crates/rmcp/src/transport/async_rw.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use std::{marker::PhantomData, sync::Arc};

// use crate::schema::*;
use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use serde::{Serialize, de::DeserializeOwned};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite},
io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader},
sync::Mutex,
};
use tokio_util::{
bytes::{Buf, BufMut, BytesMut},
codec::{Decoder, Encoder, FramedRead, FramedWrite},
codec::{Decoder, Encoder, FramedWrite},
};

use super::{IntoTransport, Transport};
use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
use crate::{
model::ErrorData,
service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage},
};

#[non_exhaustive]
pub enum TransportAdapterAsyncRW {}
Expand Down Expand Up @@ -47,8 +49,10 @@ where
pub type TransportWriter<Role, W> = FramedWrite<W, JsonRpcMessageCodec<TxJsonRpcMessage<Role>>>;

pub struct AsyncRwTransport<Role: ServiceRole, R: AsyncRead, W: AsyncWrite> {
read: FramedRead<R, JsonRpcMessageCodec<RxJsonRpcMessage<Role>>>,
read: BufReader<R>,
line_buf: Vec<u8>,
write: Arc<Mutex<Option<TransportWriter<Role, W>>>>,
_role: PhantomData<fn() -> Role>,
}

impl<Role: ServiceRole, R, W> AsyncRwTransport<Role, R, W>
Expand All @@ -57,15 +61,17 @@ where
W: Send + AsyncWrite + Unpin + 'static,
{
pub fn new(read: R, write: W) -> Self {
let read = FramedRead::new(
read,
JsonRpcMessageCodec::<RxJsonRpcMessage<Role>>::default(),
);
let read = BufReader::new(read);
let write = Arc::new(Mutex::new(Some(FramedWrite::new(
write,
JsonRpcMessageCodec::<TxJsonRpcMessage<Role>>::default(),
))));
Self { read, write }
Self {
read,
line_buf: Vec::new(),
write,
_role: PhantomData,
}
}
}

Expand Down Expand Up @@ -116,15 +122,43 @@ where
}
}

fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<Role>>> {
let next = self.read.next();
async {
next.await.and_then(|e| {
e.inspect_err(|e| {
async fn receive(&mut self) -> Option<RxJsonRpcMessage<Role>> {
loop {
self.line_buf.clear();
match self.read.read_until(b'\n', &mut self.line_buf).await {
Ok(0) => return None,
Ok(_) => {}
Err(e) => {
tracing::error!("Error reading from stream: {}", e);
})
.ok()
})
return None;
}
}
let line = without_carriage_return(
self.line_buf.strip_suffix(b"\n").unwrap_or(&self.line_buf),
);
if line.is_empty() {
continue;
}
match try_parse_with_compatibility::<RxJsonRpcMessage<Role>>(line, "receive") {
Ok(Some(msg)) => return Some(msg),
Ok(None) => continue,
Err(JsonRpcMessageCodecError::Serde(e)) => {
tracing::debug!("Parse error on incoming message: {e}");
let mut write = self.write.lock().await;
let framed = write.as_mut()?;
let response = TxJsonRpcMessage::<Role>::error(
ErrorData::parse_error("Parse error", None),
None,
);
if framed.send(response).await.is_err() {
return None;
}
}
Err(e) => {
tracing::error!("Error reading from stream: {}", e);
return None;
}
}
}
}

Expand Down Expand Up @@ -172,13 +206,12 @@ impl<T> JsonRpcMessageCodec<T> {
}

fn without_carriage_return(s: &[u8]) -> &[u8] {
if let Some(&b'\r') = s.last() {
&s[..s.len() - 1]
} else {
s
}
s.strip_suffix(b"\r").unwrap_or(s)
}

/// UTF-8 byte order mark. RFC 8259 §8.1 allows JSON parsers to ignore a leading BOM.
const UTF8_BOM: &[u8; 3] = b"\xEF\xBB\xBF";

/// Check if a method is a standard MCP method (request, response, or notification).
/// This includes both requests and notifications defined in the MCP specification.
///
Expand Down Expand Up @@ -247,6 +280,7 @@ fn try_parse_with_compatibility<T: serde::de::DeserializeOwned>(
line: &[u8],
context: &str,
) -> Result<Option<T>, JsonRpcMessageCodecError> {
let line = line.strip_prefix(UTF8_BOM.as_slice()).unwrap_or(line);
if let Ok(line_str) = std::str::from_utf8(line) {
match serde_json::from_slice(line) {
Ok(item) => Ok(Some(item)),
Expand Down Expand Up @@ -406,7 +440,8 @@ impl<T: Serialize> Encoder<T> for JsonRpcMessageCodec<T> {

#[cfg(test)]
mod test {
use futures::{Sink, Stream};
use futures::{Sink, Stream, StreamExt};
use tokio_util::codec::FramedRead;

use super::*;
fn from_async_read<T: DeserializeOwned, R: AsyncRead>(reader: R) -> impl Stream<Item = T> {
Expand Down Expand Up @@ -555,4 +590,76 @@ mod test {

println!("Standard notifications are preserved, non-standard are handled gracefully");
}

#[tokio::test]
async fn test_decode_strips_utf8_bom() {
use futures::StreamExt;
use tokio::io::BufReader;

// Valid JSON-RPC message preceded by a UTF-8 BOM (EF BB BF). Some Windows
// tooling and editors prepend this; the codec should ignore it per RFC 8259 §8.1.
let mut data = Vec::new();
data.extend_from_slice(UTF8_BOM);
data.extend_from_slice(br#"{"jsonrpc":"2.0","method":"ping","id":1}"#);
data.push(b'\n');

let mut cursor = BufReader::new(&data[..]);
let mut stream = from_async_read::<serde_json::Value, _>(&mut cursor);

let item = stream
.next()
.await
.expect("should decode BOM-prefixed line");
assert_eq!(
item,
serde_json::json!({"jsonrpc": "2.0", "method": "ping", "id": 1})
);
}

#[cfg(feature = "server")]
#[tokio::test]
async fn receive_recovers_from_parse_error() {
use tokio::io::AsyncWriteExt;

use crate::{RoleServer, transport::Transport};

// Two paired streams: `server_io` is wrapped by the transport; the test
// drives `client_io` to act as the peer.
let (server_io, client_io) = tokio::io::duplex(4096);
let (server_r, server_w) = tokio::io::split(server_io);
let (mut client_r, mut client_w) = tokio::io::split(client_io);

let mut transport = AsyncRwTransport::<RoleServer, _, _>::new(server_r, server_w);

client_w
.write_all(
b"not json\n{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n",
)
.await
.unwrap();

let received = transport
.receive()
.await
.expect("transport should recover and yield the next valid message");

// Read one line back from the peer side and parse as JSON.
let mut reply_buf = Vec::new();
let mut peer = tokio::io::BufReader::new(&mut client_r);
peer.read_until(b'\n', &mut reply_buf).await.unwrap();
let reply: serde_json::Value = serde_json::from_slice(&reply_buf).unwrap();

// Per MCP 2025-11-25: id is omitted when the server can't read the request id.
assert_eq!(
reply,
serde_json::json!({
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
})
);
assert_eq!(
serde_json::to_value(&received).unwrap()["method"],
"notifications/initialized",
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,12 @@ impl LocalSessionWorker {
}
}
ServerJsonRpcMessage::Error(json_rpc_error) => {
if let Some(id) = self
.resource_router
.get(&ResourceKey::McpRequestId(json_rpc_error.id.clone()))
{
OutboundChannel::RequestWise {
id: *id,
close: true,
}
if let Some(id) = json_rpc_error.id.clone().and_then(|rid| {
self.resource_router
.get(&ResourceKey::McpRequestId(rid))
.copied()
}) {
OutboundChannel::RequestWise { id, close: true }
} else {
OutboundChannel::Common
}
Expand Down Expand Up @@ -1041,8 +1039,7 @@ impl Worker for LocalSessionWorker {
Some(ResourceKey::McpRequestId(request_id))
}
crate::model::JsonRpcMessage::Error(json_rpc_error) => {
let request_id = json_rpc_error.id.clone();
Some(ResourceKey::McpRequestId(request_id))
json_rpc_error.id.clone().map(ResourceKey::McpRequestId)
}
_ => {
None
Expand Down
2 changes: 1 addition & 1 deletion crates/rmcp/tests/test_client_initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn test_client_init_handles_jsonrpc_error() {

let error_msg = ServerJsonRpcMessage::Error(JsonRpcError {
jsonrpc: JsonRpcVersion2_0,
id: RequestId::Number(1),
id: Some(RequestId::Number(1)),
error: ErrorData {
code: ErrorCode(-32600),
message: Cow::Borrowed("Invalid Request"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,15 +862,21 @@
"$ref": "#/definitions/ErrorData"
},
"id": {
"$ref": "#/definitions/NumberOrString"
"anyOf": [
{
"$ref": "#/definitions/NumberOrString"
},
{
"type": "null"
}
]
},
"jsonrpc": {
"$ref": "#/definitions/JsonRpcVersion2_0"
}
},
"required": [
"jsonrpc",
"id",
"error"
]
},
Expand Down
Loading
Loading