Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 301 additions & 0 deletions pip/pip-470.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
# PIP-470: Close Inactive Topics Without Deleting Them

# Background Knowledge

Pulsar has a long-standing inactive-topic GC mechanism controlled by the broker
configuration `brokerDeleteInactiveTopicsEnabled`. When enabled, the broker
periodically iterates over loaded topics, detects topics with no active
producers/consumers (and optionally no subscriptions or no backlog), and
**deletes** them — both the in-memory topic instance and the persistent data
in BookKeeper, plus topic-level metadata.

The detection logic lives in `PersistentTopic.checkGC()` /
`NonPersistentTopic.checkGC()`. The policy object is
`InactiveTopicPolicies`, which has two modes:

- `delete_when_no_subscriptions`
- `delete_when_subscriptions_caught_up`

Both modes lead to permanent deletion of the topic and its data.

Each loaded `PersistentTopic` pins a non-trivial amount of broker memory:
a managed-ledger with its cache, subscription/cursor state, rate limiters,
dispatchers, schema references, and — notably — per-topic metric series.
Deployments with millions of low-traffic topics routinely hit memory pressure
and metrics cardinality pressure even when most topics are idle most of the
time.

Pulsar also supports topic **unload** via the admin API
(`pulsar-admin topics unload <topic>`) and namespace-bundle unload. Unload
evicts the topic from the broker's topic map and releases all broker-side
resources, while the ledgers in BookKeeper are preserved. The next
produce/consume call transparently reloads the topic. There is currently no
automatic mechanism that unloads idle topics based on inactivity.

# Motivation

Operators with very large topic counts (commonly tens of thousands to millions
per cluster, with a long tail of low-traffic topics) face two problems today:

1. **Broker memory pressure.** Every loaded topic holds a managed-ledger,
cursors, rate limiters, dispatchers, and metric series. An idle topic that
hasn't been produced/consumed for hours still occupies all of that memory.
2. **Metrics cardinality.** Per-topic metrics grow linearly with the number
of loaded topics. Scrape endpoints get large and the monitoring system
gets expensive.

The natural remedy today is `brokerDeleteInactiveTopicsEnabled`, but that
**deletes the data**. Many users explicitly do not want deletion — they want
to keep the data around for later consumers, replay, compliance retention, etc.
Their only options today are:

- Leave all idle topics loaded and accept the memory/metrics cost.
- Build an external cron job that polls `topics stats` for zero
producers/consumers and calls `pulsar-admin topics unload` per topic.

The external-cron approach works but is awkward: it reimplements the existing
broker inactivity detection, has to run with admin credentials, adds a moving
part to the deployment, and lacks integration with the existing
`InactiveTopicPolicies` configuration surface.

This PIP adds a first-class broker option that performs the same idle-topic
detection the broker already does for delete-GC, but performs a **close**
(unload) instead of a delete when the topic is determined to be inactive.

# Goals

## In Scope

- A new broker configuration `brokerCloseInactiveTopicsEnabled` (default
`false`) that, when enabled, causes the inactivity monitor to **close**
(unload) topics determined to be inactive rather than delete them.
- Mutual exclusion with `brokerDeleteInactiveTopicsEnabled`: only one of the
two may be enabled at a time. Broker start fails fast if both are set.
- Reuse of the existing inactivity detection: same
`brokerDeleteInactiveTopicsMode`, `brokerDeleteInactiveTopicsFrequencySeconds`,
and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` settings determine
when a topic is considered inactive. No new detection surface is introduced
for v1.
- Data preservation guarantee: only the in-memory topic instance is released.
BookKeeper ledgers, topic metadata, subscriptions, cursors, and topic-level
policies are all retained. The next producer/consumer reconnect transparently
reloads the topic.
- Coverage for both `PersistentTopic` and `NonPersistentTopic`.

## Out of Scope

- A per-topic or per-namespace override for close-on-inactive. The v1 knob is
broker-level only; a follow-up PIP can extend `InactiveTopicPolicies` with
an action field (`delete` vs `close`) if operators want per-namespace
control.
- Changing the existing `InactiveTopicDeleteMode` enum or
`InactiveTopicPolicies` schema.
- A new admin API. Manual `pulsar-admin topics unload` remains available for
ad-hoc use.
- Partitioned-topic metadata handling changes. Because nothing is deleted,
`brokerDeleteInactivePartitionedTopicMetadataEnabled` has no interaction
with the new close mode.
- Changing the detection cadence or detection semantics beyond reusing
what's there.

# High Level Design

The existing inactivity monitor in `BrokerService.startInactivityMonitor()`
schedules `checkGC()` on all loaded topics when
`brokerDeleteInactiveTopicsEnabled` is true. With this PIP, the monitor also
runs when `brokerCloseInactiveTopicsEnabled` is true.

`PersistentTopic.checkGC()` and `NonPersistentTopic.checkGC()` gain a branch:
if close mode is enabled (and delete mode is not), after the same
isActive / duration / replication-producer checks pass, the topic calls its
own `close(disconnectClients=true, closeWithoutWaitingClientDisconnect=false)`
path. That path already:

- disconnects producers and consumers,
- closes subscriptions and the managed ledger (without deleting ledgers),
- removes the topic entry from `BrokerService.topics` via `disposeTopic` →
`removeTopicFromCache`.

Data on BookKeeper is untouched. Subsequent lookups reload the topic from
metadata and BookKeeper exactly as they do after a manual unload or a broker
restart.

The **retention-window check** (`shouldTopicBeRetained`) is intentionally
bypassed for the close branch — that check exists to avoid deleting data
that's still within the retention window, which is moot when we are not
deleting anything.

# Detailed Design

## Public-facing Changes

### Configuration

One new dynamic broker configuration in `ServiceConfiguration`:

```java
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Enable closing (unloading from broker memory) of inactive topics without deleting their data. ..."
)
private boolean brokerCloseInactiveTopicsEnabled = false;
```

Behavior:

- Default: `false` (no behavior change for existing deployments).
- Mutually exclusive with `brokerDeleteInactiveTopicsEnabled`. If both are
`true` at broker startup, the broker fails fast with an
`IllegalArgumentException`.
- Dynamic: can be changed at runtime via the existing dynamic-config
mechanism. Dynamic transitions that would produce a conflicting state
(both flags simultaneously true) are rejected by the same validation path.

Reused configuration (no changes):

- `brokerDeleteInactiveTopicsMode` — selects `delete_when_no_subscriptions`
vs `delete_when_subscriptions_caught_up` semantics for the inactivity check.
The name is retained for v1 to avoid churn; a rename to a neutral name
(e.g. `inactiveTopicsMode`) could be proposed separately.
- `brokerDeleteInactiveTopicsFrequencySeconds` — how often the monitor runs.
- `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` — inactivity
threshold before the action fires.

### Public API / Binary / Wire

None. No changes to the Pulsar admin API, client protocol, or topic
policies schema. Topic policies — including `InactiveTopicPolicies` — are
untouched.

### CLI

None.

### Metrics

None added. Existing per-topic metrics naturally shrink when topics are
closed. A follow-up could add a broker-level counter
`pulsar_broker_topic_inactive_close_total` but it is not required for v1.

## Design & Implementation Details

### Validation

`PulsarService.start()` already performs static config validation. We add:

```java
if (config.isBrokerDeleteInactiveTopicsEnabled() && config.isBrokerCloseInactiveTopicsEnabled()) {
throw new IllegalArgumentException(
"brokerDeleteInactiveTopicsEnabled and brokerCloseInactiveTopicsEnabled are mutually "
+ "exclusive. Enable at most one of them.");
}
```

### Scheduler

`BrokerService.startInactivityMonitor()` is updated to start the monitor
when **either** flag is enabled:

```java
if (config.isBrokerDeleteInactiveTopicsEnabled() || config.isBrokerCloseInactiveTopicsEnabled()) {
int interval = config.getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval,
TimeUnit.SECONDS);
}
```

### Helper on AbstractTopic

```java
public boolean isCloseWhileInactive() {
return brokerService.pulsar().getConfiguration().isBrokerCloseInactiveTopicsEnabled();
}
```

Kept broker-level only in v1; future work may read from a per-topic policy.

### PersistentTopic.checkGC

The method now branches on action:

1. Compute `deleteEnabled = isDeleteWhileInactive()` and
`closeEnabled = !deleteEnabled && isCloseWhileInactive()`. If neither is
true, return.
2. Detect inactivity using the existing `isActive(deleteMode)` and the
inactivity-duration check.
3. **New for close mode:** skip the `shouldTopicBeRetained()` guard, since
retention only guards against data loss, which is irrelevant here.
4. Close replication producers first (same as today). If remote producers
are connected, abort with `TopicBusyException`.
5. If close mode is active, call `close(true, false)`. That path already
disconnects clients, closes subscriptions, closes the managed ledger,
and removes the topic from the broker cache via `disposeTopic`. Log
success; on `TopicBusyException`, log at debug (topic became active
again and will be reloaded).
6. Otherwise proceed with the existing delete branch unchanged.

### NonPersistentTopic.checkGC

Parallel change: same `deleteEnabled`/`closeEnabled` gate; if close mode is
active, call `close(true, false)` and skip `tryToDeletePartitionedMetadata`.

### Race: topic reloaded before close completes

A client can connect between the inactivity check and the call to `close()`.
This is already handled by `close()`'s fencing logic: `fenceTopicToCloseOrDelete`
rejects new client operations during close. If a lookup races in and sees
the topic is gone from the cache it will re-create a fresh `PersistentTopic`
instance from metadata, which is exactly the desired behavior.

### Race: close completes and topic is immediately reloaded

Expected. The new `PersistentTopic` instance has its own `lastActive`
initialized to now; another full inactivity window must elapse before it
becomes a close candidate again. Activity produced after reload resets the
clock as usual.

### Interaction with unload-based load balancing

The existing bundle-unload mechanism and this new inactive-close mechanism
are independent. A bundle unload closes every topic in the bundle
regardless of activity; the new mechanism closes only inactive topics,
cluster-wide. They compose without conflict.

### Interaction with `brokerDeleteInactivePartitionedTopicMetadataEnabled`

Because nothing is deleted, partitioned-topic metadata cleanup does not run
under close mode. The metadata (partition count, topic policies) is
preserved.

### Backwards compatibility

The new configuration defaults to `false`, so existing deployments see no
behavioral change.

## Security Considerations

None. This PIP introduces no new network-accessible endpoints, no new
credentials, and no new authorization surface. Inactivity detection and
close use the same internal code paths that the broker already executes for
the delete variant and for admin-driven unload.

# Monitoring

Operators can watch for the log line
`Topic closed successfully due to inactivity` to observe the feature
firing. The existing `pulsar_topics_count` metric and per-topic metric
series naturally shrink as idle topics are closed. Producers/consumers
reconnecting to a closed topic exercise the same load-topic path that's
already exercised at broker startup.

# General Notes

This PIP is intentionally minimal — one new flag, one branch in `checkGC`.
Everything else is reused: the inactivity detection, the close code path,
the dispose-from-cache machinery, and the admin-driven unload semantics.

# Links

* Mailing List discussion thread: TBD
* Mailing List voting thread: TBD
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,21 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Enable closing (unloading from broker memory) of inactive topics without deleting their data.\n"
+ "When a topic is deemed inactive (no producers, no subscriptions, or subscriptions caught up based on\n"
+ "the configured mode), the broker will close the topic instance, releasing in-memory resources such as\n"
+ "the managed ledger cache, subscription state, and per-topic metrics. The topic data in BookKeeper is\n"
+ "preserved; clients will transparently reload the topic on the next produce/consume.\n"
+ "This option is mutually exclusive with 'brokerDeleteInactiveTopicsEnabled': only one of the two may\n"
+ "be enabled at a time. The inactivity detection reuses 'brokerDeleteInactiveTopicsMode',\n"
+ "'brokerDeleteInactiveTopicsFrequencySeconds', and\n"
+ "'brokerDeleteInactiveTopicsMaxInactiveDurationSeconds'."
)
private boolean brokerCloseInactiveTopicsEnabled = false;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,12 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

if (config.isBrokerDeleteInactiveTopicsEnabled() && config.isBrokerCloseInactiveTopicsEnabled()) {
throw new IllegalArgumentException(
"brokerDeleteInactiveTopicsEnabled and brokerCloseInactiveTopicsEnabled are mutually "
+ "exclusive. Enable at most one of them.");
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);
openTelemetryProducerStats = new OpenTelemetryProducerStats(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,10 @@ public boolean isDeleteWhileInactive() {
return topicPolicies.getInactiveTopicPolicies().get().isDeleteWhileInactive();
}

public boolean isCloseWhileInactive() {
return brokerService.pulsar().getConfiguration().isBrokerCloseInactiveTopicsEnabled();
}

public boolean deletePartitionedTopicMetadataWhileInactive() {
return brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,9 @@ protected void startDeduplicationSnapshotMonitor() {
}

protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
ServiceConfiguration config = pulsar().getConfiguration();
if (config.isBrokerDeleteInactiveTopicsEnabled() || config.isBrokerCloseInactiveTopicsEnabled()) {
int interval = config.getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> checkGC(), interval, interval,
TimeUnit.SECONDS);
}
Expand Down
Loading
Loading