Skip to content

Commit f1ffb2e

Browse files
committed
feat(transaction): Add atomic snapshot tagging to FastAppendAction
Add the ability to create a tag reference atomically in the same transaction that creates a new snapshot via fast_append. This adds a `with_tag()` method to `FastAppendAction` that allows specifying a tag name. When set, the tag will be created pointing to the newly created snapshot, all within a single atomic catalog update. Example usage: ```rust let tx = Transaction::new(&table); let action = tx .fast_append() .add_data_files(data_files) .with_tag("v1.0.0"); let tx = action.apply(tx)?; let table = tx.commit(&catalog).await?; ```
1 parent 6bda658 commit f1ffb2e

File tree

2 files changed

+85
-1
lines changed

2 files changed

+85
-1
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub struct FastAppendAction {
3737
key_metadata: Option<Vec<u8>>,
3838
snapshot_properties: HashMap<String, String>,
3939
added_data_files: Vec<DataFile>,
40+
// Optional tag name to create atomically with the snapshot.
41+
tag_ref: Option<String>,
4042
}
4143

4244
impl FastAppendAction {
@@ -47,6 +49,7 @@ impl FastAppendAction {
4749
key_metadata: None,
4850
snapshot_properties: HashMap::default(),
4951
added_data_files: vec![],
52+
tag_ref: None,
5053
}
5154
}
5255

@@ -79,6 +82,12 @@ impl FastAppendAction {
7982
self.snapshot_properties = snapshot_properties;
8083
self
8184
}
85+
86+
/// Set a tag name to be created atomically with the snapshot.
87+
pub fn with_tag(mut self, tag_name: impl Into<String>) -> Self {
88+
self.tag_ref = Some(tag_name.into());
89+
self
90+
}
8291
}
8392

8493
#[async_trait]
@@ -90,6 +99,7 @@ impl TransactionAction for FastAppendAction {
9099
self.key_metadata.clone(),
91100
self.snapshot_properties.clone(),
92101
self.added_data_files.clone(),
102+
self.tag_ref.clone(),
93103
);
94104

95105
// validate added files
@@ -333,4 +343,65 @@ mod tests {
333343
);
334344
assert_eq!(data_file, *manifest.entries()[0].data_file());
335345
}
346+
347+
#[tokio::test]
348+
async fn test_fast_append_with_tag() {
349+
let table = make_v2_minimal_table();
350+
let tx = Transaction::new(&table);
351+
352+
let data_file = DataFileBuilder::default()
353+
.content(DataContentType::Data)
354+
.file_path("test/tagged.parquet".to_string())
355+
.file_format(DataFileFormat::Parquet)
356+
.file_size_in_bytes(100)
357+
.record_count(1)
358+
.partition_spec_id(table.metadata().default_partition_spec_id())
359+
.partition(Struct::from_iter([Some(Literal::long(300))]))
360+
.build()
361+
.unwrap();
362+
363+
let action = tx
364+
.fast_append()
365+
.add_data_files(vec![data_file])
366+
.with_tag("v1.0.0");
367+
368+
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
369+
let updates = action_commit.take_updates();
370+
371+
// Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag)
372+
assert_eq!(updates.len(), 3);
373+
374+
// First update: AddSnapshot
375+
let snapshot_id = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
376+
snapshot.snapshot_id()
377+
} else {
378+
panic!("Expected AddSnapshot as first update");
379+
};
380+
381+
// Second update: SetSnapshotRef for main branch
382+
if let TableUpdate::SetSnapshotRef {
383+
ref_name,
384+
reference,
385+
} = &updates[1]
386+
{
387+
assert_eq!(ref_name, MAIN_BRANCH);
388+
assert_eq!(reference.snapshot_id, snapshot_id);
389+
assert!(reference.is_branch());
390+
} else {
391+
panic!("Expected SetSnapshotRef for main branch as second update");
392+
}
393+
394+
// Third update: SetSnapshotRef for tag
395+
if let TableUpdate::SetSnapshotRef {
396+
ref_name,
397+
reference,
398+
} = &updates[2]
399+
{
400+
assert_eq!(ref_name, "v1.0.0");
401+
assert_eq!(reference.snapshot_id, snapshot_id);
402+
assert!(!reference.is_branch()); // Should be a tag, not a branch
403+
} else {
404+
panic!("Expected SetSnapshotRef for tag as third update");
405+
}
406+
}
336407
}

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> {
118118
// It starts from 0 and increments for each new manifest file.
119119
// Note: This counter is limited to the range of (0..u64::MAX).
120120
manifest_counter: RangeFrom<u64>,
121+
// Optional tag name to create atomically with the snapshot.
122+
tag_ref: Option<String>,
121123
}
122124

123125
impl<'a> SnapshotProducer<'a> {
@@ -127,6 +129,7 @@ impl<'a> SnapshotProducer<'a> {
127129
key_metadata: Option<Vec<u8>>,
128130
snapshot_properties: HashMap<String, String>,
129131
added_data_files: Vec<DataFile>,
132+
tag_ref: Option<String>,
130133
) -> Self {
131134
Self {
132135
table,
@@ -136,6 +139,7 @@ impl<'a> SnapshotProducer<'a> {
136139
snapshot_properties,
137140
added_data_files,
138141
manifest_counter: (0..),
142+
tag_ref,
139143
}
140144
}
141145

@@ -485,7 +489,7 @@ impl<'a> SnapshotProducer<'a> {
485489
new_snapshot.build()
486490
};
487491

488-
let updates = vec![
492+
let mut updates = vec![
489493
TableUpdate::AddSnapshot {
490494
snapshot: new_snapshot,
491495
},
@@ -498,6 +502,15 @@ impl<'a> SnapshotProducer<'a> {
498502
},
499503
];
500504

505+
if let Some(tag_name) = self.tag_ref {
506+
updates.push(TableUpdate::SetSnapshotRef {
507+
ref_name: tag_name,
508+
reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag {
509+
max_ref_age_ms: None,
510+
}),
511+
});
512+
}
513+
501514
let requirements = vec![
502515
TableRequirement::UuidMatch {
503516
uuid: self.table.metadata().uuid(),

0 commit comments

Comments
 (0)