diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java index c304b7c205e0..dffa8d4f2bdd 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hadoop/TestHiveHadoopCommits.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -34,6 +37,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.util.Tasks; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -55,7 +59,7 @@ void testCommitFailedBeforeChangeVersionHint() { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps2 = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps2 = spy(tableOperations); doReturn(10000).when(spyOps2).findVersionWithOutVersionHint(any()); TableMetadata metadataV1 = spyOps2.current(); SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); @@ -64,7 +68,7 @@ void testCommitFailedBeforeChangeVersionHint() { .isInstanceOf(CommitFailedException.class) .hasMessageContaining("Are there other clients running in parallel with the current task?"); - HadoopTableOperations spyOps3 = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps3 = spy(tableOperations); doReturn(false).when(spyOps3).nextVersionIsLatest(anyInt(), anyInt()); assertCommitNotChangeVersion( baseTable, @@ -72,7 +76,7 @@ void testCommitFailedBeforeChangeVersionHint() { CommitFailedException.class, "Are there other clients running in parallel with the current task?"); - HadoopTableOperations spyOps4 = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps4 = spy(tableOperations); doThrow(new RuntimeException("FileSystem crash!")) .when(spyOps4) .renameMetaDataFileAndCheck(any(), any(), any(), anyBoolean()); @@ -85,7 +89,7 @@ void testCommitFailedAndCheckFailed() throws IOException { table.newFastAppend().appendFile(FILE_A).commit(); BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doThrow(new IOException("FileSystem crash!")) .when(spyOps) .renameMetaDataFile(any(), any(), any()); @@ -95,13 +99,13 @@ void testCommitFailedAndCheckFailed() throws IOException { assertCommitNotChangeVersion( baseTable, spyOps, CommitStateUnknownException.class, "FileSystem crash!"); - HadoopTableOperations spyOps2 = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps2 = spy(tableOperations); doThrow(new OutOfMemoryError("Java heap space")) .when(spyOps2) .renameMetaDataFile(any(), any(), any()); assertCommitFail(baseTable, spyOps2, OutOfMemoryError.class, "Java heap space"); - HadoopTableOperations spyOps3 = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps3 = spy(tableOperations); doThrow(new RuntimeException("UNKNOWN ERROR")) .when(spyOps3) .renameMetaDataFile(any(), any(), any()); @@ -114,7 +118,7 @@ void testCommitFailedAndRenameNotSuccess() throws IOException { table.newFastAppend().appendFile(FILE_A).commit(); BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doThrow(new IOException("FileSystem crash!")) .when(spyOps) .renameMetaDataFile(any(), any(), any()); @@ -130,7 +134,7 @@ void testCommitFailedButActualSuccess() throws IOException { table.newFastAppend().appendFile(FILE_A).commit(); BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doThrow(new IOException("FileSystem crash!")) .when(spyOps) .renameMetaDataFile(any(), any(), any()); @@ -174,7 +178,7 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit() { table.newFastAppend().appendFile(FILE_A).commit(); BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doThrow(new RuntimeException("FileSystem crash!")) .when(spyOps) .deleteRemovedMetadataFiles(any(), any()); @@ -204,22 +208,15 @@ void testCommitFailedAfterChangeVersionHintRepeatCommit() { @Test void testTwoClientCommitSameVersion() throws InterruptedException { - // In the linux environment, the JDK FileSystem interface implementation class is - // java.io.UnixFileSystem. - // Its behavior follows the posix protocol, which causes rename operations to overwrite the - // target file (because linux is compatible with some of the unix interfaces). - // However, linux also supports renaming without overwriting the target file. In addition, some - // other file systems such as Windows, HDFS, etc. also support renaming without overwriting the - // target file. - // We use the `mv -n` command to simulate the behavior of such filesystems. table.newFastAppend().appendFile(FILE_A).commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicReference expectedException = new AtomicReference<>(null); CountDownLatch countDownLatch = new CountDownLatch(2); BaseTable baseTable = (BaseTable) table; assertThat(((HadoopTableOperations) baseTable.operations()).findVersion()).isEqualTo(2); - executorService.execute(() -> { + final Object lock = new Object(); + Runnable commitTask = () -> { try { HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); HadoopTableOperations spyOps = spy(tableOperations); @@ -232,14 +229,16 @@ void testTwoClientCommitSameVersion() throws InterruptedException { doAnswer(x -> { Path srcPath = x.getArgument(1); Path dstPath = x.getArgument(2); - File src = new File(srcPath.toUri()); - File dst = new File(dstPath.toUri()); - String srcPathStr = src.getAbsolutePath(); - String dstPathStr = dst.getAbsolutePath(); - String cmd = String.format("mv -n %s %s", srcPathStr, dstPathStr); - Process process = Runtime.getRuntime().exec(cmd); - assertThat(process.waitFor()).isZero(); - return dst.exists() && !src.exists(); + var src = Paths.get(srcPath.toUri()); + var dst = Paths.get(dstPath.toUri()); + synchronized (lock) { + if (Files.exists(dst)) { + return false; + } else { + Files.move(src, dst, StandardCopyOption.ATOMIC_MOVE); + return true; + } + } }).when(spyOps).renameMetaDataFile(any(), any(), any()); TableMetadata metadataV1 = spyOps.current(); SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); @@ -250,40 +249,12 @@ void testTwoClientCommitSameVersion() throws InterruptedException { } catch (Throwable e) { unexpectedException.set(e); } - }); + }; + + Tasks.range(2) + .executeWith(executorService) + .run(i -> commitTask.run()); - executorService.execute(() -> { - try { - HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); - doNothing().when(spyOps).tryLock(any(), any()); - doAnswer(x -> { - countDownLatch.countDown(); - countDownLatch.await(); - return x.callRealMethod(); - }).when(spyOps).renameMetaDataFileAndCheck(any(), any(), any(), anyBoolean()); - doAnswer(x -> { - Path srcPath = x.getArgument(1); - Path dstPath = x.getArgument(2); - File src = new File(srcPath.toUri()); - File dst = new File(dstPath.toUri()); - String srcPathStr = src.getAbsolutePath(); - String dstPathStr = dst.getAbsolutePath(); - String cmd = String.format("mv -n %s %s", srcPathStr, dstPathStr); - Process process = Runtime.getRuntime().exec(cmd); - assertThat(process.waitFor()).isZero(); - return dst.exists() && !src.exists(); - }).when(spyOps).renameMetaDataFile(any(), any(), any()); - TableMetadata metadataV1 = spyOps.current(); - SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); - TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort); - spyOps.commit(metadataV1, metadataV2); - } catch (CommitFailedException e) { - expectedException.set(e); - } catch (Throwable e) { - unexpectedException.set(e); - } - }); executorService.shutdown(); if (!executorService.awaitTermination(610, TimeUnit.SECONDS)) { executorService.shutdownNow(); @@ -302,7 +273,7 @@ void testTwoClientCommitSameVersion() throws InterruptedException { void testConcurrentCommitAndRejectCommitAlreadyExistsVersion() throws InterruptedException { table.newFastAppend().appendFile(FILE_A).commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); int maxCommitTimes = 20; @@ -312,7 +283,7 @@ void testConcurrentCommitAndRejectCommitAlreadyExistsVersion() try { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doNothing().when(spyOps).tryLock(any(), any()); doAnswer(x -> { countDownLatch2.countDown(); @@ -353,7 +324,7 @@ void testRejectCommitAlreadyExistsVersionWithUsingObjectStore() // memory locks. So we can use the local file system to simulate the use of object storage. table.updateProperties().set(TableProperties.OBJECT_STORE_ENABLED, "true").commit(); table.newFastAppend().appendFile(FILE_A).commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); int maxCommitTimes = 20; @@ -363,7 +334,7 @@ void testRejectCommitAlreadyExistsVersionWithUsingObjectStore() try { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doAnswer(x -> { countDownLatch2.countDown(); countDownLatch.await(); @@ -403,7 +374,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws InterruptedException { table.updateProperties().set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "2").commit(); table.updateProperties().set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true") .commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); int maxCommitTimes = 20; @@ -414,7 +385,7 @@ void testConcurrentCommitAndRejectTooOldCommit() throws InterruptedException { try { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doNothing().when(spyOps).tryLock(any(), any()); doAnswer(x -> { countDownLatch2.countDown(); @@ -466,7 +437,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws InterruptedException { table.updateProperties() .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true") .commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); int maxCommitTimes = 20; @@ -476,7 +447,7 @@ void testRejectTooOldCommitWithUsingObjectStore() throws InterruptedException { try { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); doNothing().when(spyOps).tryLock(any(), any()); doAnswer(x -> { countDownLatch2.countDown(); @@ -521,7 +492,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws InterruptedException { table.updateProperties() .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true") .commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); CountDownLatch countDownLatch = new CountDownLatch(5); @@ -531,7 +502,7 @@ void testConcurrentCommitAndRejectDirtyCommit() throws InterruptedException { try { BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); TableMetadata metadataV1 = spyOps.current(); SortOrder dataSort = SortOrder.builderFor(baseTable.schema()).asc("data").build(); TableMetadata metadataV2 = metadataV1.replaceSortOrder(dataSort); @@ -579,13 +550,13 @@ void testCleanTooOldDirtyCommit() throws InterruptedException { table.updateProperties() .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true") .commit(); - ExecutorService executorService = Executors.newFixedThreadPool(8); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); AtomicReference unexpectedException = new AtomicReference<>(null); AtomicInteger commitTimes = new AtomicInteger(0); int maxCommitTimes = 20; BaseTable baseTable = (BaseTable) table; HadoopTableOperations tableOperations = (HadoopTableOperations) baseTable.operations(); - HadoopTableOperations spyOps = (HadoopTableOperations) spy(tableOperations); + HadoopTableOperations spyOps = spy(tableOperations); CountDownLatch countDownLatch = new CountDownLatch(5); CountDownLatch countDownLatch2 = new CountDownLatch(1); AtomicReference dirtyCommitFile = new AtomicReference<>(null);