Skip to content

[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774

Closed
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:SPARK-53339-2
Closed

[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:SPARK-53339-2

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Mar 12, 2026

What changes were proposed in this pull request?

This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in Pending state causes an IllegalStateException and leaves the operation in a broken state where subsequent interrupts never work.
The root cause is that in SparkConnectExecutionManager#createExecuteHolderAndAttach, there was a window between createExecuteHolder (which registers the operation) and postStarted() where the operation was registered but still in Pending state. If an interrupt arrived during this window:

  1. ExecuteThreadRunner#interrupt() transitioned state from notStarted to interrupted via CAS
  2. ErrorUtils.handleError was called with isInterrupted=true, which called postCanceled()
  3. postCanceled() threw IllegalStateException because Pending was not in its allowed source statuses
  4. All subsequent interrupts for the same operation failed silently because ExecuteThreadRunner.state was already in the terminal interrupted state

This issue can be reproduced by inserting Thread.sleep(100) into SparkConnectExecutionManager#createExecuteHolderAndAttach like as follows:

     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {

And then run a test interrupt all - background queries, foreground interrupt in SparkSessionE2ESuite.

$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'

The fix consists of:

  1. Move postStarted() into ExecuteThreadRunner#executeInternal() — Previously, postStarted() was called in createExecuteHolderAndAttach before start(), creating a window where an interrupt could race with the status transition. By moving postStarted() to right after the notStarted -> started CAS in executeInternal(), the status transition and the CAS are now sequenced — if interrupt wins the CAS (notStarted -> interrupted), postStarted() is never called.

  2. Allow Pending -> Canceled and Pending -> Failed transitions — When interrupt wins the CAS before postStarted() is called, ExecuteEventsManager._status is still Pending. The postCanceled() call from ErrorUtils.handleError needs to transition from Pending to Canceled. Similarly, postFailed() needs to handle the case where postStarted() itself throws an exception (e.g., session state check failure) while _status is still Pending.

  3. Remove plan validation from postStarted()postStarted() previously threw UnsupportedOperationException for unknown OpTypeCase values (e.g., OPTYPE_NOT_SET). This was an implicit validation that doesn't belong in postStarted(), whose responsibility is status transition and listener event firing. The case _ branch now falls back to request.getPlan instead of throwing, since the plan variable is only used for generating the statement text in the listener event. Actual plan validation is handled by executeInternal().

  4. Add early plan validation in createExecuteHolderAndAttach — Since postStarted() was moved into executeInternal() (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in postStarted() would now fail asynchronously inside the execution thread. This means the existing catch block in createExecuteHolderAndAttach — which calls removeExecuteHolder to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit OpTypeCase validation is added before start(), ensuring that invalid plans are still caught synchronously and the holder is properly removed from the executions map.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add new tests.
I also confirmed that SparkSessionE2ESuite mentioned above succeeded.

Was this patch authored or co-authored using generative AI tooling?

Kiro CLI / Opus 4.6

@sarutak sarutak closed this in 09979af Mar 16, 2026
sarutak added a commit that referenced this pull request Mar 16, 2026
…`postStarted()` and allowing Pending to Canceled/Failed transition

### What changes were proposed in this pull request?
This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work.
The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window:

1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS
2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()`
3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses
4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state

This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows:

```
     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {
```

And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`.
```
$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'
```

The fix consists of:

1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called.

2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`.

3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`.

4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map.

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Add  new tests.
I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded.

### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6

Closes #54774 from sarutak/SPARK-53339-2.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
(cherry picked from commit 09979af)
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
sarutak added a commit that referenced this pull request Mar 16, 2026
…`postStarted()` and allowing Pending to Canceled/Failed transition

### What changes were proposed in this pull request?
This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in `Pending` state causes an `IllegalStateException` and leaves the operation in a broken state where subsequent interrupts never work.
The root cause is that in `SparkConnectExecutionManager#createExecuteHolderAndAttach`, there was a window between `createExecuteHolder` (which registers the operation) and `postStarted()` where the operation was registered but still in `Pending` state. If an interrupt arrived during this window:

1. `ExecuteThreadRunner#interrupt()` transitioned `state` from `notStarted` to `interrupted` via CAS
2. `ErrorUtils.handleError` was called with `isInterrupted=true`, which called `postCanceled()`
3. `postCanceled()` threw `IllegalStateException` because `Pending` was not in its allowed source statuses
4. All subsequent interrupts for the same operation failed silently because `ExecuteThreadRunner.state` was already in the terminal `interrupted` state

This issue can be reproduced by inserting `Thread.sleep(100)` into `SparkConnectExecutionManager#createExecuteHolderAndAttach` like as follows:

```
     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {
```

And then run a test `interrupt all - background queries, foreground interrupt` in `SparkSessionE2ESuite`.
```
$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'
```

The fix consists of:

1. **Move `postStarted()` into `ExecuteThreadRunner#executeInternal()`** — Previously, `postStarted()` was called in `createExecuteHolderAndAttach` before `start()`, creating a window where an interrupt could race with the status transition. By moving `postStarted()` to right after the `notStarted -> started` CAS in `executeInternal()`, the status transition and the CAS are now sequenced — if interrupt wins the CAS (`notStarted -> interrupted`), `postStarted()` is never called.

2. **Allow `Pending -> Canceled` and `Pending -> Failed` transitions** — When interrupt wins the CAS before `postStarted()` is called, `ExecuteEventsManager._status` is still `Pending`. The `postCanceled()` call from `ErrorUtils.handleError` needs to transition from `Pending` to `Canceled`. Similarly, `postFailed()` needs to handle the case where `postStarted()` itself throws an exception (e.g., session state check failure) while `_status` is still `Pending`.

3. **Remove plan validation from `postStarted()`** — `postStarted()` previously threw `UnsupportedOperationException` for unknown `OpTypeCase` values (e.g., `OPTYPE_NOT_SET`). This was an implicit validation that doesn't belong in `postStarted()`, whose responsibility is status transition and listener event firing. The `case _` branch now falls back to `request.getPlan` instead of throwing, since the `plan` variable is only used for generating the `statement` text in the listener event. Actual plan validation is handled by `executeInternal()`.

4. **Add early plan validation in `createExecuteHolderAndAttach`** — Since `postStarted()` was moved into `executeInternal()` (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in `postStarted()` would now fail asynchronously inside the execution thread. This means the existing `catch` block in `createExecuteHolderAndAttach` — which calls `removeExecuteHolder` to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit `OpTypeCase` validation is added before `start()`, ensuring that invalid plans are still caught synchronously and the holder is properly removed from the `executions` map.

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Add  new tests.
I also confirmed that `SparkSessionE2ESuite` mentioned above succeeded.

### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6

Closes #54774 from sarutak/SPARK-53339-2.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
(cherry picked from commit 09979af)
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
@sarutak
Copy link
Member Author

sarutak commented Mar 16, 2026

Merged to master, branch-4.1 and branch-4.0. Thank you, @HyukjinKwon .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants