Skip to content

Commit 7d02f45

Browse files
committed
some io stuff
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 5e5572b commit 7d02f45

7 files changed

Lines changed: 388 additions & 17 deletions

File tree

vortex-file/src/read/driver.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl State {
136136
tracing::debug!(?event, "Received ReadEvent");
137137
match event {
138138
ReadEvent::Request(req) => {
139-
if req.callback.is_closed() {
139+
if req.is_closed() {
140140
tracing::debug!(?req, "ReadRequest dropped before registration");
141141
return;
142142
}
@@ -145,7 +145,7 @@ impl State {
145145
}
146146
ReadEvent::Polled(req_id) => {
147147
if let Some(req) = self.requests.remove(&req_id) {
148-
if req.callback.is_closed() {
148+
if req.is_closed() {
149149
self.requests_by_offset.remove(&(req.offset, req_id));
150150
tracing::debug!(?req, "ReadRequest dropped before poll");
151151
} else {
@@ -192,7 +192,7 @@ impl State {
192192
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
193193
while let Some((req_id, req)) = self.polled_requests.pop_first() {
194194
self.requests_by_offset.remove(&(req.offset, req_id));
195-
if req.callback.is_closed() {
195+
if req.is_closed() {
196196
tracing::debug!("Dropping canceled request");
197197
continue;
198198
}
@@ -250,7 +250,7 @@ impl State {
250250
.vortex_expect("Missing request in requests_by_offset");
251251

252252
// Skip any cancelled requests
253-
if req.callback.is_closed() {
253+
if req.is_closed() {
254254
if ids_to_remove.insert(req_id) {
255255
keys_to_remove.push((req_offset, req_id));
256256
}

vortex-file/src/read/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,8 @@ mod driver;
55
mod request;
66

77
pub(crate) use driver::IoRequestStream;
8+
#[cfg(test)]
9+
pub(crate) use request::CoalescedRequest;
10+
pub(crate) use request::IoRequest;
811
pub(crate) use request::ReadRequest;
912
pub(crate) use request::RequestId;

vortex-file/src/read/request.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ impl IoRequest {
5757
IoRequestInner::Coalesced(req) => req.resolve(result),
5858
}
5959
}
60+
61+
/// Returns true if no callback remains to receive the read result.
62+
pub(crate) fn is_cancelled(&self) -> bool {
63+
match &self.0 {
64+
IoRequestInner::Single(req) => req.is_closed(),
65+
IoRequestInner::Coalesced(req) => req.is_cancelled(),
66+
}
67+
}
6068
}
6169

6270
// Testing functionality
@@ -100,12 +108,16 @@ impl Debug for ReadRequest {
100108
.field("offset", &self.offset)
101109
.field("length", &self.length)
102110
.field("alignment", &self.alignment)
103-
.field("is_closed", &self.callback.is_closed())
111+
.field("is_closed", &self.is_closed())
104112
.finish()
105113
}
106114
}
107115

108116
impl ReadRequest {
117+
pub(crate) fn is_closed(&self) -> bool {
118+
self.callback.is_closed()
119+
}
120+
109121
pub(crate) fn resolve(self, result: VortexResult<BufferHandle>) {
110122
if let Err(e) = self.callback.send(result) {
111123
tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
@@ -132,6 +144,10 @@ impl Debug for CoalescedRequest {
132144
}
133145

134146
impl CoalescedRequest {
147+
pub(crate) fn is_cancelled(&self) -> bool {
148+
self.requests.iter().all(ReadRequest::is_closed)
149+
}
150+
135151
pub fn resolve(self, result: VortexResult<BufferHandle>) {
136152
match result {
137153
Ok(buffer) => {

vortex-file/src/segments/source.rs

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use vortex_metrics::MetricBuilder;
3030
use vortex_metrics::MetricsRegistry;
3131

3232
use crate::SegmentSpec;
33+
use crate::read::IoRequest;
3334
use crate::read::IoRequestStream;
3435
use crate::read::ReadRequest;
3536
use crate::read::RequestId;
@@ -111,12 +112,7 @@ impl FileSegmentSource {
111112
stream
112113
.map(move |req| {
113114
let reader = reader.clone();
114-
async move {
115-
let result = reader
116-
.read_at(req.offset(), req.len(), req.alignment())
117-
.await;
118-
req.resolve(result);
119-
}
115+
drive_request(reader, req)
120116
})
121117
.buffer_unordered(concurrency)
122118
.collect::<()>()
@@ -133,6 +129,22 @@ impl FileSegmentSource {
133129
}
134130
}
135131

132+
async fn drive_request<R: VortexReadAt>(reader: R, req: IoRequest) {
133+
if req.is_cancelled() {
134+
tracing::debug!(
135+
offset = req.offset(),
136+
length = req.len(),
137+
"Skipping cancelled I/O request"
138+
);
139+
return;
140+
}
141+
142+
let result = reader
143+
.read_at(req.offset(), req.len(), req.alignment())
144+
.await;
145+
req.resolve(result);
146+
}
147+
136148
impl SegmentSource for FileSegmentSource {
137149
fn request(&self, id: SegmentId) -> SegmentFuture {
138150
// We eagerly register the read request here assuming the behaviour of [`FileRead`], where
@@ -295,3 +307,133 @@ impl SegmentSource for BufferSegmentSource {
295307
future::ready(Ok(BufferHandle::new_host(slice))).boxed()
296308
}
297309
}
310+
311+
#[cfg(test)]
312+
mod tests {
313+
use std::sync::Arc;
314+
use std::sync::atomic::AtomicUsize;
315+
use std::sync::atomic::Ordering;
316+
317+
use futures::future::BoxFuture;
318+
319+
use super::*;
320+
use crate::read::CoalescedRequest;
321+
322+
#[derive(Clone, Default)]
323+
struct CountingRead {
324+
read_count: Arc<AtomicUsize>,
325+
}
326+
327+
impl VortexReadAt for CountingRead {
328+
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
329+
async { Ok(1024) }.boxed()
330+
}
331+
332+
fn concurrency(&self) -> usize {
333+
1
334+
}
335+
336+
fn read_at(
337+
&self,
338+
_offset: u64,
339+
length: usize,
340+
alignment: Alignment,
341+
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
342+
self.read_count.fetch_add(1, Ordering::Relaxed);
343+
async move {
344+
let buffer = ByteBuffer::copy_from(vec![0; length]).aligned(alignment);
345+
Ok(BufferHandle::new_host(buffer))
346+
}
347+
.boxed()
348+
}
349+
}
350+
351+
#[tokio::test]
352+
async fn drive_request_skips_cancelled_single_request() {
353+
let reader = CountingRead::default();
354+
let (callback, receiver) = oneshot::channel();
355+
drop(receiver);
356+
357+
let req = IoRequest::new_single(ReadRequest {
358+
id: 0,
359+
offset: 0,
360+
length: 16,
361+
alignment: Alignment::none(),
362+
callback,
363+
});
364+
365+
drive_request(reader.clone(), req).await;
366+
367+
assert_eq!(reader.read_count.load(Ordering::Relaxed), 0);
368+
}
369+
370+
#[tokio::test]
371+
async fn drive_request_skips_fully_cancelled_coalesced_request() {
372+
let reader = CountingRead::default();
373+
let (callback1, receiver1) = oneshot::channel();
374+
let (callback2, receiver2) = oneshot::channel();
375+
drop(receiver1);
376+
drop(receiver2);
377+
378+
let req = IoRequest::new_coalesced(CoalescedRequest {
379+
range: 0..32,
380+
alignment: Alignment::none(),
381+
requests: vec![
382+
ReadRequest {
383+
id: 0,
384+
offset: 0,
385+
length: 16,
386+
alignment: Alignment::none(),
387+
callback: callback1,
388+
},
389+
ReadRequest {
390+
id: 1,
391+
offset: 16,
392+
length: 16,
393+
alignment: Alignment::none(),
394+
callback: callback2,
395+
},
396+
],
397+
});
398+
399+
drive_request(reader.clone(), req).await;
400+
401+
assert_eq!(reader.read_count.load(Ordering::Relaxed), 0);
402+
}
403+
404+
#[tokio::test]
405+
async fn drive_request_reads_coalesced_request_with_live_receiver() -> VortexResult<()> {
406+
let reader = CountingRead::default();
407+
let (callback1, receiver1) = oneshot::channel();
408+
let (callback2, receiver2) = oneshot::channel();
409+
drop(receiver1);
410+
411+
let req = IoRequest::new_coalesced(CoalescedRequest {
412+
range: 0..32,
413+
alignment: Alignment::none(),
414+
requests: vec![
415+
ReadRequest {
416+
id: 0,
417+
offset: 0,
418+
length: 16,
419+
alignment: Alignment::none(),
420+
callback: callback1,
421+
},
422+
ReadRequest {
423+
id: 1,
424+
offset: 16,
425+
length: 16,
426+
alignment: Alignment::none(),
427+
callback: callback2,
428+
},
429+
],
430+
});
431+
432+
drive_request(reader.clone(), req).await;
433+
434+
let buffer = receiver2.await.expect("live receiver should resolve")?;
435+
assert_eq!(buffer.len(), 16);
436+
assert_eq!(reader.read_count.load(Ordering::Relaxed), 1);
437+
Ok(())
438+
}
439+
}

0 commit comments

Comments
 (0)