diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 0c40e916296..f0985084367 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -200,6 +200,8 @@ pub fn vortex_datafusion::VortexSource::with_projection_pushdown(self, bool) -> pub fn vortex_datafusion::VortexSource::with_scan_concurrency(self, usize) -> Self +pub fn vortex_datafusion::VortexSource::with_segment_cache_builder(self, alloc::sync::Arc) -> Self + pub fn vortex_datafusion::VortexSource::with_vortex_reader_factory(self, alloc::sync::Arc) -> Self impl core::clone::Clone for vortex_datafusion::VortexSource diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index acff2a22806..f13142b43b4 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -30,6 +30,7 @@ use futures::StreamExt; use futures::TryStreamExt; use futures::stream; use itertools::Itertools; +use object_store::ObjectMeta; use object_store::path::Path; use tracing::Instrument; use vortex::array::VortexSessionExecute; @@ -55,6 +56,10 @@ use crate::convert::exprs::make_vortex_predicate; use crate::convert::schema::calculate_physical_schema; use crate::metrics::PARTITION_LABEL; use crate::metrics::PATH_LABEL; +use vortex::layout::segments::FileIdentity; +use vortex::layout::segments::FileVersion; +use vortex::layout::segments::SegmentCacheBuilder; + use crate::persistent::cache::CachedVortexMetadata; use crate::persistent::reader::VortexReaderFactory; use crate::persistent::stream::PrunableStream; @@ -97,6 +102,7 @@ pub(crate) struct VortexOpener { pub expression_convertor: Arc, pub file_metadata_cache: Option>, + pub segment_cache_builder: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. pub projection_pushdown: bool, pub scan_concurrency: Option, @@ -122,6 +128,7 @@ impl FileOpener for VortexOpener { let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory); let file_metadata_cache = self.file_metadata_cache.clone(); + let segment_cache_builder = self.segment_cache_builder.clone(); let unified_file_schema = Arc::clone(self.table_schema.file_schema()); let batch_size = self.batch_size; @@ -200,6 +207,11 @@ impl FileOpener for VortexOpener { open_opts = open_opts.with_footer(vortex_metadata.footer().clone()); } + if let Some(builder) = segment_cache_builder { + let identity = file_identity(&file.object_meta); + open_opts = open_opts.with_segment_cache(builder.cache_for(&identity)); + } + let vxf = open_opts .open_read(reader) .await @@ -445,6 +457,18 @@ impl FileOpener for VortexOpener { } } +/// Build a [`FileIdentity`] from object store metadata, preferring the etag and falling +/// back to `(size, last_modified)` when no etag is available. +fn file_identity(meta: &ObjectMeta) -> FileIdentity { + let path = Arc::from(meta.location.as_ref()); + let version = if let Some(etag) = meta.e_tag.as_deref() { + FileVersion::Etag(Arc::from(etag)) + } else { + FileVersion::SizeMtime(meta.size, meta.last_modified.timestamp()) + }; + FileIdentity { path, version } +} + fn natural_split_ranges_for_file( natural_split_ranges: &DashMap]>>, path: &Path, @@ -680,6 +704,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, } @@ -812,6 +837,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -899,6 +925,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1056,6 +1083,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; @@ -1116,6 +1144,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, } @@ -1320,6 +1349,7 @@ mod tests { has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), file_metadata_cache: None, + segment_cache_builder: None, projection_pushdown: false, scan_concurrency: None, }; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index aed2024d5d9..1866ada114a 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -30,6 +30,7 @@ use object_store::path::Path; use vortex::error::VortexExpect; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; +use vortex::layout::segments::SegmentCacheBuilder; use vortex::metrics::DefaultMetricsRegistry; use vortex::metrics::MetricsRegistry; use vortex::session::VortexSession; @@ -192,6 +193,7 @@ pub struct VortexSource { pub(crate) vortex_reader_factory: Option>, vx_metrics_registry: Arc, file_metadata_cache: Option>, + segment_cache_builder: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. options: VortexTableOptions, } @@ -224,6 +226,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + segment_cache_builder: None, options: VortexTableOptions::default(), } } @@ -283,6 +286,25 @@ impl VortexSource { self } + /// Sets a [`SegmentCacheBuilder`] to reuse segment bytes across scans of the same files. + /// + /// Without a builder every query re-reads zone map and data segments from object storage. + /// The builder is invoked once per opened file with that file's + /// [`FileIdentity`](vortex::layout::segments::FileIdentity); the returned per-file + /// [`SegmentCache`](vortex::layout::segments::SegmentCache) is wired into the file open + /// path. Use + /// [`NamespacedMokaSegmentCacheBuilder`](vortex::layout::segments::NamespacedMokaSegmentCacheBuilder) + /// for cross-query reuse with a global memory budget, optionally wrapped in + /// [`InstrumentedSegmentCacheBuilder`](vortex::layout::segments::InstrumentedSegmentCacheBuilder) + /// for hit/miss metrics. + pub fn with_segment_cache_builder( + mut self, + builder: Arc, + ) -> Self { + self.segment_cache_builder = Some(builder); + self + } + /// Sets the per-file Vortex scan concurrency. /// /// This is separate from DataFusion's partition-level parallelism. @@ -339,6 +361,7 @@ impl VortexSource { has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), + segment_cache_builder: self.segment_cache_builder.clone(), projection_pushdown: self.options.projection_pushdown, scan_concurrency: self.options.scan_concurrency, }; diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index a20ab092751..e80a60246a8 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -40,7 +40,7 @@ pub fn vortex_file::segments::FileSegmentSource::request(&self, vortex_layout::s pub struct vortex_file::segments::InitialReadSegmentCache -pub vortex_file::segments::InitialReadSegmentCache::fallback: alloc::sync::Arc +pub vortex_file::segments::InitialReadSegmentCache::fallback: vortex_layout::segments::cache::SharedSegmentCache pub vortex_file::segments::InitialReadSegmentCache::initial: parking_lot::rwlock::RwLock> @@ -312,7 +312,7 @@ pub fn vortex_file::VortexOpenOptions::with_labels(self, alloc::vec::Vec) -> Self -pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, alloc::sync::Arc) -> Self +pub fn vortex_file::VortexOpenOptions::with_segment_cache(self, vortex_layout::segments::cache::SharedSegmentCache) -> Self pub fn vortex_file::VortexOpenOptions::with_some_file_size(self, core::option::Option) -> Self diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 3a5d3cf90e5..8db573492d2 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -17,9 +17,9 @@ use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::InstrumentedSegmentCache; use vortex_layout::segments::NoOpSegmentCache; -use vortex_layout::segments::SegmentCache; use vortex_layout::segments::SegmentCacheSourceAdapter; use vortex_layout::segments::SegmentId; +use vortex_layout::segments::SharedSegmentCache; use vortex_layout::segments::SharedSegmentSource; use vortex_layout::session::LayoutSessionExt; use vortex_metrics::DefaultMetricsRegistry; @@ -45,7 +45,7 @@ pub struct VortexOpenOptions { /// The session to use for opening the file. session: VortexSession, /// Cache to use for file segments. - segment_cache: Option>, + segment_cache: Option, /// The number of bytes to read when parsing the footer. initial_read_size: usize, /// An optional, externally provided, file size. @@ -93,7 +93,18 @@ impl VortexOpenOptions { } /// Configure a custom [`SegmentCache`]. - pub fn with_segment_cache(mut self, segment_cache: Arc) -> Self { + /// + /// The supplied cache must be **scoped to a single file**: [`SegmentId`] is a + /// file-local index, so reusing one cache across multiple files will alias entries + /// from different files onto the same key. For cross-file sharing use a + /// [`SegmentCacheBuilder`] (e.g. + /// [`NamespacedMokaSegmentCacheBuilder`](vortex_layout::segments::NamespacedMokaSegmentCacheBuilder)) + /// at the layer that opens files, and pass the per-file [`SegmentCache`] it returns + /// here. + /// + /// [`SegmentId`]: vortex_layout::segments::SegmentId + /// [`SegmentCacheBuilder`]: vortex_layout::segments::SegmentCacheBuilder + pub fn with_segment_cache(mut self, segment_cache: SharedSegmentCache) -> Self { self.segment_cache = Some(segment_cache); self } diff --git a/vortex-file/src/segments/cache.rs b/vortex-file/src/segments/cache.rs index a17e3866bf4..9d465864eb1 100644 --- a/vortex-file/src/segments/cache.rs +++ b/vortex-file/src/segments/cache.rs @@ -1,20 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::sync::Arc; - use async_trait::async_trait; use parking_lot::RwLock; use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_layout::segments::SegmentCache; use vortex_layout::segments::SegmentId; +use vortex_layout::segments::SharedSegmentCache; use vortex_utils::aliases::hash_map::HashMap; /// Segment cache containing the initial read segments. pub struct InitialReadSegmentCache { pub initial: RwLock>, - pub fallback: Arc, + pub fallback: SharedSegmentCache, } #[async_trait] diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index c0f3acec787..4a9c4a2230a 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -1138,6 +1138,58 @@ impl core::marker::Copy for vortex_layout::scan::split_by::SplitBy pub mod vortex_layout::segments +pub enum vortex_layout::segments::FileVersion + +pub vortex_layout::segments::FileVersion::Etag(alloc::sync::Arc) + +pub vortex_layout::segments::FileVersion::SizeMtime(u64, i64) + +impl core::clone::Clone for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::clone(&self) -> vortex_layout::segments::FileVersion + +impl core::cmp::Eq for vortex_layout::segments::FileVersion + +impl core::cmp::PartialEq for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::eq(&self, &vortex_layout::segments::FileVersion) -> bool + +impl core::fmt::Debug for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_layout::segments::FileVersion + +pub fn vortex_layout::segments::FileVersion::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_layout::segments::FileVersion + +pub struct vortex_layout::segments::FileIdentity + +pub vortex_layout::segments::FileIdentity::path: alloc::sync::Arc + +pub vortex_layout::segments::FileIdentity::version: vortex_layout::segments::FileVersion + +impl core::clone::Clone for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::clone(&self) -> vortex_layout::segments::FileIdentity + +impl core::cmp::Eq for vortex_layout::segments::FileIdentity + +impl core::cmp::PartialEq for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::eq(&self, &vortex_layout::segments::FileIdentity) -> bool + +impl core::fmt::Debug for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_layout::segments::FileIdentity + +pub fn vortex_layout::segments::FileIdentity::hash<__H: core::hash::Hasher>(&self, &mut __H) + +impl core::marker::StructuralPartialEq for vortex_layout::segments::FileIdentity + pub struct vortex_layout::segments::InstrumentedSegmentCache impl vortex_layout::segments::InstrumentedSegmentCache @@ -1150,6 +1202,16 @@ pub fn vortex_layout::segments::InstrumentedSegmentCache::get<'life0, 'async_ pub fn vortex_layout::segments::InstrumentedSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::InstrumentedSegmentCacheBuilder + +impl vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::new(B, alloc::sync::Arc, alloc::vec::Vec) -> Self + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::MokaSegmentCache(_) impl vortex_layout::segments::MokaSegmentCache @@ -1162,6 +1224,16 @@ pub fn vortex_layout::segments::MokaSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::MokaSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +impl vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::new(u64) -> Self + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::NoOpSegmentCache impl vortex_layout::segments::SegmentCache for vortex_layout::segments::NoOpSegmentCache @@ -1170,11 +1242,17 @@ pub fn vortex_layout::segments::NoOpSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::NoOpSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub struct vortex_layout::segments::NoOpSegmentCacheBuilder + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NoOpSegmentCacheBuilder + +pub fn vortex_layout::segments::NoOpSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub struct vortex_layout::segments::SegmentCacheSourceAdapter impl vortex_layout::segments::SegmentCacheSourceAdapter -pub fn vortex_layout::segments::SegmentCacheSourceAdapter::new(alloc::sync::Arc, alloc::sync::Arc) -> Self +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::new(vortex_layout::segments::SharedSegmentCache, alloc::sync::Arc) -> Self impl vortex_layout::segments::SegmentSource for vortex_layout::segments::SegmentCacheSourceAdapter @@ -1264,12 +1342,34 @@ pub fn vortex_layout::segments::NoOpSegmentCache::get<'life0, 'async_trait>(&'li pub fn vortex_layout::segments::NoOpSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +impl vortex_layout::segments::SegmentCache for alloc::sync::Arc + +pub fn alloc::sync::Arc::get<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait + +pub fn alloc::sync::Arc::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait + impl vortex_layout::segments::SegmentCache for vortex_layout::segments::InstrumentedSegmentCache pub fn vortex_layout::segments::InstrumentedSegmentCache::get<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait pub fn vortex_layout::segments::InstrumentedSegmentCache::put<'life0, 'async_trait>(&'life0 self, vortex_layout::segments::SegmentId, vortex_buffer::ByteBuffer) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub trait vortex_layout::segments::SegmentCacheBuilder: core::marker::Send + core::marker::Sync + +pub fn vortex_layout::segments::SegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NamespacedMokaSegmentCacheBuilder + +pub fn vortex_layout::segments::NamespacedMokaSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::NoOpSegmentCacheBuilder + +pub fn vortex_layout::segments::NoOpSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + +impl vortex_layout::segments::SegmentCacheBuilder for vortex_layout::segments::InstrumentedSegmentCacheBuilder + +pub fn vortex_layout::segments::InstrumentedSegmentCacheBuilder::cache_for(&self, &vortex_layout::segments::FileIdentity) -> vortex_layout::segments::SharedSegmentCache + pub trait vortex_layout::segments::SegmentSink: core::marker::Send + core::marker::Sync pub fn vortex_layout::segments::SegmentSink::write<'life0, 'async_trait>(&'life0 self, vortex_layout::sequence::SequenceId, alloc::vec::Vec) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait @@ -1290,6 +1390,8 @@ pub type vortex_layout::segments::SegmentFuture = futures_core::future::BoxFutur pub type vortex_layout::segments::SegmentSinkRef = alloc::sync::Arc +pub type vortex_layout::segments::SharedSegmentCache = alloc::sync::Arc + pub mod vortex_layout::sequence pub struct vortex_layout::sequence::SequenceId diff --git a/vortex-layout/src/segments/cache.rs b/vortex-layout/src/segments/cache.rs index 37675c19d2a..2626881366c 100644 --- a/vortex-layout/src/segments/cache.rs +++ b/vortex-layout/src/segments/cache.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::sync::Arc; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; use async_trait::async_trait; use futures::FutureExt; @@ -17,6 +19,7 @@ use vortex_metrics::Counter; use vortex_metrics::Label; use vortex_metrics::MetricBuilder; use vortex_metrics::MetricsRegistry; +use vortex_utils::aliases::dash_map::DashMap; use crate::segments::SegmentFuture; use crate::segments::SegmentId; @@ -29,6 +32,25 @@ pub trait SegmentCache: Send + Sync { async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>; } +/// Shared, type-erased reference to a [`SegmentCache`]. +/// +/// This is the form used at almost every API boundary that hands off a [`SegmentCache`] +/// (builder outputs, file open options, source adapters). The alias exists primarily so +/// that IDE "find references" can locate every shared cache hand-off without matching +/// every `Arc` in the codebase. +pub type SharedSegmentCache = Arc; + +#[async_trait] +impl SegmentCache for Arc { + async fn get(&self, id: SegmentId) -> VortexResult> { + (**self).get(id).await + } + + async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> { + (**self).put(id, buffer).await + } +} + pub struct NoOpSegmentCache; #[async_trait] @@ -124,13 +146,53 @@ impl SegmentCache for InstrumentedSegmentCache { } } +/// Decorator [`SegmentCacheBuilder`] that wraps each per-file cache with an +/// [`InstrumentedSegmentCache`] for hit/miss/store metrics. +/// +/// # Example +/// +/// ```ignore +/// let cache = Arc::new(InstrumentedSegmentCacheBuilder::new( +/// NamespacedMokaSegmentCacheBuilder::new(2 << 30), +/// metrics_registry, +/// vec![], +/// )); +/// ``` +pub struct InstrumentedSegmentCacheBuilder { + inner: B, + metrics_registry: Arc, + labels: Vec