Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/cn/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ linux一般使用non-blocking IO提高IO并发度。当IO并发度很低时,no

由于epoll的[一个bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/)(开发brpc时仍有)及epoll_ctl较大的开销,EDISP使用Edge triggered模式。当收到事件时,EDISP给一个原子变量加1,只有当加1前的值是0时启动一个bthread处理对应fd上的数据。在背后,EDISP把所在的pthread让给了新建的bthread,使其有更好的cache locality,可以尽快地读取fd上的数据。而EDISP所在的bthread会被偷到另外一个pthread继续执行,这个过程即是bthread的work stealing调度。要准确理解那个原子变量的工作方式可以先阅读[atomic instructions](atomic_instructions.md),再看[Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp)。这些方法使得brpc读取同一个fd时产生的竞争是[wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的。

在当前实现里,`Transport::ProcessEvent` 会按 `EventDispatcherUnsched()` 选择启动方式:返回 `false` 时走 `bthread_start_urgent`,返回 `true` 时走 `bthread_start_background`。此外,RDMA 在轮询模式与事件模式对 `last_msg` 的处理不同:`rdma_use_polling=false` 时不会在 `RdmaTransport::QueueMessage` 里处理 `last_msg`,轮询模式下会继续处理。

[InputMessenger](https://github.com/apache/brpc/blob/master/src/brpc/input_messenger.h)负责从fd上切割和处理消息,它通过用户回调函数理解不同的格式。Parse一般是把消息从二进制流上切割下来,运行时间较固定;Process则是进一步解析消息(比如反序列化为protobuf)后调用用户回调,时间不确定。若一次从某个fd读取出n个消息(n > 1),InputMessenger会启动n-1个bthread分别处理前n-1个消息,最后一个消息则会在原地被Process。InputMessenger会逐一尝试多种协议,由于一个连接上往往只有一种消息格式,InputMessenger会记录下上次的选择,而避免每次都重复尝试。

可以看到,fd间和fd内的消息都会在brpc中获得并发,这使brpc非常擅长大消息的读取,在高负载时仍能及时处理不同来源的消息,减少长尾的存在。
Expand Down
25 changes: 23 additions & 2 deletions docs/cn/rdma.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,27 @@ RDMA要求数据收发所使用的内存空间必须被注册(memory register

RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。

RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。在配合使用spdk等驱动的时候,因为spdk只支持轮询模式,并且只能在单线程使用(或者叫Run To Completion模式上使用)执行一个任务过程中不允许被调度到别的线程上,所以这时候需要设置(rdma_edisp_unsched)为true,使事件驱动程序一直占用一个worker线程,不能调度别的任务。
RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通过设置rdma_use_polling可以开启轮询模式。轮询模式下还可以设置轮询器数目(rdma_poller_num),以及是否主动放弃CPU(rdma_poller_yield)。轮询模式下还可以设置一个回调函数,在每次轮询时调用,可以配合io_uring/spdk等使用。

`event_dispatcher_edisp_unsched` 是全局开关,同时影响普通模式(TCP)和 RDMA 模式的 EventDispatcher 调度行为。
它用于替代 `rdma_edisp_unsched`。当前保留 `rdma_edisp_unsched` 仅用于兼容历史命令行,未来版本会移除。两者语义一致:值为 `true` 时都表示 EventDispatcher 不可被调度。

历史说明:之前 RDMA 路径里出现过一次 `if` 判断 bug,导致行为和 flag 语义不一致;当前逻辑已修复,并按统一语义生效。

最终生效条件统一为:
`event_dispatcher_edisp_unsched || rdma_edisp_unsched`

启动时不会再改写用户传入的 flag,运行时严格按用户配置值生效。

推荐使用方式:
1. 新部署:只配置 `event_dispatcher_edisp_unsched`。
2. 存量部署:`rdma_edisp_unsched` 仅作过渡兼容,逐步迁移到 `event_dispatcher_edisp_unsched`。
3. 避免脚本中给出“冲突值”;在统一 OR 语义下,只要任一 flag 为 `true`,EventDispatcher 就不可调度。

行为示例:
1. 仅设置 `-rdma_edisp_unsched=true`:`rdma_edisp_unsched=true`、`event_dispatcher_edisp_unsched=false`;TCP和RDMA均不可调度。
2. 仅设置 `-event_dispatcher_edisp_unsched=true`:`rdma_edisp_unsched=false`、`event_dispatcher_edisp_unsched=true`;TCP和RDMA均不可调度。
3. 同时设置 `-rdma_edisp_unsched=true -event_dispatcher_edisp_unsched=false`:`rdma_edisp_unsched=true`、`event_dispatcher_edisp_unsched=false`;TCP和RDMA均不可调度。

# 参数

Expand All @@ -73,5 +93,6 @@ RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通
* rdma_use_polling: 是否使用RDMA的轮询模式,默认false。
* rdma_poller_num: 轮询模式下的poller数目,默认1。
* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false。
* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false。
* event_dispatcher_edisp_unsched: 全局开关,控制EventDispatcher是否不可被调度(true时不可调度),默认是false。
* rdma_edisp_unsched: 废弃兼容参数(未来版本计划移除)。当前仍参与统一生效判断,默认是false。
* rdma_disable_bthread: 禁用bthread,默认是false。
2 changes: 2 additions & 0 deletions docs/en/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ A message is a bounded binary data read from a connection, which may be a reques

Because of a [bug](https://web.archive.org/web/20150423184820/https://patchwork.kernel.org/patch/1970231/) of epoll (at the time of developing brpc) and overhead of epoll_ctl, edge triggered mode is used in EDISP. After receiving an event, an atomic variable associated with the fd is added by one atomically. If the variable is zero before addition, a bthread is started to handle the data from the fd. The pthread worker in which EDISP runs is yielded to the newly created bthread to make it start reading ASAP and have a better cache locality. The bthread in which EDISP runs will be stolen to another pthread and keep running, this mechanism is work stealing used in bthreads. To understand exactly how that atomic variable works, you can read [atomic instructions](atomic_instructions.md) first, then check [Socket::StartInputEvent](https://github.com/apache/brpc/blob/master/src/brpc/socket.cpp). These methods make contentions on dispatching events of one fd [wait-free](http://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom).

In current implementation, `Transport::ProcessEvent` chooses start mode based on `EventDispatcherUnsched()`: `false` uses `bthread_start_urgent`, and `true` uses `bthread_start_background`. In addition, RDMA handles `last_msg` differently between polling and event modes: when `rdma_use_polling=false`, `RdmaTransport::QueueMessage` does not process `last_msg`; in polling mode it continues to process it.

[InputMessenger](https://github.com/apache/brpc/blob/master/src/brpc/input_messenger.h) cuts messages and uses customizable callbacks to handle different format of data. `Parse` callback cuts messages from binary data and has relatively stable running time; `Process` parses messages further(such as parsing by protobuf) and calls users' callbacks, which vary in running time. If n(n > 1) messages are read from the fd, InputMessenger launches n-1 bthreads to handle first n-1 messages respectively, and processes the last message in-place. InputMessenger tries protocols one by one. Since one connections often has only one type of messages, InputMessenger remembers current protocol to avoid trying for protocols next time.

It can be seen that messages from different fds or even same fd are processed concurrently in brpc, which makes brpc good at handling large messages and reducing long tails on processing messages from different sources under high workloads.
Expand Down
23 changes: 22 additions & 1 deletion docs/en/rdma.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ The application can manage memory by itself and send data with IOBuf::append_use

RDMA is hardware-related. It has some different concepts such as device, port, GID, LID, MaxSge and so on. These parameters can be read from NICs at initialization, and brpc will make the default choice (see src/brpc/rdma/rdma_helper.cpp). Sometimes the default choice is not the expectation, then it can be changed in the flag way.

`event_dispatcher_edisp_unsched` is a global flag and affects EventDispatcher scheduling in both normal mode (TCP) and RDMA mode.
It replaces `rdma_edisp_unsched`. `rdma_edisp_unsched` is still kept only for command-line compatibility and is planned for removal in a future release. The two flags have the same semantics: `true` means EventDispatcher is unschedulable.

Historical note: there was a previous `if`-condition bug on the RDMA path, where behavior did not match the flag semantics. The logic is now fixed and follows the unified semantics.

The effective unsched condition is unified as:
`event_dispatcher_edisp_unsched || rdma_edisp_unsched`

No startup synchronization rewrites user flags. Runtime behavior is determined directly from user-provided values.

Recommended usage:
1. New deployment: set only `event_dispatcher_edisp_unsched`.
2. Existing deployment: keep `rdma_edisp_unsched` temporarily, but migrate to `event_dispatcher_edisp_unsched`.
3. Avoid conflicting values in scripts; with unified OR semantics, either flag being `true` makes EventDispatcher unschedulable.

Examples:
1. Only `-rdma_edisp_unsched=true`: `rdma_edisp_unsched=true`, `event_dispatcher_edisp_unsched=false`; both TCP and RDMA are unschedulable.
2. Only `-event_dispatcher_edisp_unsched=true`: `rdma_edisp_unsched=false`, `event_dispatcher_edisp_unsched=true`; both TCP and RDMA are unschedulable.
3. Both `-rdma_edisp_unsched=true -event_dispatcher_edisp_unsched=false`: `rdma_edisp_unsched=true`, `event_dispatcher_edisp_unsched=false`; both TCP and RDMA are unschedulable.

# Parameters

Configurable parameters:
Expand All @@ -71,5 +91,6 @@ Configurable parameters:
* rdma_use_polling: Whether to use RDMA polling mode, default is false.
* rdma_poller_num: The number of pollers in polling mode, default is 1.
* rdma_poller_yield: Whether pollers in polling mode voluntarily relinquish the CPU, default is false.
* rdma_edisp_unsched`: Prevents the event driver from being scheduled, default is false.
* event_dispatcher_edisp_unsched: Global switch for EventDispatcher scheduling (true means unschedulable), default is false.
* rdma_edisp_unsched: Deprecated compatibility flag (planned removal in a future release). It still participates in unified unsched condition, default is false.
* rdma_disable_bthread: Disables bthread, default is false.
19 changes: 19 additions & 0 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ DECLARE_int32(task_group_ntags);
namespace brpc {

DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
DEFINE_bool(event_dispatcher_edisp_unsched, false,
"Disable event dispatcher schedule");

#if BRPC_WITH_RDMA
namespace rdma {
DEFINE_bool(rdma_edisp_unsched, false,
"Deprecated and will be removed in a future release, "
"use event_dispatcher_edisp_unsched instead");
} // namespace rdma
#endif

DEFINE_bool(usercode_in_pthread, false,
"Call user's callback in pthreads, use bthreads otherwise");
Expand All @@ -41,6 +51,15 @@ static bvar::LatencyRecorder* g_edisp_read_lantency = NULL;
static bvar::LatencyRecorder* g_edisp_write_lantency = NULL;
static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;

bool EventDispatcherUnsched() {
#if BRPC_WITH_RDMA
return FLAGS_event_dispatcher_edisp_unsched ||
rdma::FLAGS_rdma_edisp_unsched;
#else
return FLAGS_event_dispatcher_edisp_unsched;
#endif
}
Comment on lines 32 to +61
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventDispatcherUnsched() and the new unified flag introduce non-trivial scheduling behavior changes, but there are no unit tests in the repository exercising the new flag/legacy-flag interaction or the urgent/background selection. Please add/adjust tests (e.g. in test/brpc_event_dispatcher_unittest.cpp) to cover: unified flag toggling, legacy rdma flag behavior when BRPC_WITH_RDMA, and that Tcp/Rdma ProcessEvent choose the intended bthread_start_* path.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanglimingcn the evaluation here isn't very meaningful; I think your review would suffice.


static void StopAndJoinGlobalDispatchers() {
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
Expand Down
13 changes: 11 additions & 2 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
#ifndef BRPC_EVENT_DISPATCHER_H
#define BRPC_EVENT_DISPATCHER_H

#include <gflags/gflags_declare.h> // DECLARE_bool
#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bthread/types.h" // bthread_t, bthread_attr_t
#include "brpc/versioned_ref_with_id.h"


namespace brpc {

DECLARE_bool(event_dispatcher_edisp_unsched);

// Unique identifier of a IOEventData.
// Users shall store EventDataId instead of EventData and call EventData::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
Expand Down Expand Up @@ -87,8 +90,9 @@ namespace rdma {
class RdmaEndpoint;
}

// Dispatch edge-triggered events of file descriptors to consumers
// running in separate bthreads.
// Dispatch edge-triggered events of file descriptors to consumers.
// By default callbacks run in spawned bthreads; when usercode-in-coroutine is
// enabled, the callback may run inline in the current coroutine.
class EventDispatcher {
friend class Socket;
friend class rdma::RdmaEndpoint;
Expand Down Expand Up @@ -188,6 +192,11 @@ template <typename T> friend class IOEvent;

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);

// Unified unsched switch for transport layer.
// false -> urgent start (foreground scheduling before caller continues),
// true -> background start (allowing schedule away).
bool EventDispatcherUnsched();

// IOEvent class manages the IO events of a file descriptor conveniently.
template <typename T>
class IOEvent {
Expand Down
1 change: 0 additions & 1 deletion src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate);
DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA.");
DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode.");
DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode.");
DEFINE_bool(rdma_edisp_unsched, false, "Disable event dispatcher schedule");
DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA");

static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent
Expand Down
1 change: 0 additions & 1 deletion src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace rdma {

DECLARE_bool(rdma_use_polling);
DECLARE_int32(rdma_poller_num);
DECLARE_bool(rdma_edisp_unsched);
DECLARE_bool(rdma_disable_bthread);

class RdmaConnect : public AppConnect {
Expand Down
9 changes: 5 additions & 4 deletions src/brpc/rdma_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#if BRPC_WITH_RDMA

#include "brpc/rdma_transport.h"
#include "brpc/event_dispatcher.h"
#include "brpc/tcp_transport.h"
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/rdma/rdma_helper.h"
Expand Down Expand Up @@ -127,13 +128,13 @@ void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
bthread_t tid;
if (FLAGS_usercode_in_coroutine) {
OnEdge(_socket);
} else if (rdma::FLAGS_rdma_edisp_unsched == false) {
auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket);
} else if (!EventDispatcherUnsched()) {
auto rc = bthread_start_urgent(&tid, &attr, OnEdge, _socket);
if (rc != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
OnEdge(_socket);
}
} else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
} else if (bthread_start_background(&tid, &attr, OnEdge, _socket) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
OnEdge(_socket);
}
Expand Down Expand Up @@ -235,4 +236,4 @@ bool RdmaTransport::OptionsAvailableOverRdma(const ServerOptions* opt) {
return true;
}
} // namespace brpc
#endif
#endif
11 changes: 9 additions & 2 deletions src/brpc/tcp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "brpc/tcp_transport.h"
#include "brpc/event_dispatcher.h"

namespace brpc {
DECLARE_bool(usercode_in_coroutine);
Expand Down Expand Up @@ -68,7 +69,13 @@ void TcpTransport::ProcessEvent(bthread_attr_t attr) {
bthread_t tid;
if (FLAGS_usercode_in_coroutine) {
OnEdge(_socket);
} else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
} else if (!EventDispatcherUnsched()) {
auto rc = bthread_start_urgent(&tid, &attr, OnEdge, _socket);
if (rc != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
OnEdge(_socket);
}
} else if (bthread_start_background(&tid, &attr, OnEdge, _socket) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
OnEdge(_socket);
}
Expand Down Expand Up @@ -96,4 +103,4 @@ void TcpTransport::QueueMessage(InputMessageClosure& input_msg,
}
}

} // namespace brpc
} // namespace brpc