Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21299,7 +21299,8 @@ def to_json(col: "ColumnOrName", options: Optional[Mapping[str, str]] = None) ->
See `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option>`_
for the version you use.
Additionally the function supports the `pretty` option which enables
pretty JSON generation.
pretty JSON generation, and the `sortKeys` option which sorts map keys
in JSON objects for deterministic output.

.. # noqa

Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3396,6 +3396,21 @@ def test_parse_json(self):
self.assertEqual("""{"a":1}""", actual["var"])
self.assertEqual("""{"b":[{"c":"str2"}]}""", actual["var_lit"])

def test_to_json_sort_keys_option(self):
df = self.spark.createDataFrame([({"b": 1, "a": 2},)], ["m"])
res = df.select(F.to_json("m", {"sortKeys": "true"}).alias("j")).collect()
self.assertEqual("""{"a":2,"b":1}""", res[0]["j"])

nested = self.spark.createDataFrame(
[({"outerB": {"y": 2, "x": 1}, "outerA": {"d": 4, "c": 3}},)],
["m"],
)
res_nested = nested.select(F.to_json("m", {"sortKeys": "true"}).alias("j")).collect()
self.assertEqual(
"""{"outerA":{"c":3,"d":4},"outerB":{"x":1,"y":2}}""",
res_nested[0]["j"],
)

def test_variant_expressions(self):
df = self.spark.createDataFrame(
[Row(json="""{ "a" : 1 }""", path="$.a"), Row(json="""{ "b" : 2 }""", path="$.b")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ class JSONOptions(
*/
val pretty: Boolean = parameters.get(PRETTY).map(_.toBoolean).getOrElse(false)

/**
* Sorting map keys in JSON objects if the parameter is enabled.
* This is mainly useful for deterministic JSON output, for example
* when comparing JSON strings generated from map-typed columns.
*/
val sortKeys: Boolean = parameters.get(SORT_KEYS).map(_.toBoolean).getOrElse(false)

/**
* Enables inferring of TimestampType and TimestampNTZType from strings matched to the
* corresponding timestamp pattern defined by the timestampFormat and timestampNTZFormat options
Expand Down Expand Up @@ -304,6 +311,7 @@ object JSONOptions extends DataSourceOptions {
val LINE_SEP = newOption("lineSep")
val PRETTY = newOption("pretty")
val INFER_TIMESTAMP = newOption("inferTimestamp")
val SORT_KEYS = newOption("sortKeys")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)
val TIME_ZONE = newOption("timeZone")
val WRITE_NON_ASCII_CHARACTER_AS_CODEPOINT = newOption("writeNonAsciiCharacterAsCodePoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,35 @@ class JacksonGenerator(
map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
val keyArray = map.keyArray()
val valueArray = map.valueArray()
var i = 0
while (i < map.numElements()) {
gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
if (!valueArray.isNullAt(i)) {
fieldWriter.apply(valueArray, i)
} else {
gen.writeNull()
val numElements = map.numElements()

if (!options.sortKeys) {
var i = 0
while (i < numElements) {
gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
if (!valueArray.isNullAt(i)) {
fieldWriter.apply(valueArray, i)
} else {
gen.writeNull()
}
i += 1
}
} else {
val keyType = mapType.keyType
val indices = Array.tabulate(numElements)(identity)
val sortedIndices = indices.sortBy(i => keyArray.get(i, keyType).toString)

var pos = 0
while (pos < numElements) {
val i = sortedIndices(pos)
gen.writeFieldName(keyArray.get(i, keyType).toString)
if (!valueArray.isNullAt(i)) {
fieldWriter.apply(valueArray, i)
} else {
gen.writeNull()
}
pos += 1
}
i += 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ class JacksonGeneratorSuite extends SparkFunSuite {
assert(writer.toString === """{"a":1}""")
}

test("initial with Map and write out a map data with sortKeys=true") {
val dataType = MapType(StringType, IntegerType)
// Intentionally construct map data with keys in non-sorted order.
val input = ArrayBasedMapData(Array("b", "a"), Array(1, 2))
val writer = new CharArrayWriter()
val sortOption = new JSONOptions(Map("sortKeys" -> "true"), utcId)
val gen = new JacksonGenerator(dataType, writer, sortOption)
gen.write(input)
gen.flush()
assert(writer.toString === """{"a":2,"b":1}""")
}

test("initial with Map and write out an array of maps") {
val dataType = MapType(StringType, IntegerType)
val input = new GenericArrayData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,22 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
Row("""{"a":1}""") :: Nil)
}

test("to_json with option (sortKeys)") {
val df = Seq(Map("b" -> 1, "a" -> 2)).toDF("a")
val options = Map("sortKeys" -> "true")

checkAnswer(
df.select(to_json($"a", options)),
Row("""{"a":2,"b":1}""") :: Nil)

val nested = Seq(Map("outerB" -> Map("y" -> 2, "x" -> 1),
"outerA" -> Map("d" -> 4, "c" -> 3))).toDF("m")

checkAnswer(
nested.select(to_json($"m", options)),
Row("""{"outerA":{"c":3,"d":4},"outerB":{"x":1,"y":2}}""") :: Nil)
}

test("to_json with option (timestampFormat)") {
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
Expand Down