diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 5e5fe9fbfef1..2e238f287f37 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -342,6 +342,35 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException {
return success;
}
+ /**
+ * Write content atomically, failing if target file already exists. Uses native conditional
+ * writes on supported object stores. Falls back to {@link #tryToWriteAtomic} on filesystems
+ * without native conditional write support.
+ *
+ * @param path the target file path
+ * @param content the content to write
+ * @return true if write succeeded, false if file already exists
+ * @throws IOException on I/O errors (not including "file exists" condition)
+ */
+ default boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
+ // Default implementation uses tryToWriteAtomic which is safe for HDFS
+ // but requires external locking for object stores without native conditional writes
+ return tryToWriteAtomic(path, content);
+ }
+
+ /**
+ * Returns whether this FileIO supports native conditional writes (write-if-absent semantics).
+ *
+ *
When true, {@link #tryToWriteAtomicIfAbsent} uses native conditional write operations that
+ * atomically fail if the target file exists. This eliminates the need for external locking on
+ * object stores.
+ *
+ * @return true if native conditional writes are supported
+ */
+ default boolean supportsConditionalWrite() {
+ return false;
+ }
+
default void writeFile(Path path, String content, boolean overwrite) throws IOException {
try (PositionOutputStream out = newOutputStream(path, overwrite)) {
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
index 367bce383719..6ac60eb01dfa 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
@@ -47,6 +47,17 @@
/** Test static methods and methods with default implementations of {@link FileIO}. */
public class FileIOTest {
+ @Test
+ public void testConditionalWriteDefaults(@TempDir java.nio.file.Path tempDir) throws Exception {
+ FileIO fileIO = new DummyFileIO();
+ Path file = new Path(tempDir.resolve("test.txt").toUri());
+
+ assertThat(fileIO.supportsConditionalWrite()).isFalse();
+ assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "first")).isTrue();
+ assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "second")).isFalse();
+ assertThat(fileIO.readFileUtf8(file)).isEqualTo("first");
+ }
+
@TempDir java.nio.file.Path tempDir;
@Test
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
index 5cc9d2a0199a..801d4cbf0af6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java
@@ -32,7 +32,8 @@
* A {@link SnapshotCommit} using file renaming to commit.
*
*
Note that when the file system is local or HDFS, rename is atomic. But if the file system is
- * object storage, we need additional lock protection.
+ * object storage, we need additional lock protection unless the storage supports native conditional
+ * writes.
*/
public class RenamingSnapshotCommit implements SnapshotCommit {
@@ -54,6 +55,17 @@ public boolean commit(Snapshot snapshot, String branch, List callable =
() -> {
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java
new file mode 100644
index 000000000000..9cadaf2ba996
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RenamingSnapshotCommit}. */
+public class RenamingSnapshotCommitTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private Path tablePath;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ tablePath = new Path(tempDir.toString());
+ LocalFileIO.create().mkdirs(new Path(tablePath, "snapshot"));
+ }
+
+ @Test
+ public void testConditionalWritePathSkipsLock() throws Exception {
+ ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO();
+ RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty());
+
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
+ assertThat(fileIO.conditionalWriteCalls.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testConditionalWriteFailsOnConflict() throws Exception {
+ ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO();
+ RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty());
+
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse();
+ }
+
+ @Test
+ public void testFallbackPathUsesLock() throws Exception {
+ AtomicInteger lockCalls = new AtomicInteger();
+ RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls));
+
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
+ assertThat(lockCalls.get()).isEqualTo(1);
+ }
+
+ @Test
+ public void testFallbackPathFailsOnConflict() throws Exception {
+ AtomicInteger lockCalls = new AtomicInteger();
+ RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls));
+
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
+ assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse();
+ assertThat(lockCalls.get()).isEqualTo(2);
+ }
+
+ private RenamingSnapshotCommit newCommit(LocalFileIO fileIO, Lock lock) {
+ return new RenamingSnapshotCommit(
+ new SnapshotManager(fileIO, tablePath, "main", null, null), lock);
+ }
+
+ private static Snapshot newSnapshot(long id) {
+ return createSnapshotWithMillis(id, System.currentTimeMillis());
+ }
+
+ private static Lock trackingLock(AtomicInteger counter) {
+ return new Lock() {
+ @Override
+ public T runWithLock(java.util.concurrent.Callable callable) throws Exception {
+ counter.incrementAndGet();
+ return callable.call();
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ private static class ConditionalWriteFileIO extends LocalFileIO {
+ final AtomicInteger conditionalWriteCalls = new AtomicInteger();
+
+ @Override
+ public boolean supportsConditionalWrite() {
+ return true;
+ }
+
+ @Override
+ public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
+ conditionalWriteCalls.incrementAndGet();
+ if (exists(path)) {
+ return false;
+ }
+ writeFile(path, content, false);
+ return true;
+ }
+ }
+}
diff --git a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java
index 36f7d5f37c0d..434652fe15de 100644
--- a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java
+++ b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java
@@ -20,9 +20,12 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
@@ -32,6 +35,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,6 +65,25 @@ public boolean isObjectStore() {
return true;
}
+ @Override
+ public boolean supportsConditionalWrite() {
+ return true;
+ }
+
+ @Override
+ public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ FileSystem fs = getFileSystem(hadoopPath);
+ byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
+ try (FSDataOutputStream out = fs.createFile(hadoopPath).create().overwrite(false).build()) {
+ out.write(bytes);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ LOG.debug("Conditional write failed, file already exists: {}", path);
+ return false;
+ }
+ }
+
@Override
public void configure(CatalogContext context) {
this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
diff --git a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java
index 0758bb44370d..4c9585f1b513 100644
--- a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java
+++ b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java
@@ -102,11 +102,11 @@ public boolean rename(Path src, Path dst) throws IOException {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}
- private org.apache.hadoop.fs.Path path(Path path) {
+ protected org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
- private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
+ protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
if (fs == null) {
synchronized (this) {
if (fs == null) {
diff --git a/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE b/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE
index 1c4727b121b3..c0db4285d7be 100644
--- a/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE
+++ b/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE
@@ -30,12 +30,12 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.commons:commons-configuration2:2.1.1
- org.apache.commons:commons-lang3:3.18.0
- org.apache.commons:commons-text:1.4
-- org.apache.hadoop:hadoop-annotations:3.3.4
-- org.apache.hadoop:hadoop-auth:3.3.4
-- org.apache.hadoop:hadoop-azure:3.3.4
-- org.apache.hadoop:hadoop-common:3.3.4
-- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1
-- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7:1.1.1
+- org.apache.hadoop:hadoop-annotations:3.4.2
+- org.apache.hadoop:hadoop-auth:3.4.2
+- org.apache.hadoop:hadoop-azure:3.4.2
+- org.apache.hadoop:hadoop-common:3.4.2
+- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.4.0
+- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25:1.4.0
- org.apache.httpcomponents:httpclient:4.5.13
- org.apache.httpcomponents:httpcore:4.4.13
- org.apache.kerby:kerb-core:1.0.1
@@ -79,12 +79,14 @@ The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs
org.apache.hadoop.thirdparty:hadoop-shaded-guava dependency bundles the following dependencies under
the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- com.google.guava:guava:30.1.1-jre
+- com.google.guava:guava:33.4.6-jre
The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs
-org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7 dependency bundles the following dependencies under
+org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25 dependency bundles the following dependencies under
the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- com.google.protobuf:protobuf-java:3.25.5
+
This project bundles the following dependencies under the Eclipse Distribution License - v 1.0
You find it under licenses/LICENSE.jakarta
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index 827251837342..8d50e48d1a51 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -25,6 +25,8 @@
import org.apache.paimon.options.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
@@ -33,6 +35,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +88,39 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite
new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
}
+ @Override
+ public boolean supportsConditionalWrite() {
+ return true;
+ }
+
+ /**
+ * Write content atomically using S3 conditional writes via Hadoop 3.4+ native API.
+ *
+ * @param path the target file path
+ * @param content the content to write
+ * @return true if write succeeded, false if file already exists
+ * @throws IOException on I/O errors
+ */
+ @Override
+ public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);
+
+ byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
+
+ try (FSDataOutputStream out =
+ fs.createFile(hadoopPath)
+ .create()
+ .overwrite(false) // Fails if file exists (uses If-None-Match: * on S3)
+ .build()) {
+ out.write(contentBytes);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ LOG.debug("Conditional write failed, file already exists: {}", path);
+ return false;
+ }
+ }
+
// add additional config entries from the IO config to the Hadoop config
private Options loadHadoopConfigFromContext(CatalogContext context) {
Options hadoopConfig = new Options();
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
index ca112bac4ec4..ecd46867874e 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
@@ -20,30 +20,33 @@
import org.apache.paimon.fs.MultiPartUploadStore;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
-/** Provides the multipart upload by Amazon S3. */
+/** Provides the multipart upload by Amazon S3 using Hadoop 3.4+ API (AWS SDK v2). */
public class S3MultiPartUpload
- implements MultiPartUploadStore {
+ implements MultiPartUploadStore {
private final S3AFileSystem s3a;
-
private final InternalWriteOperationHelper s3accessHelper;
public S3MultiPartUpload(S3AFileSystem s3a, Configuration conf) {
@@ -65,25 +68,37 @@ public Path workingDirectory() {
@Override
public String startMultiPartUpload(String objectName) throws IOException {
- return s3accessHelper.initiateMultiPartUpload(objectName);
+ return s3accessHelper.initiateMultiPartUpload(objectName, PutObjectOptions.keepingDirs());
}
@Override
- public CompleteMultipartUploadResult completeMultipartUpload(
- String objectName, String uploadId, List partETags, long numBytesInParts)
+ public CompleteMultipartUploadResponse completeMultipartUpload(
+ String objectName, String uploadId, List parts, long numBytesInParts)
throws IOException {
return s3accessHelper.completeMPUwithRetries(
- objectName, uploadId, partETags, numBytesInParts, new AtomicInteger(0));
+ objectName,
+ uploadId,
+ parts,
+ numBytesInParts,
+ new AtomicInteger(0),
+ PutObjectOptions.keepingDirs());
}
@Override
- public PartETag uploadPart(
+ public CompletedPart uploadPart(
String objectName, String uploadId, int partNumber, File file, int byteLength)
throws IOException {
- final UploadPartRequest uploadRequest =
- s3accessHelper.newUploadPartRequest(
- objectName, uploadId, partNumber, byteLength, null, file, 0L);
- return s3accessHelper.uploadPart(uploadRequest).getPartETag();
+ UploadPartRequest request =
+ UploadPartRequest.builder()
+ .bucket(s3a.getBucket())
+ .key(objectName)
+ .uploadId(uploadId)
+ .partNumber(partNumber)
+ .contentLength((long) byteLength)
+ .build();
+ RequestBody body = RequestBody.fromBytes(Files.readAllBytes(file.toPath()));
+ UploadPartResponse response = s3accessHelper.uploadPart(request, body, null);
+ return CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build();
}
@Override
@@ -99,7 +114,7 @@ private static final class InternalWriteOperationHelper extends WriteOperationHe
S3AStatisticsContext statisticsContext,
AuditSpanSource auditSpanSource,
AuditSpan auditSpan) {
- super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
+ super(owner, conf, statisticsContext, auditSpanSource, auditSpan, null);
}
}
}
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
index 9d663e7be4de..6551f5e8a9b8 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
@@ -23,19 +23,20 @@
import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.Path;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.PartETag;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.io.IOException;
import java.util.List;
-/** S3 implementation of MultiPartUploadCommitter. */
+/** S3 implementation of MultiPartUploadCommitter (AWS SDK v2). */
public class S3MultiPartUploadCommitter
- extends BaseMultiPartUploadCommitter {
+ extends BaseMultiPartUploadCommitter {
+
public S3MultiPartUploadCommitter(
String uploadId,
- List uploadedParts,
+ List uploadedParts,
String objectName,
long position,
Path targetPath) {
@@ -43,8 +44,8 @@ public S3MultiPartUploadCommitter(
}
@Override
- protected MultiPartUploadStore multiPartUploadStore(
- FileIO fileIO, Path targetPath) throws IOException {
+ protected MultiPartUploadStore
+ multiPartUploadStore(FileIO fileIO, Path targetPath) throws IOException {
S3FileIO s3FileIO = (S3FileIO) fileIO;
org.apache.hadoop.fs.Path hadoopPath = s3FileIO.path(targetPath);
S3AFileSystem fs = (S3AFileSystem) s3FileIO.getFileSystem(hadoopPath);
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
index 03ad1e097757..c221d5d04922 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
+++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -22,17 +22,19 @@
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
import org.apache.paimon.fs.Path;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.PartETag;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.io.IOException;
-/** S3 implementation of TwoPhaseOutputStream using multipart upload. */
+/** S3 implementation of TwoPhaseOutputStream using multipart upload (AWS SDK v2). */
public class S3TwoPhaseOutputStream
- extends MultiPartUploadTwoPhaseOutputStream {
+ extends MultiPartUploadTwoPhaseOutputStream<
+ CompletedPart, CompleteMultipartUploadResponse> {
public S3TwoPhaseOutputStream(
- MultiPartUploadStore multiPartUploadStore,
+ MultiPartUploadStore
+ multiPartUploadStore,
org.apache.hadoop.fs.Path hadoopPath,
Path targetPath)
throws IOException {
diff --git a/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE b/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE
index 211c3c322a3e..38b3aae53f27 100644
--- a/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE
+++ b/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE
@@ -31,12 +31,13 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.commons:commons-configuration2:2.1.1
- org.apache.commons:commons-lang3:3.18.0
- org.apache.commons:commons-text:1.10.0
-- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1
-- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7:1.1.1
-- org.apache.hadoop:hadoop-annotations:3.3.4
-- org.apache.hadoop:hadoop-auth:3.3.4
-- org.apache.hadoop:hadoop-aws:3.3.4
-- org.apache.hadoop:hadoop-common:3.3.4
+- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.4.0
+- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25:1.4.0
+- org.apache.hadoop:hadoop-annotations:3.4.2
+- org.apache.hadoop:hadoop-auth:3.4.2
+- org.apache.hadoop:hadoop-aws:3.4.2
+- org.apache.hadoop:hadoop-common:3.4.2
+- software.amazon.awssdk:bundle:2.29.52
- org.apache.httpcomponents:httpclient:4.5.13
- org.apache.httpcomponents:httpcore:4.4.13
- org.apache.kerby:kerb-core:1.0.1
@@ -83,13 +84,13 @@ The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs
org.apache.hadoop.thirdparty:hadoop-shaded-guava dependency bundles the following dependencies under
the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- com.google.guava:guava:30.1.1-jre
+- com.google.guava:guava:33.4.6-jre
The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs
-org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7 dependency bundles the following dependencies under
+org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25 dependency bundles the following dependencies under
the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- com.google.protobuf:protobuf-java:3.7.1
+- com.google.protobuf:protobuf-java:3.25.5
This project bundles the following dependencies under the Eclipse Distribution License - v 1.0
You find it under licenses/LICENSE.jakarta
diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml
index 1550b9d46aa6..d8e20940985a 100644
--- a/paimon-filesystems/pom.xml
+++ b/paimon-filesystems/pom.xml
@@ -51,7 +51,7 @@
- 3.3.4
+ 3.4.2
3.4.2
1.12.319
1.9.4