[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472
[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472Pengzna wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses duplicate tsfile transfer in IoTConsensusV2 “batch mode” pipes by introducing a per-task-instance dedup scope shared between the realtime and historical sources, allowing the historical source to skip tsfiles already captured by the realtime source.
Changes:
- Add a per-task
tsFileDedupScopeIDand use it as the key for tsfile progress index tracking/dedup. - Register realtime-captured tsfiles into a scoped keeper and have the historical source skip duplicates under the same scope.
- Add unit tests covering scoped behavior (contains/check/clear) of the keeper.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java | Switch keeper keying from pipeName to task-scope ID; add scoped clear + contains APIs. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java | Generate/store task-scope ID and clear scoped keeper state on close; use scope in progress-index checks. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java | Register/eliminate tsfile progress index using the new task-scope ID. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java | Register/eliminate tsfile progress index using the new task-scope ID. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java | Compute matching task-scope ID and skip historical tsfile events already captured by realtime under that scope. |
| iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java | New unit tests verifying the keeper’s scope isolation and scoped clearing. |
Comments suppressed due to low confidence (1)
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java:83
isProgressIndexAfterOrEqualsname doesn’t match its predicate:!resource.getMaxProgressIndex().isAfter(progressIndex)is equivalent tomaxProgressIndex <= progressIndex(i.e., before or equal), not after or equal. This makes the API very easy to misuse. Please either invert the comparison to match the name, or rename the method to reflect the current behavior (and adjust tests/callers accordingly).
public synchronized boolean isProgressIndexAfterOrEquals(
final int dataRegionId,
final String taskScopeID,
final String tsFilePath,
final ProgressIndex progressIndex) {
return progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.entrySet()
.stream()
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
.filter(Objects::nonNull)
.anyMatch(resource -> !resource.getMaxProgressIndex().isAfter(progressIndex));
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public synchronized void eliminateProgressIndex( | ||
| final int dataRegionId, final @Nonnull String pipeName, final String filePath) { | ||
| final int dataRegionId, final @Nonnull String taskScopeID, final String filePath) { | ||
| progressIndexKeeper | ||
| .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) | ||
| .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) | ||
| .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) | ||
| .remove(filePath); | ||
| } |
There was a problem hiding this comment.
PipeTsFileEpochProgressIndexKeeper changed the eliminateProgressIndex signature from pipeName to taskScopeID, but there are still call sites passing pipeName (e.g., in PipeTsFileInsertionEvent and PipeCompactedTsFileInsertionEvent). This will break compilation and also prevents removing the correct scoped entry. Please update all remaining callers to pass the same task-scope ID used during registerProgressIndex, or provide a compatible overload if some callers cannot access the scope ID.
| public synchronized void clearProgressIndex( | ||
| final int dataRegionId, final @Nonnull String taskScopeID) { | ||
| progressIndexKeeper | ||
| .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) | ||
| .remove(taskScopeID); | ||
| } | ||
|
|
||
| public synchronized boolean containsTsFile( | ||
| final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) { | ||
| return progressIndexKeeper | ||
| .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) | ||
| .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) | ||
| .containsKey(tsFilePath); | ||
| } |
There was a problem hiding this comment.
clearProgressIndex/containsTsFile currently use computeIfAbsent, which creates new empty maps as a side effect (e.g., calling clearProgressIndex during close() when no tsfiles were ever registered will still allocate and retain an empty dataRegionId entry). Consider using get(...) with null checks instead, and optionally removing the dataRegionId entry when its scope map becomes empty, to avoid unbounded growth / unnecessary allocations.
| if (shouldSkipHistoricalTsFileEvent(resource)) { | ||
| filteredTsFileResources2TableNames.remove(resource); | ||
| LOGGER.info( | ||
| "Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.", | ||
| pipeName, | ||
| dataRegionId, | ||
| resource.getTsFilePath(), | ||
| tsFileDedupScopeID); | ||
| try { | ||
| return null; | ||
| } finally { | ||
| try { | ||
| PipeDataNodeResourceManager.tsfile() | ||
| .unpinTsFileResource(resource, shouldTransferModFile, pipeName); | ||
| } catch (final IOException e) { | ||
| LOGGER.warn( | ||
| "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", | ||
| pipeName, | ||
| dataRegionId, | ||
| resource.getTsFilePath()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
When a historical tsfile is skipped (shouldSkipHistoricalTsFileEvent), supplyTsFileEvent returns null. In the processor pipeline, a null supply result is treated as “no work” and causes the worker to sleep (see PipeProcessorSubtaskWorker.sleepIfNecessary), which can significantly slow historical consumption if there are multiple skipped duplicates. Consider continuing to poll the next PersistentResource within the same supply() call (or emitting an appropriate progress/report event) instead of returning null for a skipped resource.
| try { | ||
| PipeDataNodeResourceManager.tsfile() | ||
| .unpinTsFileResource(resource, shouldTransferModFile, pipeName); | ||
| } catch (final IOException e) { | ||
| LOGGER.warn( | ||
| "Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}", | ||
| pipeName, | ||
| dataRegionId, | ||
| resource.getTsFilePath()); |
There was a problem hiding this comment.
May provide the exception.
| public synchronized boolean containsTsFile( | ||
| final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) { | ||
| return progressIndexKeeper | ||
| .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) | ||
| .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>()) | ||
| .containsKey(tsFilePath); | ||
| } |
There was a problem hiding this comment.
No need to compute new entries when missing dataRegionId/taskScopeID, just returen false.
Why
In IoTConsensusV2 batch mode, the same logical tsfile can be observed by both the realtime source and the historical source in the same consensus pipe task. When that happens, the duplicated historical tsfile event may allocate another
replicateIndexfor an already captured file and cause redundant receiver-side seal/retry behavior.What Changed
Verification
mvn -pl iotdb-core/datanode -DskipITs -Dtest=PipeTsFileEpochProgressIndexKeeperTest testmvn clean package -DskipTests -T 1Cdata_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensusV2DEVICE_NUMBER=10,SENSOR_NUMBER=1,BATCH_SIZE_PER_WRITE=100000,LOOP=10iot-benchmark -> flushskip historical tsfile ... because realtime source in current task ... has already captured itwriting file null is not available,Failed to seal file, or2204errors were observed in this repro