[SPARK-53339][CONNECT] Fix an issue which occurs when an operation in pending state is interrupted#52083
[SPARK-53339][CONNECT] Fix an issue which occurs when an operation in pending state is interrupted#52083sarutak wants to merge 7 commits intoapache:masterfrom
Conversation
|
cc: @peter-toth |
|
cc: @dongjoon-hyun too. |
|
Thank you for pinging me, @sarutak . I just came back from my vacation today. |
| */ | ||
| def interrupt(): Boolean = { | ||
| if (eventsManager.status == ExecuteStatus.Pending) { | ||
| return false |
There was a problem hiding this comment.
-
According to the function description,
falseis already occupied for the status where it was already interrupted. -
For the code change, according to the state transition, can we change the status to
ExecuteStatus.Canceledstatus directly from the currentExecuteStatus.Pendingbecause it's not started yet? In this case, we can returntrue.
There was a problem hiding this comment.
For the code change, according to the state transition, can we change the status to ExecuteStatus.Canceled status directly from the current ExecuteStatus.Pending because it's not started yet? In this case, we can return true.
Actually, that was my first idea to solve this issue. But as I mentioned in the description, I found that didn't work because transitioning from Pending to Canceled causes another issue.
There was a problem hiding this comment.
According to the function description, false is already occupied for the status where it was already interrupted.
Hmm, if it's OK to ignore interruption to a pending state operation and we need exactly tell already interrupted from interruption failed, how about returning the exact interruption result rather than boolean?
There was a problem hiding this comment.
Got it. Thank you. As long as the code and description are consistent, I'm okay for both. (1) Updating the description by changing the meaning of false and (2) changing the return types.
There was a problem hiding this comment.
Thank you for your suggestion. I'll simply update the description.
|
BTW, thank you for the investigation to identify the root cause. cc @grundprinzip , @hvanhovell , @zhengruifeng to ask if this was the intentional design of state transition or not. |
|
There are two things at play here: the internal state of the operation itself and the notification on the listener bus. If this patch simply ignores the interrupt on an operation in a pending state, there is a new edge case where we can never cancel this operation if it's stuck in a pending state for whatever reason. Previously, it seems that while the physical query was cancelled, only the observable operation state on the listener bus was not properly handled. I understand that there is another race condition when the interrupt happens right between the incoming request and the different posting states. I think the better solution is not to ignore the interruption, but we need to figure out how to avoid double-posting of events. |
|
Given that this is one of the long standing At the same time, we can discuss more in order to figure out the correct steps in Apache Spark 4.1.0 timeframe as @grundprinzip suggested in the above. |
|
@dongjoon-hyun |
|
@dongjoon-hyun So, this issue should be documented only for Also, thank you for sharing related PRs. As far as I know, we have only one issue which blocks SPARK-48139 besides this issue, and I believe that's the last one. |
|
@grundprinzip |
|
@grundprinzip |
|
@grundprinzip Gentle ping. |
|
cc: @hvanhovell |
|
@hvanhovell Gentle ping. |
|
cc: @HyukjinKwon too. |
…`postStarted()` and allowing Pending to Canceled/Failed transition ### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org>
…`postStarted()` and allowing Pending to Canceled/Failed transition ### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 09979af) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
…`postStarted()` and allowing Pending to Canceled/Failed transition ### What changes were proposed in this pull request? This PR aims to solve SPARK-53339 using a different approach than #52083. The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work. The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window: 1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS 2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()` 3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses 4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows: ``` val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { + Thread.sleep(1000) executeHolder.eventsManager.postStarted() executeHolder.start() } catch { ``` And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`. ``` $ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"' ``` The fix consists of: 1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called. 2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`. 3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`. 4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new tests. I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded. ### Was this patch authored or co-authored using generative AI tooling? Kiro CLI / Opus 4.6 Closes #54774 from sarutak/SPARK-53339-2. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org> (cherry picked from commit 09979af) Signed-off-by: Kousuke Saruta <sarutak@apache.org>
|
This issue was resolved by another PR (#54774). |
What changes were proposed in this pull request?
This PR fixes an issue which occurs when an operation in pending state is interrupted.
Once an operation in pending state is interrupted, the interruption and following all interruption for the operation never work correctly.
You can easily reproduce this issue by modifying
SparkConnectExecutionManager#createExecuteHolderAndAttachlike as follows.And then run a test
interrupt all - background queries, foreground interruptinSparkSessionE2ESuite.You will see the following error.
If an operation in pending state is interrupted, the interruption is handled in
ExecuteHolder#interruptand ErrorUtils.handleError is called inErrorUtils#handleError, the operation status transitions toCanceledby calling executeEventsManager.postCanceled.But
postCanceleddoes not expect transition from pending state so an exception is thrown and propagated to the caller ofExecuteThreadRunner#interrupt.The reason following all interruptions for the same operation never works correctly is that
ExecuteThreadRunner#statehas already been changed tointerruptedhere at the first call ofExecuteThreadRunner#interruptand following interruptions don't enter this loop and this method always returnsfalse, causing the result of interruption is not correctly recognized.The solution in this PR includes:
Why are the changes needed?
Bug fix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add new tests.
I also confirmed that
SparkSessionE2ESuitementioned above succeeded.Was this patch authored or co-authored using generative AI tooling?
No.