diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt index a4c48eff31a6a..7d09c6fd187e6 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt @@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 16954.6 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3764.8 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 1677.4 0.6 0.1X -IntegerUpdater 0 0 0 10269.9 0.1 0.6X -LongUpdater 0 0 0 5120.9 0.2 0.3X -FloatUpdater 0 0 0 10252.8 0.1 0.6X -DoubleUpdater 0 0 0 5133.4 0.2 0.3X -BinaryUpdater 15 15 3 71.0 14.1 0.0X +BooleanUpdater 0 0 0 17004.4 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3758.6 0.3 0.2X +ShortUpdater (INT32 -> Short) 1 1 0 1678.4 0.6 0.1X +IntegerUpdater 0 0 0 10276.1 0.1 0.6X +LongUpdater 0 0 0 5112.7 0.2 0.3X +FloatUpdater 0 0 0 10293.3 0.1 0.6X +DoubleUpdater 0 0 0 3870.8 0.3 0.2X +BinaryUpdater 15 15 1 71.3 14.0 0.0X ================================================================================================ @@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 0 0 0 6220.9 0.2 1.0X -IntegerToDoubleUpdater 0 0 0 6215.0 0.2 1.0X -FloatToDoubleUpdater 2 2 0 489.5 2.0 0.1X -DateToTimestampNTZUpdater 29 29 1 36.1 27.7 0.0X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 455.2 2.2 0.1X +IntegerToLongUpdater 0 0 0 5145.1 0.2 1.0X +IntegerToDoubleUpdater 0 0 0 5120.5 0.2 1.0X +FloatToDoubleUpdater 0 0 0 2527.2 0.4 0.5X +DateToTimestampNTZUpdater 29 29 0 36.3 27.5 0.0X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 455.6 2.2 0.1X ================================================================================================ @@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3645.1 0.3 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2663.5 0.4 0.7X -LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3 0 419.9 2.4 0.1X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3647.4 0.3 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2668.6 0.4 0.7X +LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3 0 420.3 2.4 0.1X ================================================================================================ @@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 5846.6 0.2 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18 0 60.3 16.6 0.0X +UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 5089.6 0.2 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18 1 60.2 16.6 0.0X ================================================================================================ @@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 10296.2 0.1 1.0X -LongToDecimalUpdater 0 0 0 5145.8 0.2 0.5X -FixedLenByteArrayToDecimalUpdater 21 21 1 50.2 19.9 0.0X +IntegerToDecimalUpdater 0 0 0 7746.5 0.1 1.0X +LongToDecimalUpdater 0 0 0 3874.1 0.3 0.5X +FixedLenByteArrayToDecimalUpdater 21 21 0 50.7 19.7 0.0X ================================================================================================ @@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 20 21 1 52.7 19.0 1.0X +FixedLenByteArrayUpdater (len=16 -> Binary) 19 20 2 54.1 18.5 1.0X FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 160.1 6.2 3.0X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 132.7 7.5 2.5X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 133.2 7.5 2.5X diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt index a22613b7dd4f4..627835735a088 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt @@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 17171.8 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3675.3 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 1661.2 0.6 0.1X -IntegerUpdater 0 0 0 10301.4 0.1 0.6X -LongUpdater 0 0 0 5147.9 0.2 0.3X -FloatUpdater 0 0 0 10294.3 0.1 0.6X -DoubleUpdater 0 0 0 5113.4 0.2 0.3X -BinaryUpdater 16 16 1 66.9 14.9 0.0X +BooleanUpdater 0 0 0 17138.1 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3678.2 0.3 0.2X +ShortUpdater (INT32 -> Short) 1 1 0 1662.8 0.6 0.1X +IntegerUpdater 0 0 0 10231.8 0.1 0.6X +LongUpdater 0 0 0 5103.2 0.2 0.3X +FloatUpdater 0 0 0 10203.9 0.1 0.6X +DoubleUpdater 0 0 0 5105.4 0.2 0.3X +BinaryUpdater 17 17 0 62.8 15.9 0.0X ================================================================================================ @@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 0 0 0 6351.2 0.2 1.0X -IntegerToDoubleUpdater 0 0 0 6423.7 0.2 1.0X -FloatToDoubleUpdater 2 2 0 483.3 2.1 0.1X -DateToTimestampNTZUpdater 29 29 1 36.6 27.3 0.0X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 455.4 2.2 0.1X +IntegerToLongUpdater 0 0 0 6239.9 0.2 1.0X +IntegerToDoubleUpdater 0 0 0 6382.6 0.2 1.0X +FloatToDoubleUpdater 0 0 0 2570.7 0.4 0.4X +DateToTimestampNTZUpdater 26 26 0 40.5 24.7 0.0X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 637.7 1.6 0.1X ================================================================================================ @@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3653.2 0.3 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2659.6 0.4 0.7X -LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3 0 371.1 2.7 0.1X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 3660.5 0.3 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 2663.6 0.4 0.7X +LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2 0 521.3 1.9 0.1X ================================================================================================ @@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 6324.8 0.2 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17 0 60.4 16.5 0.0X +UnsignedIntegerUpdater (UINT32 -> Long) 0 0 0 6196.3 0.2 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18 1 60.3 16.6 0.0X ================================================================================================ @@ -64,8 +64,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 10258.9 0.1 1.0X -LongToDecimalUpdater 0 0 0 5144.1 0.2 0.5X +IntegerToDecimalUpdater 0 0 0 10247.8 0.1 1.0X +LongToDecimalUpdater 0 0 0 5125.7 0.2 0.5X FixedLenByteArrayToDecimalUpdater 21 21 0 50.7 19.7 0.0X @@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 21 22 1 50.2 19.9 1.0X -FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 152.5 6.6 3.0X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 127.5 7.8 2.5X +FixedLenByteArrayUpdater (len=16 -> Binary) 21 21 1 51.0 19.6 1.0X +FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 152.6 6.6 3.0X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8 8 0 127.7 7.8 2.5X diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt index fe4f716d2e15f..5e2c31d2b5668 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt @@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 20526.1 0.0 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3669.4 0.3 0.2X -ShortUpdater (INT32 -> Short) 1 1 0 2050.4 0.5 0.1X -IntegerUpdater 0 0 0 10107.4 0.1 0.5X -LongUpdater 0 0 0 5037.7 0.2 0.2X -FloatUpdater 0 0 0 10239.0 0.1 0.5X -DoubleUpdater 0 0 0 5095.8 0.2 0.2X -BinaryUpdater 15 15 0 70.1 14.3 0.0X +BooleanUpdater 0 0 0 14526.2 0.1 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 3679.3 0.3 0.3X +ShortUpdater (INT32 -> Short) 1 1 0 2054.1 0.5 0.1X +IntegerUpdater 0 0 0 10178.0 0.1 0.7X +LongUpdater 0 0 0 5054.4 0.2 0.3X +FloatUpdater 0 0 0 10212.8 0.1 0.7X +DoubleUpdater 0 0 0 5051.2 0.2 0.3X +BinaryUpdater 15 15 0 68.4 14.6 0.0X ================================================================================================ @@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 1 1 0 1277.6 0.8 1.0X -IntegerToDoubleUpdater 1 1 0 1475.2 0.7 1.2X -FloatToDoubleUpdater 2 2 0 480.2 2.1 0.4X -DateToTimestampNTZUpdater 36 36 0 29.0 34.5 0.0X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 455.0 2.2 0.4X +IntegerToLongUpdater 1 1 0 1280.6 0.8 1.0X +IntegerToDoubleUpdater 1 1 0 1537.9 0.7 1.2X +FloatToDoubleUpdater 1 1 0 1418.8 0.7 1.1X +DateToTimestampNTZUpdater 36 36 0 29.5 33.9 0.0X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2 0 455.3 2.2 0.4X ================================================================================================ @@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 2400.9 0.4 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1 0 2079.6 0.5 0.9X -LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2 0 454.0 2.2 0.2X +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 2407.3 0.4 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1 0 2030.8 0.5 0.8X +LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2 0 454.4 2.2 0.2X ================================================================================================ @@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 1091.4 0.9 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18 0 59.0 16.9 0.1X +UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 1093.1 0.9 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18 0 59.1 16.9 0.1X ================================================================================================ @@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 10204.0 0.1 1.0X -LongToDecimalUpdater 0 0 0 4883.9 0.2 0.5X -FixedLenByteArrayToDecimalUpdater 21 21 1 51.0 19.6 0.0X +IntegerToDecimalUpdater 0 0 0 10195.9 0.1 1.0X +LongToDecimalUpdater 0 0 0 5049.2 0.2 0.5X +FixedLenByteArrayToDecimalUpdater 21 21 0 51.0 19.6 0.0X ================================================================================================ @@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure AMD EPYC 7763 64-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 19 19 0 55.2 18.1 1.0X +FixedLenByteArrayUpdater (len=16 -> Binary) 19 19 0 54.9 18.2 1.0X FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 160.1 6.2 2.9X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9 9 0 123.1 8.1 2.2X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9 9 0 123.0 8.1 2.2X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 46a0d3dd212f8..eba4b426a0d84 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -949,9 +949,7 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - values.putDouble(offset + i, valuesReader.readFloat()); - } + valuesReader.readFloatsAsDoubles(total, values, offset); } @Override diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 0329b1ff8ebff..d91ba5e2b87dc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -180,6 +180,18 @@ public final void readIntegersAsDoubles(int total, WritableColumnVector c, int r } } + @Override + public final void readFloatsAsDoubles(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + // No `hasArray` bulk-copy path: source (float, 4 bytes) and target (double, 8 bytes) + // have different widths so a contiguous byte copy is impossible. Matches the pattern + // in `readIntegersAsLongs`. + for (int i = 0; i < total; i += 1) { + c.putDouble(rowId + i, buffer.getFloat()); + } + } + // A fork of `readIntegers` to rebase the date values. For performance reasons, this method // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 8cc9a7dae2cd7..a90e9bf01c819 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -94,6 +94,27 @@ default void readIntegersAsDoubles(int total, WritableColumnVector c, int rowId) } } + /** + * Reads {@code total} FLOAT values, widens each to a double, and writes them into + * {@code c} starting at {@code c[rowId]}. The widening is Java's primitive + * float-to-double conversion: exact for every finite and infinite float; a NaN + * float widens to a double NaN (the payload may be canonicalized by the JVM). + * Used by the type-converting updater that reads parquet FLOAT columns into + * Spark {@code DoubleType} targets. + * + *
The default implementation falls back to a per-row read+widen+write loop and is + * therefore equivalent in cost to the legacy per-row Updater path. Subclasses backed + * by contiguous bulk storage (e.g. PLAIN encoding via {@link VectorizedPlainValuesReader}) + * should override to read source bytes once and run a tight in-method conversion loop, + * avoiding {@code total} virtual dispatches on {@link #readFloat()}. Readers without + * an override preserve correctness but gain no speedup. + */ + default void readFloatsAsDoubles(int total, WritableColumnVector c, int rowId) { + for (int i = 0; i < total; i += 1) { + c.putDouble(rowId + i, readFloat()); + } + } + void readBinary(int total, WritableColumnVector c, int rowId); void readGeometry(int total, WritableColumnVector c, int rowId); void readGeography(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index bab3d4e36f5e6..692306fb52f29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1972,6 +1972,55 @@ class ParquetIOSuite extends ParquetTest with SharedSparkSession { } } } + + test("FLOAT -> Double widening end-to-end via vectorized read path") { + // Round-trips a FLOAT Parquet file read back as DoubleType, exercising + // FloatToDoubleUpdater. Same REQUIRED/OPTIONAL coverage as the INT32 -> Long + // sibling test above. The widening is Java's primitive float-to-double conversion; + // `checkAnswer`'s Double comparison treats any NaN as equal to any NaN, so NaN + // payload preservation is not asserted at this level. + withTempPath { file => + val n = 5000 + def sampleAt(i: Int): Float = i % 7 match { + case 0 => Float.MinValue + case 1 => -1.5f + case 2 => -0.0f + case 3 => 0.0f + case 4 => Float.MaxValue + case 5 => Float.NaN + case _ => i * 0.125f - 3.25f + } + + val nonNullData = (0 until n).map(i => Row(sampleAt(i))) + // Every 11th row is null. Mixed with NaN entries above, this exercises the PACKED + // def-level path interleaving runs of values with runs of nulls. + val nullableData = (0 until n).map { i => + if (i % 11 == 0) Row(null) else Row(sampleAt(i)) + } + + val nonNullWriteSchema = new StructType().add("v", FloatType, nullable = false) + val nonNullReadSchema = new StructType().add("v", DoubleType, nullable = false) + val nullableWriteSchema = new StructType().add("v", FloatType, nullable = true) + val nullableReadSchema = new StructType().add("v", DoubleType, nullable = true) + + val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath + val nullablePath = new java.io.File(file, "nullable").getCanonicalPath + spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4), nonNullWriteSchema) + .write.parquet(nonNullPath) + spark.createDataFrame(spark.sparkContext.parallelize(nullableData, 4), nullableWriteSchema) + .write.parquet(nullablePath) + + val expectedNonNull = nonNullData.map(r => Row(r.getFloat(0).toDouble)) + val expectedNullable = nullableData.map { r => + if (r.isNullAt(0)) Row(null) else Row(r.getFloat(0).toDouble) + } + + withAllParquetReaders { + checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath), expectedNonNull) + checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath), expectedNullable) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala index 64a9b2032b658..2ed6f84e68b9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types._ * whose `readValues` is a bulk delegation to `VectorizedValuesReader`: * - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch) * - `IntegerToDoubleUpdater` (INT32 -> Double) + * - `FloatToDoubleUpdater` (FLOAT -> Double) * * Covers boundary batch lengths, sign-extension on negative INT32 values, the singular * `readValue` path, and the factory's long-decimal dispatch @@ -71,6 +72,13 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite { buf.array() } + private def plainFloatBytes(values: Array[Float]): Array[Byte] = { + val buf = ByteBuffer.allocate(values.length * 4).order(ByteOrder.LITTLE_ENDIAN) + var i = 0 + while (i < values.length) { buf.putFloat(values(i)); i += 1 } + buf.array() + } + private def newPlainReader(bytes: Array[Byte], numValues: Int): VectorizedPlainValuesReader = { val r = new VectorizedPlainValuesReader r.initFromPage(numValues, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes))) @@ -211,4 +219,95 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite { assert(actual === input.map(_.toDouble)) } + // ---- FloatToDoubleUpdater: same bulk-path shape, source is FLOAT, target is DoubleType ---- + + // FLOAT column descriptor with no logical-type annotation. + private val floatDescriptor: ColumnDescriptor = { + val pt = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("col") + new ColumnDescriptor(Array("col"), pt, 0, 1) + } + + // Reads `values.length` FLOATs through `FloatToDoubleUpdater.readValues` and returns the + // resulting double column. + private def readViaFloatToDoubleUpdater(values: Array[Float]): Array[Double] = { + val fac = newFactory(floatDescriptor) + val updater = fac.getUpdater(floatDescriptor, DataTypes.DoubleType) + val out = new OnHeapColumnVector(values.length.max(1), DataTypes.DoubleType) + val reader = newPlainReader(plainFloatBytes(values), values.length) + updater.readValues(values.length, 0, out, reader) + val result = new Array[Double](values.length) + var i = 0 + while (i < values.length) { result(i) = out.getDouble(i); i += 1 } + result + } + + // Sample mixes finite, signed-zero, NaN, and infinity to catch sign and special-value bugs. + private def floatSampleValues(n: Int): Array[Float] = { + val out = new Array[Float](n) + var i = 0 + while (i < n) { + out(i) = i % 7 match { + case 0 => Float.MinValue + case 1 => -1.5f + case 2 => -0.0f + case 3 => 0.0f + case 4 => Float.MaxValue + case 5 => Float.NaN + case _ => i * 0.125f - 3.25f + } + i += 1 + } + out + } + + // Java's float-to-double widening is exact for finite/infinite values and produces + // some double NaN for a NaN input (the payload may be canonicalized). Use + // `java.lang.Double.compare` to give NaN well-defined equality and to distinguish + // -0.0 from +0.0. + private def assertDoublesEqual(actual: Array[Double], expected: Array[Double]): Unit = { + assert(actual.length === expected.length) + var i = 0 + while (i < actual.length) { + assert(java.lang.Double.compare(actual(i), expected(i)) === 0, + s"mismatch at $i: actual=${actual(i)} expected=${expected(i)}") + i += 1 + } + } + + for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) { + test(s"FloatToDoubleUpdater produces correct widened output (total=$n)") { + val input = floatSampleValues(n) + assertDoublesEqual(readViaFloatToDoubleUpdater(input), input.map(_.toDouble)) + } + } + + test("FloatToDoubleUpdater: readValue widens a single FLOAT -> Double") { + // Same rationale as the IntegerToLongUpdater readValue test: the def-level-decoder's + // run-of-1 path calls `readFloat()` directly rather than the bulk method. + val input = Array(0.0f, 1.5f, -1.5f, Float.MinValue, Float.MaxValue, Float.NaN) + val fac = newFactory(floatDescriptor) + val updater = fac.getUpdater(floatDescriptor, DataTypes.DoubleType) + val out = new OnHeapColumnVector(input.length, DataTypes.DoubleType) + val reader = newPlainReader(plainFloatBytes(input), input.length) + var i = 0 + while (i < input.length) { + updater.readValue(i, out, reader) + i += 1 + } + val actual = (0 until input.length).map(out.getDouble).toArray + assertDoublesEqual(actual, input.map(_.toDouble)) + } + + test("FloatToDoubleUpdater: special values (signed zeros, NaN, +/-Infinity) widen exactly") { + // -0.0f widens to -0.0d (distinct from +0.0d), and Float.NaN widens to a double NaN. + val input = Array(-0.0f, 0.0f, Float.NegativeInfinity, Float.PositiveInfinity, Float.NaN) + val actual = readViaFloatToDoubleUpdater(input) + val expected = input.map(_.toDouble) + assertDoublesEqual(actual, expected) + // Spot-check signed-zero preservation directly (===-based comparison would conflate). + assert(java.lang.Double.doubleToRawLongBits(actual(0)) === + java.lang.Double.doubleToRawLongBits(-0.0d)) + assert(java.lang.Double.doubleToRawLongBits(actual(1)) === + java.lang.Double.doubleToRawLongBits(0.0d)) + } }