diff --git a/packages/client/lib/RESP/decoder.spec.ts b/packages/client/lib/RESP/decoder.spec.ts index 43b08e35662..0143a5c7b7a 100644 --- a/packages/client/lib/RESP/decoder.spec.ts +++ b/packages/client/lib/RESP/decoder.spec.ts @@ -257,12 +257,37 @@ describe('RESP Decoder', () => { replies: [Buffer.from('OK')] }); + test("'OK' as Uint8Array", { + typeMapping: { + [RESP_TYPES.BLOB_STRING]: Uint8Array + }, + toWrite: Buffer.from('$2\r\nOK\r\n'), + replies: [new Uint8Array(Buffer.from('OK'))] + }); + test("'é'", { toWrite: Buffer.from('$2\r\né\r\n'), replies: ['é'] }); }); + it('returns Uint8Array for chunked BLOB_STRING', () => { + const setup = setupTest({ + typeMapping: { + [RESP_TYPES.BLOB_STRING]: Uint8Array + }, + toWrite: Buffer.from('$10\r\n1234567890\r\n') + }); + setup.decoder.write(Buffer.from('$10\r\n')); + setup.decoder.write(Buffer.from('1234567890')); + setup.decoder.write(Buffer.from('\r\n')); + assert.equal(setup.onReplySpy.callCount, 1); + const [reply] = setup.onReplySpy.args[0]; + assert.equal(reply.constructor, Uint8Array); + assert.equal(Buffer.isBuffer(reply), false); + assert.deepEqual(reply, new Uint8Array(Buffer.from('1234567890'))); + }); + describe('VerbatimString', () => { test("''", { toWrite: Buffer.from('=4\r\ntxt:\r\n'), diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index 3bdcae66a4a..efeb9871ac8 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -492,9 +492,7 @@ export class Decoder { } const slice = chunk.subarray(start, crlfIndex); - return type === Buffer ? - slice : - slice.toString(); + return this.#decodeStringType(type, slice); } #continueDecodeSimpleString(chunks, type, chunk) { @@ -507,7 +505,7 @@ export class Decoder { chunks.push(chunk.subarray(start, crlfIndex)); const buffer = Buffer.concat(chunks); - return type === Buffer ? buffer : buffer.toString(); + return this.#decodeStringType(type, buffer); } #decodeBlobString(type, chunk) { @@ -555,9 +553,7 @@ export class Decoder { const slice = chunk.subarray(this.#cursor, end); this.#cursor = end + skip; - return type === Buffer ? - slice : - slice.toString(); + return this.#decodeStringType(type, slice); } #continueDecodeStringWithLength(length, chunks, skip, type, chunk) { @@ -578,7 +574,19 @@ export class Decoder { chunks.push(chunk.subarray(this.#cursor, end)); this.#cursor = end + skip; const buffer = Buffer.concat(chunks); - return type === Buffer ? buffer : buffer.toString(); + return this.#decodeStringType(type, buffer); + } + + #decodeStringType(type, chunk) { + if (type === Buffer) { + return chunk; + } + + if (type === Uint8Array) { + return new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + } + + return chunk.toString(); } #decodeBlobStringWithLength(length, type, chunk) { diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index df58f1be292..5013c1f38de 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -6,11 +6,9 @@ import assert from "node:assert"; import { setTimeout } from "node:timers/promises"; import { RedisTcpSocketOptions } from "./socket"; import diagnostics_channel from "node:diagnostics_channel"; -import { RedisArgument } from "../RESP/types"; +import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from "../RESP/types"; import { publish, CHANNELS } from "./tracing"; -type RedisType = RedisClient; - export const SMIGRATED_EVENT = "__SMIGRATED"; interface Address { @@ -72,13 +70,25 @@ export interface MaintenanceUpdate { relaxedSocketTimeout?: number; } -export default class EnterpriseMaintenanceManager { +export default class EnterpriseMaintenanceManager< + M extends RedisModules = RedisModules, + F extends RedisFunctions = RedisFunctions, + S extends RedisScripts = RedisScripts, + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping +> { #commandsQueue: RedisCommandsQueue; - #options: RedisClientOptions; + #options: RedisClientOptions; #isMaintenance = 0; - #client: RedisType; - - static setupDefaultMaintOptions(options: RedisClientOptions) { + #client: RedisClient; + + static setupDefaultMaintOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping + >(options: RedisClientOptions) { if (options.maintNotifications === undefined) { options.maintNotifications = options?.RESP === 3 ? "auto" : "disabled"; @@ -94,8 +104,14 @@ export default class EnterpriseMaintenanceManager { } } - static async getHandshakeCommand( - options: RedisClientOptions, + static async getHandshakeCommand< + M extends RedisModules = RedisModules, + F extends RedisFunctions = RedisFunctions, + S extends RedisScripts = RedisScripts, + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping + >( + options: RedisClientOptions, clientId: string, ): Promise< | { cmd: Array; errorHandler: (error: Error) => void } @@ -140,8 +156,8 @@ export default class EnterpriseMaintenanceManager { constructor( commandsQueue: RedisCommandsQueue, - client: RedisType, - options: RedisClientOptions, + client: RedisClient, + options: RedisClientOptions, ) { this.#commandsQueue = commandsQueue; this.#options = options; @@ -465,7 +481,7 @@ function isPrivateIP(ip: string): boolean { async function determineEndpoint( tlsEnabled: boolean, host: string, - options: RedisClientOptions, + options: RedisClientOptions, ): Promise { assert(options.maintEndpointType !== undefined); if (options.maintEndpointType !== "auto") { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index c20c75830e0..ed06d6069b1 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -31,7 +31,7 @@ export interface RedisClientOptions< M extends RedisModules = RedisModules, F extends RedisFunctions = RedisFunctions, S extends RedisScripts = RedisScripts, - RESP extends RespVersions = RespVersions, + RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = TypeMapping, SocketOptions extends RedisSocketOptions = RedisSocketOptions > extends CommanderConfig { @@ -365,7 +365,13 @@ export default class RedisClient< return RedisClient.factory(options)(options); } - static parseOptions(options: O): O { + static parseOptions< + M extends RedisModules = RedisModules, + F extends RedisFunctions = RedisFunctions, + S extends RedisScripts = RedisScripts, + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping + >(options: RedisClientOptions): RedisClientOptions { if (options?.url) { const parsed = RedisClient.parseURL(options.url); if (options.socket) { @@ -380,15 +386,15 @@ export default class RedisClient< return options; } - static parseURL(url: string): RedisClientOptions & { - socket: Exclude & { + static parseURL(url: string): RedisClientOptions & { + socket: Exclude['socket'], undefined> & { tls: boolean } } { // https://www.iana.org/assignments/uri-schemes/prov/redis const { hostname, port, protocol, username, password, pathname } = new URL(url), - parsed: RedisClientOptions & { - socket: Exclude & { + parsed: RedisClientOptions & { + socket: Exclude['socket'], undefined> & { tls: boolean } } = { diff --git a/packages/client/lib/commands/XINFO_GROUPS.spec.ts b/packages/client/lib/commands/XINFO_GROUPS.spec.ts index a1196f4957a..2a2f67ff2f2 100644 --- a/packages/client/lib/commands/XINFO_GROUPS.spec.ts +++ b/packages/client/lib/commands/XINFO_GROUPS.spec.ts @@ -11,6 +11,44 @@ describe('XINFO GROUPS', () => { ); }); + it('transformReply', () => { + assert.deepEqual( + XINFO_GROUPS.transformReply[2]([[ + 'name', 'group', + 'consumers', 0, + 'pending', 0, + 'last-delivered-id', '0-0' + ]]), + [Object.assign(Object.create(null), { + name: 'group', + consumers: 0, + pending: 0, + 'last-delivered-id': '0-0' + })] + ); + }); + + it('transformReply - Redis 7 fields', () => { + assert.deepEqual( + XINFO_GROUPS.transformReply[2]([[ + 'name', 'group', + 'consumers', 0, + 'pending', 0, + 'last-delivered-id', '0-0', + 'entries-read', null, + 'lag', null + ]]), + [Object.assign(Object.create(null), { + name: 'group', + consumers: 0, + pending: 0, + 'last-delivered-id': '0-0', + 'entries-read': null, + lag: null + })] + ); + }); + testUtils.testAll('xInfoGroups', async client => { const [, reply] = await Promise.all([ client.xGroupCreate('key', 'group', '$', { diff --git a/packages/client/lib/commands/XINFO_GROUPS.ts b/packages/client/lib/commands/XINFO_GROUPS.ts index b2469c8551f..fb473a4e390 100644 --- a/packages/client/lib/commands/XINFO_GROUPS.ts +++ b/packages/client/lib/commands/XINFO_GROUPS.ts @@ -1,5 +1,6 @@ import { CommandParser } from '../client/parser'; -import { RedisArgument, ArrayReply, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, UnwrapReply, Resp2Reply, Command } from '../RESP/types'; +import { RedisArgument, ArrayReply, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, Command } from '../RESP/types'; +import { transformTuplesReply } from './generic-transformers'; /** * Reply structure for XINFO GROUPS command containing information about consumer groups @@ -8,11 +9,11 @@ export type XInfoGroupsReply = ArrayReply, BlobStringReply], [BlobStringReply<'consumers'>, NumberReply], [BlobStringReply<'pending'>, NumberReply], - [BlobStringReply<'last-delivered-id'>, NumberReply], + [BlobStringReply<'last-delivered-id'>, BlobStringReply], /** added in 7.0 */ [BlobStringReply<'entries-read'>, NumberReply | NullReply], /** added in 7.0 */ - [BlobStringReply<'lag'>, NumberReply], + [BlobStringReply<'lag'>, NumberReply | NullReply], ]>>; export default { @@ -34,19 +35,8 @@ export default { * entries-read - Number of entries read in the group (Redis 7.0+) * lag - Number of entries not read by the group (Redis 7.0+) */ - 2: (reply: UnwrapReply>) => { - return reply.map(group => { - const unwrapped = group as unknown as UnwrapReply; - return { - name: unwrapped[1], - consumers: unwrapped[3], - pending: unwrapped[5], - 'last-delivered-id': unwrapped[7], - 'entries-read': unwrapped[9], - lag: unwrapped[11] - }; - }); - }, + 2: (reply: Array>) => + reply.map(group => transformTuplesReply(group as unknown as ArrayReply)) as unknown as XInfoGroupsReply['DEFAULT'], 3: undefined as unknown as () => XInfoGroupsReply } } as const satisfies Command; diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index ccb16cc0f8b..9849dbf2e3d 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -19,6 +19,8 @@ import { RedisTcpSocketOptions } from '../client/socket'; import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; import { ClientIdentity, ClientRole, generateClientId } from '../client/identity'; +type BroadSentinelClient = RedisClientType; + interface ClientInfo { id: number; } @@ -162,7 +164,7 @@ export class RedisSentinelClient< async _execute( isReadonly: boolean | undefined, - fn: (client: RedisClient) => Promise + fn: (client: BroadSentinelClient) => Promise ): Promise { if (this._self.#clientInfo === undefined) { throw new Error("Attempted execution on released RedisSentinelClient lease"); @@ -675,7 +677,7 @@ export class RedisSentinelInternal< readonly #name: string; readonly #sentinelClientId: string; readonly #nodeClientOptions: RedisClientOptions; - readonly #sentinelClientOptions: RedisClientOptions; + readonly #sentinelClientOptions: RedisClientOptions; readonly #nodeAddressMap?: NodeAddressMap; readonly #scanInterval: number; readonly #passthroughClientErrorEvents: boolean; @@ -686,13 +688,13 @@ export class RedisSentinelInternal< readonly #sentinelSeedNodes: Array; #sentinelRootNodes: Array; - #sentinelClient?: RedisClientType; + #sentinelClient?: BroadSentinelClient; - #masterClients: Array> = []; + #masterClients: Array = []; #masterClientQueue: WaitQueue; readonly #masterPoolSize: number; - #replicaClients: Array> = []; + #replicaClients: Array = []; #replicaClientsIdx: number = 0; readonly #replicaPoolSize: number; @@ -702,7 +704,7 @@ export class RedisSentinelInternal< #connectPromise?: Promise; #maxCommandRediscovers: number; - readonly #pubSubProxy: PubSubProxy; + readonly #pubSubProxy: PubSubProxy; #scanTimer?: NodeJS.Timeout @@ -767,13 +769,23 @@ export class RedisSentinelInternal< } /* persistent object for life of sentinel object */ - this.#pubSubProxy = new PubSubProxy( + this.#pubSubProxy = new PubSubProxy( this.#nodeClientOptions, err => this.emit('error', err) ); } - #createClient(node: RedisNode, clientOptions: RedisClientOptions, reconnectStrategy?: false) { + #createClient< + _M extends RedisModules, + _F extends RedisFunctions, + _S extends RedisScripts, + _RESP extends RespVersions, + _TYPE_MAPPING extends TypeMapping + >( + node: RedisNode, + clientOptions: RedisClientOptions<_M, _F, _S, _RESP, _TYPE_MAPPING, RedisTcpSocketOptions>, + reconnectStrategy?: false + ): RedisClientType<_M, _F, _S, 2 | 3, _TYPE_MAPPING> { const socket = getMappedNode(node.host, node.port, this.#nodeAddressMap); const client = RedisClient.create({ //first take the globally set RESP @@ -887,7 +899,7 @@ export class RedisSentinelInternal< } async execute( - fn: (client: RedisClientType) => Promise, + fn: (client: BroadSentinelClient) => Promise, clientInfo?: ClientInfo ): Promise { let iter = 0; @@ -937,7 +949,7 @@ export class RedisSentinelInternal< } } - async #createPubSub(client: RedisClientType) { + async #createPubSub(client: BroadSentinelClient) { /* Whenever sentinels or slaves get added, or when slave configuration changes, reconfigure */ await client.pSubscribe(['switch-master', '[-+]sdown', '+slave', '+sentinel', '[-+]odown', '+slave-reconf-done'], (message, channel) => { this.#handlePubSubControlChannel(channel, message); @@ -952,7 +964,7 @@ export class RedisSentinelInternal< } // if clientInfo is defined, it corresponds to a master client in the #masterClients array, otherwise loop around replicaClients - #getClient(clientInfo?: ClientInfo): RedisClientType { + #getClient(clientInfo?: ClientInfo): BroadSentinelClient { if (clientInfo !== undefined) { return this.#masterClients[clientInfo.id]; } @@ -1380,7 +1392,7 @@ export class RedisSentinelInternal< replicaCloseSet.add(str); } - const newClientList: Array> = []; + const newClientList: Array = []; const removedSet = new Set(); for (const replica of this.#replicaClients) { diff --git a/packages/client/lib/sentinel/pub-sub-proxy.ts b/packages/client/lib/sentinel/pub-sub-proxy.ts index 1ba1d25bbf1..def6f9d449b 100644 --- a/packages/client/lib/sentinel/pub-sub-proxy.ts +++ b/packages/client/lib/sentinel/pub-sub-proxy.ts @@ -4,12 +4,13 @@ import { RedisClientOptions } from '../client'; import { PUBSUB_TYPE, PubSubListener, PubSubTypeListeners } from '../client/pub-sub'; import { RedisNode } from './types'; import RedisClient from '../client'; +import { RedisTcpSocketOptions } from '../client/socket'; -type Client = RedisClient< +type Client = RedisClient< RedisModules, RedisFunctions, RedisScripts, - RespVersions, + RESP, TypeMapping >; @@ -18,22 +19,22 @@ type Subscriptions = Record< PubSubTypeListeners >; -type PubSubState = { - client: Client; - connectPromise: Promise | undefined; +type PubSubState = { + client: Client; + connectPromise: Promise | undefined> | undefined; }; type OnError = (err: unknown) => unknown; -export class PubSubProxy extends EventEmitter { +export class PubSubProxy extends EventEmitter { #clientOptions; #onError; #node?: RedisNode; - #state?: PubSubState; + #state?: PubSubState; #subscriptions?: Subscriptions; - constructor(clientOptions: RedisClientOptions, onError: OnError) { + constructor(clientOptions: RedisClientOptions, onError: OnError) { super(); this.#clientOptions = clientOptions; @@ -127,7 +128,7 @@ export class PubSubProxy extends EventEmitter { await this.#initiatePubSubClient(true); } - #executeCommand(fn: (client: Client) => T) { + #executeCommand(fn: (client: Client) => T) { const client = this.#getPubSubClient(); if (client instanceof RedisClient) { return fn(client); @@ -158,7 +159,7 @@ export class PubSubProxy extends EventEmitter { ); } - #unsubscribe(fn: (client: Client) => Promise) { + #unsubscribe(fn: (client: Client) => Promise) { return this.#executeCommand(async client => { const reply = await fn(client); diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index c1888d0e68d..92a276ab177 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -264,7 +264,13 @@ export default class TestUtils { /** * Cleans up non-default ACL users using a temporary client connection */ - static async cleanupAclUsers(port: number, clientOptions?: Partial): Promise { + static async cleanupAclUsers< + M extends RedisModules = RedisModules, + F extends RedisFunctions = RedisFunctions, + S extends RedisScripts = RedisScripts, + RESP extends RespVersions = RespVersions, + TYPE_MAPPING extends TypeMapping = TypeMapping + >(port: number, clientOptions?: Partial>): Promise { const cleanupClient = createClient({ ...clientOptions, socket: {