diff --git a/pip/pip-470.md b/pip/pip-470.md new file mode 100644 index 0000000000000..7fa1f9a066a52 --- /dev/null +++ b/pip/pip-470.md @@ -0,0 +1,301 @@ +# PIP-470: Close Inactive Topics Without Deleting Them + +# Background Knowledge + +Pulsar has a long-standing inactive-topic GC mechanism controlled by the broker +configuration `brokerDeleteInactiveTopicsEnabled`. When enabled, the broker +periodically iterates over loaded topics, detects topics with no active +producers/consumers (and optionally no subscriptions or no backlog), and +**deletes** them — both the in-memory topic instance and the persistent data +in BookKeeper, plus topic-level metadata. + +The detection logic lives in `PersistentTopic.checkGC()` / +`NonPersistentTopic.checkGC()`. The policy object is +`InactiveTopicPolicies`, which has two modes: + +- `delete_when_no_subscriptions` +- `delete_when_subscriptions_caught_up` + +Both modes lead to permanent deletion of the topic and its data. + +Each loaded `PersistentTopic` pins a non-trivial amount of broker memory: +a managed-ledger with its cache, subscription/cursor state, rate limiters, +dispatchers, schema references, and — notably — per-topic metric series. +Deployments with millions of low-traffic topics routinely hit memory pressure +and metrics cardinality pressure even when most topics are idle most of the +time. + +Pulsar also supports topic **unload** via the admin API +(`pulsar-admin topics unload `) and namespace-bundle unload. Unload +evicts the topic from the broker's topic map and releases all broker-side +resources, while the ledgers in BookKeeper are preserved. The next +produce/consume call transparently reloads the topic. There is currently no +automatic mechanism that unloads idle topics based on inactivity. + +# Motivation + +Operators with very large topic counts (commonly tens of thousands to millions +per cluster, with a long tail of low-traffic topics) face two problems today: + +1. **Broker memory pressure.** Every loaded topic holds a managed-ledger, + cursors, rate limiters, dispatchers, and metric series. An idle topic that + hasn't been produced/consumed for hours still occupies all of that memory. +2. **Metrics cardinality.** Per-topic metrics grow linearly with the number + of loaded topics. Scrape endpoints get large and the monitoring system + gets expensive. + +The natural remedy today is `brokerDeleteInactiveTopicsEnabled`, but that +**deletes the data**. Many users explicitly do not want deletion — they want +to keep the data around for later consumers, replay, compliance retention, etc. +Their only options today are: + +- Leave all idle topics loaded and accept the memory/metrics cost. +- Build an external cron job that polls `topics stats` for zero + producers/consumers and calls `pulsar-admin topics unload` per topic. + +The external-cron approach works but is awkward: it reimplements the existing +broker inactivity detection, has to run with admin credentials, adds a moving +part to the deployment, and lacks integration with the existing +`InactiveTopicPolicies` configuration surface. + +This PIP adds a first-class broker option that performs the same idle-topic +detection the broker already does for delete-GC, but performs a **close** +(unload) instead of a delete when the topic is determined to be inactive. + +# Goals + +## In Scope + +- A new broker configuration `brokerCloseInactiveTopicsEnabled` (default + `false`) that, when enabled, causes the inactivity monitor to **close** + (unload) topics determined to be inactive rather than delete them. +- Mutual exclusion with `brokerDeleteInactiveTopicsEnabled`: only one of the + two may be enabled at a time. Broker start fails fast if both are set. +- Reuse of the existing inactivity detection: same + `brokerDeleteInactiveTopicsMode`, `brokerDeleteInactiveTopicsFrequencySeconds`, + and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` settings determine + when a topic is considered inactive. No new detection surface is introduced + for v1. +- Data preservation guarantee: only the in-memory topic instance is released. + BookKeeper ledgers, topic metadata, subscriptions, cursors, and topic-level + policies are all retained. The next producer/consumer reconnect transparently + reloads the topic. +- Coverage for both `PersistentTopic` and `NonPersistentTopic`. + +## Out of Scope + +- A per-topic or per-namespace override for close-on-inactive. The v1 knob is + broker-level only; a follow-up PIP can extend `InactiveTopicPolicies` with + an action field (`delete` vs `close`) if operators want per-namespace + control. +- Changing the existing `InactiveTopicDeleteMode` enum or + `InactiveTopicPolicies` schema. +- A new admin API. Manual `pulsar-admin topics unload` remains available for + ad-hoc use. +- Partitioned-topic metadata handling changes. Because nothing is deleted, + `brokerDeleteInactivePartitionedTopicMetadataEnabled` has no interaction + with the new close mode. +- Changing the detection cadence or detection semantics beyond reusing + what's there. + +# High Level Design + +The existing inactivity monitor in `BrokerService.startInactivityMonitor()` +schedules `checkGC()` on all loaded topics when +`brokerDeleteInactiveTopicsEnabled` is true. With this PIP, the monitor also +runs when `brokerCloseInactiveTopicsEnabled` is true. + +`PersistentTopic.checkGC()` and `NonPersistentTopic.checkGC()` gain a branch: +if close mode is enabled (and delete mode is not), after the same +isActive / duration / replication-producer checks pass, the topic calls its +own `close(disconnectClients=true, closeWithoutWaitingClientDisconnect=false)` +path. That path already: + +- disconnects producers and consumers, +- closes subscriptions and the managed ledger (without deleting ledgers), +- removes the topic entry from `BrokerService.topics` via `disposeTopic` → + `removeTopicFromCache`. + +Data on BookKeeper is untouched. Subsequent lookups reload the topic from +metadata and BookKeeper exactly as they do after a manual unload or a broker +restart. + +The **retention-window check** (`shouldTopicBeRetained`) is intentionally +bypassed for the close branch — that check exists to avoid deleting data +that's still within the retention window, which is moot when we are not +deleting anything. + +# Detailed Design + +## Public-facing Changes + +### Configuration + +One new dynamic broker configuration in `ServiceConfiguration`: + +```java +@FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Enable closing (unloading from broker memory) of inactive topics without deleting their data. ..." +) +private boolean brokerCloseInactiveTopicsEnabled = false; +``` + +Behavior: + +- Default: `false` (no behavior change for existing deployments). +- Mutually exclusive with `brokerDeleteInactiveTopicsEnabled`. If both are + `true` at broker startup, the broker fails fast with an + `IllegalArgumentException`. +- Dynamic: can be changed at runtime via the existing dynamic-config + mechanism. Dynamic transitions that would produce a conflicting state + (both flags simultaneously true) are rejected by the same validation path. + +Reused configuration (no changes): + +- `brokerDeleteInactiveTopicsMode` — selects `delete_when_no_subscriptions` + vs `delete_when_subscriptions_caught_up` semantics for the inactivity check. + The name is retained for v1 to avoid churn; a rename to a neutral name + (e.g. `inactiveTopicsMode`) could be proposed separately. +- `brokerDeleteInactiveTopicsFrequencySeconds` — how often the monitor runs. +- `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` — inactivity + threshold before the action fires. + +### Public API / Binary / Wire + +None. No changes to the Pulsar admin API, client protocol, or topic +policies schema. Topic policies — including `InactiveTopicPolicies` — are +untouched. + +### CLI + +None. + +### Metrics + +None added. Existing per-topic metrics naturally shrink when topics are +closed. A follow-up could add a broker-level counter +`pulsar_broker_topic_inactive_close_total` but it is not required for v1. + +## Design & Implementation Details + +### Validation + +`PulsarService.start()` already performs static config validation. We add: + +```java +if (config.isBrokerDeleteInactiveTopicsEnabled() && config.isBrokerCloseInactiveTopicsEnabled()) { + throw new IllegalArgumentException( + "brokerDeleteInactiveTopicsEnabled and brokerCloseInactiveTopicsEnabled are mutually " + + "exclusive. Enable at most one of them."); +} +``` + +### Scheduler + +`BrokerService.startInactivityMonitor()` is updated to start the monitor +when **either** flag is enabled: + +```java +if (config.isBrokerDeleteInactiveTopicsEnabled() || config.isBrokerCloseInactiveTopicsEnabled()) { + int interval = config.getBrokerDeleteInactiveTopicsFrequencySeconds(); + inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval, + TimeUnit.SECONDS); +} +``` + +### Helper on AbstractTopic + +```java +public boolean isCloseWhileInactive() { + return brokerService.pulsar().getConfiguration().isBrokerCloseInactiveTopicsEnabled(); +} +``` + +Kept broker-level only in v1; future work may read from a per-topic policy. + +### PersistentTopic.checkGC + +The method now branches on action: + +1. Compute `deleteEnabled = isDeleteWhileInactive()` and + `closeEnabled = !deleteEnabled && isCloseWhileInactive()`. If neither is + true, return. +2. Detect inactivity using the existing `isActive(deleteMode)` and the + inactivity-duration check. +3. **New for close mode:** skip the `shouldTopicBeRetained()` guard, since + retention only guards against data loss, which is irrelevant here. +4. Close replication producers first (same as today). If remote producers + are connected, abort with `TopicBusyException`. +5. If close mode is active, call `close(true, false)`. That path already + disconnects clients, closes subscriptions, closes the managed ledger, + and removes the topic from the broker cache via `disposeTopic`. Log + success; on `TopicBusyException`, log at debug (topic became active + again and will be reloaded). +6. Otherwise proceed with the existing delete branch unchanged. + +### NonPersistentTopic.checkGC + +Parallel change: same `deleteEnabled`/`closeEnabled` gate; if close mode is +active, call `close(true, false)` and skip `tryToDeletePartitionedMetadata`. + +### Race: topic reloaded before close completes + +A client can connect between the inactivity check and the call to `close()`. +This is already handled by `close()`'s fencing logic: `fenceTopicToCloseOrDelete` +rejects new client operations during close. If a lookup races in and sees +the topic is gone from the cache it will re-create a fresh `PersistentTopic` +instance from metadata, which is exactly the desired behavior. + +### Race: close completes and topic is immediately reloaded + +Expected. The new `PersistentTopic` instance has its own `lastActive` +initialized to now; another full inactivity window must elapse before it +becomes a close candidate again. Activity produced after reload resets the +clock as usual. + +### Interaction with unload-based load balancing + +The existing bundle-unload mechanism and this new inactive-close mechanism +are independent. A bundle unload closes every topic in the bundle +regardless of activity; the new mechanism closes only inactive topics, +cluster-wide. They compose without conflict. + +### Interaction with `brokerDeleteInactivePartitionedTopicMetadataEnabled` + +Because nothing is deleted, partitioned-topic metadata cleanup does not run +under close mode. The metadata (partition count, topic policies) is +preserved. + +### Backwards compatibility + +The new configuration defaults to `false`, so existing deployments see no +behavioral change. + +## Security Considerations + +None. This PIP introduces no new network-accessible endpoints, no new +credentials, and no new authorization surface. Inactivity detection and +close use the same internal code paths that the broker already executes for +the delete variant and for admin-driven unload. + +# Monitoring + +Operators can watch for the log line +`Topic closed successfully due to inactivity` to observe the feature +firing. The existing `pulsar_topics_count` metric and per-topic metric +series naturally shrink as idle topics are closed. Producers/consumers +reconnecting to a closed topic exercise the same load-topic path that's +already exercised at broker startup. + +# General Notes + +This PIP is intentionally minimal — one new flag, one branch in `checkGC`. +Everything else is reused: the inactivity detection, the close code path, +the dispose-from-cache machinery, and the admin-driven unload semantics. + +# Links + +* Mailing List discussion thread: TBD +* Mailing List voting thread: TBD diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0931aa64114a1..11c09c957105f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -763,6 +763,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Enable closing (unloading from broker memory) of inactive topics without deleting their data.\n" + + "When a topic is deemed inactive (no producers, no subscriptions, or subscriptions caught up based on\n" + + "the configured mode), the broker will close the topic instance, releasing in-memory resources such as\n" + + "the managed ledger cache, subscription state, and per-topic metrics. The topic data in BookKeeper is\n" + + "preserved; clients will transparently reload the topic on the next produce/consume.\n" + + "This option is mutually exclusive with 'brokerDeleteInactiveTopicsEnabled': only one of the two may\n" + + "be enabled at a time. The inactivity detection reuses 'brokerDeleteInactiveTopicsMode',\n" + + "'brokerDeleteInactiveTopicsFrequencySeconds', and\n" + + "'brokerDeleteInactiveTopicsMaxInactiveDurationSeconds'." + ) + private boolean brokerCloseInactiveTopicsEnabled = false; + @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 341154f3b7111..a71e13b29f195 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -881,6 +881,12 @@ public void start() throws PulsarServerException { config.getDefaultRetentionTimeInMinutes() * 60)); } + if (config.isBrokerDeleteInactiveTopicsEnabled() && config.isBrokerCloseInactiveTopicsEnabled()) { + throw new IllegalArgumentException( + "brokerDeleteInactiveTopicsEnabled and brokerCloseInactiveTopicsEnabled are mutually " + + "exclusive. Enable at most one of them."); + } + openTelemetryTopicStats = new OpenTelemetryTopicStats(this); openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this); openTelemetryProducerStats = new OpenTelemetryProducerStats(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 8be1002b3a8db..4989dedf14ecb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1228,6 +1228,10 @@ public boolean isDeleteWhileInactive() { return topicPolicies.getInactiveTopicPolicies().get().isDeleteWhileInactive(); } + public boolean isCloseWhileInactive() { + return brokerService.pulsar().getConfiguration().isBrokerCloseInactiveTopicsEnabled(); + } + public boolean deletePartitionedTopicMetadataWhileInactive() { return brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d6226254f3c45..67fd4f08242e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -719,8 +719,9 @@ protected void startDeduplicationSnapshotMonitor() { } protected void startInactivityMonitor() { - if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) { - int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); + ServiceConfiguration config = pulsar().getConfiguration(); + if (config.isBrokerDeleteInactiveTopicsEnabled() || config.isBrokerCloseInactiveTopicsEnabled()) { + int interval = config.getBrokerDeleteInactiveTopicsFrequencySeconds(); inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval, TimeUnit.SECONDS); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 45a92c92f7ee9..616d7c059e8b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1037,7 +1037,9 @@ private CompletableFuture disconnectReplicators() { @Override public void checkGC() { - if (!isDeleteWhileInactive()) { + boolean deleteEnabled = isDeleteWhileInactive(); + boolean closeEnabled = !deleteEnabled && isCloseWhileInactive(); + if (!deleteEnabled && !closeEnabled) { // This topic is not included in GC return; } @@ -1048,13 +1050,32 @@ public void checkGC() { if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) { // Close repl producers first. - // Once all repl producers are closed, we can delete the topic, + // Once all repl producers are closed, we can delete/close the topic, // provided no remote producers connected to the broker. if (log.isDebugEnabled()) { log.debug("[{}] Topic inactive for {} seconds, closing repl producers.", topic, maxInactiveDurationInSec); } + if (closeEnabled) { + stopReplProducers().thenCompose(v -> close(true, false)) + .thenRun(() -> log.info("[{}] Topic closed successfully due to inactivity", topic)) + .exceptionally(e -> { + Throwable throwable = e.getCause(); + if (throwable instanceof TopicBusyException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Did not close busy topic: {}", topic, + throwable.getMessage()); + } + replicators.forEach((region, replicator) -> replicator.startProducer()); + } else { + log.warn("[{}] Inactive topic close failed", topic, e); + } + return null; + }); + return; + } + stopReplProducers().thenCompose(v -> delete(true, false)) .thenCompose(__ -> tryToDeletePartitionedMetadata()) .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 88d511217c473..3432b8baa83af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3390,7 +3390,9 @@ public boolean isReplicationBacklogExist() { @Override public void checkGC() { - if (!isDeleteWhileInactive()) { + boolean deleteEnabled = isDeleteWhileInactive(); + boolean closeEnabled = !deleteEnabled && isCloseWhileInactive(); + if (!deleteEnabled && !closeEnabled) { // This topic is not included in GC return; } @@ -3402,14 +3404,14 @@ public void checkGC() { } else if (System.nanoTime() - lastActive < SECONDS.toNanos(maxInactiveDurationInSec)) { // Gc interval did not expire yet return; - } else if (shouldTopicBeRetained()) { + } else if (deleteEnabled && shouldTopicBeRetained()) { // Topic activity is still within the retention period return; } else { CompletableFuture replCloseFuture = new CompletableFuture<>(); // Close repl producers first. - // Once all repl producers are closed, we can delete the topic, + // Once all repl producers are closed, we can delete/close the topic, // provided no remote producers connected to the broker. if (log.isDebugEnabled()) { log.debug("[{}] Topic inactive for {} seconds, closing repl producers.", topic, @@ -3445,6 +3447,22 @@ public void checkGC() { return null; }); + if (closeEnabled) { + replCloseFuture.thenCompose(v -> close(true, false)) + .thenRun(() -> log.info("[{}] Topic closed successfully due to inactivity", topic)) + .exceptionally(e -> { + if (e.getCause() instanceof TopicBusyException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Did not close busy topic: {}", topic, e.getCause().getMessage()); + } + } else { + log.warn("[{}] Inactive topic close failed", topic, e); + } + return null; + }); + return; + } + replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false)) .thenCompose((res) -> tryToDeletePartitionedMetadata()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicCloseTest.java new file mode 100644 index 0000000000000..3622efa17eae0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicCloseTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InactiveTopicCloseTest extends BrokerTestBase { + + @BeforeMethod + protected void setup() throws Exception { + //No-op + } + + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testCloseInactiveTopicKeepsDataAndEvictsFromCache() throws Exception { + conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setBrokerCloseInactiveTopicsEnabled(true); + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1); + super.baseSetup(); + + final String topic = "persistent://prop/ns-abc/testCloseInactive"; + + Producer producer = pulsarClient.newProducer().topic(topic).create(); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + producer.send("hello".getBytes()); + consumer.close(); + producer.close(); + + // Drop the only subscription so the topic qualifies as inactive in delete_when_no_subscriptions mode. + admin.topics().deleteSubscription(topic, "sub"); + + // Topic should be evicted from broker cache (closed) but still present in metadata. + Awaitility.await().untilAsserted(() -> + assertFalse(pulsar.getBrokerService().getTopicReference(topic).isPresent())); + assertTrue(admin.topics().getList("prop/ns-abc").contains(topic)); + + // Data is preserved: a fresh consumer on a new subscription reading from earliest must see the message. + Consumer reReader = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub2") + .subscriptionInitialPosition(org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest) + .subscribe(); + org.apache.pulsar.client.api.Message msg = reReader.receive(10, java.util.concurrent.TimeUnit.SECONDS); + org.testng.Assert.assertNotNull(msg); + org.testng.Assert.assertEquals(new String(msg.getValue()), "hello"); + reReader.close(); + } + + @Test + public void testMutualExclusionWithDeleteInactive() throws Exception { + conf.setBrokerDeleteInactiveTopicsEnabled(true); + conf.setBrokerCloseInactiveTopicsEnabled(true); + try { + super.baseSetup(); + org.testng.Assert.fail("expected broker startup to fail"); + } catch (Exception e) { + Throwable cause = e; + boolean found = false; + while (cause != null) { + if (cause instanceof IllegalArgumentException + && cause.getMessage() != null + && cause.getMessage().contains("mutually exclusive")) { + found = true; + break; + } + cause = cause.getCause(); + } + assertTrue(found, "expected IllegalArgumentException about mutual exclusion, got: " + e); + } + } + + @Test + public void testActiveTopicIsNotClosed() throws Exception { + conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setBrokerCloseInactiveTopicsEnabled(true); + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1); + super.baseSetup(); + + final String topic = "persistent://prop/ns-abc/testActiveNotClosed"; + Producer producer = pulsarClient.newProducer().topic(topic).create(); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + try { + // Wait past the inactivity window; topic must remain loaded because a subscription exists. + Thread.sleep(3000); + assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent()); + } finally { + consumer.close(); + producer.close(); + } + } +}