Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ http = "1.2"
iceberg = { version = "0.8.0", path = "./crates/iceberg" }
iceberg-catalog-glue = { version = "0.8.0", path = "./crates/catalog/glue" }
iceberg-catalog-hms = { version = "0.8.0", path = "./crates/catalog/hms" }
iceberg-catalog-loader = { version = "0.8.0", path = "./crates/catalog/loader" }
iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" }
iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" }
iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" }
Expand Down
1 change: 1 addition & 0 deletions crates/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ datafusion-sqllogictest = { workspace = true }
enum-ordinalize = { workspace = true }
env_logger = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-loader = { workspace = true }
iceberg-datafusion = { workspace = true }
indicatif = { workspace = true }
log = { workspace = true }
Expand Down
123 changes: 101 additions & 22 deletions crates/sqllogictest/src/engine/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use datafusion::catalog::CatalogProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_sqllogictest::DataFusion;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
use iceberg_catalog_loader::CatalogLoader;
use iceberg_datafusion::IcebergCatalogProvider;
use indicatif::ProgressBar;
use toml::Table as TomlTable;

use crate::engine::{EngineRunner, run_slt_with_runner};
use crate::error::Result;

const DEFAULT_CATALOG_TYPE: &str = "memory";

pub struct DataFusionEngine {
test_data_path: PathBuf,
session_context: SessionContext,
Expand Down Expand Up @@ -59,49 +61,126 @@ impl EngineRunner for DataFusionEngine {
}

impl DataFusionEngine {
/// Create a new DataFusion engine with catalog configuration from the TOML config.
///
/// # Configuration
///
/// The engine reads catalog configuration from the TOML config:
/// - `catalog_type`: The type of catalog to use (e.g., "memory", "rest"). Defaults to "memory".
/// - `catalog_properties`: Additional properties for the catalog (optional).
///
/// # Example configuration
///
/// ```toml
/// [engines]
/// df = { type = "datafusion", catalog_type = "rest", catalog_properties = { uri = "http://localhost:8181" } }
/// ```
pub async fn new(config: TomlTable) -> Result<Self> {
let catalog = Self::create_catalog(&config).await?;

let session_config = SessionConfig::new()
.with_target_partitions(4)
.with_information_schema(true);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);

// Create test namespace and tables in the catalog
Self::setup_test_data(&catalog).await?;

// Register the catalog with DataFusion
let catalog_provider = IcebergCatalogProvider::try_new(catalog)
.await
.map_err(|e| {
crate::error::Error(anyhow::anyhow!("Failed to create catalog provider: {e}"))
})?;
ctx.register_catalog("default", Arc::new(catalog_provider));

Ok(Self {
test_data_path: PathBuf::from("testdata"),
session_context: ctx,
})
}

async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
// TODO: support dynamic catalog configuration
// See: https://github.com/apache/iceberg-rust/issues/1780
let catalog = MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(
/// Create a catalog from the engine configuration.
///
/// Supported catalog types:
/// - "memory": In-memory catalog (default), useful for testing
/// - "rest": REST catalog
/// - "glue": AWS Glue catalog
/// - "hms": Hive Metastore catalog
/// - "s3tables": S3 Tables catalog
/// - "sql": SQL catalog
async fn create_catalog(config: &TomlTable) -> Result<Arc<dyn Catalog>> {
let catalog_type = config
.get("catalog_type")
.and_then(|v| v.as_str())
.unwrap_or(DEFAULT_CATALOG_TYPE);

let catalog_properties: HashMap<String, String> = config
.get("catalog_properties")
.and_then(|v| v.as_table())
.map(|t| {
t.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
.collect()
})
.unwrap_or_default();

if catalog_type == "memory" {
// Memory catalog is built-in to iceberg crate, not in catalog-loader
// Ensure warehouse is set for memory catalog
let mut props = catalog_properties;
if !props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
// Use a temp directory as default warehouse for testing
props.insert(
MEMORY_CATALOG_WAREHOUSE.to_string(),
"memory://".to_string(),
)]),
)
.await?;
std::env::temp_dir()
.join("iceberg-sqllogictest")
.to_string_lossy()
.to_string(),
);
}
let catalog = MemoryCatalogBuilder::default()
.load("default", props)
.await
.map_err(|e| {
crate::error::Error(anyhow::anyhow!("Failed to load memory catalog: {e}"))
})?;
Ok(Arc::new(catalog))
} else {
// Use catalog-loader for other catalog types
let catalog = CatalogLoader::from(catalog_type)
.load("default".to_string(), catalog_properties)
.await
.map_err(|e| crate::error::Error(anyhow::anyhow!("Failed to load catalog: {e}")))?;
Ok(catalog)
}
}

