-
Notifications
You must be signed in to change notification settings - Fork 375
feat!: Support compression codecs for Avro Files (including manifest and manifest lists) #1851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
emkornfield
wants to merge
29
commits into
apache:main
Choose a base branch
from
emkornfield:fix_compression
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
665b970
feat!: Support compression codecs for JSON metadata and Avro
emkornfield 091d3bc
fmt
emkornfield 41a8c1c
fix clippy
emkornfield 51e781e
clippy again
emkornfield 253bf59
wip
emkornfield 8bdb52d
address comments
emkornfield 9d27116
no clone needed
emkornfield d1ee0b2
test compression works
emkornfield 393622f
comments
emkornfield 46fdb8e
update tests
emkornfield d50cb7d
address comments
emkornfield 5370f77
remove parse optional property
emkornfield 6b3d8ed
fmt
emkornfield 5ec090f
wip
emkornfield 4337a34
address comments
emkornfield dba26a1
put parsing in table properties
emkornfield d81ba56
cargo fmt
emkornfield 23384ab
fmt
emkornfield 173fbef
remove unneeded tests
emkornfield a52d015
fix package import
emkornfield f4dd663
clippy and visibility
emkornfield 6745adf
merge main
emkornfield 95a217e
fix
emkornfield ef41b4d
update cargo lock
emkornfield d9603ff
add missing args
emkornfield e43e0cc
address clippy
emkornfield 0ec5784
Fmt
emkornfield ed71f0e
move use statements to top level
emkornfield edb6886
remove duplicate imports
emkornfield File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Utilities for working with Apache Avro in Iceberg. | ||
|
|
||
| use apache_avro::{Codec, DeflateSettings, ZstandardSettings}; | ||
| use log::warn; | ||
| use miniz_oxide::deflate::CompressionLevel; | ||
|
|
||
| /// Codec name for gzip compression | ||
| pub const CODEC_GZIP: &str = "gzip"; | ||
| /// Codec name for zstd compression | ||
| pub const CODEC_ZSTD: &str = "zstd"; | ||
| /// Codec name for snappy compression | ||
| pub const CODEC_SNAPPY: &str = "snappy"; | ||
| /// Codec name for uncompressed | ||
| pub const CODEC_UNCOMPRESSED: &str = "uncompressed"; | ||
|
|
||
| /// Default compression level for gzip (matches Java implementation) | ||
| const DEFAULT_GZIP_LEVEL: u8 = 9; | ||
| /// Default compression level for zstd (matches Java implementation) | ||
| const DEFAULT_ZSTD_LEVEL: u8 = 1; | ||
| /// Max supported level for ZSTD | ||
| const MAX_ZSTD_LEVEL: u8 = 22; | ||
|
|
||
| /// Convert codec name and level to apache_avro::Codec. | ||
| /// Returns Codec::Null for unknown or unsupported codecs. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "snappy", "uncompressed") | ||
| /// * `level` - The compression level. For gzip/deflate: | ||
| /// - 0: NoCompression | ||
| /// - 1: BestSpeed | ||
| /// - 9: BestCompression | ||
| /// - 10: UberCompression | ||
| /// - 6: DefaultLevel (balanced speed/compression) | ||
| /// - Other values: DefaultLevel | ||
| /// | ||
| /// For zstd, level is clamped to valid range (0-22). | ||
| /// When `None`, uses codec-specific defaults. | ||
| /// | ||
| /// # Supported Codecs | ||
| /// | ||
| /// - `gzip`: Uses Deflate compression with specified level | ||
| /// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range 0-22) | ||
| /// - `snappy`: Uses Snappy compression (level parameter ignored) | ||
| /// - `uncompressed` or `None`: No compression | ||
| /// - Any other value: Defaults to no compression (Codec::Null) | ||
| pub(crate) fn codec_from_str(codec: Option<&str>, level: Option<u8>) -> Codec { | ||
| // Use case-insensitive comparison to match Java implementation | ||
| match codec.map(|s| s.to_lowercase()).as_deref() { | ||
| Some(c) if c == CODEC_GZIP => { | ||
| // Map compression level to miniz_oxide::deflate::CompressionLevel | ||
| // Reference: https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html | ||
| let compression_level = match level.unwrap_or(DEFAULT_GZIP_LEVEL) { | ||
| 0 => CompressionLevel::NoCompression, | ||
| 1 => CompressionLevel::BestSpeed, | ||
| 9 => CompressionLevel::BestCompression, | ||
| 10 => CompressionLevel::UberCompression, | ||
| _ => CompressionLevel::DefaultLevel, | ||
| }; | ||
|
|
||
| Codec::Deflate(DeflateSettings::new(compression_level)) | ||
| } | ||
| Some(c) if c == CODEC_ZSTD => { | ||
| // Zstandard supports levels 0-22, clamp to valid range | ||
| let zstd_level = level.unwrap_or(DEFAULT_ZSTD_LEVEL).min(MAX_ZSTD_LEVEL); | ||
| Codec::Zstandard(ZstandardSettings::new(zstd_level)) | ||
| } | ||
| Some(c) if c == CODEC_SNAPPY => Codec::Snappy, | ||
| Some(c) if c == CODEC_UNCOMPRESSED => Codec::Null, | ||
| None => Codec::Null, | ||
| Some(unknown) => { | ||
| warn!("Unrecognized compression codec '{unknown}', using no compression (Codec::Null)"); | ||
| Codec::Null | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use apache_avro::{DeflateSettings, ZstandardSettings}; | ||
| use miniz_oxide::deflate::CompressionLevel; | ||
|
|
||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_gzip() { | ||
| // Test with mixed case to verify case-insensitive matching | ||
| let codec = codec_from_str(Some("GZip"), Some(5)); | ||
| assert_eq!( | ||
| codec, | ||
| Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel)) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_snappy() { | ||
| let codec = codec_from_str(Some("snappy"), None); | ||
| assert_eq!(codec, Codec::Snappy); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_zstd() { | ||
| let codec = codec_from_str(Some("zstd"), Some(3)); | ||
| assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(3))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_zstd_clamping() { | ||
| let codec = codec_from_str(Some("zstd"), Some(MAX_ZSTD_LEVEL + 1)); | ||
| assert_eq!( | ||
| codec, | ||
| Codec::Zstandard(ZstandardSettings::new(MAX_ZSTD_LEVEL)) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_uncompressed() { | ||
| let codec = codec_from_str(Some("uncompressed"), None); | ||
| assert!(matches!(codec, Codec::Null)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_null() { | ||
| let codec = codec_from_str(None, None); | ||
| assert!(matches!(codec, Codec::Null)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_unknown() { | ||
| let codec = codec_from_str(Some("unknown"), Some(1)); | ||
| assert!(matches!(codec, Codec::Null)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_gzip_default_level() { | ||
| // Test that None level defaults to 9 for gzip | ||
| let codec = codec_from_str(Some("gzip"), None); | ||
| assert_eq!( | ||
| codec, | ||
| Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression)) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_codec_from_str_zstd_default_level() { | ||
| // Test that None level defaults to 1 for zstd | ||
| let codec = codec_from_str(Some("zstd"), None); | ||
| assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(1))); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.