diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index 84937e2fdd61..03638ca037e9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -62,7 +62,7 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { if (literal == null) { builder.isNull(transform) } else { - builder.equal(transform, literal) + PredicateBuilder.and(builder.isNotNull(transform), builder.equal(transform, literal)) } case _ => throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala index 58cf9868dc80..e8b685664c94 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala @@ -30,6 +30,24 @@ class PaimonSourceTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ + test("Paimon Source: EQUAL_NULL_SAFE") { + withTempDir { + _ => + { + val TableSnapshotState(_, _, _, _, _) = prepareTableAndGetLocation(0, hasPk = true) + spark.sql("INSERT INTO T VALUES (1, CAST(null as string)), (2, CAST(null as string))") + val currentResult = () => spark.sql("SELECT * FROM T WHERE !(b <=> 'v_1')") + checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null))) + spark.sql("INSERT INTO T VALUES (3, 'v_1'), (4, CAST(null as string))") + checkAnswer(currentResult(), Seq(Row(1, null), Row(2, null), Row(4, null))) + val valueDF = spark.sql("SELECT * FROM T WHERE b <=> 'v_1'") + checkAnswer(valueDF, Seq(Row(3, "v_1"))) + val nullDF = spark.sql("SELECT * FROM T WHERE b <=> null") + checkAnswer(nullDF, Seq(Row(1, null), Row(2, null), Row(4, null))) + } + } + } + test("Paimon Source: default scan mode") { withTempDir { checkpointDir => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index 05704e3b365a..2d90777fd78c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -220,7 +220,7 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { test("V2Filter: EqualNullSafe") { var filter = "int_col <=> 1" var actual = converter.convert(v2Filter(filter)).get - assert(actual.equals(builder.equal(3, 1))) + assert(actual.equals(PredicateBuilder.and(builder.isNotNull(3), builder.equal(3, 1)))) checkAnswer(sql(s"SELECT int_col from test_tbl WHERE $filter ORDER BY int_col"), Seq(Row(1))) assert(scanFilesCount(filter) == 1)