Skip to content

Commit cbe81ef

Browse files
committed
fix(replication): key logical replication leader lock on slot name
The LogicalReplicationClient leader lock (Redlock) was keyed on the client name, but a Postgres logical replication slot allows exactly one consumer, so the lock must serialize consumers of a given slot. When two clients target the same slot with different names (e.g. across a rolling deploy where the name changed but the slot did not), each acquired a distinct lock, both became leader, and the second to run START_REPLICATION failed with 'replication slot is active'. Since START_REPLICATION is fire-and-forget and only logged on error, that consumer stopped and replication stalled until restarted. Key the lock on slotName instead. Adds a regression test (two clients, same slot, different name) verified to fail before and pass after.
1 parent 70bca82 commit cbe81ef

3 files changed

Lines changed: 66 additions & 2 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Key the logical-replication leader lock on the slot name (not the client name) so consumers of the same replication slot serialize correctly across restarts and rolling deploys

internal-packages/replication/src/client.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,58 @@ describe("Replication Client", () => {
181181
expect(slotExists[0].exists).toBe(false);
182182
}
183183
);
184+
185+
postgresAndRedisTest(
186+
"two clients on the same slot must not both lead (rolling-deploy handoff)",
187+
async ({ postgresContainer, prisma, redisOptions }) => {
188+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
189+
190+
const shared = {
191+
slotName: "handoff_slot",
192+
publicationName: "handoff_publication",
193+
redisOptions,
194+
table: "TaskRun",
195+
pgConfig: { connectionString: postgresContainer.getConnectionUri() },
196+
};
197+
198+
// Leader on the shared slot.
199+
const a = new LogicalReplicationClient({ ...shared, name: "runs-replication" });
200+
const aElections: boolean[] = [];
201+
a.events.on("leaderElection", (won) => aElections.push(won));
202+
a.events.on("error", () => {});
203+
await a.subscribe();
204+
// Let A's walsender actually attach to the slot before B races it.
205+
await setTimeout(1000);
206+
207+
// Second client, SAME slot, DIFFERENT name — the rolling-deploy shape that
208+
// regressed (name changed "runs-replication" -> "runs-replication:legacy").
209+
const b = new LogicalReplicationClient({
210+
...shared,
211+
name: "runs-replication:legacy",
212+
leaderLockTimeoutMs: 1000,
213+
leaderLockAcquireAdditionalTimeMs: 250,
214+
leaderLockRetryIntervalMs: 200,
215+
});
216+
const bElections: boolean[] = [];
217+
const bErrors: Array<unknown> = [];
218+
b.events.on("leaderElection", (won) => bElections.push(won));
219+
b.events.on("error", (error) => bErrors.push(error));
220+
await b.subscribe();
221+
await setTimeout(500);
222+
223+
expect(aElections).toContain(true);
224+
// B must not also win leadership on the same slot, nor race START_REPLICATION
225+
// into a "slot is active" error. With a name-keyed lock it did both.
226+
expect(bElections).not.toContain(true);
227+
expect(bElections).toContain(false);
228+
expect(
229+
bErrors
230+
.map((e) => String((e as Error)?.message ?? e))
231+
.some((m) => /replication slot .* is active|already active/i.test(m))
232+
).toBe(false);
233+
234+
await a.stop();
235+
await b.stop();
236+
}
237+
);
184238
});

internal-packages/replication/src/client.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ export interface LogicalReplicationClientOptions {
1919
pgConfig: ClientConfig;
2020

2121
/**
22-
* The name of this LogicalReplicationClient instance, used for leader election.
22+
* The name of this LogicalReplicationClient instance, used for logging and the
23+
* Postgres application_name. Leader election is keyed on `slotName`.
2324
*/
2425
name: string;
2526
/**
@@ -703,8 +704,11 @@ export class LogicalReplicationClient {
703704

704705
while (Date.now() - startTime < maxWaitTime) {
705706
try {
707+
// Key the leader lock on the SLOT, not `name`: Postgres allows one
708+
// consumer per slot, so consumers of the same slot must contend on the
709+
// same lock (a name-keyed lock lets old+new pods race it across a deploy).
706710
this.leaderLock = await this.redlock.acquire(
707-
[`logical-replication-client:${this.options.name}`],
711+
[`logical-replication-client:${this.options.slotName}`],
708712
this.leaderLockTimeoutMs
709713
);
710714

0 commit comments

Comments
 (0)