Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ rust-version = "1.88"

[workspace.dependencies]
anyhow = "1.0.72"
apache-avro = { version = "0.21", features = ["zstandard"] }
apache-avro = { version = "0.21", features = ["zstandard", "snappy"] }
array-init = "2"
arrow-arith = "57.0"
arrow-array = "57.0"
Expand Down Expand Up @@ -93,6 +93,7 @@ log = "0.4.28"
metainfo = "0.7.14"
mimalloc = "0.1.46"
minijinja = "2.12.0"
miniz_oxide = "0.8"
mockall = "0.13.1"
mockito = "1"
motore-macros = "0.4.3"
Expand Down
10 changes: 7 additions & 3 deletions crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ mod tests {
let table_name = "my_table".to_string();
let location = "s3a://warehouse/hive".to_string();
let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
let properties = HashMap::new();
let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
Expand Down Expand Up @@ -336,8 +335,13 @@ mod tests {
.location(metadata.location())
.build();

let result =
convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?;
let result = convert_to_glue_table(
&table_name,
metadata_location,
&metadata,
metadata.properties(),
None,
)?;

assert_eq!(result.name(), &table_name);
assert_eq!(result.description(), None);
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
miniz_oxide = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
num-bigint = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl ObjectCache {
mod tests {
use std::fs;

use apache_avro::Codec;
use minijinja::value::Value;
use minijinja::{AutoEscape, Environment, context};
use tempfile::TempDir;
Expand Down Expand Up @@ -275,6 +276,7 @@ mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
Codec::Null,
)
.build_v2_data();
writer
Expand Down Expand Up @@ -307,6 +309,7 @@ mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Codec::Null,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand Down
14 changes: 9 additions & 5 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,12 @@ pub mod tests {
//! shared tests for the table scan API
#![allow(missing_docs)]

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::sync::Arc;

use apache_avro::Codec;
use arrow_array::cast::AsArray;
use arrow_array::{
Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch,
Expand Down Expand Up @@ -763,6 +764,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
Codec::Null,
)
.build_v2_data();
writer
Expand Down Expand Up @@ -840,6 +842,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Codec::Null,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand Down Expand Up @@ -975,6 +978,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
Codec::Null,
)
.build_v2_data();

Expand Down Expand Up @@ -1059,6 +1063,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Codec::Null,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand Down Expand Up @@ -1186,6 +1191,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
Codec::Null,
)
.build_v2_data();

Expand Down Expand Up @@ -1221,6 +1227,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
Codec::Null,
)
.build_v2_deletes();

Expand Down Expand Up @@ -1255,6 +1262,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Codec::Null,
);
manifest_list_write
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
Expand Down Expand Up @@ -1908,8 +1916,6 @@ pub mod tests {

#[tokio::test]
async fn test_select_with_file_column() {
use arrow_array::cast::AsArray;

let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

Expand Down Expand Up @@ -2031,8 +2037,6 @@ pub mod tests {

#[tokio::test]
async fn test_file_column_with_multiple_files() {
use std::collections::HashSet;

let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

Expand Down
167 changes: 167 additions & 0 deletions crates/iceberg/src/spec/avro_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities for working with Apache Avro in Iceberg.

use apache_avro::{Codec, DeflateSettings, ZstandardSettings};
use log::warn;
use miniz_oxide::deflate::CompressionLevel;

/// Codec name for gzip compression
pub const CODEC_GZIP: &str = "gzip";
/// Codec name for zstd compression
pub const CODEC_ZSTD: &str = "zstd";
/// Codec name for snappy compression
pub const CODEC_SNAPPY: &str = "snappy";
/// Codec name for uncompressed
pub const CODEC_UNCOMPRESSED: &str = "uncompressed";

/// Default compression level for gzip (matches Java implementation)
const DEFAULT_GZIP_LEVEL: u8 = 9;
/// Default compression level for zstd (matches Java implementation)
const DEFAULT_ZSTD_LEVEL: u8 = 1;
/// Max supported level for ZSTD
const MAX_ZSTD_LEVEL: u8 = 22;

/// Convert codec name and level to apache_avro::Codec.
/// Returns Codec::Null for unknown or unsupported codecs.
///
/// # Arguments
///
/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "snappy", "uncompressed")
/// * `level` - The compression level. For gzip/deflate:
/// - 0: NoCompression
/// - 1: BestSpeed
/// - 9: BestCompression
/// - 10: UberCompression
/// - 6: DefaultLevel (balanced speed/compression)
/// - Other values: DefaultLevel
///
/// For zstd, level is clamped to valid range (0-22).
/// When `None`, uses codec-specific defaults.
///
/// # Supported Codecs
///
/// - `gzip`: Uses Deflate compression with specified level
/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range 0-22)
/// - `snappy`: Uses Snappy compression (level parameter ignored)
/// - `uncompressed` or `None`: No compression
/// - Any other value: Defaults to no compression (Codec::Null)
pub(crate) fn codec_from_str(codec: Option<&str>, level: Option<u8>) -> Codec {
// Use case-insensitive comparison to match Java implementation
match codec.map(|s| s.to_lowercase()).as_deref() {
Some(c) if c == CODEC_GZIP => {
// Map compression level to miniz_oxide::deflate::CompressionLevel
// Reference: https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html
let compression_level = match level.unwrap_or(DEFAULT_GZIP_LEVEL) {
0 => CompressionLevel::NoCompression,
1 => CompressionLevel::BestSpeed,
9 => CompressionLevel::BestCompression,
10 => CompressionLevel::UberCompression,
_ => CompressionLevel::DefaultLevel,
};

Codec::Deflate(DeflateSettings::new(compression_level))
}
Some(c) if c == CODEC_ZSTD => {
// Zstandard supports levels 0-22, clamp to valid range
let zstd_level = level.unwrap_or(DEFAULT_ZSTD_LEVEL).min(MAX_ZSTD_LEVEL);
Codec::Zstandard(ZstandardSettings::new(zstd_level))
}
Some(c) if c == CODEC_SNAPPY => Codec::Snappy,
Some(c) if c == CODEC_UNCOMPRESSED => Codec::Null,
None => Codec::Null,
Some(unknown) => {
warn!("Unrecognized compression codec '{unknown}', using no compression (Codec::Null)");
Codec::Null
}
}
}

#[cfg(test)]
mod tests {
use apache_avro::{DeflateSettings, ZstandardSettings};
use miniz_oxide::deflate::CompressionLevel;

use super::*;

#[test]
fn test_codec_from_str_gzip() {
// Test with mixed case to verify case-insensitive matching
let codec = codec_from_str(Some("GZip"), Some(5));
assert_eq!(
codec,
Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel))
);
}

#[test]
fn test_codec_from_str_snappy() {
let codec = codec_from_str(Some("snappy"), None);
assert_eq!(codec, Codec::Snappy);
}

#[test]
fn test_codec_from_str_zstd() {
let codec = codec_from_str(Some("zstd"), Some(3));
assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(3)));
}

#[test]
fn test_codec_from_str_zstd_clamping() {
let codec = codec_from_str(Some("zstd"), Some(MAX_ZSTD_LEVEL + 1));
assert_eq!(
codec,
Codec::Zstandard(ZstandardSettings::new(MAX_ZSTD_LEVEL))
);
}

#[test]
fn test_codec_from_str_uncompressed() {
let codec = codec_from_str(Some("uncompressed"), None);
assert!(matches!(codec, Codec::Null));
}

#[test]
fn test_codec_from_str_null() {
let codec = codec_from_str(None, None);
assert!(matches!(codec, Codec::Null));
}

#[test]
fn test_codec_from_str_unknown() {
let codec = codec_from_str(Some("unknown"), Some(1));
assert!(matches!(codec, Codec::Null));
}

#[test]
fn test_codec_from_str_gzip_default_level() {
// Test that None level defaults to 9 for gzip
let codec = codec_from_str(Some("gzip"), None);
assert_eq!(
codec,
Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression))
);
}

#[test]
fn test_codec_from_str_zstd_default_level() {
// Test that None level defaults to 1 for zstd
let codec = codec_from_str(Some("zstd"), None);
assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(1)));
}
}
Loading
Loading