Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.217.2",
"version": "0.218.0",
"type": "module",
"exports": {
".": "./dist/router/index.js",
Expand Down
2 changes: 2 additions & 0 deletions protobuf/handshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ValidateHandshake<Schema extends DescMessage, ParsedMetadata> = (
export function createClientHandshakeOptions<Schema extends DescMessage>(
schema: Schema,
construct: ConstructHandshake<Schema>,
eager?: boolean,
): ClientHandshakeOptions<typeof HandshakeBytesSchema> {
return createTransportClientHandshakeOptions(
HandshakeBytesSchema,
Expand All @@ -50,6 +51,7 @@ export function createClientHandshakeOptions<Schema extends DescMessage>(

return encodeMessageBytes(schema, metadata);
},
eager,
);
}

Expand Down
13 changes: 12 additions & 1 deletion router/handshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ export interface ClientHandshakeOptions<
* Gets the {@link HandshakeRequestMetadata} to send to the server.
*/
construct: ConstructHandshake<MetadataSchema>;

/**
* When true, the client constructs handshake metadata as soon as it begins
* dialing, so a slow {@link construct} (e.g. fetching a fresh token) overlaps
* establishing the connection rather than running after it. The trade-off is
* that `construct` then runs on every connection attempt, including ones that
* never connect, so leave it unset when constructing is expensive or
* rate-limited.
*/
eager?: boolean;
}

export interface ServerHandshakeOptions<
Expand Down Expand Up @@ -77,8 +87,9 @@ export function createClientHandshakeOptions<
>(
schema: MetadataSchema,
construct: ConstructHandshake<MetadataSchema>,
eager?: boolean,
): ClientHandshakeOptions<MetadataSchema> {
return { schema, construct };
return { schema, construct, eager };
}

export function createServerHandshakeOptions<
Expand Down
78 changes: 71 additions & 7 deletions transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
defaultClientTransportOptions,
} from './options';
import { LeakyBucketRateLimit } from './rateLimit';
import { Transport } from './transport';
import { DeleteSessionOptions, Transport } from './transport';
import { coerceErrorString } from './stringifyError';
import { ProtocolError } from './events';
import { Value } from 'typebox/value';
Expand All @@ -30,11 +30,24 @@ import { SessionConnected } from './sessionStateMachine/SessionConnected';
import {
ClientSession,
ClientSessionStateGraph,
Session,
} from './sessionStateMachine/transitions';
import { SessionState } from './sessionStateMachine/common';
import { SessionNoConnection } from './sessionStateMachine/SessionNoConnection';
import { SessionBackingOff } from './sessionStateMachine/SessionBackingOff';

/**
* Outcome of constructing client handshake metadata, kept as a value (rather than a
* rejecting promise) so it can be prefetched when a connection attempt starts and
* awaited later even if that attempt is abandoned before the handshake is sent.
*/
type ConstructedHandshakeMetadata =
| {
ok: true;
metadata: Awaited<ReturnType<ClientHandshakeOptions['construct']>>;
}
| { ok: false; reason: string };

export abstract class ClientTransport<
ConnType extends Connection,
> extends Transport<ConnType> {
Expand All @@ -58,6 +71,18 @@ export abstract class ClientTransport<
*/
handshakeExtensions?: ClientHandshakeOptions;

/**
* Handshake-metadata constructions prefetched when a connection attempt begins
* (only when {@link ClientHandshakeOptions.eager} is set), so the
* token fetch overlaps the dial instead of following it. Keyed by the peer being
* connected to; consumed when the handshake is sent and cleared when an attempt
* is abandoned.
*/
private pendingHandshakeMetadata = new Map<
TransportClientId,
Promise<ConstructedHandshakeMetadata>
>();

sessions: Map<TransportClientId, ClientSession<ConnType>>;

constructor(
Expand Down Expand Up @@ -203,6 +228,15 @@ export abstract class ClientTransport<
return session;
}

protected deleteSession(
session: Session<ConnType>,
options?: DeleteSessionOptions,
): void {
// drop any handshake metadata prefetched for an attempt that's being torn down
this.pendingHandshakeMetadata.delete(session.to);
super.deleteSession(session, options);
}

// listeners
protected onConnectingFailed(session: SessionConnecting<ConnType>) {
const noConnectionSession = super.onConnectingFailed(session);
Expand Down Expand Up @@ -545,6 +579,15 @@ export abstract class ClientTransport<
},
);

