Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vortex-file/src/read/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl State {
tracing::debug!(?event, "Received ReadEvent");
match event {
ReadEvent::Request(req) => {
if req.callback.is_closed() {
if req.is_closed() {
tracing::debug!(?req, "ReadRequest dropped before registration");
return;
}
Expand All @@ -145,7 +145,7 @@ impl State {
}
ReadEvent::Polled(req_id) => {
if let Some(req) = self.requests.remove(&req_id) {
if req.callback.is_closed() {
if req.is_closed() {
self.requests_by_offset.remove(&(req.offset, req_id));
tracing::debug!(?req, "ReadRequest dropped before poll");
} else {
Expand Down Expand Up @@ -192,7 +192,7 @@ impl State {
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
while let Some((req_id, req)) = self.polled_requests.pop_first() {
self.requests_by_offset.remove(&(req.offset, req_id));
if req.callback.is_closed() {
if req.is_closed() {
tracing::debug!("Dropping canceled request");
continue;
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl State {
.vortex_expect("Missing request in requests_by_offset");

// Skip any cancelled requests
if req.callback.is_closed() {
if req.is_closed() {
if ids_to_remove.insert(req_id) {
keys_to_remove.push((req_offset, req_id));
}
Expand Down
3 changes: 3 additions & 0 deletions vortex-file/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ mod driver;
mod request;

pub(crate) use driver::IoRequestStream;
#[cfg(test)]
pub(crate) use request::CoalescedRequest;
pub(crate) use request::IoRequest;
pub(crate) use request::ReadRequest;
pub(crate) use request::RequestId;
18 changes: 17 additions & 1 deletion vortex-file/src/read/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ impl IoRequest {
IoRequestInner::Coalesced(req) => req.resolve(result),
}
}

/// Returns true if no callback remains to receive the read result.
pub(crate) fn is_cancelled(&self) -> bool {
match &self.0 {
IoRequestInner::Single(req) => req.is_closed(),
IoRequestInner::Coalesced(req) => req.is_cancelled(),
}
}
}

// Testing functionality
Expand Down Expand Up @@ -100,12 +108,16 @@ impl Debug for ReadRequest {
.field("offset", &self.offset)
.field("length", &self.length)
.field("alignment", &self.alignment)
.field("is_closed", &self.callback.is_closed())
.field("is_closed", &self.is_closed())
.finish()
}
}

impl ReadRequest {
pub(crate) fn is_closed(&self) -> bool {
self.callback.is_closed()
}

pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
if let Err(e) = self.callback.send(result) {
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
Expand All @@ -132,6 +144,10 @@ impl Debug for CoalescedRequest {
}

impl CoalescedRequest {
pub(crate) fn is_cancelled(&self) -> bool {
self.requests.iter().all(ReadRequest::is_closed)
}

pub fn resolve(self, result: VortexResult<BufferHandle>) {
match result {
Ok(buffer) => {
Expand Down
Loading
Loading