Skip to content

Commit bd1b83d

Browse files
Add RuntimeHandle support for Tokio runtime segregation
1 parent d4c4bd4 commit bd1b83d

File tree

3 files changed

+350
-25
lines changed

3 files changed

+350
-25
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,53 @@ impl Extensions {
182182
}
183183
}
184184

185+
/// Runtime handle for executing async I/O operations.
186+
///
187+
/// When provided via FileIOBuilder extensions, OpenDAL operations will spawn
188+
/// tasks on this runtime instead of using the current runtime context.
189+
///
190+
/// This is useful for runtime segregation scenarios where you want to separate
191+
/// CPU-bound query execution from I/O-bound operations (e.g., blob storage access).
192+
///
193+
/// # Example
194+
///
195+
/// ```rust,ignore
196+
/// use iceberg::io::{FileIOBuilder, RuntimeHandle};
197+
/// use tokio::runtime::Builder;
198+
///
199+
/// // Create dedicated I/O runtime
200+
/// let io_runtime = Builder::new_multi_thread()
201+
/// .worker_threads(8)
202+
/// .thread_name("io-pool")
203+
/// .enable_io()
204+
/// .enable_time()
205+
/// .build()?;
206+
///
207+
/// // Configure FileIO with runtime handle
208+
/// let file_io = FileIOBuilder::new("s3")
209+
/// .with_extension(RuntimeHandle::new(io_runtime.handle().clone()))
210+
/// .with_props(s3_config)
211+
/// .build()?;
212+
/// ```
213+
#[derive(Clone, Debug)]
214+
pub struct RuntimeHandle(pub tokio::runtime::Handle);
215+
216+
impl RuntimeHandle {
217+
/// Create a new RuntimeHandle from a Tokio runtime handle.
218+
pub fn new(handle: tokio::runtime::Handle) -> Self {
219+
Self(handle)
220+
}
221+
222+
/// Get the current runtime handle.
223+
///
224+
/// # Panics
225+
///
226+
/// Panics if called outside of a Tokio runtime context.
227+
pub fn current() -> Self {
228+
Self(tokio::runtime::Handle::current())
229+
}
230+
}
231+
185232
/// Builder for [`FileIO`].
186233
#[derive(Clone, Debug)]
187234
pub struct FileIOBuilder {

crates/iceberg/src/io/storage.rs

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
feature = "storage-azdls",
2323
))]
2424
use std::sync::Arc;
25+
use std::future::Future;
26+
use std::pin::Pin;
2527

2628
use opendal::layers::RetryLayer;
2729
#[cfg(feature = "storage-azdls")]
@@ -36,14 +38,34 @@ use opendal::{Operator, Scheme};
3638

3739
#[cfg(feature = "storage-azdls")]
3840
use super::AzureStorageScheme;
39-
use super::FileIOBuilder;
41+
use super::{FileIOBuilder, RuntimeHandle};
4042
#[cfg(feature = "storage-s3")]
4143
use crate::io::CustomAwsCredentialLoader;
4244
use crate::{Error, ErrorKind};
4345

