Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/client/lib/RESP/decoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,37 @@ describe('RESP Decoder', () => {
replies: [Buffer.from('OK')]
});

test("'OK' as Uint8Array", {
typeMapping: {
[RESP_TYPES.BLOB_STRING]: Uint8Array
},
toWrite: Buffer.from('$2\r\nOK\r\n'),
replies: [new Uint8Array(Buffer.from('OK'))]
});

test("'é'", {
toWrite: Buffer.from('$2\r\né\r\n'),
replies: ['é']
});
});

it('returns Uint8Array for chunked BLOB_STRING', () => {
const setup = setupTest({
typeMapping: {
[RESP_TYPES.BLOB_STRING]: Uint8Array
},
toWrite: Buffer.from('$10\r\n1234567890\r\n')
});
setup.decoder.write(Buffer.from('$10\r\n'));
setup.decoder.write(Buffer.from('1234567890'));
setup.decoder.write(Buffer.from('\r\n'));
assert.equal(setup.onReplySpy.callCount, 1);
const [reply] = setup.onReplySpy.args[0];
assert.equal(reply.constructor, Uint8Array);
assert.equal(Buffer.isBuffer(reply), false);
assert.deepEqual(reply, new Uint8Array(Buffer.from('1234567890')));
});

describe('VerbatimString', () => {
test("''", {
toWrite: Buffer.from('=4\r\ntxt:\r\n'),
Expand Down
24 changes: 16 additions & 8 deletions packages/client/lib/RESP/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,7 @@ export class Decoder {
}

const slice = chunk.subarray(start, crlfIndex);
return type === Buffer ?
slice :
slice.toString();
return this.#decodeStringType(type, slice);
}

#continueDecodeSimpleString(chunks, type, chunk) {
Expand All @@ -507,7 +505,7 @@ export class Decoder {

chunks.push(chunk.subarray(start, crlfIndex));
const buffer = Buffer.concat(chunks);
return type === Buffer ? buffer : buffer.toString();
return this.#decodeStringType(type, buffer);
}

#decodeBlobString(type, chunk) {
Expand Down Expand Up @@ -555,9 +553,7 @@ export class Decoder {

const slice = chunk.subarray(this.#cursor, end);
this.#cursor = end + skip;
return type === Buffer ?
slice :
slice.toString();
return this.#decodeStringType(type, slice);
}

#continueDecodeStringWithLength(length, chunks, skip, type, chunk) {
Expand All @@ -578,7 +574,19 @@ export class Decoder {
chunks.push(chunk.subarray(this.#cursor, end));
this.#cursor = end + skip;
const buffer = Buffer.concat(chunks);
return type === Buffer ? buffer : buffer.toString();
return this.#decodeStringType(type, buffer);
}

#decodeStringType(type, chunk) {
if (type === Buffer) {
return chunk;
}

if (type === Uint8Array) {
return new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uint8Array view leaks shared Buffer pool data

Medium Severity

#decodeStringType creates a Uint8Array view into the Buffer's underlying ArrayBuffer via new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength). In Node.js, small Buffers (including those from Buffer.concat, subarray, etc.) share a pooled ArrayBuffer (~8 KB). The returned Uint8Array's .buffer property exposes the entire pool — which may contain data from other Redis responses — and prevents garbage collection of the pool. Users passing result.buffer to web APIs (e.g. postMessage, WebSocket.send) or using ArrayBuffer.transfer() would encounter data leakage or corruption. Using new Uint8Array(chunk) instead would produce a copy with an independent ArrayBuffer.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 105bc16. Configure here.

}

return chunk.toString();
}

#decodeBlobStringWithLength(length, type, chunk) {
Expand Down
42 changes: 29 additions & 13 deletions packages/client/lib/client/enterprise-maintenance-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import assert from "node:assert";
import { setTimeout } from "node:timers/promises";
import { RedisTcpSocketOptions } from "./socket";
import diagnostics_channel from "node:diagnostics_channel";
import { RedisArgument } from "../RESP/types";
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from "../RESP/types";
import { publish, CHANNELS } from "./tracing";

type RedisType = RedisClient<any, any, any, any, any>;

export const SMIGRATED_EVENT = "__SMIGRATED";

interface Address {
Expand Down Expand Up @@ -72,13 +70,25 @@ export interface MaintenanceUpdate {
relaxedSocketTimeout?: number;
}

export default class EnterpriseMaintenanceManager {
export default class EnterpriseMaintenanceManager<
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping
> {
#commandsQueue: RedisCommandsQueue;
#options: RedisClientOptions;
#options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
#isMaintenance = 0;
#client: RedisType;

static setupDefaultMaintOptions(options: RedisClientOptions) {
#client: RedisClient<M, F, S, RESP, TYPE_MAPPING>;

static setupDefaultMaintOptions<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping
>(options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>) {
if (options.maintNotifications === undefined) {
options.maintNotifications =
options?.RESP === 3 ? "auto" : "disabled";
Expand All @@ -94,8 +104,14 @@ export default class EnterpriseMaintenanceManager {
}
}

static async getHandshakeCommand(
options: RedisClientOptions,
static async getHandshakeCommand<
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping
>(
options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>,
clientId: string,
): Promise<
| { cmd: Array<RedisArgument>; errorHandler: (error: Error) => void }
Expand Down Expand Up @@ -140,8 +156,8 @@ export default class EnterpriseMaintenanceManager {

constructor(
commandsQueue: RedisCommandsQueue,
client: RedisType,
options: RedisClientOptions,
client: RedisClient<M, F, S, RESP, TYPE_MAPPING>,
options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>,
) {
this.#commandsQueue = commandsQueue;
this.#options = options;
Expand Down Expand Up @@ -465,7 +481,7 @@ function isPrivateIP(ip: string): boolean {
async function determineEndpoint(
tlsEnabled: boolean,
host: string,
options: RedisClientOptions,
options: RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>,
): Promise<MovingEndpointType> {
assert(options.maintEndpointType !== undefined);
if (options.maintEndpointType !== "auto") {
Expand Down
18 changes: 12 additions & 6 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface RedisClientOptions<
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions,
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = TypeMapping,
SocketOptions extends RedisSocketOptions = RedisSocketOptions
> extends CommanderConfig<M, F, S, RESP> {
Expand Down Expand Up @@ -365,7 +365,13 @@ export default class RedisClient<
return RedisClient.factory(options)(options);
}

static parseOptions<O extends RedisClientOptions>(options: O): O {
static parseOptions<
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping
>(options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> {
if (options?.url) {
const parsed = RedisClient.parseURL(options.url);
if (options.socket) {
Expand All @@ -380,15 +386,15 @@ export default class RedisClient<
return options;
}

static parseURL(url: string): RedisClientOptions & {
socket: Exclude<RedisClientOptions['socket'], undefined> & {
static parseURL(url: string): RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisSocketOptions> & {
socket: Exclude<RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisSocketOptions>['socket'], undefined> & {
tls: boolean
}
} {
// https://www.iana.org/assignments/uri-schemes/prov/redis
const { hostname, port, protocol, username, password, pathname } = new URL(url),
parsed: RedisClientOptions & {
socket: Exclude<RedisClientOptions['socket'], undefined> & {
parsed: RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisSocketOptions> & {
socket: Exclude<RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisSocketOptions>['socket'], undefined> & {
tls: boolean
}
} = {
Expand Down
38 changes: 38 additions & 0 deletions packages/client/lib/commands/XINFO_GROUPS.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,44 @@ describe('XINFO GROUPS', () => {
);
});

it('transformReply', () => {
assert.deepEqual(
XINFO_GROUPS.transformReply[2]([[
'name', 'group',
'consumers', 0,
'pending', 0,
'last-delivered-id', '0-0'
]]),
[Object.assign(Object.create(null), {
name: 'group',
consumers: 0,
pending: 0,
'last-delivered-id': '0-0'
})]
);
});

it('transformReply - Redis 7 fields', () => {
assert.deepEqual(
XINFO_GROUPS.transformReply[2]([[
'name', 'group',
'consumers', 0,
'pending', 0,
'last-delivered-id', '0-0',
'entries-read', null,
'lag', null
]]),
[Object.assign(Object.create(null), {
name: 'group',
consumers: 0,
pending: 0,
'last-delivered-id': '0-0',
'entries-read': null,
lag: null
})]
);
});

testUtils.testAll('xInfoGroups', async client => {
const [, reply] = await Promise.all([
client.xGroupCreate('key', 'group', '$', {
Expand Down
22 changes: 6 additions & 16 deletions packages/client/lib/commands/XINFO_GROUPS.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CommandParser } from '../client/parser';
import { RedisArgument, ArrayReply, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, UnwrapReply, Resp2Reply, Command } from '../RESP/types';
import { RedisArgument, ArrayReply, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, Command } from '../RESP/types';
import { transformTuplesReply } from './generic-transformers';

/**
* Reply structure for XINFO GROUPS command containing information about consumer groups
Expand All @@ -8,11 +9,11 @@ export type XInfoGroupsReply = ArrayReply<TuplesToMapReply<[
[BlobStringReply<'name'>, BlobStringReply],
[BlobStringReply<'consumers'>, NumberReply],
[BlobStringReply<'pending'>, NumberReply],
[BlobStringReply<'last-delivered-id'>, NumberReply],
[BlobStringReply<'last-delivered-id'>, BlobStringReply],
/** added in 7.0 */
[BlobStringReply<'entries-read'>, NumberReply | NullReply],
/** added in 7.0 */
[BlobStringReply<'lag'>, NumberReply],
[BlobStringReply<'lag'>, NumberReply | NullReply],
]>>;

export default {
Expand All @@ -34,19 +35,8 @@ export default {
* entries-read - Number of entries read in the group (Redis 7.0+)
* lag - Number of entries not read by the group (Redis 7.0+)
*/
2: (reply: UnwrapReply<Resp2Reply<XInfoGroupsReply>>) => {
return reply.map(group => {
const unwrapped = group as unknown as UnwrapReply<typeof group>;
return {
name: unwrapped[1],
consumers: unwrapped[3],
pending: unwrapped[5],
'last-delivered-id': unwrapped[7],
'entries-read': unwrapped[9],
lag: unwrapped[11]
};
});
},
2: (reply: Array<Array<BlobStringReply | NumberReply | NullReply>>) =>
reply.map(group => transformTuplesReply(group as unknown as ArrayReply<BlobStringReply>)) as unknown as XInfoGroupsReply['DEFAULT'],
3: undefined as unknown as () => XInfoGroupsReply
}
} as const satisfies Command;
Loading