diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 9af9b9a3bef..8c82ad4076a 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -587,11 +587,11 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('AbortError', async client => { - await blockSetImmediate(async () => { - await assert.rejects(client.sendCommand(['PING'], { - abortSignal: AbortSignal.timeout(5) - }), AbortError); - }) + await blockSetImmediate(async () => { + await assert.rejects(client.sendCommand(['PING'], { + abortSignal: AbortSignal.timeout(5) + }), AbortError); + }) }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('Timeout with custom timeout config', async client => { @@ -865,6 +865,37 @@ describe('Client', () => { } }); + + testUtils.testWithClient('proxies respect RedisClient command options', async client => { + + const TIMEOUT = 1234; + (client as any)._commandOptions = { timeout: TIMEOUT }; + + const bufferProxy = client.withCommandOptions({ + typeMapping: { [RESP_TYPES.BLOB_STRING]: Buffer } + }); + + const stringReply = await client.module.echo('hi'); + const bufferReply = await bufferProxy.module.echo('hi'); + + + assert.ok((bufferReply as unknown) instanceof Buffer, 'Proxy failed to return Buffer.'); + assert.strictEqual(typeof stringReply, 'string', 'Original client was corrupted.'); + assert.equal(bufferReply.toString(), stringReply); + + const proxyOptions = (bufferProxy.module as any)._commandOptions; + assert.equal(proxyOptions.timeout, TIMEOUT, 'Inherited options (timeout) were lost in the proxy chain.') + + assert.ok(!Object.prototype.hasOwnProperty.call(proxyOptions, 'timeout'), 'Timeout should be inherited, not copied.'); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + modules: { + module + } + } + }) + testUtils.testWithClient('duplicate should reuse command options', async client => { const duplicate = client.duplicate(); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index bac6efa0770..be5f56ad67e 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -26,7 +26,7 @@ import { ClientIdentity, ClientRole, generateClientId } from './identity'; import { trace, sanitizeArgs, publish, CHANNELS, type CommandTraceContext } from './tracing'; import { DEFAULT_COMMAND_TIMEOUT } from '../defaults'; -const noop = () => {}; +const noop = () => { }; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -272,7 +272,10 @@ export type RedisClientType< type ProxyClient = RedisClient; -type NamespaceProxyClient = { _self: ProxyClient }; +type NamespaceProxyClient = { + _self: ProxyClient; + _commandOptions?: CommandOptions +}; export interface ScanIteratorOptions { cursor?: RedisArgument; @@ -305,7 +308,7 @@ export default class RedisClient< const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); - return this._self._executeCommand(command, parser, this._self._commandOptions, transformReply); + return this._self._executeCommand(command, parser, this._commandOptions, transformReply); }; } @@ -318,7 +321,7 @@ export default class RedisClient< parser.push(...prefix); fn.parseCommand(parser, ...args); - return this._self._executeCommand(fn, parser, this._self._commandOptions, transformReply); + return this._self._executeCommand(fn, parser, this._commandOptions, transformReply); }; } @@ -665,7 +668,7 @@ export default class RedisClient< this.#registerForMetrics(); - if(this.#options.maintNotifications !== 'disabled') { + if (this.#options.maintNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options); }; @@ -743,7 +746,7 @@ export default class RedisClient< this._commandOptions = { timeout: DEFAULT_COMMAND_TIMEOUT, ...options.commandOptions }; - if(options.maintNotifications !== 'disabled') { + if (options.maintNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options); } @@ -927,16 +930,16 @@ export default class RedisClient< } if (this.#clientSideCache) { - commands.push({cmd: this.#clientSideCache.trackingOn()}); + commands.push({ cmd: this.#clientSideCache.trackingOn() }); } if (this.#options?.emitInvalidate) { - commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']}); + commands.push({ cmd: ['CLIENT', 'TRACKING', 'ON'] }); } const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options, this._clientId); - if(maintenanceHandshakeCmd) { + if (maintenanceHandshakeCmd) { commands.push(maintenanceHandshakeCmd); }; @@ -952,24 +955,24 @@ export default class RedisClient< this.emit('error', err); } }) - .on('error', err => { - this.emit('error', err); - this.#clientSideCache?.onError(); - if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { - this.#queue.flushWaitingForReply(err); - } else { - this.#queue.flushAll(err); - } - }) - .on('connect', () => this.emit('connect')) - .on('ready', () => { - this.emit('ready'); - this.#setPingTimer(); - this.#maybeScheduleWrite(); - }) - .on('reconnecting', () => this.emit('reconnecting')) - .on('drain', () => this.#maybeScheduleWrite()) - .on('end', () => this.emit('end')); + .on('error', err => { + this.emit('error', err); + this.#clientSideCache?.onError(); + if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { + this.#queue.flushWaitingForReply(err); + } else { + this.#queue.flushAll(err); + } + }) + .on('connect', () => this.emit('connect')) + .on('ready', () => { + this.emit('ready'); + this.#setPingTimer(); + this.#maybeScheduleWrite(); + }) + .on('reconnecting', () => this.emit('reconnecting')) + .on('drain', () => this.#maybeScheduleWrite()) + .on('end', () => this.emit('end')); } #initiateSocket(clientId: string): RedisSocket { @@ -1038,7 +1041,8 @@ export default class RedisClient< TYPE_MAPPING extends TypeMapping >(options: OPTIONS) { const proxy = Object.create(this._self); - proxy._commandOptions = { ...this._commandOptions, ...options }; + proxy._commandOptions = Object.assign( + Object.create(this._commandOptions ?? null),options); return proxy as RedisClientType< M, F, @@ -1134,61 +1138,61 @@ export default class RedisClient< /** * @internal */ - _ejectSocket(): RedisSocket { - const socket = this._self.#socket; - // @ts-expect-error null assignment is intentional during eject - this._self.#socket = null; - socket.removeAllListeners(); - return socket; - } - - /** - * @internal - */ - _insertSocket(socket: RedisSocket) { - if(this._self.#socket) { + _ejectSocket(): RedisSocket { + const socket = this._self.#socket; + // @ts-ignore + this._self.#socket = null; + socket.removeAllListeners(); + return socket; + } + + /** + * @internal + */ + _insertSocket(socket: RedisSocket) { + if (this._self.#socket) { this._self._ejectSocket().destroy(); - } - this._self.#socket = socket; - this._self.#attachListeners(this._self.#socket); - } - - /** - * @internal - */ - _maintenanceUpdate(update: MaintenanceUpdate) { - this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout); - this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout); - } - - /** - * @internal - */ - _pause() { - this._self.#paused = true; - } - - /** - * @internal - */ - _unpause() { - this._self.#paused = false; - this._self.#maybeScheduleWrite(); - } - - /** - * @internal - */ - _handleSmigrated(smigratedEvent: SMigratedEvent) { - this._self.emit(SMIGRATED_EVENT, smigratedEvent); - } - - /** - * @internal - */ - _getQueue(): RedisCommandsQueue { - return this._self.#queue; - } + } + this._self.#socket = socket; + this._self.#attachListeners(this._self.#socket); + } + + /** + * @internal + */ + _maintenanceUpdate(update: MaintenanceUpdate) { + this._self.#socket.setMaintenanceTimeout(update.relaxedSocketTimeout); + this._self.#queue.setMaintenanceCommandTimeout(update.relaxedCommandTimeout); + } + + /** + * @internal + */ + _pause() { + this._self.#paused = true; + } + + /** + * @internal + */ + _unpause() { + this._self.#paused = false; + this._self.#maybeScheduleWrite(); + } + + /** + * @internal + */ + _handleSmigrated(smigratedEvent: SMigratedEvent) { + this._self.emit(SMIGRATED_EVENT, smigratedEvent); + } + + /** + * @internal + */ + _getQueue(): RedisCommandsQueue { + return this._self.#queue; + } /** * @internal @@ -1261,10 +1265,9 @@ export default class RedisClient< } // Merge global options with provided options - const opts = { - ...this._commandOptions, - ...options, - }; + const opts = options ? + Object.assign(Object.create(this._commandOptions ?? null), options) : + this._commandOptions; const promise = this._self.#queue.addCommand(args, opts); this._self.#scheduleWrite(); @@ -1450,7 +1453,7 @@ export default class RedisClient< } #write() { - if(this.#paused) { + if (this.#paused) { return } this.#socket.write(this.#queue.commandsToWrite()); diff --git a/packages/client/lib/client/pool.spec.ts b/packages/client/lib/client/pool.spec.ts index fa19504785d..8e5b7aba4c8 100644 --- a/packages/client/lib/client/pool.spec.ts +++ b/packages/client/lib/client/pool.spec.ts @@ -156,7 +156,7 @@ describe('RedisClientPool', () => { assert.equal(result2, 'task2'); }, { ...GLOBAL.SERVERS.OPEN, - poolOptions: { minimum: 1, maximum: 1, acquireTimeout: 2000, cleanupDelay: 400 } + poolOptions: { minimum: 1, maximum: 1, acquireTimeout: 2000, cleanupDelay: 400 } }); testUtils.testWithClientPool('execute rejects when pool is closing', async pool => { @@ -295,4 +295,65 @@ describe('RedisClientPool', () => { }, GLOBAL.SERVERS.OPEN ); + + testUtils.testWithClientPool('sendCommand respects proxy command options', async pool => { + const TIMEOUT = 1234; + + + (pool as any)._commandOptions = { timeout: TIMEOUT }; + + + const bufferProxy = pool.withCommandOptions({ + typeMapping: { + [RESP_TYPES.BLOB_STRING]: Buffer + } + }); + + + const stringReply = await pool.sendCommand(['ECHO', 'hello']); + assert.equal(typeof stringReply, 'string', 'Base pool should return a string'); + + + const bufferReply = await bufferProxy.sendCommand(['ECHO', 'hello']); + assert.ok(bufferReply instanceof Buffer, 'Proxy should return a Buffer'); + assert.equal(bufferReply.toString(), 'hello'); + + + const proxyOptions = (bufferProxy as any)._commandOptions; + + + assert.equal( + proxyOptions.timeout, + TIMEOUT, + 'Proxy should inherit timeout from base pool' + ); + + + assert.equal( + Object.prototype.hasOwnProperty.call(proxyOptions, 'timeout'), + false, + 'Timeout should be inherited via prototype chain, not copied' + ); + + assert.equal( + Object.prototype.hasOwnProperty.call(proxyOptions, 'typeMapping'), + true, + 'TypeMapping should be a direct property of the proxy options' + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClientPool('namespace proxy sees updated base options', async pool => { + + const module = (pool as any).module; + + assert.equal(module._commandOptions, null); + + (pool as any)._commandOptions = { timeout: 5000 }; + + assert.equal(module._commandOptions.timeout, 5000); + + (pool as any)._commandOptions = { timeout: 9999 }; + + assert.equal(module._commandOptions.timeout, 9999); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 0602187ee33..9eba8feaca1 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -86,8 +86,8 @@ type PoolWithCommands< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; -}; + [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; + }; export type RedisClientPoolType< M extends RedisModules = {}, @@ -96,17 +96,20 @@ export type RedisClientPoolType< RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {} > = ( - RedisClientPool & - PoolWithCommands & - WithModules & - WithFunctions & - WithScripts -); + RedisClientPool & + PoolWithCommands & + WithModules & + WithFunctions & + WithScripts + ); // eslint-disable-next-line @typescript-eslint/no-explicit-any -- variance markers for pool generics type ProxyPool = RedisClientPoolType; -type NamespaceProxyPool = { _self: ProxyPool }; +type NamespaceProxyPool = { + _self: ProxyPool; + _commandOptions?: CommandOptions +}; export class RedisClientPool< M extends RedisModules = {}, @@ -122,7 +125,7 @@ export class RedisClientPool< const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); - return this.execute(client => client._executeCommand(command, parser, this._commandOptions, transformReply)) + return this._self.execute(client => client._executeCommand(command, parser, this._commandOptions, transformReply)) }; } @@ -133,7 +136,7 @@ export class RedisClientPool< const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); - return this._self.execute(client => client._executeCommand(command, parser, this._self._commandOptions, transformReply)) + return this._self.execute(client => client._executeCommand(command, parser, this._commandOptions, transformReply)) }; } @@ -146,7 +149,8 @@ export class RedisClientPool< parser.push(...prefix); fn.parseCommand(parser, ...args); - return this._self.execute(client => client._executeCommand(fn, parser, this._self._commandOptions, transformReply)) }; + return this._self.execute(client => client._executeCommand(fn, parser, this._commandOptions, transformReply)) + }; } static #createScriptCommand(script: RedisScript, resp: RespVersions) { @@ -158,7 +162,7 @@ export class RedisClientPool< parser.pushVariadic(prefix); script.parseCommand(parser, ...args); - return this.execute(client => client._executeScript(script, parser, this._commandOptions, transformReply)) + return this._self.execute(client => client._executeScript(script, parser, this._commandOptions, transformReply)) }; } @@ -177,7 +181,7 @@ export class RedisClientPool< ) { let Pool = RedisClientPool.#SingleEntryCache.get(clientOptions); - if(!Pool) { + if (!Pool) { Pool = attachConfig({ BaseClass: RedisClientPool, commands: NON_STICKY_COMMANDS, @@ -301,7 +305,7 @@ export class RedisClientPool< super(); const socketOpts = clientOptions?.socket as { host?: string; port?: number } | undefined; - + this.#identity = { id: generateClientId(socketOpts?.host, socketOpts?.port, clientOptions?.database), role: ClientRole.POOL, @@ -320,7 +324,7 @@ export class RedisClientPool< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); -// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + // this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } } @@ -336,7 +340,10 @@ export class RedisClientPool< TYPE_MAPPING extends TypeMapping >(options: OPTIONS) { const proxy = Object.create(this._self); - proxy._commandOptions = options; + proxy._commandOptions = Object.assign( + Object.create(this._commandOptions ?? null), + options + ); return proxy as RedisClientPoolType< M, F, @@ -481,10 +488,10 @@ export class RedisClientPool< const result = fn(node.value); if (result instanceof Promise) { result - .then(resolve, reject) - .finally(() => { - this.#returnClient(node); - }) + .then(resolve, reject) + .finally(() => { + this.#returnClient(node); + }) } else { resolve(result); this.#returnClient(node); diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 0360d7d66cb..5743d7d9812 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -5,6 +5,7 @@ import RedisCluster from '.'; import { SQUARE_SCRIPT } from '../client/index.spec'; import { RootNodesUnavailableError } from '../errors'; import { spy } from 'sinon'; +import { RESP_TYPES } from '../RESP/decoder'; import RedisClient from '../client'; import { RESP_TYPES } from '../RESP/decoder'; @@ -284,6 +285,50 @@ describe('Cluster', () => { minimizeConnections: true } }); + + testUtils.testWithCluster('proxies respect RedisCluster command options', async cluster => { + + const CUSTOM_MAPPING = { [RESP_TYPES.BLOB_STRING]: Buffer }; + + + (cluster as any)._commandOptions = { timeout: 5000 }; + + + const bufferProxy = cluster.withTypeMapping(CUSTOM_MAPPING); + + const baseModule = (cluster as any).module; + const proxyModule = (bufferProxy as any).module; + + + assert.equal( + proxyModule._commandOptions.timeout, + 5000, + 'Namespace proxy should inherit timeout from base cluster' + ); + + + const stringReply = await baseModule.echo('hello'); + assert.equal(typeof stringReply, 'string', 'Base module should return a string'); + + const bufferReply = await proxyModule.echo('hello'); + assert.ok( + bufferReply instanceof Buffer, + 'Proxied module command should return a Buffer' + ); + + + assert.equal( + Object.prototype.hasOwnProperty.call(proxyModule._commandOptions, 'typeMapping'), + true, + 'typeMapping should be an "own" property of the proxy options' + ); + assert.equal( + Object.prototype.hasOwnProperty.call(proxyModule._commandOptions, 'timeout'), + false, + 'timeout should be inherited, not copied' + ); + + }, GLOBAL.CLUSTERS.OPEN); }); describe('PubSub', () => { diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 8035b0a7f3f..ef1219e7943 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -26,8 +26,8 @@ type WithCommands< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; -}; + [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; + }; interface ClusterCommander< @@ -36,7 +36,7 @@ interface ClusterCommander< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping, - // POLICIES extends CommandPolicies +// POLICIES extends CommandPolicies > extends CommanderConfig { commandOptions?: ClusterCommandOptions; } @@ -52,7 +52,7 @@ export interface RedisClusterOptions< S extends RedisScripts = RedisScripts, RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = TypeMapping, - // POLICIES extends CommandPolicies = CommandPolicies +// POLICIES extends CommandPolicies = CommandPolicies > extends ClusterCommander { /** * Should contain details for some of the cluster nodes that the client will use to discover @@ -135,23 +135,28 @@ export type RedisClusterType< S extends RedisScripts = {}, RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {}, - // POLICIES extends CommandPolicies = {} +// POLICIES extends CommandPolicies = {} > = ( - RedisCluster & - WithCommands & - WithModules & - WithFunctions & - WithScripts -); + RedisCluster & + WithCommands & + WithModules & + WithFunctions & + WithScripts + ); export type ClusterCommandOptions< TYPE_MAPPING extends TypeMapping = TypeMapping - // POLICIES extends CommandPolicies = CommandPolicies -> = CommandOptions; +// POLICIES extends CommandPolicies = CommandPolicies +> extends CommandOptions { + // policies?: POLICIES; +} type ProxyCluster = RedisCluster; -type NamespaceProxyCluster = { _self: ProxyCluster }; +type NamespaceProxyCluster = { + _self: ProxyCluster; + _commandOptions?: ClusterCommandOptions; +}; export default class RedisCluster< M extends RedisModules, @@ -159,7 +164,7 @@ export default class RedisCluster< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping, - // POLICIES extends CommandPolicies +// POLICIES extends CommandPolicies > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); @@ -187,7 +192,7 @@ export default class RedisCluster< return this._self._execute( parser.firstKey, command.IS_READ_ONLY, - this._self._commandOptions, + this._commandOptions, (client, opts) => client._executeCommand(command, parser, opts, transformReply) ); }; @@ -205,7 +210,7 @@ export default class RedisCluster< return this._self._execute( parser.firstKey, fn.IS_READ_ONLY, - this._self._commandOptions, + this._commandOptions, (client, opts) => client._executeCommand(fn, parser, opts, transformReply) ); }; @@ -238,7 +243,7 @@ export default class RedisCluster< S extends RedisScripts = {}, RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {}, - // POLICIES extends CommandPolicies = {} + // POLICIES extends CommandPolicies = {} >(config?: ClusterCommander) { let Cluster = RedisCluster.#SingleEntryCache.get(config); @@ -269,7 +274,7 @@ export default class RedisCluster< S extends RedisScripts = {}, RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {}, - // POLICIES extends CommandPolicies = {} + // POLICIES extends CommandPolicies = {} >(options?: RedisClusterOptions) { return RedisCluster.factory(options)(options); } @@ -374,7 +379,7 @@ export default class RedisCluster< withCommandOptions< OPTIONS extends ClusterCommandOptions, TYPE_MAPPING extends TypeMapping, - // POLICIES extends CommandPolicies + // POLICIES extends CommandPolicies >(options: OPTIONS) { const proxy = Object.create(this); proxy._commandOptions = options; @@ -384,7 +389,7 @@ export default class RedisCluster< S, RESP, TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {} - // POLICIES extends CommandPolicies ? POLICIES : {} + // POLICIES extends CommandPolicies ? POLICIES : {} >; } @@ -403,7 +408,7 @@ export default class RedisCluster< S, RESP, K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING - // K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES + // K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES >; } @@ -427,14 +432,14 @@ export default class RedisCluster< ) { return async (client: RedisClientType, options?: ClusterCommandOptions) => { const chainId = Symbol("asking chain"); - const opts = options ? {...options} : {}; + const opts = options ? { ...options } : {}; opts.chainId = chainId; const ret = await Promise.all( [ - client.sendCommand([ASKING_CMD], {chainId: chainId}), + client.sendCommand([ASKING_CMD], { chainId: chainId }), fn(client, opts) ] ); @@ -531,10 +536,10 @@ export default class RedisCluster< ): Promise { // Merge global options with local options - const opts = { - ...this._commandOptions, - ...options - } + const opts = options ? + Object.assign(Object.create(this._commandOptions ?? null), options) : + (this._commandOptions ?? {}); + return this._self._execute( firstKey, isReadonly, @@ -649,7 +654,7 @@ export default class RedisCluster< resubscribeAllPubSubListeners(allListeners: Partial) { if (allListeners.CHANNELS) { - for(const [channel, listeners] of allListeners.CHANNELS) { + for (const [channel, listeners] of allListeners.CHANNELS) { listeners.buffers.forEach(bufListener => { this.subscribe(channel, bufListener, true); }); diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index b5ab085edaa..16418a45f46 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -34,9 +34,8 @@ export function attachConfig< createScriptCommand, config }: AttachConfigOptions) { - const RESP = config?.RESP ?? DEFAULT_RESP, - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- dynamic prototype patching - Class: any = class extends BaseClass {}; + const RESP = config?.RESP ?? 2, + Class: any = class extends BaseClass { }; for (const [name, command] of Object.entries(commands)) { Class.prototype[name] = createCommand(command, RESP); @@ -74,31 +73,29 @@ export function attachConfig< return Class; } - -// Per-receiver namespace cache. Keyed by the receiver (original instance or any -// `withCommandOptions(...)` proxy) so each one gets a namespace bound to itself -// via `_self`. Caching the namespace as an own property on the receiver — which -// is what an earlier version did — leaks across the prototype chain: a proxy -// created via `Object.create(original)` would inherit the original's cached -// namespace and `_self` would point back to the original, silently bypassing -// the proxy's command-options overrides for every module/function command. -// eslint-disable-next-line @typescript-eslint/no-explicit-any -- namespaces are dynamically shaped per module -const namespaceCache = new WeakMap>(); - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -- dynamic prototype patching helper +const namespaceCache = new WeakMap>(); function attachNamespace(prototype: any, name: PropertyKey, fns: any) { + Object.defineProperty(prototype, name, { get() { - let perReceiver = namespaceCache.get(this); - if (perReceiver === undefined) { - perReceiver = new Map(); - namespaceCache.set(this, perReceiver); + + let instanceCache = namespaceCache.get(this); + if (!instanceCache) { + instanceCache = new Map(); + namespaceCache.set(this, instanceCache); } - let value = perReceiver.get(name); - if (value === undefined) { + + let value = instanceCache.get(name); + if (!value) { value = Object.create(fns); value._self = this; - perReceiver.set(name, value); + + Object.defineProperty(value, '_commandOptions', { + get() { return this._self._commandOptions ?? null; }, + enumerable: true, + configurable: false + }) + instanceCache.set(name, value); } return value; } diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index da13450dc76..ab33e462f93 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -68,6 +68,14 @@ export class RedisSentinelClient< : this._self.#commandOptions; } + /** + * Internal getter for command dispatch functions. + * @returns the effective command options of the proxy. + */ + get _commandOptions(): CommandOptions | undefined { + return this.commandOptions; + } + #commandOptions?: CommandOptions; private _commandOptions?: CommandOptions; @@ -314,6 +322,15 @@ export default class RedisSentinel< return this._self.#identity; } + /** + * @internal + * Internal getter for command dispatch functions. + * Returns the effective command options of the proxy. + */ + get _commandOptions(): CommandOptions | undefined { + return this.commandOptions; + } + #commandOptions?: CommandOptions; private _commandOptions?: CommandOptions; @@ -820,7 +837,7 @@ export class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions; + this.#nodeClientOptions = (options.nodeClientOptions ? { ...options.nodeClientOptions } : {}) as RedisClientOptions; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } @@ -831,7 +848,7 @@ export class RedisSentinelInternal< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); -// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + // this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } } @@ -939,7 +956,7 @@ export class RedisSentinelInternal< while (true) { this.#trace("starting connect loop"); - count+=1; + count += 1; if (this.#destroy) { this.#trace("in #connect and want to destroy") return; @@ -1089,10 +1106,10 @@ export class RedisSentinelInternal< #handleSentinelFailure(node: RedisNode) { const found = this.#sentinelRootNodes.findIndex( - (rootNode) => rootNode.host === node.host && rootNode.port === node.port + (rootNode) => rootNode.host === node.host && rootNode.port === node.port ); if (found !== -1) { - this.#sentinelRootNodes.splice(found, 1); + this.#sentinelRootNodes.splice(found, 1); } this.#restoreSentinelRootNodesIfEmpty(); this.#reset(); diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index 1831ba3f790..37f91b71556 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -134,7 +134,7 @@ export interface SentinelCommander< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping, - // POLICIES extends CommandPolicies +// POLICIES extends CommandPolicies > extends CommanderConfig { commandOptions?: CommandOptions; } @@ -149,36 +149,36 @@ type WithCommands< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; -}; + [P in keyof typeof NON_STICKY_COMMANDS]: CommandSignature<(typeof NON_STICKY_COMMANDS)[P], RESP, TYPE_MAPPING>; + }; type WithModules< M extends RedisModules, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof M]: { - [C in keyof M[P]]: CommandSignature; + [P in keyof M]: { + [C in keyof M[P]]: CommandSignature; + }; }; -}; type WithFunctions< F extends RedisFunctions, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [L in keyof F]: { - [C in keyof F[L]]: CommandSignature; + [L in keyof F]: { + [C in keyof F[L]]: CommandSignature; + }; }; -}; type WithScripts< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof S]: CommandSignature; -}; + [P in keyof S]: CommandSignature; + }; export type RedisSentinelClientType< M extends RedisModules = {}, @@ -187,12 +187,12 @@ export type RedisSentinelClientType< RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {}, > = ( - RedisSentinelClient & - WithCommands & - WithModules & - WithFunctions & - WithScripts -); + RedisSentinelClient & + WithCommands & + WithModules & + WithFunctions & + WithScripts + ); export type RedisSentinelType< M extends RedisModules = {}, @@ -200,25 +200,33 @@ export type RedisSentinelType< S extends RedisScripts = {}, RESP extends RespVersions = 3, TYPE_MAPPING extends TypeMapping = {}, - // POLICIES extends CommandPolicies = {} +// POLICIES extends CommandPolicies = {} > = ( - RedisSentinel & - WithCommands & - WithModules & - WithFunctions & - WithScripts -); + RedisSentinel & + WithCommands & + WithModules & + WithFunctions & + WithScripts + ); export interface SentinelCommandOptions< TYPE_MAPPING extends TypeMapping = TypeMapping -> extends CommandOptions {} +> extends CommandOptions { } -// eslint-disable-next-line @typescript-eslint/no-explicit-any -- variance markers for sentinel generics -export type ProxySentinel = RedisSentinel; -// eslint-disable-next-line @typescript-eslint/no-explicit-any -- variance markers for sentinel generics -export type ProxySentinelClient = RedisSentinelClient; -export type NamespaceProxySentinel = { _self: ProxySentinel }; -export type NamespaceProxySentinelClient = { _self: ProxySentinelClient }; +export type ProxySentinel = RedisSentinel & { + _commandOptions?: SentinelCommandOptions; +}; +export type ProxySentinelClient = RedisSentinelClient & { + _commandOptions?: SentinelCommandOptions; +}; +export type NamespaceProxySentinel = { + _self: ProxySentinel; + _commandOptions?: CommandOptions +}; +export type NamespaceProxySentinelClient = { + _self: ProxySentinelClient; + _commandOptions?: CommandOptions; +}; export type NodeInfo = { ip: string, @@ -227,7 +235,7 @@ export type NodeInfo = { }; export type RedisSentinelEvent = NodeChangeEvent | SizeChangeEvent; - + export type NodeChangeEvent = { type: "SENTINEL_CHANGE" | "MASTER_CHANGE" | "REPLICA_ADD" | "REPLICA_REMOVE"; node: RedisNode; diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index ab475919398..c3de36f43cf 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -5,7 +5,7 @@ import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } fro import { NamespaceProxySentinel, NamespaceProxySentinelClient, NodeAddressMap, ProxySentinel, ProxySentinelClient, RedisNode } from './types'; /* TODO: should use map interface, would need a transform reply probably? as resp2 is list form, which this depends on */ -export function parseNode(node: Record): RedisNode | undefined{ +export function parseNode(node: Record): RedisNode | undefined { if (node.flags.includes("s_down") || node.flags.includes("disconnected") || node.flags.includes("failover_in_progress")) { return undefined; @@ -72,7 +72,7 @@ export function createCommand(com return this._self._execute( command.IS_READ_ONLY, - client => client._executeCommand(command, parser, this.commandOptions, transformReply) + client => client._executeCommand(command, parser, this._commandOptions, transformReply) ); }; } @@ -88,7 +88,7 @@ export function createFunctionCommand client._executeCommand(fn, parser, this._self.commandOptions, transformReply) + client => client._executeCommand(fn, parser, this._commandOptions, transformReply) ); } }; @@ -102,7 +102,7 @@ export function createModuleCommand client._executeCommand(command, parser, this._self.commandOptions, transformReply) + client => client._executeCommand(command, parser, this._commandOptions, transformReply) ); } }; @@ -118,7 +118,7 @@ export function createScriptCommand client._executeScript(script, parser, this.commandOptions, transformReply) + client => client._executeScript(script, parser, this._commandOptions, transformReply) ); }; }