46+
/// Custom OpenDAL executor that spawns tasks on a specific Tokio runtime.
47+
///
48+
/// This executor implements the OpenDAL Execute trait and routes all spawned
49+
/// tasks to a configured Tokio runtime handle, enabling runtime segregation.
50+
#[derive(Clone)]
51+
struct CustomTokioExecutor {
52+
handle: tokio::runtime::Handle,
53+
}
54+
55+
impl opendal::Execute for CustomTokioExecutor {
56+
fn execute(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
57+
self.handle.spawn(f);
58+
}
59+
}
60+
4461
/// The storage carries all supported storage services in iceberg
62+
pub(crate) struct Storage {
63+
backend: StorageBackend,
64+
executor: Option<opendal::Executor>,
65+
}
66+
4567
#[derive(Debug)]
46-
pub(crate) enum Storage {
68+
enum StorageBackend {
4769
#[cfg(feature = "storage-memory")]
4870
Memory(Operator),
4971
#[cfg(feature = "storage-fs")]
@@ -73,48 +95,69 @@ pub(crate) enum Storage {
7395
},
7496
}
7597

98+
impl std::fmt::Debug for Storage {
99+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100+
f.debug_struct("Storage")
101+
.field("backend", &self.backend)
102+
.field("executor", &self.executor.as_ref().map(|_| "Some(Executor)"))
103+
.finish()
104+
}
105+
}
106+
76107
impl Storage {
77108
/// Convert iceberg config to opendal config.
78109
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
79110
let (scheme_str, props, extensions) = file_io_builder.into_parts();
80111
let _ = (&props, &extensions);
81112
let scheme = Self::parse_scheme(&scheme_str)?;
82113

83-
match scheme {
114+
// Extract runtime handle and create executor if provided
115+
let executor = extensions
116+
.get::<RuntimeHandle>()
117+
.map(|runtime_handle| {
118+
let handle = Arc::unwrap_or_clone(runtime_handle).0;
119+
opendal::Executor::with(CustomTokioExecutor { handle })
120+
});
121+
122+
let backend = match scheme {
84123
#[cfg(feature = "storage-memory")]
85-
Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
124+
Scheme::Memory => StorageBackend::Memory(super::memory_config_build()?),
86125
#[cfg(feature = "storage-fs")]
87-
Scheme::Fs => Ok(Self::LocalFs),
126+
Scheme::Fs => StorageBackend::LocalFs,
88127
#[cfg(feature = "storage-s3")]
89-
Scheme::S3 => Ok(Self::S3 {
128+
Scheme::S3 => StorageBackend::S3 {
90129
configured_scheme: scheme_str,
91130
config: super::s3_config_parse(props)?.into(),
92131
customized_credential_load: extensions
93132
.get::<CustomAwsCredentialLoader>()
94133
.map(Arc::unwrap_or_clone),
95-
}),
134+
},
96135
#[cfg(feature = "storage-gcs")]
97-
Scheme::Gcs => Ok(Self::Gcs {
136+
Scheme::Gcs => StorageBackend::Gcs {
98137
config: super::gcs_config_parse(props)?.into(),
99-
}),
138+
},
100139
#[cfg(feature = "storage-oss")]
101-
Scheme::Oss => Ok(Self::Oss {
140+
Scheme::Oss => StorageBackend::Oss {
102141
config: super::oss_config_parse(props)?.into(),
103-
}),
142+
},
104143
#[cfg(feature = "storage-azdls")]
105144
Scheme::Azdls => {
106145
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
107-
Ok(Self::Azdls {
146+
StorageBackend::Azdls {
108147
config: super::azdls_config_parse(props)?.into(),
109148
configured_scheme: scheme,
110-
})
149+
}
111150
}
112151
// Update doc on [`FileIO`] when adding new schemes.
113-
_ => Err(Error::new(
114-
ErrorKind::FeatureUnsupported,
115-
format!("Constructing file io from scheme: {scheme} not supported now",),
116-
)),
117-
}
152+
_ => {
153+
return Err(Error::new(
154+
ErrorKind::FeatureUnsupported,
155+
format!("Constructing file io from scheme: {scheme} not supported now",),
156+
))
157+
}
158+
};
159+
160+
Ok(Self { backend, executor })
118161
}
119162

120163
/// Creates operator from path.
@@ -135,17 +178,17 @@ impl Storage {
135178
) -> crate::Result<(Operator, &'a str)> {
136179
let path = path.as_ref();
137180
let _ = path;
138-
let (operator, relative_path): (Operator, &str) = match self {
181+
let (operator, relative_path): (Operator, &str) = match &self.backend {
139182
#[cfg(feature = "storage-memory")]
140-
Storage::Memory(op) => {
183+
StorageBackend::Memory(op) => {
141184
if let Some(stripped) = path.strip_prefix("memory:/") {
142185
Ok::<_, crate::Error>((op.clone(), stripped))
143186
} else {
144187
Ok::<_, crate::Error>((op.clone(), &path[1..]))
145188
}
146189
}
147190
#[cfg(feature = "storage-fs")]
148-
Storage::LocalFs => {
191+
StorageBackend::LocalFs => {
149192
let op = super::fs_config_build()?;
150193

151194
if let Some(stripped) = path.strip_prefix("file:/") {
@@ -155,7 +198,7 @@ impl Storage {
155198
}
156199
}
157200
#[cfg(feature = "storage-s3")]
158-
Storage::S3 {
201+
StorageBackend::S3 {
159202
configured_scheme,
160203
config,
161204
customized_credential_load,
@@ -175,7 +218,7 @@ impl Storage {
175218
}
176219
}
177220
#[cfg(feature = "storage-gcs")]
178-
Storage::Gcs { config } => {
221+
StorageBackend::Gcs { config } => {
179222
let operator = super::gcs_config_build(config, path)?;
180223
let prefix = format!("gs://{}/", operator.info().name());
181224
if path.starts_with(&prefix) {
@@ -188,7 +231,7 @@ impl Storage {
188231
}
189232
}
190233
#[cfg(feature = "storage-oss")]
191-
Storage::Oss { config } => {
234+
StorageBackend::Oss { config } => {
192235
let op = super::oss_config_build(config, path)?;
193236

194237
// Check prefix of oss path.
@@ -203,7 +246,7 @@ impl Storage {
203246
}
204247
}
205248
#[cfg(feature = "storage-azdls")]
206-
Storage::Azdls {
249+
StorageBackend::Azdls {
207250
configured_scheme,
208251
config,
209252
} => super::azdls_create_operator(path, config, configured_scheme),
@@ -220,6 +263,12 @@ impl Storage {
220263
)),
221264
}?;
222265

266+
// Apply custom executor if configured for runtime segregation
267+
if let Some(ref executor) = self.executor {
268+
let executor_clone = executor.clone();
269+
operator.update_executor(|_| executor_clone);
270+
}
271+
223272
// Transient errors are common for object stores; however there's no
224273
// harm in retrying temporary failures for other storage backends as well.
225274
let operator = operator.layer(RetryLayer::new());

0 commit comments

Comments
 (0)