-
Notifications
You must be signed in to change notification settings - Fork 375
Description
Is your feature request related to a problem or challenge?
In the current implementation you cannot simultaneously create a snapshot through the FastAppendAction transaction API and add a tag to the new snapshot reference.
Without the atomic transaction guarantees, it is possible for other processes to add new snapshots and cleanup old snapshots (including the one we just created) before a tag can be added to the snapshot, thus protecting it from most automated snapshot expiration policies.
With the atomic guarantees, we can guarantee that either our data was not committed into the table, or it was and it has a tagged snapshot reference that is protected from expiry.
The FastAppendAction currently creates a new snapshot an then updates the main reference to point to this new snapshot.
There is no set of Transaction actions that supports the SetSnapshotRef update for custom tags.
Describe the solution you'd like
I would like a method (.with_tag(tag:&str)?) on the FastAppendAction that allows for this sort of calling pattern:
let tx = Transaction::new(&table);
let action = tx
.fast_append()
.add_data_files(data_files)
.with_tag("v1.0.0");
let tx = action.apply(tx)?;
let table = tx.commit(&catalog).await?;This extends the fast_append() action which already uses the SetSnapshotRef--but specifically for the main branch--to be able to also SetSnapshotRef for custom tags.
Willingness to contribute
I can contribute to this feature independently
Motivation
I am building consumer that sinks data into an Apache Iceberg table and my design goal is to atomically commit both data and producer watermarks (e.g. Kafka offsets) into a table snapshot properties. This enables exactly-once semantics even in case of a system failure because we atomically record how much of the source data stream we have persisted into the table. Producers like Kakfa have mechanisms for tracking what has been consumed (via consumer groups and offset commits) but you can't atomically commit an Iceberg transaction and commit offsets back to a producer, so depending on how/when things fail you can end up with inconsistencies between the upstream producer's watermark and data persisted into the downstream table. Storing the watermark progress (again, offsets in context of Kafka) in the table gives us the necessary guarantees to prevent data inconsistency.
Currently the iceberg-rust allows for atomic properties + data writes, so the watermark storage is guaranteed, but if there are other writers to the table then the snapshot that stores the watermarks from this protocol can quickly be expired if older snapshots are removed. Providing references to a snapshot is one way to control its expiration behavior, so it would be desired to add a reference to the snapshot that contains the most current watermark information so as to prevent it from being expired if necessary. If the snapshot ref is not atomically created we can't guarantee that the snapshot will still exist by the time we try to create the reference on the just created snapshot.
Having such a named reference also prevents having to search backwards through all the snapshots to find the most recent snapshot with stored watermark properties, you can directly access it via a known snapshot reference name.
I've built a workaround that wraps the RestCatalog implementation to effectively insert a TableUpdate action into the to REST API call to the catalog that sets a snapshot ref, but it feels hacky and would be much better to use the existing tooling already resident in iceberg-rust to add a SetSnapshotRef action to the transaction.
Edit: Add motivation section