[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670
[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670xumingming wants to merge 1 commit intoapache:mainfrom
Conversation
cf055e4 to
a4ee01e
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3670 +/- ##
==========================================
+ Coverage 66.91% 67.05% +0.15%
==========================================
Files 358 359 +1
Lines 21986 22197 +211
Branches 1946 1970 +24
==========================================
+ Hits 14710 14883 +173
- Misses 6262 6292 +30
- Partials 1014 1022 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Extends Celeborn’s E2E shuffle integrity “checked zone” upstream by computing per-batch CRC immediately after batch assembly (writer thread) rather than inside the async DataPusher pipeline, aiming to detect corruption occurring between batch assembly and async dispatch.
Changes:
- Adds
ShuffleClient.computeBatchCRC()and implements it inShuffleClientImpl(no-op inDummyShuffleClient). - Moves CRC accumulation call sites into Spark shuffle writers and
SortBasedPusherright before enqueue/push/merge. - Adds a unit test validating CRC accumulation behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java | Adds UT for computeBatchCRC() accumulation behavior. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Removes CRC accumulation from pushOrMergeData() and adds computeBatchCRC() implementation. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Introduces the new computeBatchCRC() API with Javadoc. |
| client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java | Implements new abstract method as a no-op. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Calls computeBatchCRC() for giant-record pushes. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Calls computeBatchCRC() before enqueue and per-partition final flush. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Calls computeBatchCRC() for giant-record pushes. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Calls computeBatchCRC() before enqueue and per-partition final flush. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java | Calls computeBatchCRC() before partition-change flush, overflow flush, and final flush. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /** | ||
| * Pre-compute CRC for a batch immediately after assembly in the writer, before the data enters | ||
| * the async push pipeline. Calling this prevents double-computation in pushOrMergeData. | ||
| */ | ||
| public abstract void computeBatchCRC( | ||
| int shuffleId, | ||
| int mapId, | ||
| int attemptId, | ||
| int partitionId, | ||
| byte[] data, | ||
| int offset, | ||
| int length); | ||
|
|
There was a problem hiding this comment.
The computeBatchCRC() Javadoc says calling it “prevents double-computation in pushOrMergeData”, but this PR removes CRC accumulation from pushOrMergeData entirely. Please update the comment to match the new semantics (i.e., computeBatchCRC is now required for integrity-check metadata unless a fallback remains in pushOrMergeData), otherwise API users will be misled.
There was a problem hiding this comment.
Good catch, fixed.
| // compute expected CRC for partition 1: two batches accumulated via CelebornCRC32.combine | ||
| crcUtil.reset(); | ||
| crcUtil.update(batch1a, 0, batch1a.length); | ||
| int crc1a = (int) crcUtil.getValue(); | ||
| crcUtil.reset(); | ||
| crcUtil.update(batch1b, 0, batch1b.length); | ||
| int crc1b = (int) crcUtil.getValue(); | ||
| // mirrors CelebornCRC32.combine(second, first): byte-wise addition | ||
| int expected1 = celebornCrcCombine(celebornCrcCombine(0, crc1a), crc1b); | ||
| assertEquals(expected1, crcPerPartition[1]); | ||
| assertEquals(batch1a.length + batch1b.length, bytesPerPartition[1]); | ||
| } | ||
|
|
||
| // mirrors CelebornCRC32.combine(second=newChecksum, first=runningVal) | ||
| private static int celebornCrcCombine(int runningVal, int newChecksum) { | ||
| return (((byte) newChecksum + (byte) runningVal) & 0xFF) | ||
| | ((((byte) (newChecksum >> 8) + (byte) (runningVal >> 8)) & 0xFF) << 8) | ||
| | ((((byte) (newChecksum >> 16) + (byte) (runningVal >> 16)) & 0xFF) << 16) | ||
| | (((byte) (newChecksum >> 24) + (byte) (runningVal >> 24)) << 24); |
There was a problem hiding this comment.
This test re-implements CelebornCRC32.combine logic in celebornCrcCombine(), which duplicates production code and makes the test brittle if the checksum combination strategy ever changes (also the “second/first” wording is confusing given the current CelebornCRC32.combine signature). Prefer deriving the expected checksum via existing public APIs (e.g., build a CommitMetadata and call addDataWithOffsetAndLength twice, then assert its checksum/bytes match PushState), so the test validates behavior without copying internal implementation details.
There was a problem hiding this comment.
Good catch, fixed.
| @@ -1044,12 +1044,6 @@ public int pushOrMergeData( | |||
| // increment batchId | |||
| final int nextBatchId = pushState.nextBatchId(); | |||
|
|
|||
There was a problem hiding this comment.
Removing commit-metadata/CRC tracking from pushOrMergeData means integrity-check metadata is now only updated when callers explicitly invoke computeBatchCRC(). There are existing call sites that still call pushData()/mergeData() directly without computeBatchCRC (e.g. client-tez/tez/CelebornTezWriter.mergeData, client-mr/mr/CelebornSortBasedPusher.pushData, client-spark/columnar/ColumnarHashBasedShuffleWriter.closeColumnarWrite mergeData), so enabling celeborn.client.shuffle.integrityCheck.enabled will silently stop producing correct per-partition CRC/bytes for those paths. Please either keep a fallback CRC accumulation in pushOrMergeData (only when not already precomputed) or update all remaining writers/clients that call pushData/mergeData to invoke computeBatchCRC before enqueue/push.
| // Preserve integrity-check metadata for callers that still invoke pushData()/mergeData() | |
| // directly without explicitly calling computeBatchCRC() first. This must run before | |
| // compression so CRC / bytes are recorded for the original batch payload. | |
| computeBatchCRC(shuffleId, mapId, attemptId, partitionId, nextBatchId, data, offset, length); |
There was a problem hiding this comment.
End-to-End Integrity currently does not support Tez/MR even before this change. If we want to add support, the changes will be similar.
|
Thanks for the fix! I have been thinking about this change as well. I am also interested in learning what other folks in the community think as well about the current explicit call approach from all writer call sites. |
|
One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data |
|
Thanks for this PR — the analysis of the coverage gap is spot-on. I have a suggestion on the API design that could make it more maintainable. Concern: Implicit CRC contractThe current approach requires every call site to remember Suggestion: Encapsulate CRC + push as atomic operationsIntroduce // ShuffleClient.java
public int pushDataWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] data, int offset, int length,
int numMappers, int numPartitions) throws IOException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
return pushData(shuffleId, mapId, attemptId, partitionId,
data, offset, length, numMappers, numPartitions);
}
public int mergeDataWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] data, int offset, int length,
int numMappers, int numPartitions) throws IOException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
return mergeData(shuffleId, mapId, attemptId, partitionId,
data, offset, length, numMappers, numPartitions);
}
// For the DataPusher path
public void addTaskWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] buffer, int size) throws InterruptedException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
}Then the 7 writer call sites each become a single call — no way to forget CRC: // Before (two operations, easy to miss CRC)
shuffleClient.computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
// After (one atomic operation)
shuffleClient.addTaskWithCRC(shuffleId, mapId, attemptId, partitionId, buffer, size);The bare This doesn't eliminate the 7 call sites, but it makes them self-contained and impossible to get wrong. The invariant shifts from "callers must remember to compute CRC first" (implicit, fragile) to "use the CRC-inclusive API" (explicit, hard to misuse). Other than that, a couple of minor notes on the test:
Review generated with the assistance of Claude AI |
@gauravkm interesting idea! Failing the task at the mapper side is far cheaper than discovering corruption at reducers and rerunning the entire job. We can explore this as a follow-up improvement. |
|
@RexXiong Thanks for the review. After some thinking, I find the suggestion you provided to be not feasible. The reason I want to separate CRC calculation from pushOrMergeData is because current CRC calculation is too late, so I separate it from pushOrMergeData to call it at a earlier call site. The call site could be:
For adding pushDataWithCRC to replace pushData, it is doable technically, but it is actually similar to original pushOrMergeData, which is where we were from. Even if we have added all the xxxWithCRC methods. The methods in ShuffleClient will be:
It seems more complicated than my current solution. How do you think? |
@gauravkm Interesting idea! Looking forward to detailed proposal. |
Celeborn's E2E integrity check computes CRC_M inside `ShuffleClientImpl.pushOrMergeData()`, which runs in the async `DataPusher` thread. This leaves the segment from batch assembly in the writer thread through the `DataPusher` queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently. This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references. Introduce `ShuffleClient.computeBatchCRC()` and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes: `HashBasedShuffleWriter` (spark-2/3) at `flushSendBuffer()`, `pushGiantRecord()`, and the per-partition flush in `close()`; and `SortBasedPusher` at the partition-change flush, buffer-overflow flush, `pushGiantRecord()`, and final flush. The now-redundant CRC computation inside `pushOrMergeData()` is removed. This approach is less elegant than the original design, which had a single CRC call site inside `pushOrMergeData()` — one place to reason about and maintain. The new design scatters `computeBatchCRC()` across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.
a4ee01e to
f3dd0f0
Compare
|
@RexXiong @SteNicholas Have made corresponding changes, please take a look again. |
There isn't a lot more to it. Essentially we keep both the existing checksum computation, and store the CRC computation being added from the writers (in this PR) separately. And then before we send metadata, we ensure that both the computations match. Otherwise we fail the task Essentially there are two layers (writer and shuffle client) computing their own checksums, and then shuffle client compares them before propagation at mapper end |
|
@RexXiong @SteNicholas Gentle ping :) |
What changes were proposed in this pull request?
Celeborn's E2E integrity check computes CRC_M inside
ShuffleClientImpl.pushOrMergeData(), which runs in the asyncDataPusherthread. This leaves the segment from batch assembly in the writer thread through theDataPusherqueue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references.
Introduce
ShuffleClient.computeBatchCRC()and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes:HashBasedShuffleWriter(spark-2/3) atflushSendBuffer(),pushGiantRecord(), and the per-partition flush inclose(); andSortBasedPusherat the partition-change flush, buffer-overflow flush,pushGiantRecord(), and final flush. The now-redundant CRC computation insidepushOrMergeData()is removed.This approach is less elegant than the original design, which had a single CRC call site inside
pushOrMergeData()— one place to reason about and maintain. The new design scatterscomputeBatchCRC()across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.Why are the changes needed?
Celeborn's E2E integrity check computes CRC_M inside
ShuffleClientImpl.pushOrMergeData(), which runs in the asyncDataPusherthread. This leaves the segment from batch assembly in the writer thread through theDataPusherqueue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.Does this PR resolve a correctness bug?
It could help us detect more correctness bug.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT