From 0a0792bd25305b05757dd3a85b15051b9d6aa4a4 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 8 Dec 2025 02:30:49 +0800 Subject: [PATCH 1/5] fix: resolve auth before join --- packages/loro-websocket/src/client/index.ts | 102 ++++++++++++++------ packages/loro-websocket/tests/e2e.test.ts | 47 +++++++++ 2 files changed, 117 insertions(+), 32 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index bdb79f3..a593775 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -24,6 +24,9 @@ import type { CrdtDocAdaptor } from "loro-adaptors"; export * from "loro-adaptors"; +export type AuthProvider = () => Uint8Array | Promise; +type AuthOption = Uint8Array | AuthProvider; + interface FragmentBatch { header: DocUpdateFragmentHeader; fragments: Map; @@ -173,7 +176,7 @@ export class LoroWebsocketClient { private roomAdaptors: Map = new Map(); // Track roomId for each active id so we can rejoin on reconnect private roomIds: Map = new Map(); - private roomAuth: Map = new Map(); + private roomAuth: Map = new Map(); private roomStatusListeners: Map< string, Set<(s: RoomJoinStatusValue) => void> @@ -206,6 +209,17 @@ export class LoroWebsocketClient { void this.connect(); } + private async resolveAuth(auth?: AuthOption): Promise { + if (typeof auth === "function") { + const value = await auth(); + if (!(value instanceof Uint8Array)) { + throw new Error("Auth provider must return Uint8Array"); + } + return value; + } + return auth ?? new Uint8Array(); + } + get socket(): WebSocket { return this.ws; } @@ -562,17 +576,27 @@ export class LoroWebsocketClient { if (!roomId) continue; const active = this.activeRooms.get(id); if (!active) continue; - this.sendRejoinRequest(id, roomId, adaptor, active.room, this.roomAuth.get(id)); + void this.sendRejoinRequest(id, roomId, adaptor, active.room, this.roomAuth.get(id)); } } - private sendRejoinRequest( + private async sendRejoinRequest( id: string, roomId: string, adaptor: CrdtDocAdaptor, room: LoroWebsocketClientRoom, - auth?: Uint8Array + auth?: AuthOption ) { + let authValue: Uint8Array; + try { + authValue = await this.resolveAuth(auth); + } catch (e) { + console.error("Failed to resolve auth for rejoin:", e); + this.cleanupRoom(roomId, adaptor.crdtType); + this.emitRoomStatus(id, RoomJoinStatus.Error); + return; + } + // Prepare a lightweight pending entry so JoinError handling can retry version formats const pending: PendingRoom = { room: Promise.resolve(room), @@ -594,7 +618,7 @@ export class LoroWebsocketClient { }, adaptor, roomId, - auth, + auth: authValue, isRejoin: true, }; this.pendingRooms.set(id, pending); @@ -603,7 +627,7 @@ export class LoroWebsocketClient { type: MessageType.JoinRequest, crdt: adaptor.crdtType, roomId, - auth: auth ?? new Uint8Array(), + auth: authValue, version: adaptor.getVersion(), } as JoinRequest); @@ -677,7 +701,7 @@ export class LoroWebsocketClient { // Drop any in-flight join since the server explicitly removed us this.pendingRooms.delete(roomId); if (shouldRejoin && active && adaptor) { - this.sendRejoinRequest(roomId, msg.roomId, adaptor, active.room, auth); + void this.sendRejoinRequest(roomId, msg.roomId, adaptor, active.room, auth); } else { // Remove local room state so client does not auto-retry unless requested this.cleanupRoom(msg.roomId, msg.crdt); @@ -925,7 +949,7 @@ export class LoroWebsocketClient { }: { roomId: string; crdtAdaptor: CrdtDocAdaptor; - auth?: Uint8Array; + auth?: AuthOption; onStatusChange?: (s: RoomJoinStatusValue) => void; }): Promise { const id = crdtAdaptor.crdtType + roomId; @@ -940,8 +964,8 @@ export class LoroWebsocketClient { return Promise.resolve(active.room); } - let resolve: (res: JoinResponseOk) => void; - let reject: (error: Error) => void; + let resolve!: (res: JoinResponseOk) => void; + let reject!: (error: Error) => void; const response = new Promise((resolve_, reject_) => { resolve = resolve_; @@ -1005,31 +1029,45 @@ export class LoroWebsocketClient { return room; }); - this.pendingRooms.set(id, { - room, - resolve: resolve!, - reject: reject!, - adaptor: crdtAdaptor, - roomId, - auth, - }); this.roomAuth.set(id, auth); - const joinPayload = encode({ - type: MessageType.JoinRequest, - crdt: crdtAdaptor.crdtType, - roomId, - auth: auth ?? new Uint8Array(), - version: crdtAdaptor.getVersion(), - } as JoinRequest); + // Resolve auth before registering pending room to avoid race condition + // where JoinError retry might use undefined auth + void this.resolveAuth(auth) + .then(authValue => { + // Register pending room only after auth is resolved + this.pendingRooms.set(id, { + room, + resolve: resolve!, + reject: reject!, + adaptor: crdtAdaptor, + roomId, + auth: authValue, + }); - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - this.ws.send(joinPayload); - } else { - this.enqueueJoin(joinPayload); - // ensure a connection attempt is running - void this.connect(); - } + const joinPayload = encode({ + type: MessageType.JoinRequest, + crdt: crdtAdaptor.crdtType, + roomId, + auth: authValue, + version: crdtAdaptor.getVersion(), + } as JoinRequest); + + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(joinPayload); + } else { + this.enqueueJoin(joinPayload); + // ensure a connection attempt is running + void this.connect(); + } + }) + .catch(err => { + const error = err instanceof Error ? err : new Error(String(err)); + this.emitRoomStatus(id, RoomJoinStatus.Error); + reject(error); + this.cleanupRoom(roomId, crdtAdaptor.crdtType); + this.pendingRooms.delete(id); + }); return room; } diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 67f6fcb..d9776fd 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -571,6 +571,53 @@ describe("E2E: Client-Server Sync", () => { await authServer.stop(); }, 15000); + it("fetches fresh auth on rejoin when auth provider is used", async () => { + const port = await getPort(); + const tokens: string[] = []; + + const server = new SimpleServer({ + port, + authenticate: async (_roomId, _crdt, auth) => { + tokens.push(new TextDecoder().decode(auth)); + return "write"; + }, + }); + await server.start(); + + const client = new LoroWebsocketClient({ + url: `ws://localhost:${port}`, + reconnect: { initialDelayMs: 20, maxDelayMs: 100, jitter: 0 }, + }); + + let room: LoroWebsocketClientRoom | undefined; + try { + await client.waitConnected(); + let call = 0; + const adaptor = new LoroAdaptor(); + + room = await client.join({ + roomId: "auth-refresh", + crdtAdaptor: adaptor, + auth: async () => new TextEncoder().encode(`token-${++call}`), + }); + + await waitUntil(() => tokens.length >= 1, 5000, 25); + + await server.stop(); + await new Promise(resolve => setTimeout(resolve, 60)); + await server.start(); + + await waitUntil(() => tokens.some(t => t === "token-2"), 10000, 50); + + expect(tokens[0]).toBe("token-1"); + expect(tokens.some(t => t === "token-2")).toBe(true); + } finally { + await room?.destroy(); + client.destroy(); + await server.stop(); + } + }, 15000); + it("destroy rejects pending ping waiters", async () => { const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` }); await client.waitConnected(); From 44d4e986d85be6547af91290f252bbe0c70d2f4f Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 8 Dec 2025 02:37:11 +0800 Subject: [PATCH 2/5] fix: dedupe concurrent joins before auth resolution --- packages/loro-websocket/src/client/index.ts | 23 +++++++------- packages/loro-websocket/tests/e2e.test.ts | 35 +++++++++++++++++++++ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index a593775..289b3cf 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -1029,21 +1029,22 @@ export class LoroWebsocketClient { return room; }); + // Register pending room immediately so concurrent join calls dedupe + this.pendingRooms.set(id, { + room, + resolve: resolve!, + reject: reject!, + adaptor: crdtAdaptor, + roomId, + auth: undefined, + }); this.roomAuth.set(id, auth); - // Resolve auth before registering pending room to avoid race condition - // where JoinError retry might use undefined auth void this.resolveAuth(auth) .then(authValue => { - // Register pending room only after auth is resolved - this.pendingRooms.set(id, { - room, - resolve: resolve!, - reject: reject!, - adaptor: crdtAdaptor, - roomId, - auth: authValue, - }); + const currentPending = this.pendingRooms.get(id); + if (!currentPending) return; + currentPending.auth = authValue; const joinPayload = encode({ type: MessageType.JoinRequest, diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index d9776fd..5e17c1d 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -618,6 +618,41 @@ describe("E2E: Client-Server Sync", () => { } }, 15000); + it("dedupes concurrent join calls even before auth resolves", async () => { + const port = await getPort(); + const tokens: string[] = []; + + const server = new SimpleServer({ + port, + authenticate: async (_roomId, _crdt, auth) => { + tokens.push(new TextDecoder().decode(auth)); + return "write"; + }, + }); + await server.start(); + + const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` }); + await client.waitConnected(); + + const adaptor = new LoroAdaptor(); + const auth = () => new TextEncoder().encode("token-once"); + + const joinPromise1 = client.join({ roomId: "dedupe", crdtAdaptor: adaptor, auth }); + const joinPromise2 = client.join({ roomId: "dedupe", crdtAdaptor: adaptor, auth }); + + expect(joinPromise1).toBe(joinPromise2); + + const [room1, room2] = await Promise.all([joinPromise1, joinPromise2]); + expect(room1).toBe(room2); + + await waitUntil(() => tokens.length >= 1, 5000, 25); + expect(tokens).toHaveLength(1); + + await room1.destroy(); + client.destroy(); + await server.stop(); + }, 15000); + it("destroy rejects pending ping waiters", async () => { const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` }); await client.waitConnected(); From b8e64b564cbc658b0a94a3d6dc3ccc66e81fbe1c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 8 Dec 2025 02:44:37 +0800 Subject: [PATCH 3/5] Update packages/loro-websocket/src/client/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/loro-websocket/src/client/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 289b3cf..ab84f92 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -1067,7 +1067,6 @@ export class LoroWebsocketClient { this.emitRoomStatus(id, RoomJoinStatus.Error); reject(error); this.cleanupRoom(roomId, crdtAdaptor.crdtType); - this.pendingRooms.delete(id); }); return room; From 994026f1b5c749ed0373b3c6df96f87d5ada5e14 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 8 Dec 2025 02:55:41 +0800 Subject: [PATCH 4/5] fix: re-resolve auth option on join retries --- packages/loro-websocket/src/client/index.ts | 33 ++++++++++++++------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index ab84f92..8560b28 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -39,7 +39,7 @@ interface PendingRoom { reject: (error: Error) => void; adaptor: CrdtDocAdaptor; roomId: string; - auth?: Uint8Array; + auth?: AuthOption; isRejoin?: boolean; } @@ -618,7 +618,7 @@ export class LoroWebsocketClient { }, adaptor, roomId, - auth: authValue, + auth, isRejoin: true, }; this.pendingRooms.set(id, pending); @@ -839,6 +839,19 @@ export class LoroWebsocketClient { roomId: string ) { if (msg.code === JoinErrorCode.VersionUnknown) { + let authValue: Uint8Array; + try { + authValue = await this.resolveAuth(pending.auth); + } catch (e) { + pending.reject(e as Error); + this.pendingRooms.delete(roomId); + this.emitRoomStatus( + pending.adaptor.crdtType + pending.roomId, + RoomJoinStatus.Error + ); + return; + } + // Try alternative version format const currentVersion = pending.adaptor.getVersion(); const alternativeVersion = @@ -850,7 +863,7 @@ export class LoroWebsocketClient { type: MessageType.JoinRequest, crdt: pending.adaptor.crdtType, roomId: pending.roomId, - auth: pending.auth ?? new Uint8Array(), + auth: authValue, version: alternativeVersion, } as JoinRequest) ); @@ -862,7 +875,7 @@ export class LoroWebsocketClient { type: MessageType.JoinRequest, crdt: pending.adaptor.crdtType, roomId: pending.roomId, - auth: pending.auth ?? new Uint8Array(), + auth: authValue, version: new Uint8Array(), } as JoinRequest) ); @@ -939,7 +952,11 @@ export class LoroWebsocketClient { } /** - * Join a room; `auth` carries application-defined join metadata forwarded to the server. + * Join a room. + * - `auth` may be a `Uint8Array` or a provider function. + * - The provider is invoked on the initial join and again on protocol-driven retries + * (e.g. `VersionUnknown`) and reconnect rejoins, so it can refresh short-lived tokens. + * If callers need a stable token, memoize in the provider. */ join({ roomId, @@ -1036,16 +1053,12 @@ export class LoroWebsocketClient { reject: reject!, adaptor: crdtAdaptor, roomId, - auth: undefined, + auth, }); this.roomAuth.set(id, auth); void this.resolveAuth(auth) .then(authValue => { - const currentPending = this.pendingRooms.get(id); - if (!currentPending) return; - currentPending.auth = authValue; - const joinPayload = encode({ type: MessageType.JoinRequest, crdt: crdtAdaptor.crdtType, From 82c5480c1c199fd6771d9bc753c89b696f9d8711 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 7 Dec 2025 18:47:44 +0000 Subject: [PATCH 5/5] fix: add explicit pendingRooms.delete before cleanupRoom in reject handler Co-authored-by: zxch3n <18425020+zxch3n@users.noreply.github.com> --- packages/loro-websocket/src/client/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 8560b28..f528c72 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -613,6 +613,7 @@ export class LoroWebsocketClient { }, reject: (error: Error) => { console.error("Rejoin failed:", error); + this.pendingRooms.delete(id); this.cleanupRoom(roomId, adaptor.crdtType); this.emitRoomStatus(id, RoomJoinStatus.Error); },