Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void testBookieFailure() throws Exception {
metadataStore.unsetAlwaysFail();

bkc = new PulsarBookKeeperTestClient(baseClientConf);
int port = startNewBookie();
startNewBookie();

// Reconnect a new bk client
factory.shutdown();
Expand Down Expand Up @@ -164,7 +163,6 @@ public void testBookieFailure() throws Exception {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
releaseLockedPort(port);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package org.apache.bookkeeper.test;

import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;
import com.google.common.base.Stopwatch;
import java.io.File;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
Expand Down Expand Up @@ -291,19 +289,12 @@ protected void stopBKCluster() throws Exception {
t.shutdown();
}
servers.clear();
bookiePorts.removeIf(PortManager::releaseLockedPort);
bookiePorts.clear();
}

protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");

int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = nextLockedFreePort();
} else {
port = 0;
}
return newServerConfiguration(port, f, new File[] { f });
return newServerConfiguration(0, f, new File[] { f });
}

protected ClientConfiguration newClientConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) {
this.bkEnsemble = bkEnsemble;
}

public void setBkPort(int bkPort) {
this.bkPort = bkPort;
}

public void setBkDir(String bkDir) {
this.bkDir = bkDir;
}
Expand Down Expand Up @@ -172,10 +168,6 @@ public int getZkPort() {
return zkPort;
}

public int getBkPort() {
return bkPort;
}

public String getZkDir() {
return zkDir;
}
Expand Down Expand Up @@ -237,9 +229,6 @@ public boolean isHelp() {
hidden = true)
private int zkPort = 2181;

@Option(names = { "--bookkeeper-port" }, description = "Local bookies base port")
private int bkPort = 3181;

