Skip to content

Commit a583d8f

Browse files
author
ukumawat
committed
HBASE-29141 enforce max one call to initializeQueues
1 parent 243928e commit a583d8f

2 files changed

Lines changed: 15 additions & 8 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public abstract class RpcExecutor {
5858

5959
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
6060
protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f;
61+
protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1;
6162
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
6263
"hbase.ipc.server.callqueue.handler.factor";
6364

@@ -156,7 +157,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
156157

157158
// If soft limit of queue is not provided, then calculate using
158159
// DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER
159-
if (maxQueueLength == -1) {
160+
if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) {
160161
int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
161162
maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
162163
}
@@ -230,12 +231,16 @@ public Map<String, Long> getCallQueueSizeSummary() {
230231
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
231232
}
232233

233-
// IMPORTANT: Call this method only ONCE per executor instance.
234+
// This method can only be called ONCE per executor instance.
234235
// Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity)
235-
// After calling: queueInitArgs[0] is set to hard limit (max(soft limit,
236-
// DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT)) and currentQueueLimit stores the original soft limit.
236+
// After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original
237+
// soft limit.
237238
// Multiple calls would incorrectly use the hard limit as the soft limit.
239+
// As all the queues has same initArgs and queueClass, there should be no need to call this again.
238240
protected void initializeQueues(final int numQueues) {
241+
if (!queues.isEmpty()) {
242+
throw new RuntimeException("Queues are already initialized");
243+
}
239244
if (queueInitArgs.length > 0) {
240245
currentQueueLimit = (int) queueInitArgs[0];
241246
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
6363
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
6464
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
6565
Abortable server, int highPriorityLevel) {
66-
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, -1);
67-
int maxPriorityQueueLength =
68-
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, -1);
66+
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
67+
RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
68+
int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
69+
RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
6970
int maxReplicationQueueLength =
70-
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, -1);
71+
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
72+
RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH);
7173

7274
this.priority = priority;
7375
this.highPriorityLevel = highPriorityLevel;

0 commit comments

Comments
 (0)