From 6f19d03fce0dbfd310941520cf6d53d3f8035c32 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 11 May 2026 00:45:33 +0800 Subject: [PATCH 1/3] [SPARK-56811][UI] Restore sub-execution grouping on the SQL tab listing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR restores sub-execution grouping on the SQL tab listing that was silently dropped when the listing was switched to server-side pagination in SPARK-56140. - Backend (`SqlResource.sqlTable`): accept a new `groupSubExecution` query parameter (default = cluster config `spark.ui.groupSQLSubExecutionEnabled`). When enabled, paginate over root executions only and embed each root's children as `subExecutions: [...]` on the row. Sub-executions whose root is missing from the filtered set surface as roots so they are never hidden. Fix `recordsTotal` / `recordsFiltered` to count root rows in grouped mode so the DataTables "Showing X to Y of Z entries" matches the visible rows. - Frontend (`allexecutionspage.js`): read the existing `group-sub-exec-config` data attribute, forward `groupSubExecution=true|false` to the server, and append a trailing **Sub Executions** column showing a `+N sub` toggle on roots that have children. Toggling expands the row using DataTables `row().child()` with a nested table — matches the SPARK-41752 / Spark 4.1 layout. - CSS (`webui-dataTables.css`): minimal styling for the toggle link and the nested child table (indent + tertiary background). ### Why are the changes needed? SPARK-41752 introduced sub-execution grouping in Spark 4.1, and SPARK-55875 carried it over when the listing moved to client-side DataTables. SPARK-56140 then switched the listing to server-side pagination but didn't carry over the grouping logic — every sub-execution now shows up as its own flat row, regressing the UX for queries such as `CACHE TABLE` and nested CTAS. ### Does this PR introduce _any_ user-facing change? Yes — the SQL tab listing again folds sub-executions under their root, with a `+N sub` toggle to expand. Default behaviour is controlled by the existing `spark.ui.groupSQLSubExecutionEnabled` config (default true). ### How was this patch tested? - New unit test in `SqlResourceWithActualMetricsSuite` covering both grouped and flat modes against a session that runs CACHE TABLE (which produces a root + sub-execution pair). - Existing `SqlResourceWithActualMetricsSuite` and `AllExecutionsPageWithInMemoryStoreSuite` continue to pass. - Manual verification in a local Spark UI: ran a workload with two sub-execution-producing statements (CACHE TABLE, CTAS) and confirmed both modes render correctly via screenshot. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: GitHub Copilot CLI 1.0.44-2 with Claude Opus 4.7 (Extra high reasoning) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../spark/ui/static/webui-dataTables.css | 32 +++ .../execution/ui/static/allexecutionspage.js | 195 ++++++++++++------ .../spark/status/api/v1/sql/SqlResource.scala | 85 ++++++-- .../SqlResourceWithActualMetricsSuite.scala | 63 ++++++ 4 files changed, 288 insertions(+), 87 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css b/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css index 202579c6b67ce..e7a8f3ab0839a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css @@ -58,4 +58,36 @@ table.dataTable thead .sorting_desc_disabled::after { div.dataTables_wrapper div.dataTables_length select { width: 100%; +} + +/* SQL tab sub-execution disclosure (SPARK-56811) */ +table#sql-table td.sub-exec-toggle { + white-space: nowrap; +} + +table#sql-table td.sub-exec-toggle a.toggle-sub-exec { + text-decoration: none; +} + +table#sql-table td.sub-exec-toggle a.toggle-sub-exec:hover { + text-decoration: underline; +} + +table#sql-table tr.shown td.sub-exec-toggle a.toggle-sub-exec { + font-weight: 600; +} + +table#sql-table tr.shown + tr > td { + background-color: var(--bs-tertiary-bg, #f4f7fa); +} + +table.sub-exec-table { + margin-left: 1.5rem !important; + width: calc(100% - 1.5rem) !important; + background-color: transparent; +} + +table.sub-exec-table thead th { + font-weight: 600; + background-color: transparent; } \ No newline at end of file diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js index 34e3be4913ce4..f8889cf25a171 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js @@ -116,6 +116,13 @@ $(document).ready(function () { } } + // Read the cluster-level grouping toggle rendered into the page by Scala + var groupSubExecEnabled = true; + var configEl = document.getElementById("group-sub-exec-config"); + if (configEl) { + groupSubExecEnabled = configEl.getAttribute("data-value") === "true"; + } + function init(resolvedAppId) { var sqlTableEndPoint = createSQLTableEndPoint(resolvedAppId); @@ -131,6 +138,91 @@ $(document).ready(function () { '
'; + var columns = [ + { + data: "id", name: "id", title: "ID", + render: function (data, type) { + if (type !== "display") return data; + var basePath = uiRoot + appBasePath; + return '' + + data + ''; + } + }, + { + data: "queryId", name: "queryId", title: "Query ID", + orderable: false, + render: function (data, type) { + if (type !== "display" || !data) return data || ""; + return '' + data.substring(0, 8) + '...'; + } + }, + { + data: "status", name: "status", title: "Status", + render: function (data, type) { + if (type !== "display") return data; + return statusBadge(data); + } + }, + { + data: "description", name: "description", title: "Description", + render: function (data, type, row) { + if (type !== "display") return data || ""; + return descriptionHtml({ id: row.id, description: data }); + } + }, + { + data: "submissionTime", name: "submissionTime", title: "Submitted", + render: function (data, type) { + if (type !== "display") return data; + return formatDateSql(data); + } + }, + { + data: "duration", name: "duration", title: "Duration", + render: function (data, type) { + if (type !== "display") return data; + return formatDurationSql(data); + } + }, + { + data: "jobIds", name: "jobIds", title: "Succeeded Jobs", + orderable: false, + render: function (data, type) { + if (type !== "display") return (data || []).join(","); + return jobIdLinks(data || []); + } + }, + { + data: "errorMessage", name: "errorMessage", title: "Error Message", + orderable: false, + render: function (data, type) { + if (type !== "display" || !data) return data || ""; + if (data.length > 100) { + return '' + + escapeHtml(data.substring(0, 100)) + '...'; + } + return escapeHtml(data); + } + } + ]; + if (groupSubExecEnabled) { + // Trailing "Sub Executions" column matching the SPARK-41752 / 4.1 layout: + // shows "+N sub" when the root has children, blank otherwise. Click to + // expand a child row containing the sub-execution rows. + columns.push({ + data: null, name: "subExecutions", title: "Sub Executions", + orderable: false, searchable: false, + className: "sub-exec-toggle", + render: function (data, type, row) { + if (type !== "display") return ""; + var subs = row.subExecutions || []; + if (subs.length === 0) return ""; + return '' + + '+' + subs.length + ' sub'; + } + }); + } + var table = $("#sql-table").DataTable({ serverSide: true, processing: true, @@ -146,83 +238,52 @@ $(document).ready(function () { if (sel) { d.status = sel; } + d.groupSubExecution = groupSubExecEnabled ? "true" : "false"; }, dataSrc: function (json) { return json.aaData; }, error: function () { $("#sql-table_processing").css("display", "none"); } }, - columns: [ - { - data: "id", name: "id", title: "ID", - render: function (data, type) { - if (type !== "display") return data; - var basePath = uiRoot + appBasePath; - return '' + - data + ''; - } - }, - { - data: "queryId", name: "queryId", title: "Query ID", - orderable: false, - render: function (data, type) { - if (type !== "display" || !data) return data || ""; - return '' + data.substring(0, 8) + '...'; - } - }, - { - data: "status", name: "status", title: "Status", - render: function (data, type) { - if (type !== "display") return data; - return statusBadge(data); - } - }, - { - data: "description", name: "description", title: "Description", - render: function (data, type, row) { - if (type !== "display") return data || ""; - return descriptionHtml({ id: row.id, description: data }); - } - }, - { - data: "submissionTime", name: "submissionTime", title: "Submitted", - render: function (data, type) { - if (type !== "display") return data; - return formatDateSql(data); - } - }, - { - data: "duration", name: "duration", title: "Duration", - render: function (data, type) { - if (type !== "display") return data; - return formatDurationSql(data); - } - }, - { - data: "jobIds", name: "jobIds", title: "Succeeded Jobs", - orderable: false, - render: function (data, type) { - if (type !== "display") return (data || []).join(","); - return jobIdLinks(data || []); - } - }, - { - data: "errorMessage", name: "errorMessage", title: "Error Message", - orderable: false, - render: function (data, type) { - if (type !== "display" || !data) return data || ""; - if (data.length > 100) { - return '' + - escapeHtml(data.substring(0, 100)) + '...'; - } - return escapeHtml(data); - } - } - ], + columns: columns, order: [[0, "desc"]], language: { search: "Search: " } }); + // Child-row expansion for sub-executions. Sub data is embedded per root row + // in the server payload (`row.subExecutions`), so no second fetch is needed. + if (groupSubExecEnabled) { + $("#sql-table tbody").on("click", "a.toggle-sub-exec", function (e) { + e.preventDefault(); + var tr = $(this).closest("tr"); + var dtRow = table.row(tr); + var rowData = dtRow.data(); + var subs = (rowData && rowData.subExecutions) || []; + if (dtRow.child.isShown()) { + dtRow.child.hide(); + tr.removeClass("shown"); + $(this).text("+" + subs.length + " sub"); + } else { + var basePath = uiRoot + appBasePath; + var html = ''; + html += '' + + ''; + subs.forEach(function (child) { + html += ''; + html += ''; + html += ''; + html += ''; + html += ''; + }); + html += '
IDStatusDescriptionDurationSucceeded Jobs
' + child.id + '' + statusBadge(child.status) + '' + escapeHtml(child.description || "") + '' + formatDurationSql(child.duration) + '' + jobIdLinks(child.jobIds || []) + '
'; + dtRow.child(html).show(); + tr.addClass("shown"); + $(this).text("\u2212" + subs.length + " sub"); + } + }); + } + $("#status-filter").on("change", function () { table.draw(); }); diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 91d3f9a484e24..15109a43f7891 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -25,6 +25,7 @@ import jakarta.ws.rs._ import jakarta.ws.rs.core.{Context, MediaType, UriInfo} import org.apache.spark.JobExecutionStatus +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData} import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} import org.apache.spark.ui.UIUtils @@ -74,6 +75,11 @@ private[v1] class SqlResource extends BaseAppResource { * Server-side DataTables endpoint for SQL executions listing. * Accepts DataTables server-side parameters (start, length, order, search) * and returns paginated results with recordsTotal/recordsFiltered counts. + * + * When `groupSubExecution=true` (default = `spark.ui.sql.groupSubExecutionEnabled`), + * pagination is over root executions only and each row carries its sub-executions + * inline as `subExecutions: [...]`. Sub-executions whose root is missing from the + * filtered set (orphans) are surfaced as roots so they don't disappear. */ @GET @Path("sqlTable") @@ -85,7 +91,10 @@ private[v1] class SqlResource extends BaseAppResource { // Echo draw counter to prevent stale responses val draw = Option(uriParams.getFirst("draw")).map(_.toInt).getOrElse(0) - val totalRecords = sqlStore.executionsCount() + // Sub-execution grouping flag; default to the cluster config + val groupSubExec = Option(uriParams.getFirst("groupSubExecution")) + .map(_.toBoolean) + .getOrElse(ui.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)) // Search and status filter val searchValue = Option(uriParams.getFirst("search[value]")) @@ -94,9 +103,12 @@ private[v1] class SqlResource extends BaseAppResource { .filter(_.nonEmpty) val needsFilter = searchValue.isDefined || statusFilter.isDefined + // Always load all execs once. The KVStore-level pagination optimization + // referenced in earlier comments is no longer effective once grouping is + // enabled because we need the full set to identify roots and orphans. + val allExecs = sqlStore.executionsList() + val filteredExecs = if (needsFilter) { - // When filtering, we must load all and filter in memory - val allExecs = sqlStore.executionsList() allExecs.filter { exec => val matchesSearch = searchValue.forall { search => val lower = search.toLowerCase(java.util.Locale.ROOT) @@ -110,10 +122,21 @@ private[v1] class SqlResource extends BaseAppResource { matchesSearch && matchesStatus } } else { - // No filter — will use KVStore pagination below - Seq.empty + allExecs + } + + // Split filteredExecs into root rows and a sub-execution map. A root row is + // either an execution whose id equals its rootExecutionId, or an orphan sub + // whose root parent is absent from the filtered set. + val (rootRows, subsByRoot) = if (groupSubExec) { + val filteredIds = filteredExecs.iterator.map(_.executionId).toSet + val (roots, subs) = filteredExecs.partition { e => + e.executionId == e.rootExecutionId || !filteredIds.contains(e.rootExecutionId) + } + (roots, subs.groupBy(_.rootExecutionId)) + } else { + (filteredExecs, Map.empty[Long, Seq[SQLExecutionUIData]]) } - val filteredRecords = if (needsFilter) filteredExecs.size else totalRecords // Sort val sortCol = Option(uriParams.getFirst("order[0][column]")) @@ -125,26 +148,48 @@ private[v1] class SqlResource extends BaseAppResource { val start = Option(uriParams.getFirst("start")).map(_.toInt).getOrElse(0) val length = Option(uriParams.getFirst("length")).map(_.toInt).getOrElse(20) - val page = if (needsFilter) { - // Filter/search: sort and paginate in memory - val sorted = sortExecs(filteredExecs, sortCol, sortDir) - if (length > 0) sorted.slice(start, start + length) else sorted - } else { - // No filter: use KVStore-level pagination for efficiency - // KVStore returns in insertion order; sort in memory for the page - val execs = sqlStore.executionsList() - val sorted = sortExecs(execs, sortCol, sortDir) - if (length > 0) sorted.slice(start, start + length) else sorted + val sortedRoots = sortExecs(rootRows, sortCol, sortDir) + val page = if (length > 0) sortedRoots.slice(start, start + length) else sortedRoots + + // Convert to Java-compatible row data; embed sub-executions when grouping + val aaData = page.map { exec => + val row = execToRow(exec) + if (groupSubExec) { + val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty) + if (subs.nonEmpty) { + // Sort subs by id ascending so they appear in chronological order + val subRows = new java.util.ArrayList[java.util.LinkedHashMap[String, Object]]() + sortExecs(subs, "id", "asc").foreach(s => subRows.add(execToRow(s))) + row.put("subExecutions", subRows) + } + } + row } - // Convert to Java-compatible row data - val aaData = page.map(execToRow) + // Counts: when grouping, totals reflect root-only counts so DataTables shows + // "Showing X to Y of Z entries" matching the rows the user actually sees. + val recordsTotal = if (groupSubExec) { + if (needsFilter) { + // Re-derive root rows from the unfiltered set + val allIds = allExecs.iterator.map(_.executionId).toSet + allExecs.count { e => + e.executionId == e.rootExecutionId || !allIds.contains(e.rootExecutionId) + } + } else { + rootRows.size + } + } else if (needsFilter) { + filteredExecs.size + } else { + sqlStore.executionsCount() + } + val recordsFiltered = if (groupSubExec) rootRows.size else filteredExecs.size val ret = new HashMap[String, Object]() ret.put("draw", Integer.valueOf(draw)) ret.put("aaData", aaData) - ret.put("recordsTotal", java.lang.Long.valueOf(filteredRecords)) - ret.put("recordsFiltered", java.lang.Long.valueOf(filteredRecords)) + ret.put("recordsTotal", java.lang.Long.valueOf(recordsTotal)) + ret.put("recordsFiltered", java.lang.Long.valueOf(recordsFiltered)) ret } } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index 6cae4fb30668e..55e185a36495d 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -215,6 +215,69 @@ class SqlResourceWithActualMetricsSuite } } + test("SPARK-56811: sqlTable groups sub-executions under their root execution") { + // CACHE TABLE produces a root execution plus an inner sub-execution that + // shares its rootExecutionId. This is the canonical case where the SQL + // listing should fold the sub row under the root rather than flattening it. + spark.sql("CREATE OR REPLACE TEMP VIEW spark_56811 AS SELECT id FROM RANGE(10)") + .collect() + spark.sql("CACHE TABLE spark_56811_cached AS SELECT * FROM spark_56811").collect() + + eventually(timeout(10.seconds), interval(1.second)) { + val baseUrl = spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable" + + // Grouping ON: roots only, with subExecutions embedded on the root that + // owns a sub-execution. + val groupedUrl = new URI( + s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL + val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl) + assert(groupedCode === HttpServletResponse.SC_OK) + val groupedJson = JsonMethods.parse(groupedOpt.get) + val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long] + val groupedRecordsFiltered = (groupedJson \ "recordsFiltered").extract[Long] + val groupedRows = (groupedJson \ "aaData").children + assert(groupedRecordsTotal === groupedRows.size, + "with no filter, recordsTotal should match returned root count") + assert(groupedRecordsFiltered === groupedRows.size, + "with no filter, recordsFiltered should match returned root count") + groupedRows.foreach { row => + // Every row in grouped mode is a root: id == rootExecutionId. + val id = (row \ "id").extract[Long] + val rootId = (row \ "rootExecutionId").extract[Long] + assert(id === rootId, s"grouped row $id should be a root (root=$rootId)") + } + val rootsWithSubs = groupedRows.filter { row => + (row \ "subExecutions").children.nonEmpty + } + assert(rootsWithSubs.nonEmpty, + "CACHE TABLE should produce at least one root with sub-executions") + rootsWithSubs.foreach { row => + val rootId = (row \ "id").extract[Long] + (row \ "subExecutions").children.foreach { sub => + assert((sub \ "rootExecutionId").extract[Long] === rootId, + "sub-execution should reference its parent root") + assert((sub \ "id").extract[Long] !== rootId, + "sub-execution must not have the same id as its root") + } + } + + // Grouping OFF: flat list of every execution, with no embedded subs. + val flatUrl = new URI( + s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL + val (flatCode, flatOpt, _) = getContentAndCode(flatUrl) + assert(flatCode === HttpServletResponse.SC_OK) + val flatJson = JsonMethods.parse(flatOpt.get) + val flatRows = (flatJson \ "aaData").children + assert(flatRows.size > groupedRows.size, + "flat listing should contain at least one extra sub-execution row") + flatRows.foreach { row => + assert((row \ "subExecutions").children.isEmpty, + "flat listing should not embed subExecutions") + } + } + } + test("SPARK-56137: sqlList returns ISO date format in submissionTime") { withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("SELECT 'date_format_test'").collect() From 88c5155c18dd5068a56f989165b1ce2e66b5f178 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 May 2026 22:28:02 +0800 Subject: [PATCH 2/3] [SPARK-56811][UI][FOLLOWUP] Address review comments Address review feedback on PR #55787: Backend (SqlResource.scala): - Fix docstring config key: spark.ui.groupSQLSubExecutionEnabled - Replace inline KVStore-pagination comment with a clearer explanation of why allExecs is materialized once (orphan promotion + recordsTotal) - Extract the root/orphan partition predicate into SqlResource.partitionRoots so the filtered and unfiltered branches share the same implementation Frontend (allexecutionspage.js): - Escape queryId before rendering the title and short-form text - Add a11y attributes on the sub-execution toggle: role=button, aria-expanded, aria-controls pointing at the child table id - Render sub-execution descriptions via descriptionHtml so they match the parent row affordance instead of plain escapeHtml Tests (SqlResourceWithActualMetricsSuite.scala): - Wrap the SPARK-56811 grouping test in try/finally to UNCACHE the table even if assertions fail - Generalize per-row assertion to allow orphan rows - Add a synchronous unit test for SqlResource.partitionRoots that exercises true roots, grouped subs, and an orphan sub whose parent is absent from the input set Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../execution/ui/static/allexecutionspage.js | 19 ++- .../spark/status/api/v1/sql/SqlResource.scala | 45 ++++-- .../SqlResourceWithActualMetricsSuite.scala | 148 ++++++++++++------ 3 files changed, 139 insertions(+), 73 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js index f8889cf25a171..e1f9abc5c0d8a 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js @@ -153,7 +153,8 @@ $(document).ready(function () { orderable: false, render: function (data, type) { if (type !== "display" || !data) return data || ""; - return '' + data.substring(0, 8) + '...'; + var safe = escapeHtml(data); + return '' + escapeHtml(data.substring(0, 8)) + '...'; } }, { @@ -217,7 +218,9 @@ $(document).ready(function () { if (type !== "display") return ""; var subs = row.subExecutions || []; if (subs.length === 0) return ""; - return '' + + var childId = "sub-exec-" + row.id; + return '' + '+' + subs.length + ' sub'; } }); @@ -262,24 +265,28 @@ $(document).ready(function () { if (dtRow.child.isShown()) { dtRow.child.hide(); tr.removeClass("shown"); - $(this).text("+" + subs.length + " sub"); + $(this).text("+" + subs.length + " sub").attr("aria-expanded", "false"); } else { var basePath = uiRoot + appBasePath; - var html = ''; + var childId = "sub-exec-" + (rowData && rowData.id); + var html = '
'; html += '' + ''; subs.forEach(function (child) { html += ''; html += ''; - html += ''; + html += ''; html += ''; html += ''; }); html += '
IDStatusDescriptionDurationSucceeded Jobs
' + child.id + '' + statusBadge(child.status) + '' + escapeHtml(child.description || "") + '' + descriptionHtml({ + id: child.id, description: child.description || "" + }) + '' + formatDurationSql(child.duration) + '' + jobIdLinks(child.jobIds || []) + '
'; dtRow.child(html).show(); tr.addClass("shown"); - $(this).text("\u2212" + subs.length + " sub"); + $(this).text("\u2212" + subs.length + " sub").attr("aria-expanded", "true"); } }); } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 15109a43f7891..3d13d4f93b941 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -76,7 +76,7 @@ private[v1] class SqlResource extends BaseAppResource { * Accepts DataTables server-side parameters (start, length, order, search) * and returns paginated results with recordsTotal/recordsFiltered counts. * - * When `groupSubExecution=true` (default = `spark.ui.sql.groupSubExecutionEnabled`), + * When `groupSubExecution=true` (default = `spark.ui.groupSQLSubExecutionEnabled`), * pagination is over root executions only and each row carries its sub-executions * inline as `subExecutions: [...]`. Sub-executions whose root is missing from the * filtered set (orphans) are surfaced as roots so they don't disappear. @@ -103,9 +103,11 @@ private[v1] class SqlResource extends BaseAppResource { .filter(_.nonEmpty) val needsFilter = searchValue.isDefined || statusFilter.isDefined - // Always load all execs once. The KVStore-level pagination optimization - // referenced in earlier comments is no longer effective once grouping is - // enabled because we need the full set to identify roots and orphans. + // Always load all execs once. We need the full set to (a) identify orphan + // sub-executions whose root is filtered out and (b) count root rows for + // `recordsTotal`. `sqlStore.executionsList()` is already a full + // materialization, so there is no separate "KVStore-pagination" path being + // disabled here. val allExecs = sqlStore.executionsList() val filteredExecs = if (needsFilter) { @@ -125,15 +127,8 @@ private[v1] class SqlResource extends BaseAppResource { allExecs } - // Split filteredExecs into root rows and a sub-execution map. A root row is - // either an execution whose id equals its rootExecutionId, or an orphan sub - // whose root parent is absent from the filtered set. val (rootRows, subsByRoot) = if (groupSubExec) { - val filteredIds = filteredExecs.iterator.map(_.executionId).toSet - val (roots, subs) = filteredExecs.partition { e => - e.executionId == e.rootExecutionId || !filteredIds.contains(e.rootExecutionId) - } - (roots, subs.groupBy(_.rootExecutionId)) + SqlResource.partitionRoots(filteredExecs) } else { (filteredExecs, Map.empty[Long, Seq[SQLExecutionUIData]]) } @@ -170,11 +165,8 @@ private[v1] class SqlResource extends BaseAppResource { // "Showing X to Y of Z entries" matching the rows the user actually sees. val recordsTotal = if (groupSubExec) { if (needsFilter) { - // Re-derive root rows from the unfiltered set - val allIds = allExecs.iterator.map(_.executionId).toSet - allExecs.count { e => - e.executionId == e.rootExecutionId || !allIds.contains(e.rootExecutionId) - } + // Re-derive root rows from the unfiltered set using the same predicate + SqlResource.partitionRoots(allExecs)._1.size } else { rootRows.size } @@ -320,3 +312,22 @@ private[v1] class SqlResource extends BaseAppResource { } } + +private[v1] object SqlResource { + + /** + * Split a set of executions into root rows and a sub-execution map. A root row is + * either an execution whose id equals its rootExecutionId, or an orphan sub whose + * root parent is absent from the input set. Called on the filtered set (for paging) + * and on the full set (for `recordsTotal`), so the predicate lives in one place + * rather than being inlined twice. + */ + def partitionRoots(execs: Seq[SQLExecutionUIData]) + : (Seq[SQLExecutionUIData], Map[Long, Seq[SQLExecutionUIData]]) = { + val ids = execs.iterator.map(_.executionId).toSet + val (roots, subs) = execs.partition { e => + e.executionId == e.rootExecutionId || !ids.contains(e.rootExecutionId) + } + (roots, subs.groupBy(_.rootExecutionId)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index 55e185a36495d..df473373ed5fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.execution.ui.SQLExecutionUIData import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED import org.apache.spark.sql.test.SharedSparkSession @@ -222,62 +223,109 @@ class SqlResourceWithActualMetricsSuite spark.sql("CREATE OR REPLACE TEMP VIEW spark_56811 AS SELECT id FROM RANGE(10)") .collect() spark.sql("CACHE TABLE spark_56811_cached AS SELECT * FROM spark_56811").collect() - - eventually(timeout(10.seconds), interval(1.second)) { - val baseUrl = spark.sparkContext.ui.get.webUrl + - s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable" - - // Grouping ON: roots only, with subExecutions embedded on the root that - // owns a sub-execution. - val groupedUrl = new URI( - s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL - val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl) - assert(groupedCode === HttpServletResponse.SC_OK) - val groupedJson = JsonMethods.parse(groupedOpt.get) - val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long] - val groupedRecordsFiltered = (groupedJson \ "recordsFiltered").extract[Long] - val groupedRows = (groupedJson \ "aaData").children - assert(groupedRecordsTotal === groupedRows.size, - "with no filter, recordsTotal should match returned root count") - assert(groupedRecordsFiltered === groupedRows.size, - "with no filter, recordsFiltered should match returned root count") - groupedRows.foreach { row => - // Every row in grouped mode is a root: id == rootExecutionId. - val id = (row \ "id").extract[Long] - val rootId = (row \ "rootExecutionId").extract[Long] - assert(id === rootId, s"grouped row $id should be a root (root=$rootId)") - } - val rootsWithSubs = groupedRows.filter { row => - (row \ "subExecutions").children.nonEmpty - } - assert(rootsWithSubs.nonEmpty, - "CACHE TABLE should produce at least one root with sub-executions") - rootsWithSubs.foreach { row => - val rootId = (row \ "id").extract[Long] - (row \ "subExecutions").children.foreach { sub => - assert((sub \ "rootExecutionId").extract[Long] === rootId, - "sub-execution should reference its parent root") - assert((sub \ "id").extract[Long] !== rootId, - "sub-execution must not have the same id as its root") + try { + eventually(timeout(10.seconds), interval(1.second)) { + val baseUrl = spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable" + + // Grouping ON: roots only, with subExecutions embedded on the root that + // owns a sub-execution. + val groupedUrl = new URI( + s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL + val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl) + assert(groupedCode === HttpServletResponse.SC_OK) + val groupedJson = JsonMethods.parse(groupedOpt.get) + val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long] + val groupedRecordsFiltered = (groupedJson \ "recordsFiltered").extract[Long] + val groupedRows = (groupedJson \ "aaData").children + assert(groupedRecordsTotal === groupedRows.size, + "with no filter, recordsTotal should match returned root count") + assert(groupedRecordsFiltered === groupedRows.size, + "with no filter, recordsFiltered should match returned root count") + // Every row in grouped mode is either a true root (id == rootExecutionId) + // or an orphan sub whose real parent is absent from the result set. + val visibleIds = groupedRows.map(r => (r \ "id").extract[Long]).toSet + groupedRows.foreach { row => + val id = (row \ "id").extract[Long] + val rootId = (row \ "rootExecutionId").extract[Long] + assert(id == rootId || !visibleIds.contains(rootId), + s"grouped row $id (rootId=$rootId) is neither a root nor an orphan") + } + val rootsWithSubs = groupedRows.filter { row => + (row \ "subExecutions").children.nonEmpty + } + assert(rootsWithSubs.nonEmpty, + "CACHE TABLE should produce at least one root with sub-executions") + rootsWithSubs.foreach { row => + val rootId = (row \ "id").extract[Long] + (row \ "subExecutions").children.foreach { sub => + assert((sub \ "rootExecutionId").extract[Long] === rootId, + "sub-execution should reference its parent root") + assert((sub \ "id").extract[Long] !== rootId, + "sub-execution must not have the same id as its root") + } } - } - // Grouping OFF: flat list of every execution, with no embedded subs. - val flatUrl = new URI( - s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL - val (flatCode, flatOpt, _) = getContentAndCode(flatUrl) - assert(flatCode === HttpServletResponse.SC_OK) - val flatJson = JsonMethods.parse(flatOpt.get) - val flatRows = (flatJson \ "aaData").children - assert(flatRows.size > groupedRows.size, - "flat listing should contain at least one extra sub-execution row") - flatRows.foreach { row => - assert((row \ "subExecutions").children.isEmpty, - "flat listing should not embed subExecutions") + // Grouping OFF: flat list of every execution, with no embedded subs. + val flatUrl = new URI( + s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL + val (flatCode, flatOpt, _) = getContentAndCode(flatUrl) + assert(flatCode === HttpServletResponse.SC_OK) + val flatJson = JsonMethods.parse(flatOpt.get) + val flatRows = (flatJson \ "aaData").children + assert(flatRows.size > groupedRows.size, + "flat listing should contain at least one extra sub-execution row") + flatRows.foreach { row => + assert((row \ "subExecutions").children.isEmpty, + "flat listing should not embed subExecutions") + } } + } finally { + spark.sql("UNCACHE TABLE IF EXISTS spark_56811_cached") } } + test("SPARK-56811: partitionRoots surfaces orphan sub-executions as root rows") { + def mkExec(id: Long, rootId: Long): SQLExecutionUIData = new SQLExecutionUIData( + executionId = id, + rootExecutionId = rootId, + description = s"exec $id", + details = "", + physicalPlanDescription = "", + modifiedConfigs = Map.empty, + metrics = Seq.empty, + submissionTime = id, + completionTime = None, + errorMessage = None, + jobs = Map.empty, + stages = Set.empty, + metricValues = null, + queryId = null) + + // Tree: + // 1 (root) -> 2, 3 (subs) + // 4 (root, no subs) + // 6 (sub of 5, but 5 is missing -> orphan) + val root1 = mkExec(1, 1) + val sub2 = mkExec(2, 1) + val sub3 = mkExec(3, 1) + val root4 = mkExec(4, 4) + val orphan6 = mkExec(6, 5) + + val (roots, subsByRoot) = + SqlResource.partitionRoots(Seq(root1, sub2, sub3, root4, orphan6)) + + assert(roots.map(_.executionId).toSet === Set(1L, 4L, 6L), + "true roots and orphan subs should both be promoted to root rows") + assert(subsByRoot.keySet === Set(1L), + "only execs with a parent present in the input should appear in subsByRoot") + assert(subsByRoot(1L).map(_.executionId).toSet === Set(2L, 3L), + "subs should be grouped under their parent root id") + val orphanRow = roots.find(_.executionId == 6L).get + assert(orphanRow.rootExecutionId === 5L, + "orphan promoted to a root row preserves its original rootExecutionId") + } + test("SPARK-56137: sqlList returns ISO date format in submissionTime") { withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("SELECT 'date_format_test'").collect() From ca0daf62c42a890a34df7d255476618720944a93 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 13 May 2026 18:07:50 +0800 Subject: [PATCH 3/3] [SPARK-56811][UI][FOLLOWUP] Address review comments - SqlResource: defensive boolean parse for groupSubExecution param; always emit a (possibly empty) subExecutions field in grouped mode for consistent JSON schema; restore unfiltered executionsCount() as flat-mode recordsTotal so DataTables shows the 'filtered from W total entries' suffix. - SqlResourceWithActualMetricsSuite: assert flat row count equals grouped roots plus embedded sub rows. - allexecutionspage.js: track expanded row IDs out-of-band and re-attach the child row on every DataTables draw, so expand state survives sort, filter, and page-change under serverSide:true. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../execution/ui/static/allexecutionspage.js | 63 +++++++++++++------ .../spark/status/api/v1/sql/SqlResource.scala | 24 +++---- .../SqlResourceWithActualMetricsSuite.scala | 3 + 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js index e1f9abc5c0d8a..b741a18789d68 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js @@ -255,7 +255,34 @@ $(document).ready(function () { // Child-row expansion for sub-executions. Sub data is embedded per root row // in the server payload (`row.subExecutions`), so no second fetch is needed. + // Under serverSide: true DataTables destroys/recreates rows on every sort, + // filter or page change, so we track expanded row IDs out-of-band and + // re-attach the child on each draw. if (groupSubExecEnabled) { + var expandedRowIds = {}; + + var renderSubExecutionsHtml = function (rowData) { + var subs = (rowData && rowData.subExecutions) || []; + var basePath = uiRoot + appBasePath; + var childId = "sub-exec-" + (rowData && rowData.id); + var html = ''; + html += '' + + ''; + subs.forEach(function (child) { + html += ''; + html += ''; + html += ''; + html += ''; + html += ''; + }); + html += '
IDStatusDescriptionDurationSucceeded Jobs
' + child.id + '' + statusBadge(child.status) + '' + descriptionHtml({ + id: child.id, description: child.description || "" + }) + '' + formatDurationSql(child.duration) + '' + jobIdLinks(child.jobIds || []) + '
'; + return html; + }; + $("#sql-table tbody").on("click", "a.toggle-sub-exec", function (e) { e.preventDefault(); var tr = $(this).closest("tr"); @@ -266,29 +293,29 @@ $(document).ready(function () { dtRow.child.hide(); tr.removeClass("shown"); $(this).text("+" + subs.length + " sub").attr("aria-expanded", "false"); + delete expandedRowIds[rowData.id]; } else { - var basePath = uiRoot + appBasePath; - var childId = "sub-exec-" + (rowData && rowData.id); - var html = ''; - html += '' + - ''; - subs.forEach(function (child) { - html += ''; - html += ''; - html += ''; - html += ''; - html += ''; - }); - html += '
IDStatusDescriptionDurationSucceeded Jobs
' + child.id + '' + statusBadge(child.status) + '' + descriptionHtml({ - id: child.id, description: child.description || "" - }) + '' + formatDurationSql(child.duration) + '' + jobIdLinks(child.jobIds || []) + '
'; - dtRow.child(html).show(); + dtRow.child(renderSubExecutionsHtml(rowData)).show(); tr.addClass("shown"); $(this).text("\u2212" + subs.length + " sub").attr("aria-expanded", "true"); + expandedRowIds[rowData.id] = true; } }); + + table.on("draw", function () { + $("#sql-table tbody > tr").each(function () { + var dtRow = table.row(this); + var data = dtRow.data(); + if (data && expandedRowIds[data.id]) { + var subs = data.subExecutions || []; + dtRow.child(renderSubExecutionsHtml(data)).show(); + $(this).addClass("shown"); + $(this).find("a.toggle-sub-exec") + .text("\u2212" + subs.length + " sub") + .attr("aria-expanded", "true"); + } + }); + }); } $("#status-filter").on("change", function () { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 3d13d4f93b941..59a1264993e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1.sql import java.util.{Date, HashMap} +import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import jakarta.ws.rs._ @@ -91,9 +92,10 @@ private[v1] class SqlResource extends BaseAppResource { // Echo draw counter to prevent stale responses val draw = Option(uriParams.getFirst("draw")).map(_.toInt).getOrElse(0) - // Sub-execution grouping flag; default to the cluster config + // Sub-execution grouping flag; default to the cluster config. Defensive + // parse - bad values should not 500 the public REST endpoint. val groupSubExec = Option(uriParams.getFirst("groupSubExecution")) - .map(_.toBoolean) + .flatMap(v => Try(v.toBoolean).toOption) .getOrElse(ui.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)) // Search and status filter @@ -146,23 +148,23 @@ private[v1] class SqlResource extends BaseAppResource { val sortedRoots = sortExecs(rootRows, sortCol, sortDir) val page = if (length > 0) sortedRoots.slice(start, start + length) else sortedRoots - // Convert to Java-compatible row data; embed sub-executions when grouping + // Convert to Java-compatible row data; embed sub-executions when grouping. + // Always emit a `subExecutions` field (possibly empty) in grouped mode so + // JSON consumers see a consistent schema; flat mode never includes it. val aaData = page.map { exec => val row = execToRow(exec) if (groupSubExec) { val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty) - if (subs.nonEmpty) { - // Sort subs by id ascending so they appear in chronological order - val subRows = new java.util.ArrayList[java.util.LinkedHashMap[String, Object]]() - sortExecs(subs, "id", "asc").foreach(s => subRows.add(execToRow(s))) - row.put("subExecutions", subRows) - } + // Sort subs by id ascending so they appear in chronological order + row.put("subExecutions", sortExecs(subs, "id", "asc").map(execToRow).asJava) } row } - // Counts: when grouping, totals reflect root-only counts so DataTables shows + // Counts: grouped totals reflect root-only counts so DataTables shows // "Showing X to Y of Z entries" matching the rows the user actually sees. + // Flat mode's recordsTotal is the unfiltered total (from the KVStore), + // which lets DataTables show the "filtered from W total entries" suffix. val recordsTotal = if (groupSubExec) { if (needsFilter) { // Re-derive root rows from the unfiltered set using the same predicate @@ -170,8 +172,6 @@ private[v1] class SqlResource extends BaseAppResource { } else { rootRows.size } - } else if (needsFilter) { - filteredExecs.size } else { sqlStore.executionsCount() } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index df473373ed5fc..f03aff39b532d 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -275,6 +275,9 @@ class SqlResourceWithActualMetricsSuite val flatRows = (flatJson \ "aaData").children assert(flatRows.size > groupedRows.size, "flat listing should contain at least one extra sub-execution row") + val embeddedSubs = groupedRows.map(r => (r \ "subExecutions").children.size).sum + assert(flatRows.size === groupedRows.size + embeddedSubs, + "flat size should equal grouped roots plus embedded sub rows") flatRows.foreach { row => assert((row \ "subExecutions").children.isEmpty, "flat listing should not embed subExecutions")