diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 5e5fe9fbfef1..2e238f287f37 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -342,6 +342,35 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException { return success; } + /** + * Write content atomically, failing if target file already exists. Uses native conditional + * writes on supported object stores. Falls back to {@link #tryToWriteAtomic} on filesystems + * without native conditional write support. + * + * @param path the target file path + * @param content the content to write + * @return true if write succeeded, false if file already exists + * @throws IOException on I/O errors (not including "file exists" condition) + */ + default boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + // Default implementation uses tryToWriteAtomic which is safe for HDFS + // but requires external locking for object stores without native conditional writes + return tryToWriteAtomic(path, content); + } + + /** + * Returns whether this FileIO supports native conditional writes (write-if-absent semantics). + * + *

When true, {@link #tryToWriteAtomicIfAbsent} uses native conditional write operations that + * atomically fail if the target file exists. This eliminates the need for external locking on + * object stores. + * + * @return true if native conditional writes are supported + */ + default boolean supportsConditionalWrite() { + return false; + } + default void writeFile(Path path, String content, boolean overwrite) throws IOException { try (PositionOutputStream out = newOutputStream(path, overwrite)) { OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8); diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 367bce383719..6ac60eb01dfa 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -47,6 +47,17 @@ /** Test static methods and methods with default implementations of {@link FileIO}. */ public class FileIOTest { + @Test + public void testConditionalWriteDefaults(@TempDir java.nio.file.Path tempDir) throws Exception { + FileIO fileIO = new DummyFileIO(); + Path file = new Path(tempDir.resolve("test.txt").toUri()); + + assertThat(fileIO.supportsConditionalWrite()).isFalse(); + assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "first")).isTrue(); + assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "second")).isFalse(); + assertThat(fileIO.readFileUtf8(file)).isEqualTo("first"); + } + @TempDir java.nio.file.Path tempDir; @Test diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java index 5cc9d2a0199a..801d4cbf0af6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java @@ -32,7 +32,8 @@ * A {@link SnapshotCommit} using file renaming to commit. * *

Note that when the file system is local or HDFS, rename is atomic. But if the file system is - * object storage, we need additional lock protection. + * object storage, we need additional lock protection unless the storage supports native conditional + * writes. */ public class RenamingSnapshotCommit implements SnapshotCommit { @@ -54,6 +55,17 @@ public boolean commit(Snapshot snapshot, String branch, List callable = () -> { boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson()); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java new file mode 100644 index 000000000000..9cadaf2ba996 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.utils.SnapshotManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RenamingSnapshotCommit}. */ +public class RenamingSnapshotCommitTest { + + @TempDir java.nio.file.Path tempDir; + + private Path tablePath; + + @BeforeEach + void setUp() throws IOException { + tablePath = new Path(tempDir.toString()); + LocalFileIO.create().mkdirs(new Path(tablePath, "snapshot")); + } + + @Test + public void testConditionalWritePathSkipsLock() throws Exception { + ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO(); + RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty()); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(fileIO.conditionalWriteCalls.get()).isEqualTo(1); + } + + @Test + public void testConditionalWriteFailsOnConflict() throws Exception { + ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO(); + RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty()); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse(); + } + + @Test + public void testFallbackPathUsesLock() throws Exception { + AtomicInteger lockCalls = new AtomicInteger(); + RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls)); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(lockCalls.get()).isEqualTo(1); + } + + @Test + public void testFallbackPathFailsOnConflict() throws Exception { + AtomicInteger lockCalls = new AtomicInteger(); + RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls)); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse(); + assertThat(lockCalls.get()).isEqualTo(2); + } + + private RenamingSnapshotCommit newCommit(LocalFileIO fileIO, Lock lock) { + return new RenamingSnapshotCommit( + new SnapshotManager(fileIO, tablePath, "main", null, null), lock); + } + + private static Snapshot newSnapshot(long id) { + return createSnapshotWithMillis(id, System.currentTimeMillis()); + } + + private static Lock trackingLock(AtomicInteger counter) { + return new Lock() { + @Override + public T runWithLock(java.util.concurrent.Callable callable) throws Exception { + counter.incrementAndGet(); + return callable.call(); + } + + @Override + public void close() {} + }; + } + + private static class ConditionalWriteFileIO extends LocalFileIO { + final AtomicInteger conditionalWriteCalls = new AtomicInteger(); + + @Override + public boolean supportsConditionalWrite() { + return true; + } + + @Override + public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + conditionalWriteCalls.incrementAndGet(); + if (exists(path)) { + return false; + } + writeFile(path, content, false); + return true; + } + } +} diff --git a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java index 36f7d5f37c0d..434652fe15de 100644 --- a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java +++ b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java @@ -20,9 +20,12 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; @@ -32,6 +35,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -61,6 +65,25 @@ public boolean isObjectStore() { return true; } + @Override + public boolean supportsConditionalWrite() { + return true; + } + + @Override + public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + FileSystem fs = getFileSystem(hadoopPath); + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + try (FSDataOutputStream out = fs.createFile(hadoopPath).create().overwrite(false).build()) { + out.write(bytes); + return true; + } catch (FileAlreadyExistsException e) { + LOG.debug("Conditional write failed, file already exists: {}", path); + return false; + } + } + @Override public void configure(CatalogContext context) { this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context)); diff --git a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java index 0758bb44370d..4c9585f1b513 100644 --- a/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java @@ -102,11 +102,11 @@ public boolean rename(Path src, Path dst) throws IOException { return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); } - private org.apache.hadoop.fs.Path path(Path path) { + protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } - private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { + protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { if (fs == null) { synchronized (this) { if (fs == null) { diff --git a/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE b/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE index 1c4727b121b3..c0db4285d7be 100644 --- a/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE +++ b/paimon-filesystems/paimon-azure-impl/src/main/resources/META-INF/NOTICE @@ -30,12 +30,12 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.commons:commons-configuration2:2.1.1 - org.apache.commons:commons-lang3:3.18.0 - org.apache.commons:commons-text:1.4 -- org.apache.hadoop:hadoop-annotations:3.3.4 -- org.apache.hadoop:hadoop-auth:3.3.4 -- org.apache.hadoop:hadoop-azure:3.3.4 -- org.apache.hadoop:hadoop-common:3.3.4 -- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1 -- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7:1.1.1 +- org.apache.hadoop:hadoop-annotations:3.4.2 +- org.apache.hadoop:hadoop-auth:3.4.2 +- org.apache.hadoop:hadoop-azure:3.4.2 +- org.apache.hadoop:hadoop-common:3.4.2 +- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.4.0 +- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25:1.4.0 - org.apache.httpcomponents:httpclient:4.5.13 - org.apache.httpcomponents:httpcore:4.4.13 - org.apache.kerby:kerb-core:1.0.1 @@ -79,12 +79,14 @@ The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs org.apache.hadoop.thirdparty:hadoop-shaded-guava dependency bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.google.guava:guava:30.1.1-jre +- com.google.guava:guava:33.4.6-jre The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs -org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7 dependency bundles the following dependencies under +org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25 dependency bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) +- com.google.protobuf:protobuf-java:3.25.5 + This project bundles the following dependencies under the Eclipse Distribution License - v 1.0 You find it under licenses/LICENSE.jakarta diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index 827251837342..8d50e48d1a51 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -25,6 +25,8 @@ import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; @@ -33,6 +35,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -85,6 +88,39 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path); } + @Override + public boolean supportsConditionalWrite() { + return true; + } + + /** + * Write content atomically using S3 conditional writes via Hadoop 3.4+ native API. + * + * @param path the target file path + * @param content the content to write + * @return true if write succeeded, false if file already exists + * @throws IOException on I/O errors + */ + @Override + public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath); + + byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + + try (FSDataOutputStream out = + fs.createFile(hadoopPath) + .create() + .overwrite(false) // Fails if file exists (uses If-None-Match: * on S3) + .build()) { + out.write(contentBytes); + return true; + } catch (FileAlreadyExistsException e) { + LOG.debug("Conditional write failed, file already exists: {}", path); + return false; + } + } + // add additional config entries from the IO config to the Hadoop config private Options loadHadoopConfigFromContext(CatalogContext context) { Options hadoopConfig = new Options(); diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java index ca112bac4ec4..ecd46867874e 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java @@ -20,30 +20,33 @@ import org.apache.paimon.fs.MultiPartUploadStore; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.paimon.utils.Preconditions.checkNotNull; -/** Provides the multipart upload by Amazon S3. */ +/** Provides the multipart upload by Amazon S3 using Hadoop 3.4+ API (AWS SDK v2). */ public class S3MultiPartUpload - implements MultiPartUploadStore { + implements MultiPartUploadStore { private final S3AFileSystem s3a; - private final InternalWriteOperationHelper s3accessHelper; public S3MultiPartUpload(S3AFileSystem s3a, Configuration conf) { @@ -65,25 +68,37 @@ public Path workingDirectory() { @Override public String startMultiPartUpload(String objectName) throws IOException { - return s3accessHelper.initiateMultiPartUpload(objectName); + return s3accessHelper.initiateMultiPartUpload(objectName, PutObjectOptions.keepingDirs()); } @Override - public CompleteMultipartUploadResult completeMultipartUpload( - String objectName, String uploadId, List partETags, long numBytesInParts) + public CompleteMultipartUploadResponse completeMultipartUpload( + String objectName, String uploadId, List parts, long numBytesInParts) throws IOException { return s3accessHelper.completeMPUwithRetries( - objectName, uploadId, partETags, numBytesInParts, new AtomicInteger(0)); + objectName, + uploadId, + parts, + numBytesInParts, + new AtomicInteger(0), + PutObjectOptions.keepingDirs()); } @Override - public PartETag uploadPart( + public CompletedPart uploadPart( String objectName, String uploadId, int partNumber, File file, int byteLength) throws IOException { - final UploadPartRequest uploadRequest = - s3accessHelper.newUploadPartRequest( - objectName, uploadId, partNumber, byteLength, null, file, 0L); - return s3accessHelper.uploadPart(uploadRequest).getPartETag(); + UploadPartRequest request = + UploadPartRequest.builder() + .bucket(s3a.getBucket()) + .key(objectName) + .uploadId(uploadId) + .partNumber(partNumber) + .contentLength((long) byteLength) + .build(); + RequestBody body = RequestBody.fromBytes(Files.readAllBytes(file.toPath())); + UploadPartResponse response = s3accessHelper.uploadPart(request, body, null); + return CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build(); } @Override @@ -99,7 +114,7 @@ private static final class InternalWriteOperationHelper extends WriteOperationHe S3AStatisticsContext statisticsContext, AuditSpanSource auditSpanSource, AuditSpan auditSpan) { - super(owner, conf, statisticsContext, auditSpanSource, auditSpan); + super(owner, conf, statisticsContext, auditSpanSource, auditSpan, null); } } } diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java index 9d663e7be4de..6551f5e8a9b8 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java @@ -23,19 +23,20 @@ import org.apache.paimon.fs.MultiPartUploadStore; import org.apache.paimon.fs.Path; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.PartETag; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedPart; import java.io.IOException; import java.util.List; -/** S3 implementation of MultiPartUploadCommitter. */ +/** S3 implementation of MultiPartUploadCommitter (AWS SDK v2). */ public class S3MultiPartUploadCommitter - extends BaseMultiPartUploadCommitter { + extends BaseMultiPartUploadCommitter { + public S3MultiPartUploadCommitter( String uploadId, - List uploadedParts, + List uploadedParts, String objectName, long position, Path targetPath) { @@ -43,8 +44,8 @@ public S3MultiPartUploadCommitter( } @Override - protected MultiPartUploadStore multiPartUploadStore( - FileIO fileIO, Path targetPath) throws IOException { + protected MultiPartUploadStore + multiPartUploadStore(FileIO fileIO, Path targetPath) throws IOException { S3FileIO s3FileIO = (S3FileIO) fileIO; org.apache.hadoop.fs.Path hadoopPath = s3FileIO.path(targetPath); S3AFileSystem fs = (S3AFileSystem) s3FileIO.getFileSystem(hadoopPath); diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java index 03ad1e097757..c221d5d04922 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java @@ -22,17 +22,19 @@ import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream; import org.apache.paimon.fs.Path; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.PartETag; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompletedPart; import java.io.IOException; -/** S3 implementation of TwoPhaseOutputStream using multipart upload. */ +/** S3 implementation of TwoPhaseOutputStream using multipart upload (AWS SDK v2). */ public class S3TwoPhaseOutputStream - extends MultiPartUploadTwoPhaseOutputStream { + extends MultiPartUploadTwoPhaseOutputStream< + CompletedPart, CompleteMultipartUploadResponse> { public S3TwoPhaseOutputStream( - MultiPartUploadStore multiPartUploadStore, + MultiPartUploadStore + multiPartUploadStore, org.apache.hadoop.fs.Path hadoopPath, Path targetPath) throws IOException { diff --git a/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE b/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE index 211c3c322a3e..38b3aae53f27 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE +++ b/paimon-filesystems/paimon-s3-impl/src/main/resources/META-INF/NOTICE @@ -31,12 +31,13 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.commons:commons-configuration2:2.1.1 - org.apache.commons:commons-lang3:3.18.0 - org.apache.commons:commons-text:1.10.0 -- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1 -- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7:1.1.1 -- org.apache.hadoop:hadoop-annotations:3.3.4 -- org.apache.hadoop:hadoop-auth:3.3.4 -- org.apache.hadoop:hadoop-aws:3.3.4 -- org.apache.hadoop:hadoop-common:3.3.4 +- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.4.0 +- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25:1.4.0 +- org.apache.hadoop:hadoop-annotations:3.4.2 +- org.apache.hadoop:hadoop-auth:3.4.2 +- org.apache.hadoop:hadoop-aws:3.4.2 +- org.apache.hadoop:hadoop-common:3.4.2 +- software.amazon.awssdk:bundle:2.29.52 - org.apache.httpcomponents:httpclient:4.5.13 - org.apache.httpcomponents:httpcore:4.4.13 - org.apache.kerby:kerb-core:1.0.1 @@ -83,13 +84,13 @@ The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs org.apache.hadoop.thirdparty:hadoop-shaded-guava dependency bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.google.guava:guava:30.1.1-jre +- com.google.guava:guava:33.4.6-jre The bundled Apache Hadoop Relocated (Shaded) Third-party Miscellaneous Libs -org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7 dependency bundles the following dependencies under +org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_25 dependency bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- com.google.protobuf:protobuf-java:3.7.1 +- com.google.protobuf:protobuf-java:3.25.5 This project bundles the following dependencies under the Eclipse Distribution License - v 1.0 You find it under licenses/LICENSE.jakarta diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml index 1550b9d46aa6..d8e20940985a 100644 --- a/paimon-filesystems/pom.xml +++ b/paimon-filesystems/pom.xml @@ -51,7 +51,7 @@ - 3.3.4 + 3.4.2 3.4.2 1.12.319 1.9.4