Skip to content

Commit 7e8d26f

Browse files
committed
feat(sqllogictest): support dynamic catalog configuration
This PR adds support for configuring the catalog type in sqllogictest schedule files, addressing issue #1780. Changes: - Add [catalog] section parsing in schedule files - Support 'memory' (default) and all catalog types from iceberg-catalog-loader - Update DataFusionEngine to accept catalog from schedule configuration - Add comprehensive tests for catalog configuration parsing The catalog can now be configured in schedule files like: [catalog] type = "rest" [catalog.properties] uri = "http://localhost:8181" If no catalog section is specified, defaults to memory catalog. Closes #1780
1 parent b7ba2e8 commit 7e8d26f

File tree

6 files changed

+221
-34
lines changed

6 files changed

+221
-34
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ http = "1.2"
8181
iceberg = { version = "0.8.0", path = "./crates/iceberg" }
8282
iceberg-catalog-glue = { version = "0.8.0", path = "./crates/catalog/glue" }
8383
iceberg-catalog-hms = { version = "0.8.0", path = "./crates/catalog/hms" }
84+
iceberg-catalog-loader = { version = "0.8.0", path = "./crates/catalog/loader" }
8485
iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" }
8586
iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" }
8687
iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" }

crates/sqllogictest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ datafusion-sqllogictest = { workspace = true }
3232
enum-ordinalize = { workspace = true }
3333
env_logger = { workspace = true }
3434
iceberg = { workspace = true }
35+
iceberg-catalog-loader = { workspace = true }
3536
iceberg-datafusion = { workspace = true }
3637
indicatif = { workspace = true }
3738
log = { workspace = true }

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@ use std::collections::HashMap;
1919
use std::path::{Path, PathBuf};
2020
use std::sync::Arc;
2121

22-
use datafusion::catalog::CatalogProvider;
2322
use datafusion::prelude::{SessionConfig, SessionContext};
2423
use datafusion_sqllogictest::DataFusion;
25-
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
2624
use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec};
27-
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
25+
use iceberg::{Catalog, NamespaceIdent, TableCreation};
2826
use iceberg_datafusion::IcebergCatalogProvider;
2927
use indicatif::ProgressBar;
3028
use toml::Table as TomlTable;
@@ -59,49 +57,54 @@ impl EngineRunner for DataFusionEngine {
5957
}
6058

