diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..86cc15dced8 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -136,7 +136,7 @@ impl State { tracing::debug!(?event, "Received ReadEvent"); match event { ReadEvent::Request(req) => { - if req.callback.is_closed() { + if req.is_closed() { tracing::debug!(?req, "ReadRequest dropped before registration"); return; } @@ -145,7 +145,7 @@ impl State { } ReadEvent::Polled(req_id) => { if let Some(req) = self.requests.remove(&req_id) { - if req.callback.is_closed() { + if req.is_closed() { self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } else { @@ -192,7 +192,7 @@ impl State { fn next_uncoalesced(&mut self) -> Option { while let Some((req_id, req)) = self.polled_requests.pop_first() { self.requests_by_offset.remove(&(req.offset, req_id)); - if req.callback.is_closed() { + if req.is_closed() { tracing::debug!("Dropping canceled request"); continue; } @@ -250,7 +250,7 @@ impl State { .vortex_expect("Missing request in requests_by_offset"); // Skip any cancelled requests - if req.callback.is_closed() { + if req.is_closed() { if ids_to_remove.insert(req_id) { keys_to_remove.push((req_offset, req_id)); } diff --git a/vortex-file/src/read/mod.rs b/vortex-file/src/read/mod.rs index a812b81f63b..8421b260fd8 100644 --- a/vortex-file/src/read/mod.rs +++ b/vortex-file/src/read/mod.rs @@ -5,5 +5,8 @@ mod driver; mod request; pub(crate) use driver::IoRequestStream; +#[cfg(test)] +pub(crate) use request::CoalescedRequest; +pub(crate) use request::IoRequest; pub(crate) use request::ReadRequest; pub(crate) use request::RequestId; diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 7caaa08d3d8..d4cafbae3b0 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -57,6 +57,14 @@ impl IoRequest { IoRequestInner::Coalesced(req) => req.resolve(result), } } + + /// Returns true if no callback remains to receive the read result. + pub(crate) fn is_cancelled(&self) -> bool { + match &self.0 { + IoRequestInner::Single(req) => req.is_closed(), + IoRequestInner::Coalesced(req) => req.is_cancelled(), + } + } } // Testing functionality @@ -100,12 +108,16 @@ impl Debug for ReadRequest { .field("offset", &self.offset) .field("length", &self.length) .field("alignment", &self.alignment) - .field("is_closed", &self.callback.is_closed()) + .field("is_closed", &self.is_closed()) .finish() } } impl ReadRequest { + pub(crate) fn is_closed(&self) -> bool { + self.callback.is_closed() + } + pub(crate) fn resolve(self, result: VortexResult) { if let Err(e) = self.callback.send(result) { tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id); @@ -132,6 +144,10 @@ impl Debug for CoalescedRequest { } impl CoalescedRequest { + pub(crate) fn is_cancelled(&self) -> bool { + self.requests.iter().all(ReadRequest::is_closed) + } + pub fn resolve(self, result: VortexResult) { match result { Ok(buffer) => { diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..88e84d6df7a 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -12,6 +12,8 @@ use futures::FutureExt; use futures::StreamExt; use futures::channel::mpsc; use futures::future; +use futures::future::BoxFuture; +use futures::future::Shared; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; @@ -30,10 +32,13 @@ use vortex_metrics::MetricBuilder; use vortex_metrics::MetricsRegistry; use crate::SegmentSpec; +use crate::read::IoRequest; use crate::read::IoRequestStream; use crate::read::ReadRequest; use crate::read::RequestId; +type IoStreamClosed = Shared>; + #[derive(Debug)] pub enum ReadEvent { Request(ReadRequest), @@ -66,6 +71,8 @@ pub struct FileSegmentSource { segments: Arc<[SegmentSpec]>, /// A queue for sending read request events to the I/O stream. events: mpsc::UnboundedSender, + /// Resolves when the spawned I/O driver stream is dropped. + io_stream_closed: IoStreamClosed, /// The next read request ID. next_id: Arc, } @@ -107,16 +114,19 @@ impl FileSegmentSource { ) .boxed(); + let (io_stream_closed_send, io_stream_closed_recv) = oneshot::channel(); + let io_stream_closed = async move { + let _ = io_stream_closed_recv.into_future().await; + } + .boxed() + .shared(); + let drive_fut = async move { + let _io_stream_closed = IoStreamClosedNotifier::new(io_stream_closed_send); stream .map(move |req| { let reader = reader.clone(); - async move { - let result = reader - .read_at(req.offset(), req.len(), req.alignment()) - .await; - req.resolve(result); - } + drive_request(reader, req) }) .buffer_unordered(concurrency) .collect::<()>() @@ -128,11 +138,44 @@ impl FileSegmentSource { Self { segments, events: send, + io_stream_closed, next_id: Arc::new(AtomicUsize::new(0)), } } } +struct IoStreamClosedNotifier(Option>); + +impl IoStreamClosedNotifier { + fn new(send: oneshot::Sender<()>) -> Self { + Self(Some(send)) + } +} + +impl Drop for IoStreamClosedNotifier { + fn drop(&mut self) { + if let Some(send) = self.0.take() { + drop(send.send(())); + } + } +} + +async fn drive_request(reader: R, req: IoRequest) { + if req.is_cancelled() { + tracing::debug!( + offset = req.offset(), + length = req.len(), + "Skipping cancelled I/O request" + ); + return; + } + + let result = reader + .read_at(req.offset(), req.len(), req.alignment()) + .await; + req.resolve(result); +} + impl SegmentSource for FileSegmentSource { fn request(&self, id: SegmentId) -> SegmentFuture { // We eagerly register the read request here assuming the behaviour of [`FileRead`], where @@ -171,6 +214,7 @@ impl SegmentSource for FileSegmentSource { polled: false, finished: false, events: self.events.clone(), + io_stream_closed: self.io_stream_closed.clone(), }; // One allocation: we only box the returned SegmentFuture, not the inner ReadFuture. @@ -188,6 +232,7 @@ struct ReadFuture { polled: bool, finished: bool, events: mpsc::UnboundedSender, + io_stream_closed: IoStreamClosed, } impl Future for ReadFuture { @@ -200,21 +245,34 @@ impl Future for ReadFuture { // note: we are skipping polled and dropped events for this if the future // is ready on the first poll, that means this request was completed // before it was polled, as part of a coalesced request. - Poll::Ready( + return Poll::Ready( result.unwrap_or_else(|e| { Err(vortex_err!("ReadRequest dropped by runtime: {e}")) }), - ) + ); } - Poll::Pending if !self.polled => { - self.polled = true; - // Notify the I/O stream that this request has been polled. - match self.events.unbounded_send(ReadEvent::Polled(self.id)) { - Ok(()) => Poll::Pending, - Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))), + Poll::Pending => {} + } + + if self.io_stream_closed.poll_unpin(cx).is_ready() { + self.finished = true; + return Poll::Ready(Err(vortex_err!( + "ReadRequest dropped by runtime: I/O request stream closed" + ))); + } + + if !self.polled { + self.polled = true; + // Notify the I/O stream that this request has been polled. + match self.events.unbounded_send(ReadEvent::Polled(self.id)) { + Ok(()) => Poll::Pending, + Err(e) => { + self.finished = true; + Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))) } } - _ => Poll::Pending, + } else { + Poll::Pending } } } @@ -295,3 +353,167 @@ impl SegmentSource for BufferSegmentSource { future::ready(Ok(BufferHandle::new_host(slice))).boxed() } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use futures::future::BoxFuture; + + use super::*; + use crate::read::CoalescedRequest; + + fn io_stream_closed_pair() -> (IoStreamClosedNotifier, IoStreamClosed) { + let (send, recv) = oneshot::channel(); + let closed = async move { + let _ = recv.into_future().await; + } + .boxed() + .shared(); + (IoStreamClosedNotifier::new(send), closed) + } + + #[derive(Clone, Default)] + struct CountingRead { + read_count: Arc, + } + + impl VortexReadAt for CountingRead { + fn size(&self) -> BoxFuture<'static, VortexResult> { + async { Ok(1024) }.boxed() + } + + fn concurrency(&self) -> usize { + 1 + } + + fn read_at( + &self, + _offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.read_count.fetch_add(1, Ordering::Relaxed); + async move { + let buffer = ByteBuffer::copy_from(vec![0; length]).aligned(alignment); + Ok(BufferHandle::new_host(buffer)) + } + .boxed() + } + } + + #[tokio::test] + async fn drive_request_skips_cancelled_single_request() { + let reader = CountingRead::default(); + let (callback, receiver) = oneshot::channel(); + drop(receiver); + + let req = IoRequest::new_single(ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback, + }); + + drive_request(reader.clone(), req).await; + + assert_eq!(reader.read_count.load(Ordering::Relaxed), 0); + } + + #[tokio::test] + async fn drive_request_skips_fully_cancelled_coalesced_request() { + let reader = CountingRead::default(); + let (callback1, receiver1) = oneshot::channel(); + let (callback2, receiver2) = oneshot::channel(); + drop(receiver1); + drop(receiver2); + + let req = IoRequest::new_coalesced(CoalescedRequest { + range: 0..32, + alignment: Alignment::none(), + requests: vec![ + ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback: callback1, + }, + ReadRequest { + id: 1, + offset: 16, + length: 16, + alignment: Alignment::none(), + callback: callback2, + }, + ], + }); + + drive_request(reader.clone(), req).await; + + assert_eq!(reader.read_count.load(Ordering::Relaxed), 0); + } + + #[tokio::test] + async fn drive_request_reads_coalesced_request_with_live_receiver() -> VortexResult<()> { + let reader = CountingRead::default(); + let (callback1, receiver1) = oneshot::channel(); + let (callback2, receiver2) = oneshot::channel(); + drop(receiver1); + + let req = IoRequest::new_coalesced(CoalescedRequest { + range: 0..32, + alignment: Alignment::none(), + requests: vec![ + ReadRequest { + id: 0, + offset: 0, + length: 16, + alignment: Alignment::none(), + callback: callback1, + }, + ReadRequest { + id: 1, + offset: 16, + length: 16, + alignment: Alignment::none(), + callback: callback2, + }, + ], + }); + + drive_request(reader.clone(), req).await; + + let buffer = receiver2.await.expect("live receiver should resolve")?; + assert_eq!(buffer.len(), 16); + assert_eq!(reader.read_count.load(Ordering::Relaxed), 1); + Ok(()) + } + + #[tokio::test] + async fn read_future_finishes_when_io_stream_closes_after_poll() { + let (_callback, recv) = oneshot::channel(); + let (events, _event_recv) = mpsc::unbounded(); + let (notifier, io_stream_closed) = io_stream_closed_pair(); + + let read = ReadFuture { + id: 0, + recv: recv.into_future(), + polled: true, + finished: false, + events, + io_stream_closed, + }; + + drop(notifier); + + let err = read.await.unwrap_err(); + assert!( + err.to_string().contains("I/O request stream closed"), + "unexpected error: {err}" + ); + } +} diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 2ba10d96684..d587201399f 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -3,13 +3,16 @@ #![expect(clippy::cast_possible_truncation)] use std::iter; +use std::ops::Range; use std::sync::Arc; use std::sync::LazyLock; use bytes::Bytes; use futures::StreamExt; use futures::TryStreamExt; +use futures::future::BoxFuture; use futures::pin_mut; +use parking_lot::Mutex; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; @@ -27,6 +30,7 @@ use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::dict::DictArraySlotsExt; use vortex_array::arrays::struct_::StructArrayExt; use vortex_array::assert_arrays_eq; +use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::DecimalDType; use vortex_array::dtype::Nullability; @@ -58,12 +62,17 @@ use vortex_array::stats::PRUNING_STATS; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; use vortex_array::validity::Validity; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSession; use vortex_layout::Layout; +use vortex_layout::LayoutChildType; use vortex_layout::scan::scan_builder::ScanBuilder; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -87,6 +96,34 @@ static SESSION: LazyLock = LazyLock::new(|| { session }); +#[derive(Clone)] +struct RecordingRead { + inner: R, + reads: Arc>>>, +} + +impl VortexReadAt for RecordingRead { + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.inner.size() + } + + fn concurrency(&self) -> usize { + self.inner.concurrency() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let end = + offset + u64::try_from(length).vortex_expect("read length must fit in u64 byte range"); + self.reads.lock().push(offset..end); + self.inner.read_at(offset, length, alignment) + } +} + #[tokio::test] async fn test_eof_values() { // this test exists as a reminder to think about whether we should increment the version @@ -515,6 +552,61 @@ async fn issue_5385_filter_casted_column() { ); } +#[tokio::test] +#[cfg_attr(miri, ignore)] +async fn all_false_filter_does_not_read_projected_column_segments() -> VortexResult<()> { + let row_count = 16_384; + let column_a = PrimitiveArray::from_iter(0..row_count).into_array(); + let column_b = + PrimitiveArray::from_iter((0..row_count).map(|value| value * 3 + 1)).into_array(); + let array = StructArray::from_fields(&[("a", column_a), ("b", column_b)])?.into_array(); + + let mut buf = ByteBufferMut::empty(); + let summary = SESSION + .write_options() + .with_file_statistics(vec![]) + .write(&mut buf, array.to_array_stream()) + .await?; + assert!(summary.footer().statistics().is_none()); + + let b_ranges = collect_field_segment_ranges( + summary.footer().layout().as_ref(), + "b", + summary.footer().segment_map(), + )?; + assert!(!b_ranges.is_empty(), "expected column b to have segments"); + + let reads = Arc::new(Mutex::new(Vec::new())); + let reader = RecordingRead { + inner: ByteBuffer::from(buf), + reads: Arc::clone(&reads), + }; + let file = SESSION + .open_options() + .with_footer(summary.footer().clone()) + .open_read(reader) + .await?; + + let result = file + .scan()? + .with_filter(gt(get_item("a", root()), lit(row_count))) + .with_projection(get_item("b", root())) + .into_array_stream()? + .read_all() + .await?; + + assert_eq!(result.len(), 0); + + let read_ranges = reads.lock().clone(); + assert!( + read_ranges + .iter() + .all(|read| b_ranges.iter().all(|b| !ranges_overlap(read, b))), + "projection column b was read despite an all-false filter: reads={read_ranges:?}, b_ranges={b_ranges:?}" + ); + Ok(()) +} + #[tokio::test] #[cfg_attr(miri, ignore)] async fn filter_string() { @@ -1801,6 +1893,59 @@ fn collect_segment_offsets_inner( } } +fn collect_field_segment_ranges( + layout: &dyn Layout, + field_name: &str, + segment_specs: &[SegmentSpec], +) -> VortexResult>> { + let mut result = Vec::new(); + collect_field_segment_ranges_inner(layout, field_name, segment_specs, &mut result)?; + Ok(result) +} + +fn collect_field_segment_ranges_inner( + layout: &dyn Layout, + field_name: &str, + segment_specs: &[SegmentSpec], + result: &mut Vec>, +) -> VortexResult<()> { + for idx in 0..layout.nchildren() { + let child = layout.child(idx)?; + match layout.child_type(idx) { + LayoutChildType::Field(name) if name == field_name => { + collect_segment_ranges(child.as_ref(), segment_specs, result)?; + } + _ => { + collect_field_segment_ranges_inner( + child.as_ref(), + field_name, + segment_specs, + result, + )?; + } + } + } + Ok(()) +} + +fn collect_segment_ranges( + layout: &dyn Layout, + segment_specs: &[SegmentSpec], + result: &mut Vec>, +) -> VortexResult<()> { + for seg_id in layout.segment_ids() { + result.push(segment_specs[*seg_id as usize].byte_range()); + } + for child in layout.children()? { + collect_segment_ranges(child.as_ref(), segment_specs, result)?; + } + Ok(()) +} + +fn ranges_overlap(left: &Range, right: &Range) -> bool { + left.start < right.end && right.start < left.end +} + /// Assert that all offsets in `before` are less than all offsets in `after`. fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) { if let (Some(&max_before), Some(&min_after)) = (before.iter().max(), after.iter().min()) { diff --git a/vortex-file/src/v2/file_stats_reader.rs b/vortex-file/src/v2/file_stats_reader.rs index f197fea32b6..f335b85ae41 100644 --- a/vortex-file/src/v2/file_stats_reader.rs +++ b/vortex-file/src/v2/file_stats_reader.rs @@ -11,6 +11,8 @@ use std::collections::BTreeSet; use std::ops::Range; use std::sync::Arc; +use futures::FutureExt; +use futures::future; use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::MaskFuture; @@ -202,6 +204,15 @@ impl LayoutReader for FileStatsLayoutReader { expr: &Expression, mask: MaskFuture, ) -> VortexResult { + let dtype = expr.return_dtype(self.dtype())?; + match mask.clone().now_or_never() { + Some(Ok(mask)) if mask.all_false() => { + return Ok(future::ready(Ok(Canonical::empty(&dtype).into_array())).boxed()); + } + Some(Err(err)) => return Ok(future::ready(Err(err)).boxed()), + Some(Ok(_)) | None => {} + } + self.child.projection_evaluation(row_range, expr, mask) } diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index 376b6654707..490dcb69d32 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -29,6 +29,7 @@ use crate::LayoutReaderRef; use crate::LazyReaderChildren; use crate::layouts::chunked::ChunkedLayout; use crate::reader::LayoutReader; +use crate::reader::empty_projection_if_mask_all_false; use crate::segments::SegmentSource; /// A [`LayoutReader`] for chunked layouts. @@ -294,6 +295,10 @@ impl LayoutReader for ChunkedReader { mask: MaskFuture, ) -> VortexResult>> { let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask) { + return Ok(empty); + } + if row_range.is_empty() { return Ok(future::ready(Ok(Canonical::empty(&dtype).into_array())).boxed()); } @@ -348,11 +353,13 @@ mod test { use vortex_buffer::buffer; use vortex_io::runtime::single::block_on; use vortex_io::session::RuntimeSessionExt; + use vortex_mask::Mask; use crate::LayoutRef; use crate::LayoutStrategy; use crate::layouts::chunked::writer::ChunkedLayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::segments::CountingSegmentSource; use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; @@ -415,4 +422,30 @@ mod test { assert_arrays_eq!(result, expected); }) } + + #[rstest] + fn chunked_projection_known_false_mask_does_not_request_child_segments( + #[from(chunked_layout)] (segments, layout): (Arc, LayoutRef), + ) { + block_on(|_h| async { + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let result = layout + .new_reader("".into(), source, &SESSION) + .unwrap() + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::ready(Mask::new_false( + usize::try_from(layout.row_count()).unwrap(), + )), + ) + .unwrap() + .await + .unwrap(); + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + }) + } } diff --git a/vortex-layout/src/layouts/dict/reader.rs b/vortex-layout/src/layouts/dict/reader.rs index dd57926ac63..a95ba5d34a2 100644 --- a/vortex-layout/src/layouts/dict/reader.rs +++ b/vortex-layout/src/layouts/dict/reader.rs @@ -33,6 +33,7 @@ use super::DictLayout; use crate::LayoutReader; use crate::LayoutReaderRef; use crate::layouts::SharedArrayFuture; +use crate::reader::empty_projection_if_mask_all_false; use crate::segments::SegmentSource; pub struct DictReader { @@ -223,6 +224,11 @@ impl LayoutReader for DictReader { expr: &Expression, mask: MaskFuture, ) -> VortexResult>> { + let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask) { + return Ok(empty); + } + // TODO: fix up expr partitioning with fallible & null sensitive annotations let values_eval = self.values_array(); let codes_eval = self @@ -289,6 +295,7 @@ mod tests { use vortex_io::runtime::single::block_on; use vortex_io::session::RuntimeSession; use vortex_io::session::RuntimeSessionExt; + use vortex_mask::Mask; use vortex_session::VortexSession; use crate::LayoutId; @@ -297,6 +304,8 @@ mod tests { use crate::layouts::dict::writer::DictLayoutOptions; use crate::layouts::dict::writer::DictStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::segments::CountingSegmentSource; + use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; use crate::sequence::SequentialArrayStreamExt; @@ -400,6 +409,61 @@ mod tests { }) } + #[test] + fn dict_projection_known_false_mask_does_not_request_segments() { + block_on(|handle| async move { + let session = session_with_handle(handle); + let strategy = DictStrategy::new( + FlatLayoutStrategy::default(), + FlatLayoutStrategy::default(), + FlatLayoutStrategy::default(), + DictLayoutOptions::default(), + ); + + let array = VarBinArray::from_iter( + [Some("abc"), Some("def"), None, Some("abc")], + DType::Utf8(Nullability::Nullable), + ) + .into_array(); + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let layout: LayoutRef = strategy + .write_stream( + ctx, + Arc::::clone(&segments), + SequentialStreamAdapter::new( + DType::Utf8(Nullability::Nullable), + array.to_array_stream().sequenced(ptr), + ) + .sendable(), + eof, + &session, + ) + .await + .unwrap(); + + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let result = layout + .new_reader("".into(), source, &session) + .unwrap() + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::ready(Mask::new_false( + usize::try_from(layout.row_count()).unwrap(), + )), + ) + .unwrap() + .await + .unwrap(); + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + }) + } + #[rstest] #[case::all_true_case( vec![Some(""), None, Some("")], // Dict values: [""] diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 66f9553c0de..0fbebbf9517 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -23,6 +23,7 @@ use vortex_session::VortexSession; use crate::LayoutReader; use crate::layouts::SharedArrayFuture; use crate::layouts::flat::FlatLayout; +use crate::reader::empty_projection_if_mask_all_false; use crate::segments::SegmentSource; /// The threshold of mask density below which we will evaluate the expression only over the @@ -186,6 +187,11 @@ impl LayoutReader for FlatReader { .vortex_expect("Row range begin must fit within FlatLayout size") ..usize::try_from(row_range.end) .vortex_expect("Row range end must fit within FlatLayout size"); + let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask) { + return Ok(empty); + } + let name = Arc::clone(&self.name); let array = self.array_future(); let expr = expr.clone(); @@ -240,9 +246,12 @@ mod test { use vortex_error::VortexResult; use vortex_io::runtime::single::block_on; use vortex_io::session::RuntimeSessionExt; + use vortex_mask::Mask; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::segments::CountingSegmentSource; + use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; use crate::sequence::SequentialArrayStreamExt; @@ -326,6 +335,44 @@ mod test { }) } + #[test] + fn flat_projection_known_false_mask_does_not_request_segment() -> VortexResult<()> { + block_on(|handle| async { + let session = SESSION.clone().with_handle(handle); + let ctx = ArrayContext::empty(); + + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let array = + PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid).into_array(); + let layout = FlatLayoutStrategy::default() + .write_stream( + ctx, + Arc::::clone(&segments), + array.to_array_stream().sequenced(ptr), + eof, + &session, + ) + .await?; + + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let result = layout + .new_reader("".into(), source, &SESSION)? + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::ready(Mask::new_false(layout.row_count().try_into()?)), + )? + .await?; + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + + Ok(()) + }) + } + #[test] fn flat_unaligned_row_mask() { block_on(|handle| async { diff --git a/vortex-layout/src/layouts/row_idx/mod.rs b/vortex-layout/src/layouts/row_idx/mod.rs index 33078617722..13c2b1596d8 100644 --- a/vortex-layout/src/layouts/row_idx/mod.rs +++ b/vortex-layout/src/layouts/row_idx/mod.rs @@ -44,6 +44,7 @@ use vortex_utils::aliases::dash_map::DashMap; use crate::ArrayFuture; use crate::LayoutReader; use crate::layouts::partitioned::PartitionedExprEval; +use crate::reader::empty_projection_if_mask_all_false; pub struct RowIdxLayoutReader { name: Arc, @@ -239,6 +240,11 @@ impl LayoutReader for RowIdxLayoutReader { expr: &Expression, mask: MaskFuture, ) -> VortexResult>> { + let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask) { + return Ok(empty); + } + match &self.partition_expr(expr) { Partitioning::RowIdx(expr) => Ok(row_idx_array_future( self.row_offset, @@ -336,12 +342,15 @@ mod tests { use vortex_buffer::buffer; use vortex_io::runtime::single::block_on; use vortex_io::session::RuntimeSessionExt; + use vortex_mask::Mask; use crate::LayoutReader; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; use crate::layouts::row_idx::RowIdxLayoutReader; use crate::layouts::row_idx::row_idx; + use crate::segments::CountingSegmentSource; + use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; use crate::sequence::SequentialArrayStreamExt; @@ -429,6 +438,46 @@ mod tests { }) } + #[test] + fn row_idx_projection_known_false_mask_does_not_request_child_segments() { + block_on(|handle| async { + let session = SESSION.clone().with_handle(handle); + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let array = buffer![1..=5].into_array(); + let layout = FlatLayoutStrategy::default() + .write_stream( + ctx, + Arc::::clone(&segments), + array.to_array_stream().sequenced(ptr), + eof, + &session, + ) + .await + .unwrap(); + + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let result = RowIdxLayoutReader::new( + 0, + layout.new_reader("".into(), source, &SESSION).unwrap(), + SESSION.clone(), + ) + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::ready(Mask::new_false(layout.row_count().try_into().unwrap())), + ) + .unwrap() + .await + .unwrap(); + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + }) + } + #[test] fn flat_expr_or() { block_on(|handle| async { diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 94cf534ea16..9f772abad64 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -45,6 +45,7 @@ use crate::LayoutReaderRef; use crate::LazyReaderChildren; use crate::layouts::partitioned::PartitionedExprEval; use crate::layouts::struct_::StructLayout; +use crate::reader::empty_projection_if_mask_all_false; use crate::segments::SegmentSource; pub struct StructReader { @@ -323,6 +324,11 @@ impl LayoutReader for StructReader { expr: &Expression, mask_fut: MaskFuture, ) -> VortexResult { + let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask_fut) { + return Ok(empty); + } + let validity_fut = self .validity()? .map(|reader| reader.projection_evaluation(row_range, &root(), mask_fut.clone())) @@ -433,6 +439,7 @@ mod tests { use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; use crate::layouts::table::TableStrategy; + use crate::segments::CountingSegmentSource; use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; @@ -648,6 +655,24 @@ mod tests { assert_arrays_eq!(result, expected); } + #[rstest] + fn struct_projection_known_false_mask_does_not_request_child_segments( + #[from(struct_layout)] (segments, layout): (Arc, LayoutRef), + ) { + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let reader = layout.new_reader("".into(), source, &SESSION).unwrap(); + let result = block_on(|_| { + reader + .projection_evaluation(&(0..3), &root(), MaskFuture::ready(Mask::new_false(3))) + .unwrap() + }) + .unwrap(); + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + } + #[rstest] fn test_struct_layout_row_mask( #[from(struct_layout)] (segments, layout): (Arc, LayoutRef), diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index e3dc2bf2e00..f88e27b9ff7 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -25,6 +25,7 @@ use crate::LazyReaderChildren; use crate::layouts::zoned::ZonedLayout; use crate::layouts::zoned::pruning::PruningState; use crate::layouts::zoned::schema::stats_table_dtype; +use crate::reader::empty_projection_if_mask_all_false; use crate::segments::SegmentSource; pub struct ZonedReader { @@ -207,6 +208,11 @@ impl LayoutReader for ZonedReader { expr: &Expression, mask: MaskFuture, ) -> VortexResult>> { + let dtype = expr.return_dtype(self.dtype())?; + if let Some(empty) = empty_projection_if_mask_all_false(&dtype, &mask) { + return Ok(empty); + } + // TODO(ngates): there are some projection expressions that we may also be able to // short-circuit with statistics. self.data_child()? @@ -252,6 +258,7 @@ mod test { use crate::layouts::zoned::ZonedMetadata; use crate::layouts::zoned::writer::ZonedLayoutOptions; use crate::layouts::zoned::writer::ZonedStrategy; + use crate::segments::CountingSegmentSource; use crate::segments::SegmentSource; use crate::segments::TestSegments; use crate::sequence::SequenceId; @@ -323,6 +330,33 @@ mod test { }) } + #[rstest] + fn zoned_projection_known_false_mask_does_not_request_child_segments( + #[from(stats_layout)] (segments, layout): (Arc, LayoutRef), + ) { + block_on(|handle| async { + let session = session_with_handle(handle); + let counting = Arc::new(CountingSegmentSource::new(segments)); + let source: Arc = Arc::::clone(&counting); + let result = layout + .new_reader("".into(), source, &session) + .unwrap() + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::ready(Mask::new_false( + usize::try_from(layout.row_count()).unwrap(), + )), + ) + .unwrap() + .await + .unwrap(); + + assert_eq!(result.len(), 0); + assert_eq!(counting.request_count(), 0); + }) + } + #[rstest] fn test_stats_pruning_mask( #[from(stats_layout)] (segments, layout): (Arc, LayoutRef), diff --git a/vortex-layout/src/reader.rs b/vortex-layout/src/reader.rs index f1467212c38..c3a8633cc0f 100644 --- a/vortex-layout/src/reader.rs +++ b/vortex-layout/src/reader.rs @@ -6,10 +6,13 @@ use std::collections::BTreeSet; use std::ops::Range; use std::sync::Arc; +use futures::FutureExt; +use futures::future; use futures::future::BoxFuture; use futures::try_join; use once_cell::sync::OnceCell; use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::MaskFuture; use vortex_array::builtins::ArrayBuiltins; @@ -94,6 +97,19 @@ pub trait LayoutReader: 'static + Send + Sync { pub type ArrayFuture = BoxFuture<'static, VortexResult>; +pub(crate) fn empty_projection_if_mask_all_false( + dtype: &DType, + mask: &MaskFuture, +) -> Option { + match mask.clone().now_or_never()? { + Ok(mask) if mask.all_false() => { + Some(future::ready(Ok(Canonical::empty(dtype).into_array())).boxed()) + } + Ok(_) => None, + Err(err) => Some(future::ready(Err(err)).boxed()), + } +} + pub trait ArrayFutureExt { fn masked(self, mask: MaskFuture) -> Self; } diff --git a/vortex-layout/src/scan/tasks.rs b/vortex-layout/src/scan/tasks.rs index dc0b489b1f2..a1623190a88 100644 --- a/vortex-layout/src/scan/tasks.rs +++ b/vortex-layout/src/scan/tasks.rs @@ -136,7 +136,7 @@ pub fn split_exec( } }; - // Step 4: execute the projection, only at the mask for rows which match the filter + // Step 4: execute the projection, only at the mask for rows which match the filter. let projection_future = ctx.reader .projection_evaluation(&row_range, &ctx.projection, filter_mask.clone())?; diff --git a/vortex-layout/src/segments/cache.rs b/vortex-layout/src/segments/cache.rs index 37675c19d2a..159596a0b18 100644 --- a/vortex-layout/src/segments/cache.rs +++ b/vortex-layout/src/segments/cache.rs @@ -145,6 +145,7 @@ impl SegmentSource for SegmentCacheSourceAdapter { tracing::debug!("Resolved segment {} from cache", id); return Ok(BufferHandle::new_host(segment)); } + let result = delegate.await?; // Cache only CPU buffers; device buffers are not cached. if let Some(buffer) = result.as_host_opt() @@ -157,3 +158,67 @@ impl SegmentSource for SegmentCacheSourceAdapter { .boxed() } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use vortex_buffer::ByteBuffer; + + use super::*; + use crate::segments::SegmentSink; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + + #[derive(Default, Clone)] + struct CountingSegmentSource { + segments: TestSegments, + request_count: Arc, + } + + impl SegmentSource for CountingSegmentSource { + fn request(&self, id: SegmentId) -> SegmentFuture { + self.request_count.fetch_add(1, Ordering::Relaxed); + self.segments.request(id) + } + } + + #[tokio::test] + async fn cache_hit_registers_inner_source_for_eager_io() -> VortexResult<()> { + let id = SegmentId::from(0); + let data = ByteBuffer::copy_from([1, 2, 3, 4]); + let cache = Arc::new(MokaSegmentCache::new(1024)); + cache.put(id, data.clone()).await?; + + let source = CountingSegmentSource::default(); + let adapter = SegmentCacheSourceAdapter::new(cache, Arc::new(source.clone())); + + let result = adapter.request(id).await?; + + assert_eq!(result.unwrap_host(), data); + assert_eq!(source.request_count.load(Ordering::Relaxed), 1); + Ok(()) + } + + #[tokio::test] + async fn cache_miss_requests_inner_source_and_stores_host_buffer() -> VortexResult<()> { + let data = ByteBuffer::copy_from([5, 6, 7, 8]); + let source = CountingSegmentSource::default(); + let id = source + .segments + .write(SequenceId::root().downgrade(), vec![data.clone()]) + .await?; + + let cache = Arc::new(MokaSegmentCache::new(1024)); + let cache_source: Arc = Arc::::clone(&cache); + let adapter = SegmentCacheSourceAdapter::new(cache_source, Arc::new(source.clone())); + + let result = adapter.request(id).await?; + + assert_eq!(result.unwrap_host(), data); + assert_eq!(source.request_count.load(Ordering::Relaxed), 1); + assert_eq!(cache.get(id).await?.as_ref(), Some(&data)); + Ok(()) + } +} diff --git a/vortex-layout/src/segments/test.rs b/vortex-layout/src/segments/test.rs index d880d15cc1a..bbac3a2b52b 100644 --- a/vortex-layout/src/segments/test.rs +++ b/vortex-layout/src/segments/test.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use async_trait::async_trait; use futures::FutureExt; @@ -37,6 +39,32 @@ impl SegmentSource for TestSegments { } } +#[derive(Clone)] +pub struct CountingSegmentSource { + inner: Arc, + requests: Arc, +} + +impl CountingSegmentSource { + pub fn new(inner: Arc) -> Self { + Self { + inner, + requests: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn request_count(&self) -> usize { + self.requests.load(Ordering::Relaxed) + } +} + +impl SegmentSource for CountingSegmentSource { + fn request(&self, id: SegmentId) -> SegmentFuture { + self.requests.fetch_add(1, Ordering::Relaxed); + self.inner.request(id) + } +} + #[async_trait] impl SegmentSink for TestSegments { async fn write(