@Option(names = { "--zookeeper-dir" },
description = "Local zooKeeper's data directory",
hidden = true)
Expand Down Expand Up @@ -470,7 +459,6 @@ void startBookieWithMetadataStore() throws Exception {
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
.bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
.clearOldData(wipeData)
Expand All @@ -484,9 +472,9 @@ private void startBookieWithZookeeper() throws Exception {
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
calculateCacheSize(bkServerConf);
// Start LocalBookKeeper
// Start LocalBookKeeper. Bookies bind to kernel-assigned ports.
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getNumOfBk(), this.getZkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public PulsarStandaloneBuilder withZkPort(int zkPort) {
return this;
}

public PulsarStandaloneBuilder withBkPort(int bkPort) {
pulsarStandalone.setBkPort(bkPort);
return this;
}

public PulsarStandaloneBuilder withZkDir(String zkDir) {
pulsarStandalone.setZkDir(zkDir);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.CustomLog;
Expand Down Expand Up @@ -91,46 +90,18 @@ public class LocalBookkeeperEnsemble {
int numberOfBookies;
private final boolean clearOldData;

private static class BasePortManager implements Supplier<Integer> {

private int port;

public BasePortManager(int basePort) {
this.port = basePort;
}

@Override
public synchronized Integer get() {
return port++;
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort) {
this(numberOfBookies, zkPort, 4181, null, null, true, null);
}

private final Supplier<Integer> portManager;

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName,
String bkDataDirName, boolean clearOldData) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, String zkDataDirName,
String bkDataDirName, boolean clearOldData, String advertisedAddress) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}

public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int bkBasePort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
bkBasePort != 0 ? new BasePortManager(bkBasePort) : () -> 0);
this(numberOfBookies, zkPort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}

public LocalBookkeeperEnsemble(int numberOfBookies,
Expand All @@ -139,10 +110,8 @@ public LocalBookkeeperEnsemble(int numberOfBookies,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress,
Supplier<Integer> portManager) {
String advertisedAddress) {
this.numberOfBookies = numberOfBookies;
this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
Expand Down Expand Up @@ -301,7 +270,8 @@ private void runBookies(ServerConfiguration baseConf) throws Exception {
cleanDirectory(bkDataDir);
}

int bookiePort = portManager.get();
// Bookies bind to a kernel-assigned port; identity is established via bookieId.
int bookiePort = 0;
String bookieId = "bk" + i + "test";
// Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
deleteBookieRegistrationZnode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testStandaloneWithRocksDB() throws Exception {

PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
standalone.setBkPort(0);
standalone.setNumOfBk(bookieNum);

standalone.startBookieWithMetadataStore();
Expand All @@ -67,10 +66,12 @@ public void testStandaloneWithRocksDB() throws Exception {
List<ServerConfiguration> secondBsConfs = standalone.bkCluster.getBsConfs();
Assert.assertEquals(secondBsConfs.size(), bookieNum);

// Cookies must be preserved across restart (otherwise bookie startup would have failed
// with InvalidCookieException). The bookieId is the persistent identity.
for (int i = 0; i < bookieNum; i++) {
ServerConfiguration conf1 = firstBsConfs.get(i);
ServerConfiguration conf2 = secondBsConfs.get(i);
Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
Assert.assertEquals(conf1.getBookieId(), conf2.getBookieId());
}
standalone.close();
cleanDirectory(tempDir);
Expand All @@ -93,7 +94,6 @@ public void testMetadataInitialization(boolean enableBrokerClientAuth) throws Ex
}
final File bkDir = IOUtils.createTempDir("standalone", "bk");
standalone.setNumOfBk(1);
standalone.setBkPort(0);
standalone.setBkDir(bkDir.getAbsolutePath());
standalone.start();

Expand Down Expand Up @@ -148,7 +148,6 @@ public void testShutdownHookClosesBkCluster() throws Exception {
bkDir.getAbsolutePath()
});
standalone.setTestMode(true);
standalone.setBkPort(0);
standalone.start();
BKCluster bkCluster = standalone.bkCluster;
standalone.runShutdownHook();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.CustomLog;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
Expand All @@ -35,7 +34,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.PortManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

Expand All @@ -46,24 +44,18 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
protected List<PulsarAdmin> additionalBrokerAdmins;
protected List<PulsarClient> additionalBrokerClients;
protected PulsarMockBookKeeper mockBookKeeper;
// Populated after broker startup with kernel-assigned ports.
protected int mainBrokerPort;
protected List<Integer> additionalBrokerPorts = new ArrayList<>();

protected int numberOfAdditionalBrokers() {
return 2;
}

protected boolean useDynamicBrokerPorts() {
return true;
}

@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
beforeSetup();
if (!useDynamicBrokerPorts()) {
mainBrokerPort = PortManager.nextLockedFreePort();
}
OrderedExecutor mockBookKeeperExecutor = OrderedExecutor.newBuilder().numThreads(1)
.name(MultiBrokerBaseTest.class.getSimpleName() + "-bk-executor").build();
registerCloseable(() -> GracefulExecutorServicesShutdown.initiate()
Expand All @@ -75,6 +67,7 @@ public final void setup() throws Exception {
((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown();
});
super.internalSetup();
mainBrokerPort = pulsar.getBrokerListenPort().orElse(0);
additionalBrokersSetup();
pulsarResourcesSetup();
additionalSetup();
Expand All @@ -89,14 +82,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p
pulsarTestContextBuilder.bookKeeperClient(mockBookKeeper);
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
if (!useDynamicBrokerPorts()) {
conf.setBrokerServicePort(Optional.of(mainBrokerPort));
}
}

protected void additionalSetup() throws Exception {
// override this method to add any additional setup logic

Expand All @@ -115,17 +100,12 @@ protected void additionalBrokersSetup() throws Exception {
additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
additionalPulsarTestContexts = new ArrayList<>(numberOfAdditionalBrokers);
additionalBrokerPorts = new ArrayList<>(numberOfAdditionalBrokers);
if (!useDynamicBrokerPorts()) {
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
int port = PortManager.nextLockedFreePort();
additionalBrokerPorts.add(port);
}
}
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
PulsarTestContext pulsarTestContext = createAdditionalBroker(i);
additionalPulsarTestContexts.add(i, pulsarTestContext);
PulsarService pulsarService = pulsarTestContext.getPulsarService();
additionalBrokers.add(i, pulsarService);
additionalBrokerPorts.add(pulsarService.getBrokerListenPort().orElse(0));
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null
? pulsarService.getWebServiceAddress()
Expand Down Expand Up @@ -159,9 +139,6 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke

protected PulsarTestContext createAdditionalBroker(int additionalBrokerIndex) throws Exception {
ServiceConfiguration conf = createConfForAdditionalBroker(additionalBrokerIndex);
if (!useDynamicBrokerPorts()) {
conf.setBrokerServicePort(Optional.of(additionalBrokerPorts.get(additionalBrokerIndex)));
}
return createAdditionalPulsarTestContext(conf);
}

Expand All @@ -175,14 +152,6 @@ public final void cleanup() throws Exception {
log.warn().exception(e).log("Exception during additional cleanup");
}
super.internalCleanup();
if (!useDynamicBrokerPorts()) {
if (mainBrokerPort > 0) {
PortManager.releaseLockedPort(mainBrokerPort);
}
for (Integer port : additionalBrokerPorts) {
PortManager.releaseLockedPort(port);
}
}
}

protected void additionalCleanup() throws Exception {
Expand Down Expand Up @@ -212,9 +181,6 @@ protected void additionalBrokersCleanup() {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarTestContext.close();
pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
} catch (Exception e) {
log.warn().exception(e).log("Failed to stop additional broker");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void setup() throws Exception {
new LinkedBlockingQueue<>());
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();

// start brokers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class GetPartitionMetadataTest extends TestRetrySupport {
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
incrementSetupNumber();
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble = new LocalBookkeeperEnsemble(3, 0);
bkEnsemble.start();
// Start broker.
setupBrokers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,6 @@ protected int numberOfAdditionalBrokers() {
return 1;
}

@Override
protected boolean useDynamicBrokerPorts() {
return false;
}

@BeforeMethod(alwaysRun = true)
public final void doBeforeMethod() {
beforeMethod();
Expand Down
Loading