diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css b/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css index 202579c6b67ce..e7a8f3ab0839a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui-dataTables.css @@ -58,4 +58,36 @@ table.dataTable thead .sorting_desc_disabled::after { div.dataTables_wrapper div.dataTables_length select { width: 100%; +} + +/* SQL tab sub-execution disclosure (SPARK-56811) */ +table#sql-table td.sub-exec-toggle { + white-space: nowrap; +} + +table#sql-table td.sub-exec-toggle a.toggle-sub-exec { + text-decoration: none; +} + +table#sql-table td.sub-exec-toggle a.toggle-sub-exec:hover { + text-decoration: underline; +} + +table#sql-table tr.shown td.sub-exec-toggle a.toggle-sub-exec { + font-weight: 600; +} + +table#sql-table tr.shown + tr > td { + background-color: var(--bs-tertiary-bg, #f4f7fa); +} + +table.sub-exec-table { + margin-left: 1.5rem !important; + width: calc(100% - 1.5rem) !important; + background-color: transparent; +} + +table.sub-exec-table thead th { + font-weight: 600; + background-color: transparent; } \ No newline at end of file diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js index 34e3be4913ce4..b741a18789d68 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js @@ -116,6 +116,13 @@ $(document).ready(function () { } } + // Read the cluster-level grouping toggle rendered into the page by Scala + var groupSubExecEnabled = true; + var configEl = document.getElementById("group-sub-exec-config"); + if (configEl) { + groupSubExecEnabled = configEl.getAttribute("data-value") === "true"; + } + function init(resolvedAppId) { var sqlTableEndPoint = createSQLTableEndPoint(resolvedAppId); @@ -131,6 +138,94 @@ $(document).ready(function () { '
'; + var columns = [ + { + data: "id", name: "id", title: "ID", + render: function (data, type) { + if (type !== "display") return data; + var basePath = uiRoot + appBasePath; + return '' + + data + ''; + } + }, + { + data: "queryId", name: "queryId", title: "Query ID", + orderable: false, + render: function (data, type) { + if (type !== "display" || !data) return data || ""; + var safe = escapeHtml(data); + return '' + escapeHtml(data.substring(0, 8)) + '...'; + } + }, + { + data: "status", name: "status", title: "Status", + render: function (data, type) { + if (type !== "display") return data; + return statusBadge(data); + } + }, + { + data: "description", name: "description", title: "Description", + render: function (data, type, row) { + if (type !== "display") return data || ""; + return descriptionHtml({ id: row.id, description: data }); + } + }, + { + data: "submissionTime", name: "submissionTime", title: "Submitted", + render: function (data, type) { + if (type !== "display") return data; + return formatDateSql(data); + } + }, + { + data: "duration", name: "duration", title: "Duration", + render: function (data, type) { + if (type !== "display") return data; + return formatDurationSql(data); + } + }, + { + data: "jobIds", name: "jobIds", title: "Succeeded Jobs", + orderable: false, + render: function (data, type) { + if (type !== "display") return (data || []).join(","); + return jobIdLinks(data || []); + } + }, + { + data: "errorMessage", name: "errorMessage", title: "Error Message", + orderable: false, + render: function (data, type) { + if (type !== "display" || !data) return data || ""; + if (data.length > 100) { + return '' + + escapeHtml(data.substring(0, 100)) + '...'; + } + return escapeHtml(data); + } + } + ]; + if (groupSubExecEnabled) { + // Trailing "Sub Executions" column matching the SPARK-41752 / 4.1 layout: + // shows "+N sub" when the root has children, blank otherwise. Click to + // expand a child row containing the sub-execution rows. + columns.push({ + data: null, name: "subExecutions", title: "Sub Executions", + orderable: false, searchable: false, + className: "sub-exec-toggle", + render: function (data, type, row) { + if (type !== "display") return ""; + var subs = row.subExecutions || []; + if (subs.length === 0) return ""; + var childId = "sub-exec-" + row.id; + return '' + + '+' + subs.length + ' sub'; + } + }); + } + var table = $("#sql-table").DataTable({ serverSide: true, processing: true, @@ -146,83 +241,83 @@ $(document).ready(function () { if (sel) { d.status = sel; } + d.groupSubExecution = groupSubExecEnabled ? "true" : "false"; }, dataSrc: function (json) { return json.aaData; }, error: function () { $("#sql-table_processing").css("display", "none"); } }, - columns: [ - { - data: "id", name: "id", title: "ID", - render: function (data, type) { - if (type !== "display") return data; - var basePath = uiRoot + appBasePath; - return '' + - data + ''; - } - }, - { - data: "queryId", name: "queryId", title: "Query ID", - orderable: false, - render: function (data, type) { - if (type !== "display" || !data) return data || ""; - return '' + data.substring(0, 8) + '...'; - } - }, - { - data: "status", name: "status", title: "Status", - render: function (data, type) { - if (type !== "display") return data; - return statusBadge(data); - } - }, - { - data: "description", name: "description", title: "Description", - render: function (data, type, row) { - if (type !== "display") return data || ""; - return descriptionHtml({ id: row.id, description: data }); - } - }, - { - data: "submissionTime", name: "submissionTime", title: "Submitted", - render: function (data, type) { - if (type !== "display") return data; - return formatDateSql(data); - } - }, - { - data: "duration", name: "duration", title: "Duration", - render: function (data, type) { - if (type !== "display") return data; - return formatDurationSql(data); - } - }, - { - data: "jobIds", name: "jobIds", title: "Succeeded Jobs", - orderable: false, - render: function (data, type) { - if (type !== "display") return (data || []).join(","); - return jobIdLinks(data || []); - } - }, - { - data: "errorMessage", name: "errorMessage", title: "Error Message", - orderable: false, - render: function (data, type) { - if (type !== "display" || !data) return data || ""; - if (data.length > 100) { - return '' + - escapeHtml(data.substring(0, 100)) + '...'; - } - return escapeHtml(data); - } - } - ], + columns: columns, order: [[0, "desc"]], language: { search: "Search: " } }); + // Child-row expansion for sub-executions. Sub data is embedded per root row + // in the server payload (`row.subExecutions`), so no second fetch is needed. + // Under serverSide: true DataTables destroys/recreates rows on every sort, + // filter or page change, so we track expanded row IDs out-of-band and + // re-attach the child on each draw. + if (groupSubExecEnabled) { + var expandedRowIds = {}; + + var renderSubExecutionsHtml = function (rowData) { + var subs = (rowData && rowData.subExecutions) || []; + var basePath = uiRoot + appBasePath; + var childId = "sub-exec-" + (rowData && rowData.id); + var html = ''; + html += '' + + ''; + subs.forEach(function (child) { + html += ''; + html += ''; + html += ''; + html += ''; + html += ''; + }); + html += '
IDStatusDescriptionDurationSucceeded Jobs
' + child.id + '' + statusBadge(child.status) + '' + descriptionHtml({ + id: child.id, description: child.description || "" + }) + '' + formatDurationSql(child.duration) + '' + jobIdLinks(child.jobIds || []) + '
'; + return html; + }; + + $("#sql-table tbody").on("click", "a.toggle-sub-exec", function (e) { + e.preventDefault(); + var tr = $(this).closest("tr"); + var dtRow = table.row(tr); + var rowData = dtRow.data(); + var subs = (rowData && rowData.subExecutions) || []; + if (dtRow.child.isShown()) { + dtRow.child.hide(); + tr.removeClass("shown"); + $(this).text("+" + subs.length + " sub").attr("aria-expanded", "false"); + delete expandedRowIds[rowData.id]; + } else { + dtRow.child(renderSubExecutionsHtml(rowData)).show(); + tr.addClass("shown"); + $(this).text("\u2212" + subs.length + " sub").attr("aria-expanded", "true"); + expandedRowIds[rowData.id] = true; + } + }); + + table.on("draw", function () { + $("#sql-table tbody > tr").each(function () { + var dtRow = table.row(this); + var data = dtRow.data(); + if (data && expandedRowIds[data.id]) { + var subs = data.subExecutions || []; + dtRow.child(renderSubExecutionsHtml(data)).show(); + $(this).addClass("shown"); + $(this).find("a.toggle-sub-exec") + .text("\u2212" + subs.length + " sub") + .attr("aria-expanded", "true"); + } + }); + }); + } + $("#status-filter").on("change", function () { table.draw(); }); diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 91d3f9a484e24..59a1264993e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -19,12 +19,14 @@ package org.apache.spark.status.api.v1.sql import java.util.{Date, HashMap} +import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import jakarta.ws.rs._ import jakarta.ws.rs.core.{Context, MediaType, UriInfo} import org.apache.spark.JobExecutionStatus +import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData} import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} import org.apache.spark.ui.UIUtils @@ -74,6 +76,11 @@ private[v1] class SqlResource extends BaseAppResource { * Server-side DataTables endpoint for SQL executions listing. * Accepts DataTables server-side parameters (start, length, order, search) * and returns paginated results with recordsTotal/recordsFiltered counts. + * + * When `groupSubExecution=true` (default = `spark.ui.groupSQLSubExecutionEnabled`), + * pagination is over root executions only and each row carries its sub-executions + * inline as `subExecutions: [...]`. Sub-executions whose root is missing from the + * filtered set (orphans) are surfaced as roots so they don't disappear. */ @GET @Path("sqlTable") @@ -85,7 +92,11 @@ private[v1] class SqlResource extends BaseAppResource { // Echo draw counter to prevent stale responses val draw = Option(uriParams.getFirst("draw")).map(_.toInt).getOrElse(0) - val totalRecords = sqlStore.executionsCount() + // Sub-execution grouping flag; default to the cluster config. Defensive + // parse - bad values should not 500 the public REST endpoint. + val groupSubExec = Option(uriParams.getFirst("groupSubExecution")) + .flatMap(v => Try(v.toBoolean).toOption) + .getOrElse(ui.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)) // Search and status filter val searchValue = Option(uriParams.getFirst("search[value]")) @@ -94,9 +105,14 @@ private[v1] class SqlResource extends BaseAppResource { .filter(_.nonEmpty) val needsFilter = searchValue.isDefined || statusFilter.isDefined + // Always load all execs once. We need the full set to (a) identify orphan + // sub-executions whose root is filtered out and (b) count root rows for + // `recordsTotal`. `sqlStore.executionsList()` is already a full + // materialization, so there is no separate "KVStore-pagination" path being + // disabled here. + val allExecs = sqlStore.executionsList() + val filteredExecs = if (needsFilter) { - // When filtering, we must load all and filter in memory - val allExecs = sqlStore.executionsList() allExecs.filter { exec => val matchesSearch = searchValue.forall { search => val lower = search.toLowerCase(java.util.Locale.ROOT) @@ -110,10 +126,14 @@ private[v1] class SqlResource extends BaseAppResource { matchesSearch && matchesStatus } } else { - // No filter — will use KVStore pagination below - Seq.empty + allExecs + } + + val (rootRows, subsByRoot) = if (groupSubExec) { + SqlResource.partitionRoots(filteredExecs) + } else { + (filteredExecs, Map.empty[Long, Seq[SQLExecutionUIData]]) } - val filteredRecords = if (needsFilter) filteredExecs.size else totalRecords // Sort val sortCol = Option(uriParams.getFirst("order[0][column]")) @@ -125,26 +145,43 @@ private[v1] class SqlResource extends BaseAppResource { val start = Option(uriParams.getFirst("start")).map(_.toInt).getOrElse(0) val length = Option(uriParams.getFirst("length")).map(_.toInt).getOrElse(20) - val page = if (needsFilter) { - // Filter/search: sort and paginate in memory - val sorted = sortExecs(filteredExecs, sortCol, sortDir) - if (length > 0) sorted.slice(start, start + length) else sorted - } else { - // No filter: use KVStore-level pagination for efficiency - // KVStore returns in insertion order; sort in memory for the page - val execs = sqlStore.executionsList() - val sorted = sortExecs(execs, sortCol, sortDir) - if (length > 0) sorted.slice(start, start + length) else sorted + val sortedRoots = sortExecs(rootRows, sortCol, sortDir) + val page = if (length > 0) sortedRoots.slice(start, start + length) else sortedRoots + + // Convert to Java-compatible row data; embed sub-executions when grouping. + // Always emit a `subExecutions` field (possibly empty) in grouped mode so + // JSON consumers see a consistent schema; flat mode never includes it. + val aaData = page.map { exec => + val row = execToRow(exec) + if (groupSubExec) { + val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty) + // Sort subs by id ascending so they appear in chronological order + row.put("subExecutions", sortExecs(subs, "id", "asc").map(execToRow).asJava) + } + row } - // Convert to Java-compatible row data - val aaData = page.map(execToRow) + // Counts: grouped totals reflect root-only counts so DataTables shows + // "Showing X to Y of Z entries" matching the rows the user actually sees. + // Flat mode's recordsTotal is the unfiltered total (from the KVStore), + // which lets DataTables show the "filtered from W total entries" suffix. + val recordsTotal = if (groupSubExec) { + if (needsFilter) { + // Re-derive root rows from the unfiltered set using the same predicate + SqlResource.partitionRoots(allExecs)._1.size + } else { + rootRows.size + } + } else { + sqlStore.executionsCount() + } + val recordsFiltered = if (groupSubExec) rootRows.size else filteredExecs.size val ret = new HashMap[String, Object]() ret.put("draw", Integer.valueOf(draw)) ret.put("aaData", aaData) - ret.put("recordsTotal", java.lang.Long.valueOf(filteredRecords)) - ret.put("recordsFiltered", java.lang.Long.valueOf(filteredRecords)) + ret.put("recordsTotal", java.lang.Long.valueOf(recordsTotal)) + ret.put("recordsFiltered", java.lang.Long.valueOf(recordsFiltered)) ret } } @@ -275,3 +312,22 @@ private[v1] class SqlResource extends BaseAppResource { } } + +private[v1] object SqlResource { + + /** + * Split a set of executions into root rows and a sub-execution map. A root row is + * either an execution whose id equals its rootExecutionId, or an orphan sub whose + * root parent is absent from the input set. Called on the filtered set (for paging) + * and on the full set (for `recordsTotal`), so the predicate lives in one place + * rather than being inlined twice. + */ + def partitionRoots(execs: Seq[SQLExecutionUIData]) + : (Seq[SQLExecutionUIData], Map[Long, Seq[SQLExecutionUIData]]) = { + val ids = execs.iterator.map(_.executionId).toSet + val (roots, subs) = execs.partition { e => + e.executionId == e.rootExecutionId || !ids.contains(e.rootExecutionId) + } + (roots, subs.groupBy(_.rootExecutionId)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index 6cae4fb30668e..f03aff39b532d 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.execution.ui.SQLExecutionUIData import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED import org.apache.spark.sql.test.SharedSparkSession @@ -215,6 +216,119 @@ class SqlResourceWithActualMetricsSuite } } + test("SPARK-56811: sqlTable groups sub-executions under their root execution") { + // CACHE TABLE produces a root execution plus an inner sub-execution that + // shares its rootExecutionId. This is the canonical case where the SQL + // listing should fold the sub row under the root rather than flattening it. + spark.sql("CREATE OR REPLACE TEMP VIEW spark_56811 AS SELECT id FROM RANGE(10)") + .collect() + spark.sql("CACHE TABLE spark_56811_cached AS SELECT * FROM spark_56811").collect() + try { + eventually(timeout(10.seconds), interval(1.second)) { + val baseUrl = spark.sparkContext.ui.get.webUrl + + s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable" + + // Grouping ON: roots only, with subExecutions embedded on the root that + // owns a sub-execution. + val groupedUrl = new URI( + s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL + val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl) + assert(groupedCode === HttpServletResponse.SC_OK) + val groupedJson = JsonMethods.parse(groupedOpt.get) + val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long] + val groupedRecordsFiltered = (groupedJson \ "recordsFiltered").extract[Long] + val groupedRows = (groupedJson \ "aaData").children + assert(groupedRecordsTotal === groupedRows.size, + "with no filter, recordsTotal should match returned root count") + assert(groupedRecordsFiltered === groupedRows.size, + "with no filter, recordsFiltered should match returned root count") + // Every row in grouped mode is either a true root (id == rootExecutionId) + // or an orphan sub whose real parent is absent from the result set. + val visibleIds = groupedRows.map(r => (r \ "id").extract[Long]).toSet + groupedRows.foreach { row => + val id = (row \ "id").extract[Long] + val rootId = (row \ "rootExecutionId").extract[Long] + assert(id == rootId || !visibleIds.contains(rootId), + s"grouped row $id (rootId=$rootId) is neither a root nor an orphan") + } + val rootsWithSubs = groupedRows.filter { row => + (row \ "subExecutions").children.nonEmpty + } + assert(rootsWithSubs.nonEmpty, + "CACHE TABLE should produce at least one root with sub-executions") + rootsWithSubs.foreach { row => + val rootId = (row \ "id").extract[Long] + (row \ "subExecutions").children.foreach { sub => + assert((sub \ "rootExecutionId").extract[Long] === rootId, + "sub-execution should reference its parent root") + assert((sub \ "id").extract[Long] !== rootId, + "sub-execution must not have the same id as its root") + } + } + + // Grouping OFF: flat list of every execution, with no embedded subs. + val flatUrl = new URI( + s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL + val (flatCode, flatOpt, _) = getContentAndCode(flatUrl) + assert(flatCode === HttpServletResponse.SC_OK) + val flatJson = JsonMethods.parse(flatOpt.get) + val flatRows = (flatJson \ "aaData").children + assert(flatRows.size > groupedRows.size, + "flat listing should contain at least one extra sub-execution row") + val embeddedSubs = groupedRows.map(r => (r \ "subExecutions").children.size).sum + assert(flatRows.size === groupedRows.size + embeddedSubs, + "flat size should equal grouped roots plus embedded sub rows") + flatRows.foreach { row => + assert((row \ "subExecutions").children.isEmpty, + "flat listing should not embed subExecutions") + } + } + } finally { + spark.sql("UNCACHE TABLE IF EXISTS spark_56811_cached") + } + } + + test("SPARK-56811: partitionRoots surfaces orphan sub-executions as root rows") { + def mkExec(id: Long, rootId: Long): SQLExecutionUIData = new SQLExecutionUIData( + executionId = id, + rootExecutionId = rootId, + description = s"exec $id", + details = "", + physicalPlanDescription = "", + modifiedConfigs = Map.empty, + metrics = Seq.empty, + submissionTime = id, + completionTime = None, + errorMessage = None, + jobs = Map.empty, + stages = Set.empty, + metricValues = null, + queryId = null) + + // Tree: + // 1 (root) -> 2, 3 (subs) + // 4 (root, no subs) + // 6 (sub of 5, but 5 is missing -> orphan) + val root1 = mkExec(1, 1) + val sub2 = mkExec(2, 1) + val sub3 = mkExec(3, 1) + val root4 = mkExec(4, 4) + val orphan6 = mkExec(6, 5) + + val (roots, subsByRoot) = + SqlResource.partitionRoots(Seq(root1, sub2, sub3, root4, orphan6)) + + assert(roots.map(_.executionId).toSet === Set(1L, 4L, 6L), + "true roots and orphan subs should both be promoted to root rows") + assert(subsByRoot.keySet === Set(1L), + "only execs with a parent present in the input should appear in subsByRoot") + assert(subsByRoot(1L).map(_.executionId).toSet === Set(2L, 3L), + "subs should be grouped under their parent root id") + val orphanRow = roots.find(_.executionId == 6L).get + assert(orphanRow.rootExecutionId === 5L, + "orphan promoted to a root row preserves its original rootExecutionId") + } + test("SPARK-56137: sqlList returns ISO date format in submissionTime") { withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("SELECT 'date_format_test'").collect()