[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774
Closed
sarutak wants to merge 3 commits intoapache:masterfrom
Closed
Conversation
HyukjinKwon
approved these changes
Mar 16, 2026
sarutak
added a commit
that referenced
this pull request
Mar 16, 2026
…`postStarted()` and allowing Pending to Canceled/Failed transition ### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 09979af) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
sarutak
added a commit
that referenced
this pull request
Mar 16, 2026
…`postStarted()` and allowing Pending to Canceled/Failed transition ### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 09979af) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
Member
Author
|
Merged to |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in
Pendingstate causes anIllegalStateExceptionand leaves the operation in a broken state where subsequent interrupts never work.The root cause is that in
SparkConnectExecutionManager#createExecuteHolderAndAttach, there was a window betweencreateExecuteHolder(which registers the operation) andpostStarted()where the operation was registered but still inPendingstate. If an interrupt arrived during this window:ExecuteThreadRunner#interrupt()transitionedstatefromnotStartedtointerruptedvia CASErrorUtils.handleErrorwas called withisInterrupted=true, which calledpostCanceled()postCanceled()threwIllegalStateExceptionbecausePendingwas not in its allowed source statusesExecuteThreadRunner.statewas already in the terminalinterruptedstateThis issue can be reproduced by inserting
Thread.sleep(100)intoSparkConnectExecutionManager#createExecuteHolderAndAttachlike as follows:And then run a test
interrupt all - background queries, foreground interruptinSparkSessionE2ESuite.The fix consists of:
Move
postStarted()intoExecuteThreadRunner#executeInternal()— Previously,postStarted()was called increateExecuteHolderAndAttachbeforestart(), creating a window where an interrupt could race with the status transition. By movingpostStarted()to right after thenotStarted -> startedCAS inexecuteInternal(), the status transition and the CAS are now sequenced — if interrupt wins the CAS (notStarted -> interrupted),postStarted()is never called.Allow
Pending -> CanceledandPending -> Failedtransitions — When interrupt wins the CAS beforepostStarted()is called,ExecuteEventsManager._statusis stillPending. ThepostCanceled()call fromErrorUtils.handleErrorneeds to transition fromPendingtoCanceled. Similarly,postFailed()needs to handle the case wherepostStarted()itself throws an exception (e.g., session state check failure) while_statusis stillPending.Remove plan validation from
postStarted()—postStarted()previously threwUnsupportedOperationExceptionfor unknownOpTypeCasevalues (e.g.,OPTYPE_NOT_SET). This was an implicit validation that doesn't belong inpostStarted(), whose responsibility is status transition and listener event firing. Thecase _branch now falls back torequest.getPlaninstead of throwing, since theplanvariable is only used for generating thestatementtext in the listener event. Actual plan validation is handled byexecuteInternal().Add early plan validation in
createExecuteHolderAndAttach— SincepostStarted()was moved intoexecuteInternal()(change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously inpostStarted()would now fail asynchronously inside the execution thread. This means the existingcatchblock increateExecuteHolderAndAttach— which callsremoveExecuteHolderto clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicitOpTypeCasevalidation is added beforestart(), ensuring that invalid plans are still caught synchronously and the holder is properly removed from theexecutionsmap.Why are the changes needed?
Bug fix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add new tests.
I also confirmed that
SparkSessionE2ESuitementioned above succeeded.Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6