Skip to content

[feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect#25687

Open
merlimat wants to merge 1 commit intoapache:masterfrom
merlimat:st-v5-dag-reconnect
Open

[feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect#25687
merlimat wants to merge 1 commit intoapache:masterfrom
merlimat:st-v5-dag-reconnect

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 5, 2026

Summary

Replace the TODO in DagWatchClient.connectionClosed() that left long-lived producers / consumers stranded on a stale layout after a transient broker disconnect. The DAG watch is the consumer / producer's only path to learning about split / merge events; without reconnect, a single network blip silently drops the client off future layout updates and never surfaces an error to the application.

The new logic mirrors the proven pattern used by ScalableTopicsWatcher (namespace-watcher session) and ScalableConsumerClient (controller session):

  • start() (initial create) keeps its current behavior — the future is failed on disconnect so producer.create() / consumer.subscribe() fails fast when the broker is unreachable.
  • After the initial layout has arrived, connectionClosed() schedules a reconnect with exponential backoff (100 ms → 30 s). The reconnect path calls getConnection again, re-registers the same sessionId on the fresh ClientCnx, and re-issues ScalableTopicLookup; the broker re-pushes the current layout via onUpdate, which resets the backoff.
  • The connect / reconnect logic is shared in a single attach(cnx) method so both paths use identical wiring.

Adds a forceCloseConnectionForTesting() hook (mirroring the one on ScalableConsumerClient) for cross-module integration tests.

Test plan

  • V5DagWatchAutoReconnectTest (3 integration tests):
    • Producer's DAG channel force-closed mid-life — sends after the close still succeed.
    • Consumer's DAG channel force-closed mid-life — receives after the close still succeed.
    • Reflective check that the cnx field on DagWatchClient gets re-populated within the backoff window.
  • Full V5 broker test suite green (128/128).
  • pulsar-client-v5 and pulsar-broker checkstyle clean.

…nnect

Replace the TODO in DagWatchClient.connectionClosed() that left long-lived
producers / consumers stranded on a stale layout after a transient broker
disconnect. The DAG watch is the consumer / producer's only path to
learning about split / merge events; without reconnect, a single network
blip silently drops the client off future layout updates and never
surfaces an error to the application.

The new logic mirrors the proven pattern used by ScalableTopicsWatcher
(used for the namespace-watcher session) and ScalableConsumerClient (used
for the controller session):

- start() (initial create) keeps its current behavior: the future is
  failed on disconnect so producer.create() / consumer.subscribe() fails
  fast when the broker is unreachable.
- After the initial layout has arrived, connectionClosed() schedules a
  reconnect with exponential backoff (100 ms initial, 30 s max). The
  reconnect path calls getConnection again, re-registers the same
  sessionId on the fresh ClientCnx, and re-issues ScalableTopicLookup;
  the broker re-pushes the current layout via onUpdate, which resets the
  backoff.
- The connect/reconnect logic is shared in a single attach(cnx) method
  so both paths use identical wiring.

Adds a forceCloseConnectionForTesting() hook (mirroring the one on
ScalableConsumerClient) for cross-module integration tests.

V5DagWatchAutoReconnectTest force-closes the DAG channel underneath a
producer and a consumer and asserts each continues operating. A third
test reflectively verifies the DagWatchClient's cnx field is repopulated
within the backoff window.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@merlimat merlimat changed the title PIP-468: DagWatchClient auto-reconnects on broker disconnect [feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect May 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants