diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index a3b511aac262..56958a8c09a7 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -21299,7 +21299,8 @@ def to_json(col: "ColumnOrName", options: Optional[Mapping[str, str]] = None) -> See `Data Source Option `_ for the version you use. Additionally the function supports the `pretty` option which enables - pretty JSON generation. + pretty JSON generation, and the `sortKeys` option which sorts map keys + in JSON objects for deterministic output. .. # noqa diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index cdfd8cc24cd0..8a147e90b9f9 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -3396,6 +3396,21 @@ def test_parse_json(self): self.assertEqual("""{"a":1}""", actual["var"]) self.assertEqual("""{"b":[{"c":"str2"}]}""", actual["var_lit"]) + def test_to_json_sort_keys_option(self): + df = self.spark.createDataFrame([({"b": 1, "a": 2},)], ["m"]) + res = df.select(F.to_json("m", {"sortKeys": "true"}).alias("j")).collect() + self.assertEqual("""{"a":2,"b":1}""", res[0]["j"]) + + nested = self.spark.createDataFrame( + [({"outerB": {"y": 2, "x": 1}, "outerA": {"d": 4, "c": 3}},)], + ["m"], + ) + res_nested = nested.select(F.to_json("m", {"sortKeys": "true"}).alias("j")).collect() + self.assertEqual( + """{"outerA":{"c":3,"d":4},"outerB":{"x":1,"y":2}}""", + res_nested[0]["j"], + ) + def test_variant_expressions(self): df = self.spark.createDataFrame( [Row(json="""{ "a" : 1 }""", path="$.a"), Row(json="""{ "b" : 2 }""", path="$.b")] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index d90491c2cf3d..51eb1b99692c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -191,6 +191,13 @@ class JSONOptions( */ val pretty: Boolean = parameters.get(PRETTY).map(_.toBoolean).getOrElse(false) + /** + * Sorting map keys in JSON objects if the parameter is enabled. + * This is mainly useful for deterministic JSON output, for example + * when comparing JSON strings generated from map-typed columns. + */ + val sortKeys: Boolean = parameters.get(SORT_KEYS).map(_.toBoolean).getOrElse(false) + /** * Enables inferring of TimestampType and TimestampNTZType from strings matched to the * corresponding timestamp pattern defined by the timestampFormat and timestampNTZFormat options @@ -304,6 +311,7 @@ object JSONOptions extends DataSourceOptions { val LINE_SEP = newOption("lineSep") val PRETTY = newOption("pretty") val INFER_TIMESTAMP = newOption("inferTimestamp") + val SORT_KEYS = newOption("sortKeys") val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD) val TIME_ZONE = newOption("timeZone") val WRITE_NON_ASCII_CHARACTER_AS_CODEPOINT = newOption("writeNonAsciiCharacterAsCodePoint") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9d6f712c6b20..bccb0f7121ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -276,15 +276,35 @@ class JacksonGenerator( map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = { val keyArray = map.keyArray() val valueArray = map.valueArray() - var i = 0 - while (i < map.numElements()) { - gen.writeFieldName(keyArray.get(i, mapType.keyType).toString) - if (!valueArray.isNullAt(i)) { - fieldWriter.apply(valueArray, i) - } else { - gen.writeNull() + val numElements = map.numElements() + + if (!options.sortKeys) { + var i = 0 + while (i < numElements) { + gen.writeFieldName(keyArray.get(i, mapType.keyType).toString) + if (!valueArray.isNullAt(i)) { + fieldWriter.apply(valueArray, i) + } else { + gen.writeNull() + } + i += 1 + } + } else { + val keyType = mapType.keyType + val indices = Array.tabulate(numElements)(identity) + val sortedIndices = indices.sortBy(i => keyArray.get(i, keyType).toString) + + var pos = 0 + while (pos < numElements) { + val i = sortedIndices(pos) + gen.writeFieldName(keyArray.get(i, keyType).toString) + if (!valueArray.isNullAt(i)) { + fieldWriter.apply(valueArray, i) + } else { + gen.writeNull() + } + pos += 1 } - i += 1 } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index dc9a5816a335..d030fd4bdb17 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -106,6 +106,18 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":1}""") } + test("initial with Map and write out a map data with sortKeys=true") { + val dataType = MapType(StringType, IntegerType) + // Intentionally construct map data with keys in non-sorted order. + val input = ArrayBasedMapData(Array("b", "a"), Array(1, 2)) + val writer = new CharArrayWriter() + val sortOption = new JSONOptions(Map("sortKeys" -> "true"), utcId) + val gen = new JacksonGenerator(dataType, writer, sortOption) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":2,"b":1}""") + } + test("initial with Map and write out an array of maps") { val dataType = MapType(StringType, IntegerType) val input = new GenericArrayData( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 6d3be98bd0cd..392131d3cb2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -423,6 +423,22 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { Row("""{"a":1}""") :: Nil) } + test("to_json with option (sortKeys)") { + val df = Seq(Map("b" -> 1, "a" -> 2)).toDF("a") + val options = Map("sortKeys" -> "true") + + checkAnswer( + df.select(to_json($"a", options)), + Row("""{"a":2,"b":1}""") :: Nil) + + val nested = Seq(Map("outerB" -> Map("y" -> 2, "x" -> 1), + "outerA" -> Map("d" -> 4, "c" -> 3))).toDF("m") + + checkAnswer( + nested.select(to_json($"m", options)), + Row("""{"outerA":{"c":3,"d":4},"outerB":{"x":1,"y":2}}""") :: Nil) + } + test("to_json with option (timestampFormat)") { val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")