Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,6 @@ pub enum ReadError {
#[cfg(google_cloud_unstable_storage_bidi)]
#[error("unknown range id in bidi response: {0}")]
UnknownBidiRangeId(i64),

/// A `read_range()` request failed because the object descriptor is closed.
///
/// # Troubleshooting
///
/// The object descriptor closes only when there is an unrecoverable I/O
/// error. Consider using a more lenient [ReadResumePolicy].
///
/// [ReadResumePolicy]: [crate::read_resume_policy::ReadResumePolicy]
#[cfg(google_cloud_unstable_storage_bidi)]
#[error("read worker terminated on an unrecoverable read error")]
CannotScheduleRangeRead(#[source] BoxedSource),
}

/// An unrecoverable problem in the upload protocol.
Expand Down Expand Up @@ -338,7 +326,7 @@ impl SigningError {
SigningError(SigningErrorKind::Signing(source.into()))
}

/// A problem to sign the URL due to invalid input.
/// A problem to sign the URL due to invalid input.
pub(crate) fn invalid_parameter<S: Into<String>, T>(field: S, source: T) -> SigningError
where
T: Into<BoxError>,
Expand Down
42 changes: 29 additions & 13 deletions src/storage/src/storage/bidi/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ impl ObjectDescriptor for ObjectDescriptorTransport {

async fn read_range(&self, range: ReadRange) -> ReadObjectResponse {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let range = ActiveRead::new(tx.clone(), range.0);
if let Err(e) = self.tx.send(range).await {
let _ = tx
.send(Err(ReadError::CannotScheduleRangeRead(e.into())))
.await;
}
let range = ActiveRead::new(tx, range.0);
self.tx
.send(range)
.await
.expect("worker never exits while ObjectDescriptor is live");
ReadObjectResponse::new(Box::new(RangeReader::new(
rx,
self.object.clone(),
Expand Down Expand Up @@ -174,7 +173,7 @@ mod tests {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn read_range_error() -> anyhow::Result<()> {
use std::error::Error as _;

Expand Down Expand Up @@ -211,29 +210,46 @@ mod tests {
.set_generation(123456);
assert_eq!(transport.object(), &want, "{transport:?}");

let mut existing = transport.read_range(ReadRange::segment(100, 200)).await;

// Close the mock connection with an unrecoverable error.
// This should terminate the worker task, and the object descriptor
// should stop accepting requests.
connect_tx
.send(Err(tonic::Status::permission_denied("uh-oh")))
.await?;
drop(connect_tx);

// Wait for the worker to stop and drop the transport.tx receiver.
transport.tx.closed().await;
// Wait for the worker to stop the main loop and drop the transport.tx receiver.
let err = existing.next().await.transpose().unwrap_err();
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
source,
Some(ReadError::UnrecoverableBidiReadInterrupt(e)) if e.status().is_some()
),
"{err:?}"
);
let got = existing.next().await;
assert!(got.is_none(), "{got:?}");

// Close the mock I/O stream. From this point the `transport.read_range()`
// calls should fail.
drop(connect_tx);

// Now we know this call will fail, and we verify we get the correct
// error.
let mut reader = transport.read_range(ReadRange::segment(100, 200)).await;
let err = reader.next().await.transpose().unwrap_err();
assert!(err.is_io(), "{err:?}");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
err.source().and_then(|e| e.downcast_ref::<ReadError>()),
Some(ReadError::CannotScheduleRangeRead(_))
source,
Some(ReadError::UnrecoverableBidiReadInterrupt(e)) if e.status().is_some()
),
"{err:?}"
);
let got = reader.next().await;
assert!(got.is_none(), "{got:?}");

Ok(())
}
Expand Down
61 changes: 43 additions & 18 deletions src/storage/src/storage/bidi/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ where
// Note how this loop only exits when the `requests` queue is
// closed. A successfully closed stream and unrecoverable errors
// return immediately.
loop {
let error = loop {
tokio::select! {
m = rx.next_message() => {
match self.handle_response(m).await {
// Successful end of stream, return without error.
None => return Ok(()),
None => break None,
// An unrecoverable in the stream or its data, return
// the error.
Some(Err(e)) => return Err(e),
Some(Err(e)) => break Some(e),
// New message on the stream handled successfully,
// continue.
Some(Ok(None)) => {},
Expand All @@ -79,16 +79,24 @@ where
},
r = requests.recv_many(&mut ranges, 16) => {
if r == 0 {
break;
break None;
};
self.insert_ranges(tx.clone(), std::mem::take(&mut ranges)).await;
},
}
};
drop(rx);
drop(tx);
let Some(e) = error else {
// A successfully closed stream *and* there are no more readers.
return Ok(());
};
// Return errors for any future readers.
while let Some(mut r) = requests.recv().await {
println!("sending error after closed stream: {e:?}");
r.interrupted(e.clone()).await;
}
// No need to continue reading. The `requests` queue closes
// only when the ObjectDescriptor is gone and when all the
// associated ReadResponseReaders are gone.
Ok(())
Err(e)
}

async fn handle_response(
Expand Down Expand Up @@ -163,6 +171,7 @@ where
closing.push(active.interrupted(error.clone()));
}
let _ = closing.count().await;
guard.clear();
}

async fn insert_ranges(&mut self, tx: Sender<BidiReadObjectRequest>, readers: Vec<ActiveRead>) {
Expand Down Expand Up @@ -230,17 +239,19 @@ mod tests {
async fn run_immediately_closed() -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (_tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);

// Closing the stream without an error should not attempt a reconnect.
drop(response_tx);
let mut mock = MockTestClient::new();
mock.expect_start().never();

let connector = mock_connector(mock);
let worker = Worker::new(connector);
let result = worker.run(connection, rx).await;
let handle = tokio::spawn(worker.run(connection, rx));
// Closing the stream without an error should not attempt a reconnect.
drop(response_tx);
drop(tx);
let result = handle.await?;
assert!(result.is_ok(), "{result:?}");
Ok(())
}
Expand All @@ -251,7 +262,7 @@ mod tests {
async fn run_bad_response(range_end: bool) -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (_tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);

// Simulate a response for an unexpected read id.
Expand All @@ -269,7 +280,12 @@ mod tests {

let connector = mock_connector(mock);
let worker = Worker::new(connector);
let err = worker.run(connection, rx).await.unwrap_err();
let handle = tokio::spawn(worker.run(connection, rx));
// Wait until the response_tx/response_rx pair is closed, then close the
// request queue to terminate the worker thread.
response_tx.closed().await;
drop(tx);
let err = handle.await?.unwrap_err();
assert!(err.is_transport(), "{err:?}");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
Expand All @@ -283,20 +299,28 @@ mod tests {
async fn run_reconnect() -> anyhow::Result<()> {
let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1);
let (response_tx, response_rx) = mock_stream();
let (_tx, rx) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel(1);
let connection = Connection::new(request_tx, response_rx);

// Simulate a redirect response.
response_tx
.send(Err(redirect_status("redirect-01")))
.await?;
let mut mock = MockTestClient::new();
mock.expect_start()
.return_once(|_, _, _, _, _, _| Err(permanent_error()));
let (reconnected_tx, reconnected_rx) = tokio::sync::oneshot::channel();
mock.expect_start().return_once(move |_, _, _, _, _, _| {
let _ = reconnected_tx.send(());
Err(permanent_error())
});

let connector = mock_connector(mock);
let worker = Worker::new(connector);
let err = worker.run(connection, rx).await.unwrap_err();
let handle = tokio::spawn(worker.run(connection, rx));
// Wait until the reconnect call is made, then close the
// request queue to terminate the worker thread.
reconnected_rx.await?;
drop(tx);
let err = handle.await?.unwrap_err();
assert_eq!(err.status(), permanent_error().status());
Ok(())
}
Expand Down Expand Up @@ -634,6 +658,7 @@ mod tests {
);

// Wait for the worker to finish.
drop(tx);
let err = worker.await?.unwrap_err();
assert_eq!(err.status(), permanent_error().status());
Ok(())
Expand Down