[None][feat] Fully non-blocking pipeline parallelism executor loop.#10349
[None][feat] Fully non-blocking pipeline parallelism executor loop.#10349yuxianq merged 1 commit intoNVIDIA:mainfrom
Conversation
|
/bot run --disable-fail-fast |
|
PR_Github #30199 [ run ] triggered by Bot. Commit: |
|
PR_Github #30199 [ run ] completed with state
|
aeca707 to
593f514
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #30276 [ run ] triggered by Bot. Commit: |
|
PR_Github #30276 [ run ] completed with state
|
d7b0ea8 to
715b997
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #30707 [ run ] triggered by Bot. Commit: |
|
PR_Github #30707 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #30820 [ run ] triggered by Bot. Commit: |
|
PR_Github #30820 [ run ] completed with state
|
5212efa to
4d06e37
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #31939 [ run ] triggered by Bot. Commit: |
📝 WalkthroughWalkthroughThe pull request introduces diff tracking in PyResult for incremental output changes, refactors micro-batch scheduling with per-microbatch communication handles in the PP pipeline, adds thread-local MPI communicator support, wraps MPI sub-communicators for serialization, and adds profiling instrumentation and utility formatting functions. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@tensorrt_llm/_utils.py`:
- Around line 507-518: Remove the unused local_mpi_comm function and the
local_comm variable to avoid caching a module-level MPI communicator that
bypasses thread-local overrides; specifically delete the local_comm assignment
and the def local_mpi_comm(...) helper and any references to them, leaving
mpi_comm(), set_thread_local_mpi_comm(), and thread_local_comm intact so all
dynamic calls continue to honor thread-local overrides.
🧹 Nitpick comments (4)
tensorrt_llm/llmapi/disagg_utils.py (1)
282-283: Update the return type annotation to reflect the actual return type.The function signature declares the return type as
Tuple[bool, int, Comm], but the function actually returnsTuple[bool, int, pkl5.Intracomm](line 323). The return type annotation should be updated to match the implementation and align with how the globalcommis consistently initialized withpkl5.Intracommelsewhere in the codebase.Suggested fix
-def split_world_comm( - server_configs: List[CtxGenServerConfig]) -> Tuple[bool, int, Comm]: +def split_world_comm( + server_configs: List[CtxGenServerConfig]) -> Tuple[bool, int, pkl5.Intracomm]:tensorrt_llm/_torch/utils.py (1)
410-431: Guard empty tensors (and make CPU sampling explicit) to avoid crashes/surprising sync.max()/min()will throw onnumel==0, and.tolist()on CUDA will sync anyway—better to make the copy explicit and bounded.Proposed patch
def tensor_to_str(x: torch.Tensor, num_elements: int = 10) -> str: # Pass num_elements=-1 will print the whole tensor - if num_elements < 0: - num_elements = torch.numel(x) - if x.dtype in (torch.int32, torch.int64): - float_x = x.to(dtype=float) + numel = x.numel() + if num_elements < 0: + num_elements = numel + if x.dtype in (torch.int32, torch.int64): + float_x = x.to(dtype=torch.float32) else: float_x = x + if numel == 0: + return ("Tensor(" + f"shape={tuple(x.shape)}, " + f"dtype={torch_dtype_to_str(x.dtype)}, " + f"device={x.device}, " + "stats=(empty), " + "values=[]" + ")") return ("Tensor(" f"shape={tuple(x.shape)}, " f"dtype={torch_dtype_to_str(x.dtype)}, " f"device={x.device}, " f"stats=(" f"abs_mean={float_x.abs().mean().item():.3f}, " f"mean={float_x.mean().item():.3f}, " f"std={float_x.std().item():.3f}, " - f"max={x.max().item():.3f}, " - f"min={x.min().item():.3f}" + f"max={float_x.max().item():.3f}, " + f"min={float_x.min().item():.3f}" "), " - f"values={x.flatten()[:num_elements].tolist()}" + f"values={x.flatten()[:num_elements].detach().cpu().tolist()}" ")")tensorrt_llm/_torch/pyexecutor/llm_request.py (2)
230-248: Diff structure is reasonable; consider clarifying CPU/device expectations for tensor fields. Right nowDiffcan transiently hold CUDA tensors untilget_diff()runs; if any caller forgets to callget_diff()before transport, you’ll get non-serializable payloads.
299-333: Makeget_diff()/apply_diff()more defensive (optional), to avoid future flag mismatches. Today it relies on “all ranks constructed PyResult with identical return_* flags”; a small divergence will become an AttributeError.Sketch
def get_diff(self) -> Diff: - for i, context_logits in enumerate(self.diff.context_logits_list): - self.diff.context_logits_list[i] = context_logits.to("cpu") + for i, context_logits in enumerate(self.diff.context_logits_list): + if isinstance(context_logits, torch.Tensor) and context_logits.is_cuda: + self.diff.context_logits_list[i] = context_logits.to("cpu", non_blocking=True) for i, generation_logits in enumerate(self.diff.generation_logits_list): - self.diff.generation_logits_list[i] = generation_logits.to("cpu") + if isinstance(generation_logits, torch.Tensor) and generation_logits.is_cuda: + self.diff.generation_logits_list[i] = generation_logits.to("cpu", non_blocking=True) return self.diff def apply_diff(self, diff: Diff): if diff.exclude_last_generation_logits is not None: self._exclude_last_generation_logits = diff.exclude_last_generation_logits if len(diff.context_logits_list) > 0: - for context_logits in diff.context_logits_list: - self._context_logits.append(context_logits) + if self._context_logits is not None: + for context_logits in diff.context_logits_list: + self._context_logits.append(context_logits) if len(diff.generation_logits_list) > 0: - for generation_logits in diff.generation_logits_list: - self._generation_logits.append(generation_logits) + if self._generation_logits is not None: + for generation_logits in diff.generation_logits_list: + self._generation_logits.append(generation_logits) if diff.reset_log_probs is not None: - self._log_probs.set_log_probs(*diff.reset_log_probs) + if self._log_probs is not None: + self._log_probs.set_log_probs(*diff.reset_log_probs) if len(diff.log_probs_list) > 0: - for log_probs, cum_log_probs in diff.log_probs_list: - self._log_probs.append(log_probs, cum_log_probs) + if self._log_probs is not None: + for log_probs, cum_log_probs in diff.log_probs_list: + self._log_probs.append(log_probs, cum_log_probs)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/_torch/pyexecutor/llm_request.pytensorrt_llm/_torch/pyexecutor/py_executor.pytensorrt_llm/_torch/utils.pytensorrt_llm/_utils.pytensorrt_llm/llmapi/disagg_utils.py
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
**/*.py: The code developed for TensorRT-LLM should conform to Python 3.8+
Indent Python code with 4 spaces. Do not use tabs
Always maintain the namespace when importing Python modules, even if only one class or function from a module is used
Python filenames should use snake_case (e.g.,some_file.py)
Python classes should use PascalCase (e.g.,class SomeClass)
Python functions and methods should use snake_case (e.g.,def my_awesome_function():)
Python local variables should use snake_case, with prefixkfor variable names that start with a number (e.g.,k_99th_percentile)
Python global variables should use upper snake_case with prefixG(e.g.,G_MY_GLOBAL)
Python constants should use upper snake_case (e.g.,MY_CONSTANT)
Avoid shadowing variables declared in an outer scope in Python
Initialize all externally visible members of a Python class in the constructor
For Python interfaces that may be used outside a file, prefer docstrings over comments
Use comments in Python for code within a function, or interfaces that are local to a file
Use Google-style docstrings for Python classes and functions, which can be parsed by Sphinx
Python attributes and variables can be documented inline with the format"""<type>: Description"""
Avoid using reflection in Python when functionality can be easily achieved without reflection
When using try-except blocks in Python, limit the except clause to the smallest set of errors possible
When using try-except blocks in Python to handle multiple possible variable types (duck-typing), keep the body of the try as small as possible and use the else block for the main logic
Files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/_torch/utils.pytensorrt_llm/llmapi/disagg_utils.pytensorrt_llm/_torch/pyexecutor/llm_request.pytensorrt_llm/_utils.pytensorrt_llm/_torch/pyexecutor/py_executor.py
**/*.{cpp,cc,cxx,h,hpp,hxx,cu,cuh,py}
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
All TensorRT-LLM source files (.cpp, .h, .cu, .py, and other source files) should contain an NVIDIA copyright header with the year of latest meaningful modification
Files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/_torch/utils.pytensorrt_llm/llmapi/disagg_utils.pytensorrt_llm/_torch/pyexecutor/llm_request.pytensorrt_llm/_utils.pytensorrt_llm/_torch/pyexecutor/py_executor.py
🧠 Learnings (6)
📚 Learning: 2025-12-12T03:27:18.859Z
Learnt from: tongyuantongyu
Repo: NVIDIA/TensorRT-LLM PR: 9655
File: tensorrt_llm/_torch/pyexecutor/sampler.py:3031-3031
Timestamp: 2025-12-12T03:27:18.859Z
Learning: In tensorrt_llm/_torch/pyexecutor/sampler.py, when reviewing code that iterates through requests, ensure it does not convert excessive data into Python lists. Instead, the code should use torch.gather or indexing to gather only the data that will be used in the for loop before converting to Python lists. This minimizes data movement and improves performance.
Applied to files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/_torch/pyexecutor/py_executor.py
📚 Learning: 2025-09-23T14:58:05.372Z
Learnt from: nv-lschneider
Repo: NVIDIA/TensorRT-LLM PR: 7910
File: cpp/tensorrt_llm/kernels/nccl_device/config.cu:42-49
Timestamp: 2025-09-23T14:58:05.372Z
Learning: In TensorRT-LLM NCCL device kernels (cpp/tensorrt_llm/kernels/nccl_device/), the token partitioning intentionally uses ceil-like distribution (same token_per_rank for all ranks) to ensure all ranks launch the same number of blocks. This is required for optimal NCCL device API barrier performance, even though it may launch extra blocks for non-existent tokens on later ranks. Runtime bounds checking in the kernel (blockID validation) handles the overshoot cases.
Applied to files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.py
📚 Learning: 2025-09-02T13:42:44.885Z
Learnt from: pcastonguay
Repo: NVIDIA/TensorRT-LLM PR: 7455
File: tensorrt_llm/_torch/pyexecutor/py_executor.py:1852-1860
Timestamp: 2025-09-02T13:42:44.885Z
Learning: In MPI communication within TensorRT-LLM pipeline parallelism, different communication types (tokens, logits, termination sync) must use disjoint tag namespaces to avoid message routing collisions when using the same source/destination patterns.
Applied to files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/llmapi/disagg_utils.pytensorrt_llm/_utils.pytensorrt_llm/_torch/pyexecutor/py_executor.py
📚 Learning: 2025-12-12T03:27:08.565Z
Learnt from: tongyuantongyu
Repo: NVIDIA/TensorRT-LLM PR: 9655
File: tensorrt_llm/_torch/pyexecutor/sampler.py:3031-3031
Timestamp: 2025-12-12T03:27:08.565Z
Learning: In files under tensorrt_llm/_torch/pyexecutor, avoid accessing torch.Tensor objects inside for-loops when iterating over requests. Convert batched tensors to Python lists beforehand using tensor.tolist(), and then iterate over those lists. This improves performance by reducing tensor-bound operations inside hot loops. Apply this pattern to similar code paths that process batches to access simple Python data structures (lists) inside loops.
Applied to files:
tensorrt_llm/_torch/pyexecutor/executor_request_queue.pytensorrt_llm/_torch/pyexecutor/llm_request.pytensorrt_llm/_torch/pyexecutor/py_executor.py
📚 Learning: 2025-08-09T02:04:49.623Z
Learnt from: Fridah-nv
Repo: NVIDIA/TensorRT-LLM PR: 6760
File: tensorrt_llm/_torch/auto_deploy/models/quant_config_reader.py:81-98
Timestamp: 2025-08-09T02:04:49.623Z
Learning: In TensorRT-LLM's auto_deploy module, torch.dtype values in configuration dictionaries must be stored as string representations (e.g., "float16" instead of torch.float16) because OmegaConf.merge does not support torch.dtype types. These string representations are converted to actual torch.dtype objects in downstream code.
Applied to files:
tensorrt_llm/_torch/utils.py
📚 Learning: 2025-08-19T12:45:11.997Z
Learnt from: amitz-nv
Repo: NVIDIA/TensorRT-LLM PR: 7033
File: tensorrt_llm/_torch/pyexecutor/model_engine.py:0-0
Timestamp: 2025-08-19T12:45:11.997Z
Learning: In tensorrt_llm/_torch/pyexecutor/model_engine.py, DoRA (Delta Orthogonal Rank Adaptation) functionality was removed from the PyTorch flow to eliminate issues with inverted DoRA detection logic. The original is_dora condition was checking if scaling_vec_pointer == 0, which was potentially incorrect.
Applied to files:
tensorrt_llm/_torch/pyexecutor/py_executor.py
🧬 Code graph analysis (4)
tensorrt_llm/_torch/pyexecutor/executor_request_queue.py (2)
tensorrt_llm/_utils.py (1)
nvtx_range(911-930)tensorrt_llm/_torch/distributed/communicator.py (1)
tp_cp_broadcast(188-198)
tensorrt_llm/_torch/utils.py (1)
tensorrt_llm/_utils.py (7)
TensorWrapper(995-1062)torch_dtype_to_str(233-234)numel(1041-1042)dtype(1017-1018)dtype(1025-1035)shape(1021-1022)shape(1038-1039)
tensorrt_llm/_torch/pyexecutor/llm_request.py (2)
tensorrt_llm/executor/result.py (1)
context_logits(233-234)tensorrt_llm/_torch/pyexecutor/executor_request_queue.py (1)
set_exclude_last_generation_logits(837-851)
tensorrt_llm/_torch/pyexecutor/py_executor.py (4)
tensorrt_llm/_utils.py (4)
mpi_comm(514-518)nvtx_range(911-930)set_thread_local_mpi_comm(510-511)trace_func(797-838)tensorrt_llm/_torch/expert_statistic.py (2)
ExpertStatistic(10-101)get(14-15)tensorrt_llm/_torch/pyexecutor/resource_manager.py (3)
ResourceManager(1389-1436)ResourceManagerType(50-55)request_context(118-142)tensorrt_llm/_torch/pyexecutor/scheduler.py (3)
SerializableSchedulerOutput(69-120)from_scheduler_result(81-100)to_scheduler_result(102-120)
🪛 Ruff (0.14.11)
tensorrt_llm/_torch/pyexecutor/py_executor.py
1310-1310: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Pre-commit Check
🔇 Additional comments (4)
tensorrt_llm/llmapi/disagg_utils.py (1)
8-8: LGTM on import addition.The
pkl5import frommpi4py.utilis appropriate for enabling pickle protocol 5 support for large object serialization in MPI communication.tensorrt_llm/_torch/pyexecutor/executor_request_queue.py (1)
603-606: NVTX range around TP/CP broadcast looks good. Adds useful profiling signal without changing semantics.tensorrt_llm/_torch/pyexecutor/py_executor.py (2)
69-74: Nice: explicit PP comm tags reduce collision risk. This aligns with the “disjoint tag namespaces” requirement for PP/MPI traffic. Based on learnings, this is the right direction.
1283-1292: MPI comm duplication per thread makes sense; please verify mpi4py/pkl5 semantics. In particular: confirmDup()/Free()in this thread doesn’t interact badly with process shutdown paths or withENABLE_MULTI_DEVICE=Falseconfigurations.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
PR_Github #31939 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #32106 [ run ] triggered by Bot. Commit: |
|
/bot run --disable-fail-fast |
|
PR_Github #35074 [ run ] triggered by Bot. Commit: |
|
PR_Github #35074 [ run ] completed with state |
|
/bot run --disable-fail-fast |
|
PR_Github #35090 [ run ] triggered by Bot. Commit: |
|
PR_Github #35090 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #35173 [ run ] triggered by Bot. Commit: |
|
PR_Github #35173 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #35187 [ run ] triggered by Bot. Commit: |
|
PR_Github #35187 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #35262 [ run ] triggered by Bot. Commit: |
|
PR_Github #35262 [ run ] completed with state
|
Signed-off-by: Yuxian Qiu <142763828+yuxianq@users.noreply.github.com>
910624d to
b5c4d6e
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #35316 [ run ] triggered by Bot. Commit: |
|
PR_Github #35316 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #35387 [ run ] triggered by Bot. Commit: |
|
PR_Github #35387 [ run ] completed with state |
|
/bot run --disable-fail-fast |
|
PR_Github #35414 [ run ] triggered by Bot. Commit: |
|
PR_Github #35414 [ run ] completed with state |
Description
In pp executor loop, there are two kinds of data flows between ranks:
We interlace request/response flows in the same executor loop.
Since we always wait sample state after n iteration since we submit its forward step, the last rank will block rank 0's executor loop if the forward steps of different ranks are uneven.
By executing response flows in a background thread, this PR enables a fully non-blocking pp executor loop.
In pp executor loop, we only handle these responses when available in
_handle_executed_batch.To keep the execution state of different ranks in sync, we count the available responses on rank 0 and send the response number to other ranks, so that all ranks handle the same set of responses in the same iteration.
There are two response flows in the previous implementation:
In this PR, we merge these two response flows into a single flow, and send the diff of logits instead to reduce communication overhead.
This PR also contains the following improvements:
TLLM_PP_ASYNC_BROADCAST_SAMPLE_STATEto disable the non-blocking sample state broadcast, which is useful to get a deterministic result in unit tests or as fallback.MIN_ASYNC_MICRO_BATCH_NUM.PP_COMM_TAG_SAMPLE_STATEto avoid tag conflict. So asTERMINATION_COMM_TAG.tensor_to_strfor debugging.Test Coverage
PR Checklist
Please review the following before submitting your PR:
PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.
PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.
Test cases are provided for new code paths (see test instructions)
Any new dependencies have been scanned for license and vulnerabilities
CODEOWNERS updated if ownership changes
Documentation updated as needed
Update tava architecture diagram if there is a significant design change in PR.
The reviewers assigned automatically/manually are appropriate for the PR.
Please check this after reviewing the above items as appropriate for this PR.
GitHub Bot Help
/bot [-h] ['run', 'kill', 'skip', 'reuse-pipeline'] ...Provide a user friendly way for developers to interact with a Jenkins server.
Run
/bot [-h|--help]to print this help message.See details below for each supported subcommand.
Details
run [--reuse-test (optional)pipeline-id --disable-fail-fast --skip-test --stage-list "A10-PyTorch-1, xxx" --gpu-type "A30, H100_PCIe" --test-backend "pytorch, cpp" --add-multi-gpu-test --only-multi-gpu-test --disable-multi-gpu-test --post-merge --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx" --detailed-log --debug(experimental)]Launch build/test pipelines. All previously running jobs will be killed.
--reuse-test (optional)pipeline-id(OPTIONAL) : Allow the new pipeline to reuse build artifacts and skip successful test stages from a specified pipeline or the last pipeline if no pipeline-id is indicated. If the Git commit ID has changed, this option will be always ignored. The DEFAULT behavior of the bot is to reuse build artifacts and successful test results from the last pipeline.--disable-reuse-test(OPTIONAL) : Explicitly prevent the pipeline from reusing build artifacts and skipping successful test stages from a previous pipeline. Ensure that all builds and tests are run regardless of previous successes.--disable-fail-fast(OPTIONAL) : Disable fail fast on build/tests/infra failures.--skip-test(OPTIONAL) : Skip all test stages, but still run build stages, package stages and sanity check stages. Note: Does NOT update GitHub check status.--stage-list "A10-PyTorch-1, xxx"(OPTIONAL) : Only run the specified test stages. Examples: "A10-PyTorch-1, xxx". Note: Does NOT update GitHub check status.--gpu-type "A30, H100_PCIe"(OPTIONAL) : Only run the test stages on the specified GPU types. Examples: "A30, H100_PCIe". Note: Does NOT update GitHub check status.--test-backend "pytorch, cpp"(OPTIONAL) : Skip test stages which don't match the specified backends. Only support [pytorch, cpp, tensorrt, triton]. Examples: "pytorch, cpp" (does not run test stages with tensorrt or triton backend). Note: Does NOT update GitHub pipeline status.--only-multi-gpu-test(OPTIONAL) : Only run the multi-GPU tests. Note: Does NOT update GitHub check status.--disable-multi-gpu-test(OPTIONAL) : Disable the multi-GPU tests. Note: Does NOT update GitHub check status.--add-multi-gpu-test(OPTIONAL) : Force run the multi-GPU tests in addition to running L0 pre-merge pipeline.--post-merge(OPTIONAL) : Run the L0 post-merge pipeline instead of the ordinary L0 pre-merge pipeline.--extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx"(OPTIONAL) : Run the ordinary L0 pre-merge pipeline and specified test stages. Examples: --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx".--detailed-log(OPTIONAL) : Enable flushing out all logs to the Jenkins console. This will significantly increase the log volume and may slow down the job.--debug(OPTIONAL) : Experimental feature. Enable access to the CI container for debugging purpose. Note: Specify exactly one stage in thestage-listparameter to access the appropriate container environment. Note: Does NOT update GitHub check status.For guidance on mapping tests to stage names, see
docs/source/reference/ci-overview.mdand the
scripts/test_to_stage_mapping.pyhelper.kill
killKill all running builds associated with pull request.
skip
skip --comment COMMENTSkip testing for latest commit on pull request.
--comment "Reason for skipping build/test"is required. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.reuse-pipeline
reuse-pipelineReuse a previous pipeline to validate current commit. This action will also kill all currently running builds associated with the pull request. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.