// when opted in, kick off the handshake-metadata construction now so a slow
// token fetch overlaps establishing the connection instead of running after it
if (this.handshakeExtensions?.eager) {
this.pendingHandshakeMetadata.set(
session.to,
this.constructHandshakeMetadata(),
);
}

// transition to connecting
const connectingSession =
ClientSessionStateGraph.transition.BackingOffToConnecting(
Expand Down Expand Up @@ -600,27 +643,48 @@ export abstract class ClientTransport<
this.updateSession(connectingSession);
}

/**
* Constructs handshake metadata via the configured handshake extension, capturing
* a failure as a value so a prefetched result can be awaited without rejecting.
*/
private async constructHandshakeMetadata(): Promise<ConstructedHandshakeMetadata> {
if (!this.handshakeExtensions) {
return { ok: true, metadata: undefined };
}

try {
return { ok: true, metadata: await this.handshakeExtensions.construct() };
} catch (err) {
return { ok: false, reason: coerceErrorString(err) };
}
}

private async sendHandshake(session: SessionHandshaking<ConnType>) {
let metadata: unknown = undefined;

if (this.handshakeExtensions) {
try {
metadata = await this.handshakeExtensions.construct();
} catch (err) {
const errStr = coerceErrorString(err);
// consume the metadata prefetched when the connection attempt began, falling
// back to constructing now if none is in flight (e.g. a direct transition)
const pending = this.pendingHandshakeMetadata.get(session.to);
this.pendingHandshakeMetadata.delete(session.to);
const result = await (pending ?? this.constructHandshakeMetadata());

if (!result.ok) {
this.log?.error(
`failed to construct handshake metadata for session to ${session.to}: ${errStr}`,
`failed to construct handshake metadata for session to ${session.to}: ${result.reason}`,
session.loggingMetadata,
);

this.protocolError({
type: ProtocolError.HandshakeFailed,
message: `failed to construct handshake metadata: ${errStr}`,
message: `failed to construct handshake metadata: ${result.reason}`,
});
this.deleteSession(session, { unhealthy: true });

return;
}

metadata = result.metadata;
}

// double-check to make sure we haven't transitioned the session yet
Expand Down
85 changes: 85 additions & 0 deletions transport/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,91 @@ describe.each(testMatrix())(
});
});

test('eager fetches handshake metadata concurrently with the connection', async () => {
const schema = Type.Object({ foo: Type.String() });

const serverTransport = getServerTransport('SERVER', {
schema,
validate: async (metadata: { foo: string }) => ({ foo: metadata.foo }),
});
const clientTransport = getClientTransport('client');

// record the client session state at the moment construct() is invoked; an
// eager fetch runs while still connecting, before the handshaking state
let stateAtConstruct: SessionState | undefined;
const get = vi.fn(async () => {
stateAtConstruct = clientTransport.sessions.get(
serverTransport.clientId,
)?.state;

return { foo: 'bar' };
});
clientTransport.extendHandshake({
schema,
construct: get,
eager: true,
});

clientTransport.connect(serverTransport.clientId);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => {
expect(serverTransport.sessions.size).toBe(1);
expect(get).toHaveBeenCalledTimes(1);
});

// the token fetch began before the connection reached the handshaking state,
// so it overlapped the dial rather than running after it
expect([SessionState.BackingOff, SessionState.Connecting]).toContain(
stateAtConstruct,
);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});

test('constructs handshake metadata after connecting by default', async () => {
const schema = Type.Object({ foo: Type.String() });

const serverTransport = getServerTransport('SERVER', {
schema,
validate: async (metadata: { foo: string }) => ({ foo: metadata.foo }),
});
const clientTransport = getClientTransport('client');

let stateAtConstruct: SessionState | undefined;
const get = vi.fn(async () => {
stateAtConstruct = clientTransport.sessions.get(
serverTransport.clientId,
)?.state;

return { foo: 'bar' };
});
clientTransport.extendHandshake({ schema, construct: get });

clientTransport.connect(serverTransport.clientId);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => {
expect(serverTransport.sessions.size).toBe(1);
expect(get).toHaveBeenCalledTimes(1);
});

// without opting in, construct only runs once the connection is established
expect(stateAtConstruct).toBe(SessionState.Handshaking);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});

test('client checks request schema on construction', async () => {
const schema = Type.Object({
foo: Type.String(),
Expand Down
Loading