diff --git a/async_postgres/pg_pool.nim b/async_postgres/pg_pool.nim index f620e47..7b1ff46 100644 --- a/async_postgres/pg_pool.nim +++ b/async_postgres/pg_pool.nim @@ -40,6 +40,12 @@ type maxPipelineSize*: int ## Max operations per pipeline batch per connection (default 0=unlimited). ## Only used when `pipelined` is true. + connectBackoffInitial*: Duration + ## First backoff after a maintenance-loop connect failure (default 1s, + ## ZeroDuration=disabled, falls back to fixed `maintenanceInterval` retries). + connectBackoffMax*: Duration + ## Cap for exponential backoff growth (default 60s). Doubles each failure + ## until reaching this value. PooledConn = object ## An idle connection held by the pool with its last-used timestamp. @@ -86,6 +92,12 @@ type dispatchScheduled: bool ## Whether a dispatch callback is pending pendingCloses: seq[Future[void]] ## Fire-and-forget close tasks spawned by closeNoWait, awaited on pool.close() + consecutiveConnectFailures: int + ## Counter for exponential backoff in the maintenance loop. Reset to 0 + ## whenever a connect succeeds (in maintenance or acquire). + nextConnectRetryAt: Moment + ## Monotonic deadline before the maintenance loop is allowed to retry + ## opening a new connection. Zero means "no pending backoff". proc initPoolConfig*( connConfig: ConnConfig, @@ -101,11 +113,17 @@ proc initPoolConfig*( resetQuery = "", pipelined = false, maxPipelineSize = 0, + connectBackoffInitial = seconds(1), + connectBackoffMax = seconds(60), ): PoolConfig = ## Create a pool configuration with sensible defaults. ## `minSize` idle connections are maintained; up to `maxSize` total. ## Set `resetQuery` to clean session state on release (e.g. "DISCARD ALL" for PgBouncer). ## Set `pipelined` to true to enable implicit query batching for `pool.exec`/`pool.query`. + ## When the maintenance loop fails to open a connection, subsequent retries + ## use exponential backoff starting at `connectBackoffInitial`, doubling up to + ## `connectBackoffMax`. Set `connectBackoffInitial = ZeroDuration` to disable + ## backoff and fall back to fixed-interval retries. ## ## Raises `ValueError` if parameters are invalid. if minSize < 0: @@ -118,6 +136,10 @@ proc initPoolConfig*( ) if maxWaiters < -1: raise newException(ValueError, "maxWaiters must be >= -1, got " & $maxWaiters) + if connectBackoffInitial < ZeroDuration: + raise newException(ValueError, "connectBackoffInitial must be >= 0") + if connectBackoffMax < connectBackoffInitial: + raise newException(ValueError, "connectBackoffMax must be >= connectBackoffInitial") PoolConfig( connConfig: connConfig, @@ -133,6 +155,8 @@ proc initPoolConfig*( resetQuery: resetQuery, pipelined: pipelined, maxPipelineSize: maxPipelineSize, + connectBackoffInitial: connectBackoffInitial, + connectBackoffMax: connectBackoffMax, ) proc poolConfig*(pool: PgPool): PoolConfig = @@ -232,6 +256,20 @@ proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} = except CatchableError: discard +proc computeConnectBackoff*(initial, maxDelay: Duration, failures: int): Duration = + ## Exponential backoff for repeated connect failures: returns + ## `initial * 2^(failures-1)` capped at `maxDelay`. Returns `ZeroDuration` + ## when backoff is disabled (`initial == ZeroDuration`) or `failures <= 0`. + if failures <= 0 or initial == ZeroDuration: + return ZeroDuration + result = initial + for _ in 1 ..< failures: + if result >= maxDelay: + return maxDelay + result = result + result + if result > maxDelay: + result = maxDelay + proc maintenanceLoop(pool: PgPool) {.async.} = while not pool.closed: await sleepAsync(pool.config.maintenanceInterval) @@ -270,6 +308,13 @@ proc maintenanceLoop(pool: PgPool) {.async.} = pool.idle = remaining + # Skip the replenish phase while we are inside a backoff window from a + # recent failure. Idle pruning above still runs every interval — only the + # connect attempts are throttled, so a backed-off pool keeps closing dead + # idle/expired connections normally. + if pool.consecutiveConnectFailures > 0 and Moment.now() < pool.nextConnectRetryAt: + continue + # Replenish to minSize (best-effort) let currentTotal = pool.idle.len + pool.active let needed = max(0, pool.config.minSize - currentTotal) @@ -287,8 +332,16 @@ proc maintenanceLoop(pool: PgPool) {.async.} = conn.ownerPool = pool pool.metrics.createCount.inc pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: now)) + pool.consecutiveConnectFailures = 0 except CatchableError: - break # best-effort, retry next interval + pool.consecutiveConnectFailures.inc + let delay = computeConnectBackoff( + pool.config.connectBackoffInitial, pool.config.connectBackoffMax, + pool.consecutiveConnectFailures, + ) + if delay > ZeroDuration: + pool.nextConnectRetryAt = Moment.now() + delay + break # best-effort, retry next interval (or after backoff) proc newPool*(config: PoolConfig): Future[PgPool] {.async.} = ## Create a new connection pool and establish `minSize` initial connections. @@ -445,6 +498,9 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} = let conn = await connect(pool.config.connConfig) conn.ownerPool = pool pool.metrics.createCount.inc + # A successful caller-driven connect signals the DB is reachable — + # let the maintenance loop resume immediate replenishment. + pool.consecutiveConnectFailures = 0 recordAcquire() return (conn, true) except CatchableError as e: diff --git a/tests/test_pool.nim b/tests/test_pool.nim index 74eed01..351e9c1 100644 --- a/tests/test_pool.nim +++ b/tests/test_pool.nim @@ -180,6 +180,64 @@ suite "initPoolConfig": let cfg = initPoolConfig(ConnConfig(host: "localhost", port: 5432), minSize = 0) check cfg.minSize == 0 + test "backoff defaults": + let cfg = initPoolConfig(ConnConfig(host: "localhost", port: 5432)) + check cfg.connectBackoffInitial == seconds(1) + check cfg.connectBackoffMax == seconds(60) + + test "backoff custom overrides": + let cfg = initPoolConfig( + ConnConfig(host: "localhost", port: 5432), + connectBackoffInitial = milliseconds(100), + connectBackoffMax = seconds(10), + ) + check cfg.connectBackoffInitial == milliseconds(100) + check cfg.connectBackoffMax == seconds(10) + + test "backoff disabled with ZeroDuration initial": + let cfg = initPoolConfig( + ConnConfig(host: "localhost", port: 5432), + connectBackoffInitial = ZeroDuration, + connectBackoffMax = ZeroDuration, + ) + check cfg.connectBackoffInitial == ZeroDuration + + test "validation: connectBackoffMax < connectBackoffInitial": + expect(ValueError): + discard initPoolConfig( + ConnConfig(host: "localhost", port: 5432), + connectBackoffInitial = seconds(10), + connectBackoffMax = seconds(1), + ) + +suite "computeConnectBackoff": + test "zero failures returns ZeroDuration": + check computeConnectBackoff(seconds(1), seconds(60), 0) == ZeroDuration + + test "negative failures returns ZeroDuration": + check computeConnectBackoff(seconds(1), seconds(60), -1) == ZeroDuration + + test "disabled when initial is ZeroDuration": + check computeConnectBackoff(ZeroDuration, seconds(60), 5) == ZeroDuration + + test "first failure returns initial": + check computeConnectBackoff(seconds(1), seconds(60), 1) == seconds(1) + + test "doubles on each failure up to max": + check computeConnectBackoff(seconds(1), seconds(60), 2) == seconds(2) + check computeConnectBackoff(seconds(1), seconds(60), 3) == seconds(4) + check computeConnectBackoff(seconds(1), seconds(60), 4) == seconds(8) + check computeConnectBackoff(seconds(1), seconds(60), 5) == seconds(16) + check computeConnectBackoff(seconds(1), seconds(60), 6) == seconds(32) + + test "caps at maxDelay": + # 2^6 = 64 > 60, so 7th failure caps + check computeConnectBackoff(seconds(1), seconds(60), 7) == seconds(60) + check computeConnectBackoff(seconds(1), seconds(60), 50) == seconds(60) + + test "initial already exceeds max returns max": + check computeConnectBackoff(seconds(120), seconds(60), 1) == seconds(60) + suite "Pool release": test "release to idle queue": let pool = makePool()