[fix][ml] Avoid blocking the factory-shutdown thread on serial managed-ledger close#26063
[fix][ml] Avoid blocking the factory-shutdown thread on serial managed-ledger close#26063merlimat wants to merge 1 commit into
Conversation
…d-ledger close ManagedLedgerFactoryImpl.shutdownAsync first initiates an async close (asyncClose) for every managed ledger known at the start of shutdown. A second pass, inside bookkeeperFuture.thenRun(...), then closes any ledgers that were added after that snapshot -- but it did so with a serial, blocking managedLedger.close() (which waits on a CountDownLatch), blocking the thread that completes the shutdown future. Change that second pass from thenRun to thenCompose, and replace the blocking close() with asyncClose(), collecting the per-ledger futures and awaiting them with Futures.waitForAll. In the common case this pass is empty (the first pass has already drained the ledgers map), so it is a no-op; when ledgers were added concurrently during shutdown they now close in parallel without blocking the continuation thread. The first pass and the rest of the shutdown sequence are unchanged.
| }, null); | ||
| } | ||
| })); | ||
| return Futures.waitForAll(remainingFutures); |
There was a problem hiding this comment.
Previously this path used managedLedger.close(), which bounded the wait with AsyncOperationTimeoutSeconds before continuing shutdown. With the new asyncClose() + waitForAll flow, do we still want to keep a bounded wait here?
| cacheEvictionExecutor.shutdownNow(); | ||
|
|
||
| List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet()); | ||
| List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size()); |
There was a problem hiding this comment.
In the existing code, futures is never used for other than adding the close futures to it. It seems that it would be necessary to also perform Futures.waitForAll for futures.
| if (!ledgerFuture.isDone()) { | ||
| ledgerFuture.completeExceptionally( | ||
| new ManagedLedgerException.ManagedLedgerFactoryClosedException()); |
There was a problem hiding this comment.
In the existing logic, it's possible to hit this branch since completion of futures isn't waited. Perhaps the intention has been to wait for completion for some time if the futures don't complete in time, the future would be completed. I think that this logic could all be rolled in to the first loop by adding timeout handling to complete the future. In that case wouldn't be a need for this second loop.
Motivation
ManagedLedgerFactoryImpl.shutdownAsyncfirst initiates an async close (asyncClose) for every managed ledger known at the start of shutdown. A second pass, insidebookkeeperFuture.thenRun(...), then closes any ledgers that were added after that snapshot — but it did so with a serial, blockingmanagedLedger.close()(which waits on aCountDownLatch), blocking the thread that completes the shutdown future.Modifications
Change that second pass from
thenRuntothenCompose, and replace the blockingclose()withasyncClose(), collecting the per-ledger futures and awaiting them withFutures.waitForAll. In the common case this pass is empty (the first pass has already drained theledgersmap), so it is a no-op; when ledgers were added concurrently during shutdown they now close in parallel without blocking the continuation thread. The first pass and the rest of the shutdown sequence are unchanged.Verifying this change
Covered by
ManagedLedgerFactoryShutdownTest.openEncounteredShutdown(shutdown while a ledger open is in progress) — passes.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes