From fc0d6f20cb981fe2903e631cbdf21f220c58287d Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Sun, 31 May 2026 17:46:08 -0700 Subject: [PATCH] Add eager token construction option for faster connections --- package.json | 2 +- protobuf/handshake.ts | 2 + router/handshake.ts | 13 +++++- transport/client.ts | 78 +++++++++++++++++++++++++++++++--- transport/transport.test.ts | 85 +++++++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 9 deletions(-) diff --git a/package.json b/package.json index 78e9e0c6..f91a8965 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.217.2", + "version": "0.218.0", "type": "module", "exports": { ".": "./dist/router/index.js", diff --git a/protobuf/handshake.ts b/protobuf/handshake.ts index 06693985..f441ee4c 100644 --- a/protobuf/handshake.ts +++ b/protobuf/handshake.ts @@ -42,6 +42,7 @@ type ValidateHandshake = ( export function createClientHandshakeOptions( schema: Schema, construct: ConstructHandshake, + eager?: boolean, ): ClientHandshakeOptions { return createTransportClientHandshakeOptions( HandshakeBytesSchema, @@ -50,6 +51,7 @@ export function createClientHandshakeOptions( return encodeMessageBytes(schema, metadata); }, + eager, ); } diff --git a/router/handshake.ts b/router/handshake.ts index 801690c0..427b86b6 100644 --- a/router/handshake.ts +++ b/router/handshake.ts @@ -33,6 +33,16 @@ export interface ClientHandshakeOptions< * Gets the {@link HandshakeRequestMetadata} to send to the server. */ construct: ConstructHandshake; + + /** + * When true, the client constructs handshake metadata as soon as it begins + * dialing, so a slow {@link construct} (e.g. fetching a fresh token) overlaps + * establishing the connection rather than running after it. The trade-off is + * that `construct` then runs on every connection attempt, including ones that + * never connect, so leave it unset when constructing is expensive or + * rate-limited. + */ + eager?: boolean; } export interface ServerHandshakeOptions< @@ -77,8 +87,9 @@ export function createClientHandshakeOptions< >( schema: MetadataSchema, construct: ConstructHandshake, + eager?: boolean, ): ClientHandshakeOptions { - return { schema, construct }; + return { schema, construct, eager }; } export function createServerHandshakeOptions< diff --git a/transport/client.ts b/transport/client.ts index 4ab0d531..8f61bfda 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -17,7 +17,7 @@ import { defaultClientTransportOptions, } from './options'; import { LeakyBucketRateLimit } from './rateLimit'; -import { Transport } from './transport'; +import { DeleteSessionOptions, Transport } from './transport'; import { coerceErrorString } from './stringifyError'; import { ProtocolError } from './events'; import { Value } from 'typebox/value'; @@ -30,11 +30,24 @@ import { SessionConnected } from './sessionStateMachine/SessionConnected'; import { ClientSession, ClientSessionStateGraph, + Session, } from './sessionStateMachine/transitions'; import { SessionState } from './sessionStateMachine/common'; import { SessionNoConnection } from './sessionStateMachine/SessionNoConnection'; import { SessionBackingOff } from './sessionStateMachine/SessionBackingOff'; +/** + * Outcome of constructing client handshake metadata, kept as a value (rather than a + * rejecting promise) so it can be prefetched when a connection attempt starts and + * awaited later even if that attempt is abandoned before the handshake is sent. + */ +type ConstructedHandshakeMetadata = + | { + ok: true; + metadata: Awaited>; + } + | { ok: false; reason: string }; + export abstract class ClientTransport< ConnType extends Connection, > extends Transport { @@ -58,6 +71,18 @@ export abstract class ClientTransport< */ handshakeExtensions?: ClientHandshakeOptions; + /** + * Handshake-metadata constructions prefetched when a connection attempt begins + * (only when {@link ClientHandshakeOptions.eager} is set), so the + * token fetch overlaps the dial instead of following it. Keyed by the peer being + * connected to; consumed when the handshake is sent and cleared when an attempt + * is abandoned. + */ + private pendingHandshakeMetadata = new Map< + TransportClientId, + Promise + >(); + sessions: Map>; constructor( @@ -203,6 +228,15 @@ export abstract class ClientTransport< return session; } + protected deleteSession( + session: Session, + options?: DeleteSessionOptions, + ): void { + // drop any handshake metadata prefetched for an attempt that's being torn down + this.pendingHandshakeMetadata.delete(session.to); + super.deleteSession(session, options); + } + // listeners protected onConnectingFailed(session: SessionConnecting) { const noConnectionSession = super.onConnectingFailed(session); @@ -545,6 +579,15 @@ export abstract class ClientTransport< }, ); + // when opted in, kick off the handshake-metadata construction now so a slow + // token fetch overlaps establishing the connection instead of running after it + if (this.handshakeExtensions?.eager) { + this.pendingHandshakeMetadata.set( + session.to, + this.constructHandshakeMetadata(), + ); + } + // transition to connecting const connectingSession = ClientSessionStateGraph.transition.BackingOffToConnecting( @@ -600,27 +643,48 @@ export abstract class ClientTransport< this.updateSession(connectingSession); } + /** + * Constructs handshake metadata via the configured handshake extension, capturing + * a failure as a value so a prefetched result can be awaited without rejecting. + */ + private async constructHandshakeMetadata(): Promise { + if (!this.handshakeExtensions) { + return { ok: true, metadata: undefined }; + } + + try { + return { ok: true, metadata: await this.handshakeExtensions.construct() }; + } catch (err) { + return { ok: false, reason: coerceErrorString(err) }; + } + } + private async sendHandshake(session: SessionHandshaking) { let metadata: unknown = undefined; if (this.handshakeExtensions) { - try { - metadata = await this.handshakeExtensions.construct(); - } catch (err) { - const errStr = coerceErrorString(err); + // consume the metadata prefetched when the connection attempt began, falling + // back to constructing now if none is in flight (e.g. a direct transition) + const pending = this.pendingHandshakeMetadata.get(session.to); + this.pendingHandshakeMetadata.delete(session.to); + const result = await (pending ?? this.constructHandshakeMetadata()); + + if (!result.ok) { this.log?.error( - `failed to construct handshake metadata for session to ${session.to}: ${errStr}`, + `failed to construct handshake metadata for session to ${session.to}: ${result.reason}`, session.loggingMetadata, ); this.protocolError({ type: ProtocolError.HandshakeFailed, - message: `failed to construct handshake metadata: ${errStr}`, + message: `failed to construct handshake metadata: ${result.reason}`, }); this.deleteSession(session, { unhealthy: true }); return; } + + metadata = result.metadata; } // double-check to make sure we haven't transitioned the session yet diff --git a/transport/transport.test.ts b/transport/transport.test.ts index 2af14770..ba1bc0d8 100644 --- a/transport/transport.test.ts +++ b/transport/transport.test.ts @@ -1535,6 +1535,91 @@ describe.each(testMatrix())( }); }); + test('eager fetches handshake metadata concurrently with the connection', async () => { + const schema = Type.Object({ foo: Type.String() }); + + const serverTransport = getServerTransport('SERVER', { + schema, + validate: async (metadata: { foo: string }) => ({ foo: metadata.foo }), + }); + const clientTransport = getClientTransport('client'); + + // record the client session state at the moment construct() is invoked; an + // eager fetch runs while still connecting, before the handshaking state + let stateAtConstruct: SessionState | undefined; + const get = vi.fn(async () => { + stateAtConstruct = clientTransport.sessions.get( + serverTransport.clientId, + )?.state; + + return { foo: 'bar' }; + }); + clientTransport.extendHandshake({ + schema, + construct: get, + eager: true, + }); + + clientTransport.connect(serverTransport.clientId); + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + await waitFor(() => { + expect(serverTransport.sessions.size).toBe(1); + expect(get).toHaveBeenCalledTimes(1); + }); + + // the token fetch began before the connection reached the handshaking state, + // so it overlapped the dial rather than running after it + expect([SessionState.BackingOff, SessionState.Connecting]).toContain( + stateAtConstruct, + ); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + + test('constructs handshake metadata after connecting by default', async () => { + const schema = Type.Object({ foo: Type.String() }); + + const serverTransport = getServerTransport('SERVER', { + schema, + validate: async (metadata: { foo: string }) => ({ foo: metadata.foo }), + }); + const clientTransport = getClientTransport('client'); + + let stateAtConstruct: SessionState | undefined; + const get = vi.fn(async () => { + stateAtConstruct = clientTransport.sessions.get( + serverTransport.clientId, + )?.state; + + return { foo: 'bar' }; + }); + clientTransport.extendHandshake({ schema, construct: get }); + + clientTransport.connect(serverTransport.clientId); + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + await waitFor(() => { + expect(serverTransport.sessions.size).toBe(1); + expect(get).toHaveBeenCalledTimes(1); + }); + + // without opting in, construct only runs once the connection is established + expect(stateAtConstruct).toBe(SessionState.Handshaking); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + test('client checks request schema on construction', async () => { const schema = Type.Object({ foo: Type.String(),