Skip to content

Commit 09979af

Browse files
committed
[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition
### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org>
1 parent ceba3da commit 09979af

4 files changed

Lines changed: 53 additions & 4 deletions

File tree

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
193193
return
194194
}
195195

196+
// SPARK-53339: Post the Started event here, right after the CAS succeeds, to ensure that
197+
// postStarted() is never called when interrupt() has already transitioned the state to
198+
// interrupted. This eliminates the race between postStarted() and interrupt().
199+
executeHolder.eventsManager.postStarted()
200+
196201
// `withSession` ensures that session-specific artifacts (such as JARs and class files) are
197202
// available during processing.
198203
executeHolder.sessionHolder.withSession { session =>

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
188188
request.getPlan.getOpTypeCase match {
189189
case proto.Plan.OpTypeCase.COMMAND => request.getPlan.getCommand
190190
case proto.Plan.OpTypeCase.ROOT => request.getPlan.getRoot
191-
case _ =>
192-
throw new UnsupportedOperationException(
193-
s"${request.getPlan.getOpTypeCase} not supported.")
191+
case _ => request.getPlan
194192
}
195193

196194
val event = SparkListenerConnectOperationStarted(
@@ -248,8 +246,11 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
248246
* Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationCanceled.
249247
*/
250248
def postCanceled(): Unit = {
249+
// SPARK-53339: Pending is included to handle the case where interrupt() is called before
250+
// postStarted() transitions the status from Pending to Started.
251251
assertStatus(
252252
List(
253+
ExecuteStatus.Pending,
253254
ExecuteStatus.Started,
254255
ExecuteStatus.Analyzed,
255256
ExecuteStatus.ReadyForExecution,
@@ -269,8 +270,11 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
269270
* The message of the error thrown during the request.
270271
*/
271272
def postFailed(errorMessage: String): Unit = {
273+
// SPARK-53339: Pending is included to handle the case where postStarted() itself throws
274+
// an exception (e.g., session state check failure) before transitioning from Pending.
272275
assertStatus(
273276
List(
277+
ExecuteStatus.Pending,
274278
ExecuteStatus.Started,
275279
ExecuteStatus.Analyzed,
276280
ExecuteStatus.ReadyForExecution,

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
3535
import org.apache.spark.sql.connect.IllegalStateErrors
3636
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
3737
import org.apache.spark.sql.connect.execution.ExecuteGrpcResponseSender
38+
import org.apache.spark.sql.connect.planner.InvalidInputErrors
3839
import org.apache.spark.util.ThreadUtils
3940

4041
// Unique key identifying execution by combination of user, session and operation id
@@ -191,7 +192,16 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
191192
responseObserver: StreamObserver[proto.ExecutePlanResponse]): ExecuteHolder = {
192193
val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
193194
try {
194-
executeHolder.eventsManager.postStarted()
195+
// SPARK-53339: Validate the plan before starting the execution thread.
196+
// postStarted() was moved into executeInternal(), so invalid plans that previously
197+
// caused postStarted() to throw (and thus triggered removeExecuteHolder in this
198+
// catch block) now fail asynchronously inside the execution thread. This early
199+
// validation ensures that invalid plans are still caught synchronously here.
200+
request.getPlan.getOpTypeCase match {
201+
case proto.Plan.OpTypeCase.ROOT | proto.Plan.OpTypeCase.COMMAND => // valid
202+
case other =>
203+
throw InvalidInputErrors.invalidOneOfField(other, request.getPlan.getDescriptorForType)
204+
}
195205
executeHolder.start()
196206
} catch {
197207
// Errors raised before the execution holder has finished spawning a thread are considered

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,36 @@ class ExecuteEventsManagerSuite
138138
.isInstanceOf[SparkListenerConnectOperationCanceled])
139139
}
140140

141+
test("SPARK-53339: post canceled from Pending state") {
142+
val events = setupEvents(ExecuteStatus.Pending)
143+
events.postCanceled()
144+
assert(events.status == ExecuteStatus.Canceled)
145+
assert(events.terminationReason.contains(TerminationReason.Canceled))
146+
}
147+
148+
test("SPARK-53339: post failed from Pending state") {
149+
val events = setupEvents(ExecuteStatus.Pending)
150+
events.postFailed(DEFAULT_ERROR)
151+
assert(events.status == ExecuteStatus.Failed)
152+
assert(events.terminationReason.contains(TerminationReason.Failed))
153+
}
154+
155+
test("SPARK-53339: Pending to Canceled to Closed transition") {
156+
val events = setupEvents(ExecuteStatus.Pending)
157+
events.postCanceled()
158+
events.postClosed()
159+
assert(events.status == ExecuteStatus.Closed)
160+
assert(events.terminationReason.contains(TerminationReason.Canceled))
161+
}
162+
163+
test("SPARK-53339: Pending to Failed to Closed transition") {
164+
val events = setupEvents(ExecuteStatus.Pending)
165+
events.postFailed(DEFAULT_ERROR)
166+
events.postClosed()
167+
assert(events.status == ExecuteStatus.Closed)
168+
assert(events.terminationReason.contains(TerminationReason.Failed))
169+
}
170+
141171
test("SPARK-43923: post failed") {
142172
val events = setupEvents(ExecuteStatus.Started)
143173
events.postFailed(DEFAULT_ERROR)

0 commit comments

Comments
 (0)