Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion async_postgres/pg_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type
maxPipelineSize*: int
## Max operations per pipeline batch per connection (default 0=unlimited).
## Only used when `pipelined` is true.
connectBackoffInitial*: Duration
## First backoff after a maintenance-loop connect failure (default 1s,
## ZeroDuration=disabled, falls back to fixed `maintenanceInterval` retries).
connectBackoffMax*: Duration
## Cap for exponential backoff growth (default 60s). Doubles each failure
## until reaching this value.

PooledConn = object
## An idle connection held by the pool with its last-used timestamp.
Expand Down Expand Up @@ -86,6 +92,12 @@ type
dispatchScheduled: bool ## Whether a dispatch callback is pending
pendingCloses: seq[Future[void]]
## Fire-and-forget close tasks spawned by closeNoWait, awaited on pool.close()
consecutiveConnectFailures: int
## Counter for exponential backoff in the maintenance loop. Reset to 0
## whenever a connect succeeds (in maintenance or acquire).
nextConnectRetryAt: Moment
## Monotonic deadline before the maintenance loop is allowed to retry
## opening a new connection. Zero means "no pending backoff".

proc initPoolConfig*(
connConfig: ConnConfig,
Expand All @@ -101,11 +113,17 @@ proc initPoolConfig*(
resetQuery = "",
pipelined = false,
maxPipelineSize = 0,
connectBackoffInitial = seconds(1),
connectBackoffMax = seconds(60),
): PoolConfig =
## Create a pool configuration with sensible defaults.
## `minSize` idle connections are maintained; up to `maxSize` total.
## Set `resetQuery` to clean session state on release (e.g. "DISCARD ALL" for PgBouncer).
## Set `pipelined` to true to enable implicit query batching for `pool.exec`/`pool.query`.
## When the maintenance loop fails to open a connection, subsequent retries
## use exponential backoff starting at `connectBackoffInitial`, doubling up to
## `connectBackoffMax`. Set `connectBackoffInitial = ZeroDuration` to disable
## backoff and fall back to fixed-interval retries.
##
## Raises `ValueError` if parameters are invalid.
if minSize < 0:
Expand All @@ -118,6 +136,10 @@ proc initPoolConfig*(
)
if maxWaiters < -1:
raise newException(ValueError, "maxWaiters must be >= -1, got " & $maxWaiters)
if connectBackoffInitial < ZeroDuration:
raise newException(ValueError, "connectBackoffInitial must be >= 0")
if connectBackoffMax < connectBackoffInitial:
raise newException(ValueError, "connectBackoffMax must be >= connectBackoffInitial")

PoolConfig(
connConfig: connConfig,
Expand All @@ -133,6 +155,8 @@ proc initPoolConfig*(
resetQuery: resetQuery,
pipelined: pipelined,
maxPipelineSize: maxPipelineSize,
connectBackoffInitial: connectBackoffInitial,
connectBackoffMax: connectBackoffMax,
)

proc poolConfig*(pool: PgPool): PoolConfig =
Expand Down Expand Up @@ -232,6 +256,20 @@ proc resetSession*(pool: PgPool, conn: PgConnection) {.async.} =
except CatchableError:
discard

proc computeConnectBackoff*(initial, maxDelay: Duration, failures: int): Duration =
## Exponential backoff for repeated connect failures: returns
## `initial * 2^(failures-1)` capped at `maxDelay`. Returns `ZeroDuration`
## when backoff is disabled (`initial == ZeroDuration`) or `failures <= 0`.
if failures <= 0 or initial == ZeroDuration:
return ZeroDuration
result = initial
for _ in 1 ..< failures:
if result >= maxDelay:
return maxDelay
result = result + result
if result > maxDelay:
result = maxDelay

proc maintenanceLoop(pool: PgPool) {.async.} =
while not pool.closed:
await sleepAsync(pool.config.maintenanceInterval)
Expand Down Expand Up @@ -270,6 +308,13 @@ proc maintenanceLoop(pool: PgPool) {.async.} =

pool.idle = remaining

# Skip the replenish phase while we are inside a backoff window from a
# recent failure. Idle pruning above still runs every interval — only the
# connect attempts are throttled, so a backed-off pool keeps closing dead
# idle/expired connections normally.
if pool.consecutiveConnectFailures > 0 and Moment.now() < pool.nextConnectRetryAt:
continue

# Replenish to minSize (best-effort)
let currentTotal = pool.idle.len + pool.active
let needed = max(0, pool.config.minSize - currentTotal)
Expand All @@ -287,8 +332,16 @@ proc maintenanceLoop(pool: PgPool) {.async.} =
conn.ownerPool = pool
pool.metrics.createCount.inc
pool.idle.addLast(PooledConn(conn: conn, lastUsedAt: now))
pool.consecutiveConnectFailures = 0
except CatchableError:
break # best-effort, retry next interval
pool.consecutiveConnectFailures.inc
let delay = computeConnectBackoff(
pool.config.connectBackoffInitial, pool.config.connectBackoffMax,
pool.consecutiveConnectFailures,
)
if delay > ZeroDuration:
pool.nextConnectRetryAt = Moment.now() + delay
break # best-effort, retry next interval (or after backoff)

proc newPool*(config: PoolConfig): Future[PgPool] {.async.} =
## Create a new connection pool and establish `minSize` initial connections.
Expand Down Expand Up @@ -445,6 +498,9 @@ proc acquireImpl(pool: PgPool): Future[AcquireResult] {.async.} =
let conn = await connect(pool.config.connConfig)
conn.ownerPool = pool
pool.metrics.createCount.inc
# A successful caller-driven connect signals the DB is reachable —
# let the maintenance loop resume immediate replenishment.
pool.consecutiveConnectFailures = 0
recordAcquire()
return (conn, true)
except CatchableError as e:
Expand Down
58 changes: 58 additions & 0 deletions tests/test_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,64 @@ suite "initPoolConfig":
let cfg = initPoolConfig(ConnConfig(host: "localhost", port: 5432), minSize = 0)
check cfg.minSize == 0

test "backoff defaults":
let cfg = initPoolConfig(ConnConfig(host: "localhost", port: 5432))
check cfg.connectBackoffInitial == seconds(1)
check cfg.connectBackoffMax == seconds(60)

test "backoff custom overrides":
let cfg = initPoolConfig(
ConnConfig(host: "localhost", port: 5432),
connectBackoffInitial = milliseconds(100),
connectBackoffMax = seconds(10),
)
check cfg.connectBackoffInitial == milliseconds(100)
check cfg.connectBackoffMax == seconds(10)

test "backoff disabled with ZeroDuration initial":
let cfg = initPoolConfig(
ConnConfig(host: "localhost", port: 5432),
connectBackoffInitial = ZeroDuration,
connectBackoffMax = ZeroDuration,
)
check cfg.connectBackoffInitial == ZeroDuration

test "validation: connectBackoffMax < connectBackoffInitial":
expect(ValueError):
discard initPoolConfig(
ConnConfig(host: "localhost", port: 5432),
connectBackoffInitial = seconds(10),
connectBackoffMax = seconds(1),
)

suite "computeConnectBackoff":
test "zero failures returns ZeroDuration":
check computeConnectBackoff(seconds(1), seconds(60), 0) == ZeroDuration

test "negative failures returns ZeroDuration":
check computeConnectBackoff(seconds(1), seconds(60), -1) == ZeroDuration

test "disabled when initial is ZeroDuration":
check computeConnectBackoff(ZeroDuration, seconds(60), 5) == ZeroDuration

test "first failure returns initial":
check computeConnectBackoff(seconds(1), seconds(60), 1) == seconds(1)

test "doubles on each failure up to max":
check computeConnectBackoff(seconds(1), seconds(60), 2) == seconds(2)
check computeConnectBackoff(seconds(1), seconds(60), 3) == seconds(4)
check computeConnectBackoff(seconds(1), seconds(60), 4) == seconds(8)
check computeConnectBackoff(seconds(1), seconds(60), 5) == seconds(16)
check computeConnectBackoff(seconds(1), seconds(60), 6) == seconds(32)

test "caps at maxDelay":
# 2^6 = 64 > 60, so 7th failure caps
check computeConnectBackoff(seconds(1), seconds(60), 7) == seconds(60)
check computeConnectBackoff(seconds(1), seconds(60), 50) == seconds(60)

test "initial already exceeds max returns max":
check computeConnectBackoff(seconds(120), seconds(60), 1) == seconds(60)

suite "Pool release":
test "release to idle queue":
let pool = makePool()
Expand Down