diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 24699b03b2..373aec55cf 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -209,18 +209,6 @@ pub enum ReadError { #[cfg(google_cloud_unstable_storage_bidi)] #[error("unknown range id in bidi response: {0}")] UnknownBidiRangeId(i64), - - /// A `read_range()` request failed because the object descriptor is closed. - /// - /// # Troubleshooting - /// - /// The object descriptor closes only when there is an unrecoverable I/O - /// error. Consider using a more lenient [ReadResumePolicy]. - /// - /// [ReadResumePolicy]: [crate::read_resume_policy::ReadResumePolicy] - #[cfg(google_cloud_unstable_storage_bidi)] - #[error("read worker terminated on an unrecoverable read error")] - CannotScheduleRangeRead(#[source] BoxedSource), } /// An unrecoverable problem in the upload protocol. @@ -338,7 +326,7 @@ impl SigningError { SigningError(SigningErrorKind::Signing(source.into())) } - /// A problem to sign the URL due to invalid input. + /// A problem to sign the URL due to invalid input. pub(crate) fn invalid_parameter, T>(field: S, source: T) -> SigningError where T: Into, diff --git a/src/storage/src/storage/bidi/transport.rs b/src/storage/src/storage/bidi/transport.rs index 8b5b7e8d72..5fe2b5e305 100644 --- a/src/storage/src/storage/bidi/transport.rs +++ b/src/storage/src/storage/bidi/transport.rs @@ -65,12 +65,11 @@ impl ObjectDescriptor for ObjectDescriptorTransport { async fn read_range(&self, range: ReadRange) -> ReadObjectResponse { let (tx, rx) = tokio::sync::mpsc::channel(100); - let range = ActiveRead::new(tx.clone(), range.0); - if let Err(e) = self.tx.send(range).await { - let _ = tx - .send(Err(ReadError::CannotScheduleRangeRead(e.into()))) - .await; - } + let range = ActiveRead::new(tx, range.0); + self.tx + .send(range) + .await + .expect("worker never exits while ObjectDescriptor is live"); ReadObjectResponse::new(Box::new(RangeReader::new( rx, self.object.clone(), @@ -174,7 +173,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn read_range_error() -> anyhow::Result<()> { use std::error::Error as _; @@ -211,29 +210,46 @@ mod tests { .set_generation(123456); assert_eq!(transport.object(), &want, "{transport:?}"); + let mut existing = transport.read_range(ReadRange::segment(100, 200)).await; + // Close the mock connection with an unrecoverable error. // This should terminate the worker task, and the object descriptor // should stop accepting requests. connect_tx .send(Err(tonic::Status::permission_denied("uh-oh"))) .await?; - drop(connect_tx); - // Wait for the worker to stop and drop the transport.tx receiver. - transport.tx.closed().await; + // Wait for the worker to stop the main loop and drop the transport.tx receiver. + let err = existing.next().await.transpose().unwrap_err(); + let source = err.source().and_then(|e| e.downcast_ref::()); + assert!( + matches!( + source, + Some(ReadError::UnrecoverableBidiReadInterrupt(e)) if e.status().is_some() + ), + "{err:?}" + ); + let got = existing.next().await; + assert!(got.is_none(), "{got:?}"); + + // Close the mock I/O stream. From this point the `transport.read_range()` + // calls should fail. + drop(connect_tx); // Now we know this call will fail, and we verify we get the correct // error. let mut reader = transport.read_range(ReadRange::segment(100, 200)).await; let err = reader.next().await.transpose().unwrap_err(); - assert!(err.is_io(), "{err:?}"); + let source = err.source().and_then(|e| e.downcast_ref::()); assert!( matches!( - err.source().and_then(|e| e.downcast_ref::()), - Some(ReadError::CannotScheduleRangeRead(_)) + source, + Some(ReadError::UnrecoverableBidiReadInterrupt(e)) if e.status().is_some() ), "{err:?}" ); + let got = reader.next().await; + assert!(got.is_none(), "{got:?}"); Ok(()) } diff --git a/src/storage/src/storage/bidi/worker.rs b/src/storage/src/storage/bidi/worker.rs index 64105a4e59..82b8b072a2 100644 --- a/src/storage/src/storage/bidi/worker.rs +++ b/src/storage/src/storage/bidi/worker.rs @@ -58,15 +58,15 @@ where // Note how this loop only exits when the `requests` queue is // closed. A successfully closed stream and unrecoverable errors // return immediately. - loop { + let error = loop { tokio::select! { m = rx.next_message() => { match self.handle_response(m).await { // Successful end of stream, return without error. - None => return Ok(()), + None => break None, // An unrecoverable in the stream or its data, return // the error. - Some(Err(e)) => return Err(e), + Some(Err(e)) => break Some(e), // New message on the stream handled successfully, // continue. Some(Ok(None)) => {}, @@ -79,16 +79,24 @@ where }, r = requests.recv_many(&mut ranges, 16) => { if r == 0 { - break; + break None; }; self.insert_ranges(tx.clone(), std::mem::take(&mut ranges)).await; }, } + }; + drop(rx); + drop(tx); + let Some(e) = error else { + // A successfully closed stream *and* there are no more readers. + return Ok(()); + }; + // Return errors for any future readers. + while let Some(mut r) = requests.recv().await { + println!("sending error after closed stream: {e:?}"); + r.interrupted(e.clone()).await; } - // No need to continue reading. The `requests` queue closes - // only when the ObjectDescriptor is gone and when all the - // associated ReadResponseReaders are gone. - Ok(()) + Err(e) } async fn handle_response( @@ -163,6 +171,7 @@ where closing.push(active.interrupted(error.clone())); } let _ = closing.count().await; + guard.clear(); } async fn insert_ranges(&mut self, tx: Sender, readers: Vec) { @@ -230,17 +239,19 @@ mod tests { async fn run_immediately_closed() -> anyhow::Result<()> { let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1); let (response_tx, response_rx) = mock_stream(); - let (_tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); let connection = Connection::new(request_tx, response_rx); - // Closing the stream without an error should not attempt a reconnect. - drop(response_tx); let mut mock = MockTestClient::new(); mock.expect_start().never(); let connector = mock_connector(mock); let worker = Worker::new(connector); - let result = worker.run(connection, rx).await; + let handle = tokio::spawn(worker.run(connection, rx)); + // Closing the stream without an error should not attempt a reconnect. + drop(response_tx); + drop(tx); + let result = handle.await?; assert!(result.is_ok(), "{result:?}"); Ok(()) } @@ -251,7 +262,7 @@ mod tests { async fn run_bad_response(range_end: bool) -> anyhow::Result<()> { let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1); let (response_tx, response_rx) = mock_stream(); - let (_tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); let connection = Connection::new(request_tx, response_rx); // Simulate a response for an unexpected read id. @@ -269,7 +280,12 @@ mod tests { let connector = mock_connector(mock); let worker = Worker::new(connector); - let err = worker.run(connection, rx).await.unwrap_err(); + let handle = tokio::spawn(worker.run(connection, rx)); + // Wait until the response_tx/response_rx pair is closed, then close the + // request queue to terminate the worker thread. + response_tx.closed().await; + drop(tx); + let err = handle.await?.unwrap_err(); assert!(err.is_transport(), "{err:?}"); let source = err.source().and_then(|e| e.downcast_ref::()); assert!( @@ -283,7 +299,7 @@ mod tests { async fn run_reconnect() -> anyhow::Result<()> { let (request_tx, _request_rx) = tokio::sync::mpsc::channel(1); let (response_tx, response_rx) = mock_stream(); - let (_tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); let connection = Connection::new(request_tx, response_rx); // Simulate a redirect response. @@ -291,12 +307,20 @@ mod tests { .send(Err(redirect_status("redirect-01"))) .await?; let mut mock = MockTestClient::new(); - mock.expect_start() - .return_once(|_, _, _, _, _, _| Err(permanent_error())); + let (reconnected_tx, reconnected_rx) = tokio::sync::oneshot::channel(); + mock.expect_start().return_once(move |_, _, _, _, _, _| { + let _ = reconnected_tx.send(()); + Err(permanent_error()) + }); let connector = mock_connector(mock); let worker = Worker::new(connector); - let err = worker.run(connection, rx).await.unwrap_err(); + let handle = tokio::spawn(worker.run(connection, rx)); + // Wait until the reconnect call is made, then close the + // request queue to terminate the worker thread. + reconnected_rx.await?; + drop(tx); + let err = handle.await?.unwrap_err(); assert_eq!(err.status(), permanent_error().status()); Ok(()) } @@ -634,6 +658,7 @@ mod tests { ); // Wait for the worker to finish. + drop(tx); let err = worker.await?.unwrap_err(); assert_eq!(err.status(), permanent_error().status()); Ok(())