Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

package org.apache.rocketmq.broker.subscription;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
Expand All @@ -34,15 +43,6 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -78,24 +78,28 @@ public void destroy() {
if (notToBeExecuted()) {
return;
}
Path pathToBeDeleted = Paths.get(basePath);

try {
Files.walk(pathToBeDeleted)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
// ignore
}
});
} catch (IOException e) {
// ignore
}

if (rocksDBSubscriptionGroupManager != null) {
rocksDBSubscriptionGroupManager.stop();
}

Path root = Paths.get(basePath);
if (Files.notExists(root)) {
return;
}

try (Stream<Path> walk = Files.walk(root)) {
walk.sorted(Comparator.reverseOrder())
.forEach(p -> {
try {
Files.deleteIfExists(p);
} catch (IOException e) {
// ignore
}
});
} catch (IOException e) {
// ignore
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.StartAndShutdown;

public class MQFaultStrategy {
public class MQFaultStrategy implements StartAndShutdown {
private LatencyFaultTolerance<String> latencyFaultTolerance;
private volatile boolean sendLatencyFaultEnable;
private volatile boolean startDetectorEnable;
Expand Down Expand Up @@ -130,6 +131,11 @@ public void startDetector() {
this.latencyFaultTolerance.startDetector();
}

@Override
public void start() throws Exception {
this.startDetector();
}

public void shutdown() {
this.latencyFaultTolerance.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@
package org.apache.rocketmq.proxy.service.route;

import com.google.common.base.MoreObjects;
import java.util.Objects;
import org.apache.rocketmq.common.message.MessageQueue;

public class AddressableMessageQueue implements Comparable<AddressableMessageQueue> {

private final MessageQueue messageQueue;
public class AddressableMessageQueue extends MessageQueue {
private final String brokerAddr;

public AddressableMessageQueue(MessageQueue messageQueue, String brokerAddr) {
this.messageQueue = messageQueue;
super(messageQueue);
this.brokerAddr = brokerAddr;
}

public String getBrokerAddr() {
return brokerAddr;
}

public MessageQueue getMessageQueue() {
return new MessageQueue(getTopic(), getBrokerName(), getQueueId());
}

@Override
public int compareTo(AddressableMessageQueue o) {
return messageQueue.compareTo(o.messageQueue);
public int hashCode() {
return super.hashCode();
}

@Override
Expand All @@ -43,39 +48,13 @@ public boolean equals(Object o) {
if (!(o instanceof AddressableMessageQueue)) {
return false;
}
AddressableMessageQueue queue = (AddressableMessageQueue) o;
return Objects.equals(messageQueue, queue.messageQueue);
}

@Override
public int hashCode() {
return messageQueue == null ? 1 : messageQueue.hashCode();
}

public int getQueueId() {
return this.messageQueue.getQueueId();
}

public String getBrokerName() {
return this.messageQueue.getBrokerName();
}

public String getTopic() {
return messageQueue.getTopic();
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

public String getBrokerAddr() {
return brokerAddr;
return super.equals(o);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("messageQueue", messageQueue)
.add("messageQueue", super.toString())
.add("brokerAddr", brokerAddr)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.service.route;

public class DefaultMessageQueuePriorityProvider implements MessageQueuePriorityProvider<AddressableMessageQueue> {
@Override
public int priorityOf(AddressableMessageQueue queue) {
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;

@FunctionalInterface
public interface MessageQueuePenalizer<Q extends MessageQueue> {

/**
* Returns the penalty value for the given MessageQueue; lower is better.
*/
int penaltyOf(Q messageQueue);

/**
* Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up).
*/
static <Q extends MessageQueue> int evaluatePenalty(Q messageQueue, List<MessageQueuePenalizer<Q>> penalizers) {
Objects.requireNonNull(messageQueue, "messageQueue");
if (penalizers == null || penalizers.isEmpty()) {
return 0;
}
int sum = 0;
for (MessageQueuePenalizer<Q> p : penalizers) {
sum += p.penaltyOf(messageQueue);
}
return sum;
}

/**
* Selects the queue with the lowest evaluated penalty from the given queue list.
*
* <p>The method iterates through all queues exactly once, but starts from a rotating index
* derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 .</p>
*
* <p>For each queue, it computes a penalty via {@link #evaluatePenalty} using
* the provided {@code penalizers}. The queue with the smallest penalty is selected.</p>
*
* <p>Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately,
* since no better result than 0 is expected.</p>
*
* @param queues candidate queues to select from
* @param penalizers penalty evaluators applied to each queue
* @param startIndex atomic counter used to determine the rotating start position (round-robin)
* @param <Q> queue type
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty
*/
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenalty(List<Q> queues,
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
if (queues == null || queues.isEmpty()) {
return null;
}
Q bestQueue = null;
int bestPenalty = Integer.MAX_VALUE;

for (int i = 0; i < queues.size(); i++) {
int index = Math.floorMod(startIndex.getAndIncrement(), queues.size());
Q messageQueue = queues.get(index);
int penalty = evaluatePenalty(messageQueue, penalizers);

// Short-circuit: cannot do better than 0
if (penalty <= 0) {
return Pair.of(messageQueue, penalty);
}

if (penalty < bestPenalty) {
bestPenalty = penalty;
bestQueue = messageQueue;
}
}
return Pair.of(bestQueue, bestPenalty);
}

/**
* Selects a queue with the lowest computed penalty from multiple priority groups.
*
* <p>The input {@code queuesWithPriority} is a list of queue groups ordered by priority.
* For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue
* within that group and obtain its penalty.</p>
*
* <p>Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0},
* that result is returned immediately.</p>
*
* <p>Otherwise, it returns the queue with the smallest positive penalty among all groups.
* If multiple groups produce the same minimum penalty, the first encountered one wins.</p>
*
* @param queuesWithPriority priority-ordered groups of queues; each inner list represents one priority level
* @param penalizers penalty calculators used by {@code selectLeastPenalty} to score queues
* @param startIndex round-robin start index forwarded to {@code selectLeastPenalty} to reduce contention/hotspots
* @param <Q> queue type
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty
*/
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenaltyWithPriority(List<List<Q>> queuesWithPriority,
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
if (queuesWithPriority == null || queuesWithPriority.isEmpty()) {
return null;
}
if (queuesWithPriority.size() == 1) {
return selectLeastPenalty(queuesWithPriority.get(0), penalizers, startIndex);
}
Q bestQueue = null;
int bestPenalty = Integer.MAX_VALUE;
for (List<Q> queues : queuesWithPriority) {
Pair<Q, Integer> queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex);
int penalty = queueAndPenalty.getRight();
if (queueAndPenalty.getRight() <= 0) {
return queueAndPenalty;
}
if (penalty < bestPenalty) {
bestPenalty = penalty;
bestQueue = queueAndPenalty.getLeft();
}
}
return Pair.of(bestQueue, bestPenalty);
}
}
Loading
Loading