/// Set up the test namespace and tables in the catalog.
async fn setup_test_data(catalog: &Arc<dyn Catalog>) -> anyhow::Result<()> {
// Create a test namespace for INSERT INTO tests
let namespace = NamespaceIdent::new("default".to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;

// Create test tables
Self::create_unpartitioned_table(&catalog, &namespace).await?;
Self::create_partitioned_table(&catalog, &namespace).await?;
// Try to create the namespace, ignore if it already exists
if catalog
.create_namespace(&namespace, HashMap::new())
.await
.is_err()
{
// Namespace might already exist, that's ok
}

Ok(Arc::new(
IcebergCatalogProvider::try_new(Arc::new(catalog)).await?,
))
// Create test tables (ignore errors if they already exist)
let _ = Self::create_unpartitioned_table(catalog, &namespace).await;
let _ = Self::create_partitioned_table(catalog, &namespace).await;

Ok(())
}

/// Create an unpartitioned test table with id and name columns
/// TODO: this can be removed when we support CREATE TABLE
async fn create_unpartitioned_table(
catalog: &impl Catalog,
catalog: &Arc<dyn Catalog>,
namespace: &NamespaceIdent,
) -> anyhow::Result<()> {
let schema = Schema::builder()
Expand All @@ -128,7 +207,7 @@ impl DataFusionEngine {
/// Partitioned by category using identity transform
/// TODO: this can be removed when we support CREATE TABLE
async fn create_partitioned_table(
catalog: &impl Catalog,
catalog: &Arc<dyn Catalog>,
namespace: &NamespaceIdent,
) -> anyhow::Result<()> {
let schema = Schema::builder()
Expand Down
19 changes: 17 additions & 2 deletions crates/sqllogictest/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub trait EngineRunner: Send {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}

/// Load an engine runner based on the engine type and configuration.
/// Each engine is responsible for creating its own catalog based on the
/// `catalog_type` and `catalog_properties` fields in the config.
pub async fn load_engine_runner(
engine_type: &str,
cfg: TomlTable,
Expand Down Expand Up @@ -80,14 +83,26 @@ mod tests {
}

#[tokio::test]
async fn test_load_datafusion() {
async fn test_load_datafusion_default_catalog() {
let input = r#"
[engines]
df = { type = "datafusion" }
"#;
let tbl = toml::from_str(input).unwrap();
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;

assert!(result.is_ok());
assert!(result.is_ok(), "Failed to load engine: {:?}", result.err());
}

#[tokio::test]
async fn test_load_datafusion_with_memory_catalog() {
let input = r#"
[engines]
df = { type = "datafusion", catalog_type = "memory" }
"#;
let tbl = toml::from_str(input).unwrap();
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;

assert!(result.is_ok(), "Failed to load engine: {:?}", result.err());
}
}
27 changes: 27 additions & 0 deletions crates/sqllogictest/src/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,31 @@ mod tests {

assert!(result.is_err());
}

#[tokio::test]
async fn test_parse_engines_with_catalog_config() {
let toml_content = r#"
[engines]
df = { type = "datafusion", catalog_type = "memory" }
"#;

let table: TomlTable = toml::from_str(toml_content).unwrap();
let result = Schedule::parse_engines(&table).await;

assert!(result.is_ok());
}

#[tokio::test]
async fn test_parse_engines_default_catalog() {
let toml_content = r#"
[engines]
df = { type = "datafusion" }
"#;

let table: TomlTable = toml::from_str(toml_content).unwrap();
let result = Schedule::parse_engines(&table).await;

// Should default to memory catalog
assert!(result.is_ok());
}
}
Loading