[AURON #1471] Add the conversion execution list in the SparkUI#2150
[AURON #1471] Add the conversion execution list in the SparkUI#2150guixiaowen wants to merge 6 commits intoapache:masterfrom
Conversation
…ist in the Spark UI.
…ist in the Spark UI.
…ist in the Spark UI.
…ist in the Spark UI.
There was a problem hiding this comment.
Pull request overview
Adds Spark UI support for visualizing Auron native-conversion results per SQL execution (physical plan, converted vs fallback nodes, and fallback reasons), backed by new listener events and KVStore data.
Changes:
- Emit a new
AuronPlanFallbackEventduring post-columnar transitions and persist it in the UI KVStore. - Introduce
AuronExplainUtilsto render physical plans with operator IDs and collect fallback reasons. - Add a new “Queries” (executions) table to the Auron Spark UI page.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala | Posts per-execution fallback/conversion event for the UI. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala | Generates operator IDs, formats plans, and collects fallback reasons/counts. |
| spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala | Adds a new test related to Spark UI conversion behavior (currently not asserting UI state). |
| auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala | Adds KVStore accessors for execution UI data and defines AuronSQLExecutionUIData. |
| auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusListener.scala | Handles SQL start/end + fallback events, writes execution UI data, and implements retention cleanup. |
| auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala | Renders the new executions list table and details expansion in the Auron tab. |
| auron-spark-ui/src/main/scala/org/apache/auron/spark/ui/AuronEvent.scala | Introduces the new listener event AuronPlanFallbackEvent. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def collect(tmp: QueryPlan[_]): Unit = { | ||
| tmp.foreachUp { | ||
| case p: ExecutedCommandExec => | ||
| handleVanillaSparkPlan(p, fallbackNodeToReason) | ||
| case p: AdaptiveSparkPlanExec => | ||
| handleVanillaSparkPlan(p, fallbackNodeToReason) | ||
| collect(p.executedPlan) | ||
| case p: QueryStageExec => | ||
| handleVanillaSparkPlan(p, fallbackNodeToReason) | ||
| collect(p.plan) | ||
| case p: NativeSupports => | ||
| numAuronNodes += 1 | ||
| p.innerChildren.foreach(collect) | ||
| case p: SparkPlan => | ||
| handleVanillaSparkPlan(p, fallbackNodeToReason) | ||
| p.innerChildren.foreach(collect) | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| collect(plan) |
There was a problem hiding this comment.
collectFallbackNodes traverses the plan multiple times because it uses foreachUp (which already walks the subtree) and also recursively calls collect(...) on children/subplans inside the match arms. This will over-count numAuronNodes and can duplicate fallback collection work (and can degrade significantly on large plans). Refactor to do a single traversal (e.g., one foreachUp/foreach walk) and handle adaptive/query-stage subplans without re-traversing already-visited nodes (use a visited identity set if needed).
| def collect(tmp: QueryPlan[_]): Unit = { | |
| tmp.foreachUp { | |
| case p: ExecutedCommandExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| case p: AdaptiveSparkPlanExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| collect(p.executedPlan) | |
| case p: QueryStageExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| collect(p.plan) | |
| case p: NativeSupports => | |
| numAuronNodes += 1 | |
| p.innerChildren.foreach(collect) | |
| case p: SparkPlan => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| p.innerChildren.foreach(collect) | |
| case _ => | |
| } | |
| } | |
| collect(plan) | |
| // Use an identity-based visited set to avoid re-traversing shared subplans | |
| val visited = | |
| newSetFromMap[QueryPlan[_]]( | |
| new java.util.IdentityHashMap[QueryPlan[_], java.lang.Boolean]() | |
| ) | |
| def traverse(root: QueryPlan[_]): Unit = { | |
| // Skip traversal if we've already seen this plan instance | |
| if (!visited.add(root)) { | |
| return | |
| } | |
| root.foreachUp { | |
| case p: ExecutedCommandExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| case p: AdaptiveSparkPlanExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| // Traverse the executed plan as a separate root, guarding with visited set | |
| traverse(p.executedPlan) | |
| case p: QueryStageExec => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| // Traverse the underlying stage plan as a separate root, guarding with visited set | |
| traverse(p.plan) | |
| case p: NativeSupports => | |
| numAuronNodes += 1 | |
| case p: SparkPlan => | |
| handleVanillaSparkPlan(p, fallbackNodeToReason) | |
| case _ => | |
| } | |
| } | |
| traverse(plan) |
| UIUtils.headerSparkPage( | ||
| request, | ||
| "Auron", | ||
| buildInfoSummary(sqlStore.buildInfo()) ++ | ||
| buildExecutionsListSummary(sqlStore.executionsList(), request), | ||
| parent) |
There was a problem hiding this comment.
The page currently calls sqlStore.executionsList() and passes the full result into the table, and the datasource later sorts that full in-memory sequence. For applications with many SQL executions this can make the Auron tab slow and memory-heavy. Prefer a KVStore-backed paged datasource: use executionsCount() for the total, and fetch only the requested page via executionsList(offset, length) (and ideally push sorting down to the KVStore view when possible), mirroring Spark’s own UI pattern.
| private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] = { | ||
| val details = if (execution.description != null && execution.description.nonEmpty) { | ||
| val concat = new PlanStringConcat() | ||
| concat.append("== Fallback Summary ==\n") | ||
| val fallbackSummary = execution.fallbackNodeToReason | ||
| .map { case (name, reason) => | ||
| val id = name.substring(0, 3) | ||
| val nodeName = name.substring(4) | ||
| s"(${id.toInt}) $nodeName: $reason" | ||
| } | ||
| .mkString("\n") | ||
| concat.append(fallbackSummary) | ||
| if (execution.fallbackNodeToReason.isEmpty) { | ||
| concat.append("No fallback nodes") | ||
| } | ||
| concat.append("\n\n") | ||
| concat.append(execution.fallbackDescription) | ||
|
|
||
| <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" | ||
| class="expand-details"> | ||
| +details | ||
| </span> ++ | ||
| <div class="stage-details collapsed"> | ||
| <pre>{concat.toString()}</pre> | ||
| </div> | ||
| } else { | ||
| Nil | ||
| } |
There was a problem hiding this comment.
The “+details” expansion is currently hidden whenever execution.description is empty/null, even though fallback details (reasons/plan text) may still exist and would be useful. Gate details on the presence of fallback/plan content (e.g., fallbackDescription / fallbackNodeToReason.nonEmpty) instead of description.
| val ordering: Ordering[AuronExecutionTableRowData] = sortColumn match { | ||
| case "ID" => Ordering.by(_.executionUIData.executionId) | ||
| case "Description" => Ordering.by(_.executionUIData.fallbackDescription) | ||
| case "Num Auron Nodes" => Ordering.by(_.executionUIData.numAuronNodes) | ||
| case "Num Fallback Nodes" => Ordering.by(_.executionUIData.numFallbackNodes) | ||
| case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") | ||
| } |
There was a problem hiding this comment.
Sorting by the “Description” column uses fallbackDescription (physical plan text) rather than executionUIData.description. This makes sorting behavior inconsistent with the displayed column label/content. Update the ordering for "Description" to use the description field.
| def getAuronParameterOtherTable(request: HttpServletRequest, tableTag: String): String = { | ||
| request.getParameterMap.asScala | ||
| .filterNot(_._1.startsWith(tableTag)) | ||
| .map(parameter => parameter._1 + "=" + parameter._2(0)) |
There was a problem hiding this comment.
Query parameter values are concatenated into URLs without URL-encoding. This can break navigation when values contain &, =, spaces, or other reserved characters. Encode both keys and values (or use a Spark UI helper if available) when rebuilding the query string.
| .map(parameter => parameter._1 + "=" + parameter._2(0)) | |
| .map { case (key, values) => | |
| val encodedKey = URLEncoder.encode(key, UTF_8.name()) | |
| val value = if (values != null && values.nonEmpty) values(0) else "" | |
| val encodedValue = URLEncoder.encode(value, UTF_8.name()) | |
| s"$encodedKey=$encodedValue" | |
| } |
| test("test convert table in spark UI ") { | ||
| withTable("t1") { | ||
| sql( | ||
| "create table t1 using parquet PARTITIONED BY (part) as select 1 as c1, 2 as c2, 'test test' as part") | ||
| val df = sql("select * from t1") | ||
| checkAnswer(df, Seq(Row(1, 2, "test test"))) | ||
| } | ||
| } |
There was a problem hiding this comment.
This test exercises table creation/querying but does not assert the new Spark UI functionality (that AuronPlanFallbackEvent is posted and persisted as AuronSQLExecutionUIData, and that counts/reasons/plan text are stored). Add assertions against the AuronSQLAppStatusListener/KVStore (or AuronSQLAppStatusStore) to verify at least one execution record is written with expected fields (executionId, num nodes, fallback map/plan description).
| if (SparkEnv.get.conf | ||
| .get(SparkAuronConfiguration.UI_ENABLED.key, "true") | ||
| .equals("true")) { | ||
| val sc = sparkSession.sparkContext | ||
| val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| if (executionId == null) { | ||
| logDebug(s"Unknown execution id for plan: $sparkPlan") | ||
| return sparkPlan | ||
| } | ||
| val concat = new PlanStringConcat() | ||
| concat.append("== Physical Plan ==\n") | ||
|
|
||
| val (numAuronNodes, fallbackNodeToReason) = | ||
| AuronExplainUtils.processPlan(sparkPlan, concat.append) | ||
|
|
||
| val event = AuronPlanFallbackEvent( | ||
| executionId.toLong, | ||
| numAuronNodes, | ||
| fallbackNodeToReason.size, | ||
| concat.toString(), | ||
| fallbackNodeToReason) |
There was a problem hiding this comment.
Two issues here can cause unexpected failures/behavior: (1) UI enablement is checked via a case-sensitive string comparison; prefer a boolean conf read (or a safer parse) so values like TRUE / true behave consistently. (2) executionId.toLong can throw if the local property is non-numeric; guard the parse and skip posting the event (with debug logging) if it cannot be parsed.
…ist in the Spark UI.
…ist in the Spark UI.
|
Thanks for implementing this! Being able to see why stages weren't converted directly in the SparkUI is super helpful for debugging. |
@yew1eb ye Thank you very much for helping review the code. I really appreciate it. |
|
@ShreyeshArangath sh Would you mind helping to review the code? I’d really appreciate it. |
ShreyeshArangath
left a comment
There was a problem hiding this comment.
Really excited to see this! Left a few comments
| buildInfoSummary(sqlStore.buildInfo()) ++ | ||
| buildExecutionsListSummary(sqlStore.executionsList(), request), |
There was a problem hiding this comment.
nit: let's add a helper were we can arrange all the elements within an Auron view, so we can re-use it and have a consolidated place were we can design this page?
| } | ||
|
|
||
| @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") | ||
| class AuronExecutionPagedTable30( |
There was a problem hiding this comment.
nit: This name did not read clean to me, thoughts on something like this
| class AuronExecutionPagedTable30( | |
| class AuronExecutionPagedTableSpark3x |
|
|
||
| val auronPageTable = | ||
| try { | ||
| new AuronExecutionPagedTable40( |
| @KVIndex("executionId") | ||
| class AuronSQLExecutionUIData( | ||
| @KVIndexParam val executionId: Long, | ||
| val description: String, |
There was a problem hiding this comment.
Storing the entire plan here might be expensive for large plans. The structured fallbackNodeToReason is the bit that's actually unique and adds value. I'm not sure if the full plan needs to live in AuronSQLExecutionUIData at all. One thing we could do is +details popover can link to Spark's existing /SQL/execution/?id=... page (which already retains its own plan, deduped at the Spark layer).
| plan: T, | ||
| append: String => Unit, | ||
| collectFallbackFunc: Option[QueryPlan[_] => (Int, Map[String, String])] = None) | ||
| : (Int, Map[String, String]) = synchronized { |
There was a problem hiding this comment.
Does this need to be synchronized?
| } | ||
|
|
||
| @sparkver("3.0") | ||
| private def removeTags(plan: QueryPlan[_]): Unit = { |
There was a problem hiding this comment.
I think the operation here is quadratic: AuronExplainUtils.scala:L204-238. plan.foreach { case ... => remove(p, p.innerChildren) } already visits every node; inside remove, children.foreach(removeTags) recurses again from each visited node. Each subtree is visited O(depth) times. For a plan with N nodes and depth D, this is O(N·D) instead of O(N). Replace with a single plan.foreach(_.unsetTagValue(...))plus an explicit walk of AdaptiveSparkPlanExec.executedPlan / QueryStageExec.plan if those aren't reached by the default foreach
| event.fallbackNodeToReason.toSeq.sortBy(_._1)) | ||
| kvstore.write(uiData) | ||
| } else { | ||
| executionIdToFallbackEvent.put(event.executionId, event.copy()) |
There was a problem hiding this comment.
do we need to do event.copy(), the event should be immutable right?
| Utils.tryWithResource(view.closeableIterator())(iter => iter.asScala.toList) | ||
| } | ||
|
|
||
| def executionsList(): Seq[AuronSQLExecutionUIData] = { |
There was a problem hiding this comment.
executionsList() loads everything into memory and the page sorts in memory AuronSQLAppStatusStore.scala:42-44 (iter.asScala.toList) and AuronAllExecutionsPage.scala:560-561 (executionData.map(...).sorted(...)). The paginated overload executionsList(offset, length) exists but isn't used. With 1000 retained executions × MB-scale plan strings, this could have non-trivial effect on the heap? I think the cap helps, but pagination at the store layer would help more? WDYT?
Which issue does this PR close?
Closes #1471
Rationale for this change
Add the conversion execution list.
Display the physical execution plan of the SQL, show the version that has been converted to the native execution plan, and also present the reasons for stages that were not converted, helping users visualize the overall transformation process more intuitively.
What changes are included in this PR?
Add the conversion execution list. The core display functions of this list are described as follows:
Feature 1: Show the number of queries;
Feature 2: Execution plans that were not converted to native and the reasons for the non-conversion;
Feature 3: Original physical execution plans;
Feature 4: The number of nodes converted to Aurora nodes;
Feature 5: The number of nodes that were not converted.
Another example is to display the UI information through the Spark History Server after running the explain command.
Are there any user-facing changes?
How was this patch tested?
UT