6159
impl DataFusionEngine {
62-
pub async fn new(config: TomlTable) -> Result<Self> {
60+
pub async fn new(_config: TomlTable, catalog: Arc<dyn Catalog>) -> Result<Self> {
6361
let session_config = SessionConfig::new()
6462
.with_target_partitions(4)
6563
.with_information_schema(true);
6664
let ctx = SessionContext::new_with_config(session_config);
67-
ctx.register_catalog("default", Self::create_catalog(&config).await?);
65+
66+
// Create test namespace and tables in the catalog
67+
Self::setup_test_data(&catalog).await?;
68+
69+
// Register the catalog with DataFusion
70+
let catalog_provider = IcebergCatalogProvider::try_new(catalog)
71+
.await
72+
.map_err(|e| {
73+
crate::error::Error(anyhow::anyhow!("Failed to create catalog provider: {e}"))
74+
})?;
75+
ctx.register_catalog("default", Arc::new(catalog_provider));
6876

6977
Ok(Self {
7078
test_data_path: PathBuf::from("testdata"),
7179
session_context: ctx,
7280
})
7381
}
7482

75-
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
76-
// TODO: support dynamic catalog configuration
77-
// See: https://github.com/apache/iceberg-rust/issues/1780
78-
let catalog = MemoryCatalogBuilder::default()
79-
.load(
80-
"memory",
81-
HashMap::from([(
82-
MEMORY_CATALOG_WAREHOUSE.to_string(),
83-
"memory://".to_string(),
84-
)]),
85-
)
86-
.await?;
87-
83+
/// Set up the test namespace and tables in the catalog.
84+
async fn setup_test_data(catalog: &Arc<dyn Catalog>) -> anyhow::Result<()> {
8885
// Create a test namespace for INSERT INTO tests
8986
let namespace = NamespaceIdent::new("default".to_string());
90-
catalog.create_namespace(&namespace, HashMap::new()).await?;
9187

92-
// Create test tables
93-
Self::create_unpartitioned_table(&catalog, &namespace).await?;
94-
Self::create_partitioned_table(&catalog, &namespace).await?;
88+
// Try to create the namespace, ignore if it already exists
89+
if catalog
90+
.create_namespace(&namespace, HashMap::new())
91+
.await
92+
.is_err()
93+
{
94+
// Namespace might already exist, that's ok
95+
}
96+
97+
// Create test tables (ignore errors if they already exist)
98+
let _ = Self::create_unpartitioned_table(catalog, &namespace).await;
99+
let _ = Self::create_partitioned_table(catalog, &namespace).await;
95100

96-
Ok(Arc::new(
97-
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
98-
))
101+
Ok(())
99102
}
100103

101104
/// Create an unpartitioned test table with id and name columns
102105
/// TODO: this can be removed when we support CREATE TABLE
103106
async fn create_unpartitioned_table(
104-
catalog: &impl Catalog,
107+
catalog: &Arc<dyn Catalog>,
105108
namespace: &NamespaceIdent,
106109
) -> anyhow::Result<()> {
107110
let schema = Schema::builder()
@@ -128,7 +131,7 @@ impl DataFusionEngine {
128131
/// Partitioned by category using identity transform
129132
/// TODO: this can be removed when we support CREATE TABLE
130133
async fn create_partitioned_table(
131-
catalog: &impl Catalog,
134+
catalog: &Arc<dyn Catalog>,
132135
namespace: &NamespaceIdent,
133136
) -> anyhow::Result<()> {
134137
let schema = Schema::builder()

crates/sqllogictest/src/engine/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
mod datafusion;
1919

2020
use std::path::Path;
21+
use std::sync::Arc;
2122

2223
use anyhow::anyhow;
24+
use iceberg::Catalog;
2325
use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
2426
use toml::Table as TomlTable;
2527

@@ -36,9 +38,10 @@ pub trait EngineRunner: Send {
3638
pub async fn load_engine_runner(
3739
engine_type: &str,
3840
cfg: TomlTable,
41+
catalog: Arc<dyn Catalog>,
3942
) -> Result<Box<dyn EngineRunner>> {
4043
match engine_type {
41-
TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
44+
TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg, catalog).await?)),
4245
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
4346
}
4447
}
@@ -65,16 +68,38 @@ where
6568

6669
#[cfg(test)]
6770
mod tests {
71+
use std::collections::HashMap;
72+
use std::sync::Arc;
73+
74+
use iceberg::CatalogBuilder;
75+
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
76+
6877
use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
6978

79+
async fn create_test_catalog() -> Arc<dyn iceberg::Catalog> {
80+
Arc::new(
81+
MemoryCatalogBuilder::default()
82+
.load(
83+
"test",
84+
HashMap::from([(
85+
MEMORY_CATALOG_WAREHOUSE.to_string(),
86+
"memory://".to_string(),
87+
)]),
88+
)
89+
.await
90+
.unwrap(),
91+
)
92+
}
93+
7094
#[tokio::test]
7195
async fn test_engine_invalid_type() {
7296
let input = r#"
7397
[engines]
7498
random = { type = "random_engine", url = "http://localhost:8181" }
7599
"#;
76100
let tbl = toml::from_str(input).unwrap();
77-
let result = load_engine_runner("random_engine", tbl).await;
101+
let catalog = create_test_catalog().await;
102+
let result = load_engine_runner("random_engine", tbl, catalog).await;
78103

79104
assert!(result.is_err());
80105
}
@@ -86,7 +111,8 @@ mod tests {
86111
df = { type = "datafusion" }
87112
"#;
88113
let tbl = toml::from_str(input).unwrap();
89-
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
114+
let catalog = create_test_catalog().await;
115+
let result = load_engine_runner(TYPE_DATAFUSION, tbl, catalog).await;
90116

91117
assert!(result.is_ok());
92118
}

0 commit comments

Comments
 (0)