Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class Pool extends EventEmitter {
return this._clients.length > this.options.min
}

_hasActiveTransaction(client) {
return client && (client._txStatus === 'T' || client._txStatus === 'E')
}

_pulseQueue() {
this.log('pulse queue')
if (this.ended) {
Expand Down Expand Up @@ -359,10 +363,20 @@ class Pool extends EventEmitter {
this.emit('release', err, client)

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (
err ||
this.ending ||
!client._queryable ||
client._ending ||
this._hasActiveTransaction(client) ||
client._poolUseCount >= this.options.maxUses
) {
if (client._poolUseCount >= this.options.maxUses) {
this.log('remove expended client')
}
if (this._hasActiveTransaction(client)) {
this.log('remove client with leaked transaction')
}

return this._remove(client, this._pulseQueue.bind(this))
}
Expand Down
139 changes: 139 additions & 0 deletions packages/pg-pool/test/leaked-pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
'use strict'

const expect = require('expect.js')
const describe = require('mocha').describe
const it = require('mocha').it
const Pool = require('..')

describe('leaked connection pool guard', function () {
it('removes a client with an open transaction on release', async function () {
const logMessages = []
const pool = new Pool({
max: 1,
log: (msg) => logMessages.push(msg),
})
const client = await pool.connect()
await client.query('BEGIN')
expect(client._txStatus).to.be('T')

client.release()
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)
expect(logMessages).to.contain('remove client with leaked transaction')

// pool recovers by creating a fresh connection
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
expect(pool.totalCount).to.be(1)
expect(pool.idleCount).to.be(1)

await pool.end()
})

it('removes a client in a failed transaction state on release', async function () {
const pool = new Pool({ max: 1 })
const client = await pool.connect()
await client.query('BEGIN')
try {
await client.query('SELECT invalid_column FROM nonexistent_table')
} catch (e) {
// swallow the error to avoid pool close the connection
}
// The ReadyForQuery message with status 'E' may arrive on a separate I/O event.
// Issue a follow-up query to ensure it has been processed — this will also fail
// (since the transaction is aborted) but guarantees _txStatus is updated.
try {
await client.query('SELECT 1')
} catch (e) {
// expected — "current transaction is aborted"
}
expect(client._txStatus).to.be('E')

client.release()
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)

// pool recovers by creating a fresh connection
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
expect(pool.totalCount).to.be(1)
expect(pool.idleCount).to.be(1)

await pool.end()
})

it('only removes connections with open transactions, keeps idle ones', async function () {
const pool = new Pool({ max: 3 })
const clientA = await pool.connect()
const clientB = await pool.connect()
const clientC = await pool.connect()

// Client A: open transaction (leaked)
await clientA.query('BEGIN')
expect(clientA._txStatus).to.be('T')

// Client B: normal query (idle)
await clientB.query('SELECT 1')
expect(clientB._txStatus).to.be('I')

// Client C: committed transaction (idle)
await clientC.query('BEGIN')
await clientC.query('COMMIT')
expect(clientC._txStatus).to.be('I')

clientA.release()
clientB.release()
clientC.release()

// A was removed, B and C kept
expect(pool.totalCount).to.be(2)
expect(pool.idleCount).to.be(2)
await pool.end()
})

describe('pool.query', function () {
it('removes a client after pool.query leaks transaction via BEGIN', async function () {
const logMessages = []
const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) })

await pool.query('BEGIN')

// Client auto-released with txStatus='T', should be removed
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)
expect(logMessages).to.contain('remove client with leaked transaction')

// Verify pool recovers
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
expect(pool.totalCount).to.be(1)
expect(pool.idleCount).to.be(1)

await pool.end()
})

it('removes a client after pool.query in failed transaction state', async function () {
const pool = new Pool({ max: 1 })

await pool.query('BEGIN')

try {
await pool.query('SELECT invalid_column FROM nonexistent_table')
} catch (e) {
// Expected error
}

// Client with txStatus='E' should be removed
expect(pool.totalCount).to.be(0)
expect(pool.idleCount).to.be(0)

// Pool recovers
const { rows } = await pool.query('SELECT 1 as num')
expect(rows[0].num).to.be(1)
expect(pool.totalCount).to.be(1)
expect(pool.idleCount).to.be(1)

await pool.end()
})
})
})
2 changes: 2 additions & 0 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Client extends EventEmitter {
this._connectionError = false
this._queryable = true
this._activeQuery = null
this._txStatus = null

this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
this.connection =
Expand Down Expand Up @@ -356,6 +357,7 @@ class Client extends EventEmitter {
}
const activeQuery = this._getActiveQuery()
this._activeQuery = null
this._txStatus = msg?.status ?? null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check just for tests and pg-native? Both should probably be updated to consistently report a status in their readyForQuery events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added integration tests and updated PR description. However pg-native is not supported because this is a protocol level feature.

this.readyForQuery = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
Expand Down
81 changes: 81 additions & 0 deletions packages/pg/test/integration/client/txstatus-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict'
const helper = require('./test-helper')
const suite = new helper.Suite()
const pg = helper.pg
const assert = require('assert')

// txStatus tracking is not implemented in native client
if (!helper.args.native) {
suite.test('txStatus tracking', function (done) {
const client = new pg.Client()
client.connect(
assert.success(function () {
// Run a simple query to initialize txStatus
client.query(
'SELECT 1',
assert.success(function () {
// Test 1: Initial state after query (should be idle)
assert.equal(client._txStatus, 'I', 'should start in idle state')

// Test 2: BEGIN transaction
client.query(
'BEGIN',
assert.success(function () {
assert.equal(client._txStatus, 'T', 'should be in transaction state')

// Test 3: COMMIT
client.query(
'COMMIT',
assert.success(function () {
assert.equal(client._txStatus, 'I', 'should return to idle after commit')

client.end(done)
})
)
})
)
})
)
})
)
})

suite.test('txStatus error state', function (done) {
const client = new pg.Client()
client.connect(
assert.success(function () {
// Run a simple query to initialize txStatus
client.query(
'SELECT 1',
assert.success(function () {
client.query(
'BEGIN',
assert.success(function () {
// Execute invalid SQL to trigger error state
client.query('INVALID SQL SYNTAX', function (err) {
assert(err, 'should receive error from invalid query')

// Issue a sync query to ensure ReadyForQuery has been processed
// This guarantees _txStatus has been updated
client.query('SELECT 1', function () {
// This callback fires after ReadyForQuery is processed
assert.equal(client._txStatus, 'E', 'should be in error state')

// Rollback to recover
client.query(
'ROLLBACK',
assert.success(function () {
assert.equal(client._txStatus, 'I', 'should return to idle after rollback from error')
client.end(done)
})
)
})
})
})
)
})
)
})
)
})
}