[CELEBORN-2312] Support committing uncommitted partitions for graceful shutdown#3668
[CELEBORN-2312] Support committing uncommitted partitions for graceful shutdown#3668SteNicholas wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in mechanism for Celeborn workers to proactively commit uncommitted partitions during graceful shutdown (to reduce shutdown latency), controlled by a new worker configuration flag.
Changes:
- Add
snapshotUncommittedUniqueIdsto snapshot uncommitted partition unique IDs (primary + replica) by shuffle key. - Add
Controller.commitUncommittedPartitions()and invoke it fromWorker.shutdownGracefully()when enabled. - Add new config
celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabledand document it; add unit tests for the new behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala |
Calls proactive commit during graceful shutdown when the new config is enabled. |
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala |
Implements proactive commit flow using existing commitFiles infrastructure and then removes/releases committed partitions. |
common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala |
Adds snapshot API for uncommitted partition IDs grouped by shuffle key. |
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala |
Introduces the new configuration entry and accessor. |
docs/configuration/worker.md |
Documents the new worker config flag. |
common/src/test/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfoSuite.scala |
Adds tests for the new snapshot behavior. |
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala |
Adds tests for proactive commit behavior and idempotency/failure retention. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
da4ba4c to
4104e87
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4104e87 to
629e0c0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
629e0c0 to
5ff02d4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@SteNicholas In this PR, are the committed files stored in the RocksDB by the storage manager? |
|
@FMX, both following steps are covered by existing code, and |
|
After some investigation, I think there is something wrong with this PR. |
|
Scenario : No restart, but worker proactively commits and clears partitions During graceful shutdown, Controller.commitUncommittedPartitions():
|
|
I think you'll need to extend the shuffleCommitInfos and persist it to make sure subsequent CommitFiles requests can be recognized as already committed. |
FMX
left a comment
There was a problem hiding this comment.
After careful review, I think this PR is not ready.
What changes were proposed in this pull request?
Support the worker to proactively commit uncommitted partitions during graceful shutdown, controlled by a new configuration
celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled(default false).Key changes:
WorkerPartitionLocationInfo#snapshotUncommittedUniqueIds: Takes a weakly-consistent, point-in-time snapshot of uncommitted partition unique IDs grouped by shuffle key (primary + replica). UsesConcurrentHashMapiteration semantics - concurrent mutations after the snapshot are not visible.Controller#commitUncommittedPartitions(): Snapshots all uncommitted partitions, commits them in parallel via the existing commitFiles thread pool, waits with shuffleCommitTimeout, then removes successfully committed partitions and releases slots. Failed partitions are intentionally retained so the existing passiveLifecycleManagerCommitFiles retry path can still handle them.Worker#shutdownGracefully(): InvokesController#commitUncommittedPartitions()aftershutdown.set(true)when the config is enabled.CelebornConf: New configceleborn.worker.graceful.shutdown.commitUncommittedPartitions.enabled(version 0.7.0, default false).Why are the changes needed?
During graceful shutdown, the worker currently waits passively for
LifecycleManagerto send CommitFiles RPCs. This introduces unnecessary shutdown latency in scenarios where:LifecycleManageris slow to react (e.g., under GC pressure or network delays).LifecycleManagerhas already deregistered the worker and will not send CommitFiles.By allowing the worker to proactively commit its own partitions, the graceful shutdown window can be significantly shortened while maintaining backward compatibility (opt-in, default off).
Does this PR resolve a correctness bug?
No.
Does this PR introduce any user-facing change?
Yes. A new configuration is introduced:
celeborn.worker.graceful.shutdown.commitUncommittedPartitions.enabledfalseHow was this patch tested?
WorkerPartitionLocationInfoSuitesnapshotUncommittedUniqueIds - empty info returns empty mapssnapshotUncommittedUniqueIds - captures correct IDs across shufflessnapshotUncommittedUniqueIds - filters empty shuffle keyssnapshotUncommittedUniqueIds - snapshot is a point-in-time copyWorkerSuitecommitUncommittedPartitions - commits primary and replica partitionscommitUncommittedPartitions - no-op when no partitionscommitUncommittedPartitions - idempotent on double callcommitUncommittedPartitions - retains failed partitions for passive waitcommitUncommittedPartitions - commits across multiple shuffle keyscommitUncommittedPartitions - no cross-shuffle uniqueId collisioncommitUncommittedPartitions - cross-shuffle collision with partial failure