From 17db5bf725286232343be9f91950dafb372c8bac Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:00:31 -0700 Subject: [PATCH 01/11] perf(execution): parallelize preflight gates, cache deployed state, memoize Anthropic client - Memoize Anthropic + Azure-Anthropic SDK clients (new client-cache.ts) keyed by apiKey (+beta header; +baseURL/version/pinnedIP for Azure) so HTTP keep-alive connections are reused instead of a fresh TLS handshake per call. apiKey is the tenant boundary. - Parallelize the read-only preflight gates in preprocessing.ts (ban + subscription, then usage + org-member + rate-limit) while preserving exact error precedence (ban 403 -> usage 402 -> rate 429) and keeping the sole write (admission reservation) last. - Parallelize the independent workflow-state and env-var loads in execution-core. - Cache deployed workflow state by immutable deploymentVersionId with deep-clone-on-read, oldest-first eviction, and a 5-min TTL bounding the credential-mapping edge across ECS tasks. - Parallelize the independent personal-subscription + membership queries in getHighestPrioritySubscription. - BYOK: drop the redundant getWorkspaceById existence check (auth already validates the workspace); read the key list fresh every call for zero cross-instance staleness. Billing/usage/ban/permission reads stay fresh on the primary (no cache, no replica). Adds tests for every new mechanism and fixes a pre-existing vitest class-mock incompatibility that had execution-core.test.ts fully red on staging. --- apps/sim/lib/api-key/byok.test.ts | 26 +- apps/sim/lib/api-key/byok.ts | 11 +- .../lib/billing/calculations/usage-monitor.ts | 4 + apps/sim/lib/billing/core/plan.test.ts | 194 ++++++++ apps/sim/lib/billing/core/plan.ts | 29 +- apps/sim/lib/execution/preprocessing.test.ts | 38 +- apps/sim/lib/execution/preprocessing.ts | 414 +++++++++++------- .../workflows/executor/execution-core.test.ts | 116 ++++- .../lib/workflows/executor/execution-core.ts | 106 +++-- .../lib/workflows/persistence/utils.test.ts | 170 +++++++ apps/sim/lib/workflows/persistence/utils.ts | 68 ++- .../providers/anthropic/client-cache.test.ts | 162 +++++++ apps/sim/providers/anthropic/client-cache.ts | 41 ++ apps/sim/providers/anthropic/index.ts | 21 +- apps/sim/providers/azure-anthropic/index.ts | 41 +- 15 files changed, 1157 insertions(+), 284 deletions(-) create mode 100644 apps/sim/lib/billing/core/plan.test.ts create mode 100644 apps/sim/providers/anthropic/client-cache.test.ts create mode 100644 apps/sim/providers/anthropic/client-cache.ts diff --git a/apps/sim/lib/api-key/byok.test.ts b/apps/sim/lib/api-key/byok.test.ts index 439c392d946..6b288b6213f 100644 --- a/apps/sim/lib/api-key/byok.test.ts +++ b/apps/sim/lib/api-key/byok.test.ts @@ -3,9 +3,8 @@ */ import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockOrderBy, mockGetWorkspaceById, mockDecryptSecret } = vi.hoisted(() => ({ +const { mockOrderBy, mockDecryptSecret } = vi.hoisted(() => ({ mockOrderBy: vi.fn(), - mockGetWorkspaceById: vi.fn(), mockDecryptSecret: vi.fn(), })) @@ -19,10 +18,6 @@ vi.mock('@sim/db', () => ({ }, })) -vi.mock('@/lib/workspaces/permissions/utils', () => ({ - getWorkspaceById: mockGetWorkspaceById, -})) - vi.mock('@/lib/core/security/encryption', () => ({ decryptSecret: mockDecryptSecret, })) @@ -70,7 +65,6 @@ const storedKey = (id: string) => ({ id, encryptedApiKey: `encrypted-${id}` }) describe('getBYOKKey', () => { beforeEach(() => { vi.clearAllMocks() - mockGetWorkspaceById.mockResolvedValue({ id: 'workspace' }) mockOrderBy.mockResolvedValue([]) mockDecryptSecret.mockImplementation(async (encrypted: string) => ({ decrypted: encrypted.replace('encrypted-', 'decrypted-'), @@ -80,13 +74,6 @@ describe('getBYOKKey', () => { it('returns null when no workspaceId is provided', async () => { expect(await getBYOKKey(undefined, 'openai')).toBeNull() expect(await getBYOKKey(null, 'openai')).toBeNull() - expect(mockGetWorkspaceById).not.toHaveBeenCalled() - }) - - it('returns null when the workspace does not exist', async () => { - mockGetWorkspaceById.mockResolvedValue(null) - - expect(await getBYOKKey(uniqueWorkspaceId(), 'openai')).toBeNull() }) it('returns null when the workspace has no keys for the provider', async () => { @@ -123,6 +110,17 @@ describe('getBYOKKey', () => { ]) }) + it('reads the key list fresh from the database on every call', async () => { + const workspaceId = uniqueWorkspaceId() + mockOrderBy.mockResolvedValue([storedKey('key-1')]) + + await getBYOKKey(workspaceId, 'openai') + await getBYOKKey(workspaceId, 'openai') + await getBYOKKey(workspaceId, 'openai') + + expect(mockOrderBy).toHaveBeenCalledTimes(3) + }) + it('tracks rotation independently per provider within a workspace', async () => { const workspaceId = uniqueWorkspaceId() mockOrderBy.mockResolvedValue([storedKey('key-1'), storedKey('key-2')]) diff --git a/apps/sim/lib/api-key/byok.ts b/apps/sim/lib/api-key/byok.ts index b131d2f7425..a3f08a7d2cd 100644 --- a/apps/sim/lib/api-key/byok.ts +++ b/apps/sim/lib/api-key/byok.ts @@ -6,7 +6,6 @@ import { getRotatingApiKey } from '@/lib/core/config/api-keys' import { env } from '@/lib/core/config/env' import { isHosted } from '@/lib/core/config/env-flags' import { decryptSecret } from '@/lib/core/security/encryption' -import { getWorkspaceById } from '@/lib/workspaces/permissions/utils' import { getHostedModels } from '@/providers/models' import { PROVIDER_PLACEHOLDER_KEY } from '@/providers/utils' import { useProvidersStore } from '@/stores/providers/store' @@ -37,6 +36,11 @@ function nextRotationIndex(poolKey: string, poolSize: number): number { * multiple keys stored for the provider, requests round-robin across them in * creation order. A key that fails to decrypt is skipped in favor of the next * one in the pool. + * + * The key list is read fresh from the database on every call rather than + * cached: BYOK lookups are not a hot database query, and reading fresh keeps + * key revocation/rotation effective immediately across every ECS task with no + * cross-instance cache-coherence concern. */ export async function getBYOKKey( workspaceId: string | undefined | null, @@ -47,11 +51,6 @@ export async function getBYOKKey( } try { - const activeWorkspace = await getWorkspaceById(workspaceId) - if (!activeWorkspace) { - return null - } - const keys = await db .select({ id: workspaceBYOKKeys.id, encryptedApiKey: workspaceBYOKKeys.encryptedApiKey }) .from(workspaceBYOKKeys) diff --git a/apps/sim/lib/billing/calculations/usage-monitor.ts b/apps/sim/lib/billing/calculations/usage-monitor.ts index 906007689f8..567a70e0b56 100644 --- a/apps/sim/lib/billing/calculations/usage-monitor.ts +++ b/apps/sim/lib/billing/calculations/usage-monitor.ts @@ -467,6 +467,10 @@ export async function checkOrgMemberUsageLimit( return { isExceeded: false, currentUsage: 0, limit: null } } + // Resolve the member cap first and short-circuit when none is set — the + // common case. Computing usage is only worthwhile once a cap exists, so the + // two queries stay sequential rather than racing (parallelizing would add a + // usage query on every uncapped member's execution). const limit = await getOrgMemberUsageLimit(organizationId, userId) if (limit === null) { return { isExceeded: false, currentUsage: 0, limit: null } diff --git a/apps/sim/lib/billing/core/plan.test.ts b/apps/sim/lib/billing/core/plan.test.ts new file mode 100644 index 00000000000..241afedd3af --- /dev/null +++ b/apps/sim/lib/billing/core/plan.test.ts @@ -0,0 +1,194 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +/** + * Drizzle mock for `getHighestPrioritySubscription`. It issues up to four + * queries keyed by table: + * - `subscription` for the user's personal subs (parallelized with members) + * - `member` for the user's org memberships (parallelized with subs) + * - `organization` for the org-existence follow-up + * - `subscription` again for the org-scoped subs follow-up + * + * The mock routes results by the table object passed to `.from()`, serving the + * (twice-read) `subscription` table from a FIFO queue (first read = personal, + * second = org). It records which tables were queried so we can assert the + * parallelized pair both run and that follow-ups are skipped when appropriate. + * + * Table sentinels and shared mock state live inside `vi.hoisted` so the + * `vi.mock` factories (hoisted to the top of the file) can reference them. + */ +const { SUBSCRIPTION_TABLE, MEMBER_TABLE, ORGANIZATION_TABLE, resultsByTable, fromCalls, select } = + vi.hoisted(() => { + const SUBSCRIPTION_TABLE = { __table: 'subscription' } + const MEMBER_TABLE = { __table: 'member' } + const ORGANIZATION_TABLE = { __table: 'organization' } + + const resultsByTable: Record = { + subscription: [], + member: [], + organization: [], + } + const fromCalls: string[] = [] + + const select = vi.fn(() => ({ + from: (table: { __table: string }) => { + fromCalls.push(table.__table) + const where = () => { + const queue = resultsByTable[table.__table] + const next = queue.length > 0 ? queue.shift() : [] + return Promise.resolve(next ?? []) + } + return { where } + }, + })) + + return { + SUBSCRIPTION_TABLE, + MEMBER_TABLE, + ORGANIZATION_TABLE, + resultsByTable, + fromCalls, + select, + } + }) + +vi.mock('@sim/db', () => ({ + db: { select }, +})) + +vi.mock('@sim/db/schema', () => ({ + subscription: SUBSCRIPTION_TABLE, + member: MEMBER_TABLE, + organization: ORGANIZATION_TABLE, +})) + +/** + * Realistic plan-check predicates so `pickHighestPrioritySubscription` exercises + * the real Enterprise > Team > Pro priority ordering over the rows we feed it. + */ +vi.mock('@/lib/billing/subscriptions/utils', () => ({ + ENTITLED_SUBSCRIPTION_STATUSES: ['active', 'past_due'], + checkEnterprisePlan: (s: any) => + s?.plan === 'enterprise' && ['active', 'past_due'].includes(s?.status), + checkTeamPlan: (s: any) => s?.plan === 'team' && ['active', 'past_due'].includes(s?.status), + checkProPlan: (s: any) => s?.plan === 'pro' && ['active', 'past_due'].includes(s?.status), +})) + +import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' + +interface SubRow { + id: string + referenceId: string + plan: string + status: string +} + +function personalPro(userId: string): SubRow { + return { id: 'sub-personal-pro', referenceId: userId, plan: 'pro', status: 'active' } +} + +function orgEnterprise(orgId: string): SubRow { + return { id: 'sub-org-enterprise', referenceId: orgId, plan: 'enterprise', status: 'active' } +} + +function queue(table: 'subscription' | 'member' | 'organization', rows: unknown[]) { + resultsByTable[table].push(rows) +} + +describe('getHighestPrioritySubscription', () => { + beforeEach(() => { + vi.clearAllMocks() + resultsByTable.subscription = [] + resultsByTable.member = [] + resultsByTable.organization = [] + fromCalls.length = 0 + }) + + it('picks the org Enterprise sub over a personal Pro sub (priority order)', async () => { + queue('subscription', [personalPro('user-1')]) // personalSubs query + queue('member', [{ organizationId: 'org-1' }]) // memberships query + queue('organization', [{ id: 'org-1' }]) // org-existence query + queue('subscription', [orgEnterprise('org-1')]) // org-subscriptions query + + const result = await getHighestPrioritySubscription('user-1') + + expect(result).not.toBeNull() + expect(result?.id).toBe('sub-org-enterprise') + expect(result?.plan).toBe('enterprise') + }) + + it('selection is deterministic regardless of which parallelized query resolves first', async () => { + queue('subscription', [personalPro('user-1')]) + queue('member', [{ organizationId: 'org-1' }]) + queue('organization', [{ id: 'org-1' }]) + queue('subscription', [orgEnterprise('org-1')]) + + const result = await getHighestPrioritySubscription('user-1') + + // Enterprise (org) always wins over Pro (personal). + expect(result?.id).toBe('sub-org-enterprise') + }) + + it('issues BOTH the personal-subscriptions and memberships queries (parallelized pair)', async () => { + queue('subscription', [personalPro('user-1')]) + queue('member', [{ organizationId: 'org-1' }]) + queue('organization', [{ id: 'org-1' }]) + queue('subscription', [orgEnterprise('org-1')]) + + await getHighestPrioritySubscription('user-1') + + expect(fromCalls).toContain('subscription') + expect(fromCalls).toContain('member') + // First two queries are exactly the parallelized pair (in either order). + expect(fromCalls.slice(0, 2).sort()).toEqual(['member', 'subscription']) + }) + + it('returns the personal sub and skips org follow-ups when there are no memberships', async () => { + queue('subscription', [personalPro('user-1')]) + queue('member', []) // no org memberships + + const result = await getHighestPrioritySubscription('user-1') + + expect(result?.id).toBe('sub-personal-pro') + expect(result?.plan).toBe('pro') + // org-existence + org-subscription follow-ups are NOT issued. + expect(fromCalls).not.toContain('organization') + expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1) + }) + + it('returns null when neither personal nor org subscriptions exist', async () => { + queue('subscription', []) // no personal subs + queue('member', []) // no memberships + + const result = await getHighestPrioritySubscription('user-1') + + expect(result).toBeNull() + }) + + it('excludes orphaned org memberships whose organization row no longer exists', async () => { + queue('subscription', []) // no personal subs + queue('member', [{ organizationId: 'ghost-org' }]) // membership points at a deleted org + queue('organization', []) // org-existence returns nothing -> orphaned + + const result = await getHighestPrioritySubscription('user-1') + + // Org subs are never fetched (no valid org ids) -> falls back to null. + expect(result).toBeNull() + expect(fromCalls).toContain('organization') + // Only the initial personal-subs read on `subscription`; org-subs query skipped. + expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1) + }) + + it('falls back to the personal sub when the only org is orphaned', async () => { + queue('subscription', [personalPro('user-1')]) + queue('member', [{ organizationId: 'ghost-org' }]) + queue('organization', []) // orphaned org + + const result = await getHighestPrioritySubscription('user-1') + + expect(result?.id).toBe('sub-personal-pro') + expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1) + }) +}) diff --git a/apps/sim/lib/billing/core/plan.ts b/apps/sim/lib/billing/core/plan.ts index b4a56dab136..633d2e55c83 100644 --- a/apps/sim/lib/billing/core/plan.ts +++ b/apps/sim/lib/billing/core/plan.ts @@ -82,20 +82,21 @@ export async function getHighestPrioritySubscription( ) { const { onError = 'return-null', executor = db } = options try { - const personalSubs = await executor - .select() - .from(subscription) - .where( - and( - eq(subscription.referenceId, userId), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES) - ) - ) - - const memberships = await executor - .select({ organizationId: member.organizationId }) - .from(member) - .where(eq(member.userId, userId)) + const [personalSubs, memberships] = await Promise.all([ + executor + .select() + .from(subscription) + .where( + and( + eq(subscription.referenceId, userId), + inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES) + ) + ), + executor + .select({ organizationId: member.organizationId }) + .from(member) + .where(eq(member.userId, userId)), + ]) const orgIds = memberships.map((m: { organizationId: string }) => m.organizationId) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index c00e5653ec5..9049ac2af83 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -176,7 +176,7 @@ describe('preprocessExecution ban gate', () => { } as any) }) - it('blocks execution with 403 when the actor is banned, before any billing queries', async () => { + it('blocks execution with 403 when the actor is banned (ban wins over the parallel gates)', async () => { mockGetActivelyBannedUserIds.mockResolvedValue(['billed-account-1']) const loggingSession = { @@ -194,8 +194,39 @@ describe('preprocessExecution ban gate', () => { error: { statusCode: 403, logCreated: true, message: 'Account suspended' }, }) expect(loggingSession.safeStart).toHaveBeenCalled() - expect(getHighestPrioritySubscription).not.toHaveBeenCalled() - expect(checkServerSideUsageLimits).not.toHaveBeenCalled() + }) + + it('returns 403 (ban precedence) when ban, usage, and rate limit all fail simultaneously', async () => { + mockGetActivelyBannedUserIds.mockResolvedValue(['billed-account-1']) + vi.mocked(checkServerSideUsageLimits).mockResolvedValue({ + isExceeded: true, + currentUsage: 20, + limit: 10, + message: 'Usage limit exceeded. Please upgrade your plan to continue.', + } as any) + mockCheckRateLimit.mockResolvedValue({ + allowed: false, + remaining: 0, + resetAt: new Date(), + }) + + const loggingSession = { + safeStart: vi.fn().mockResolvedValue(true), + safeCompleteWithError: vi.fn().mockResolvedValue(undefined), + } + + const result = await preprocessExecution({ + ...baseOptions, + checkRateLimit: true, + loggingSession: loggingSession as any, + }) + + // Ban (403) takes precedence over usage (402) and rate limit (429), + // independent of which parallel gate's promise settled first. + expect(result).toMatchObject({ + success: false, + error: { statusCode: 403, logCreated: true, message: 'Account suspended' }, + }) }) it('checks the billing actor, caller-provided userId, and workflow owner in one call', async () => { @@ -234,6 +265,5 @@ describe('preprocessExecution ban gate', () => { success: false, error: { statusCode: 500, logCreated: true }, }) - expect(checkServerSideUsageLimits).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 075ee57af90..4f1fd56211d 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -322,85 +322,130 @@ export async function preprocessExecution( } } - // ========== STEP 3.5: Reject Banned Accounts ========== - // Blocks executions when the billing actor, the workflow owner, or the - // caller-provided userId (chat deployer, authenticated caller) has an - // active ban or a blocked email domain. The owner comes from the workflow - // record so schedules — which pass the 'unknown' sentinel — are covered. - const banCandidateIds = [actorUserId] - if (userId && userId !== 'unknown' && userId !== actorUserId) { - banCandidateIds.push(userId) + // ========== STEPS 3.5–6: Parallel Read-Only Preflight Gates ========== + // All remaining gates before the STEP 7 write are read-only with no side + // effects, so they run concurrently to cut sequential latency. The ban check + // (STEP 3.5) and the subscription fetch (STEP 4) are mutually independent and + // start together. Once the subscription resolves, the usage gates (STEP 5a/5b) + // and the rate-limit gate (STEP 6) — all of which depend only on the + // subscription, actor, and workspace — start together. + // + // Each gate resolves to either `null` (passed/skipped) or a deferred outcome: + // the `PreprocessExecutionResult` to return plus the optional error-log write + // to perform. Because every gate completes regardless of which fails first, + // outcomes are evaluated IN A FIXED PRECEDENCE ORDER — ban (403) → usage (402) + // → rate limit (429) — to reproduce the sequential version's behavior exactly. + // Wasted work on a request that ends up rejected is acceptable here: the gates + // are read-only and the sole write (STEP 7) stays after every gate passes. + + /** + * A failing gate's deferred outcome: the response to return, plus an optional + * error-log write to flush before returning. Evaluated in precedence order. + */ + interface GateFailure { + response: PreprocessExecutionResult + recordError?: Parameters[0] } - if (workflowRecord.userId && !banCandidateIds.includes(workflowRecord.userId)) { - banCandidateIds.push(workflowRecord.userId) + + /** Usage figures captured by STEP 5 and reused by the STEP 7 reservation. */ + interface UsageSnapshot { + currentUsage: number + limit: number } - try { - const bannedUserIds = await getActivelyBannedUserIds(banCandidateIds) - if (bannedUserIds.length > 0) { - logger.warn(`[${requestId}] Execution blocked: banned account`, { - workflowId, - bannedUserIds, - triggerType, - }) - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: 'This account has been suspended. Workflow executions are blocked.', - loggingSession: providedLoggingSession, - triggerData, - }) + const banCheck = (async (): Promise => { + // Blocks executions when the billing actor, the workflow owner, or the + // caller-provided userId (chat deployer, authenticated caller) has an + // active ban or a blocked email domain. The owner comes from the workflow + // record so schedules — which pass the 'unknown' sentinel — are covered. + const banCandidateIds = [actorUserId] + if (userId && userId !== 'unknown' && userId !== actorUserId) { + banCandidateIds.push(userId) + } + if (workflowRecord.userId && !banCandidateIds.includes(workflowRecord.userId)) { + banCandidateIds.push(workflowRecord.userId) + } + try { + const bannedUserIds = await getActivelyBannedUserIds(banCandidateIds) + if (bannedUserIds.length > 0) { + logger.warn(`[${requestId}] Execution blocked: banned account`, { + workflowId, + bannedUserIds, + triggerType, + }) + + return { + response: { + success: false, + error: { + message: 'Account suspended', + statusCode: 403, + logCreated: true, + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: 'This account has been suspended. Workflow executions are blocked.', + loggingSession: providedLoggingSession, + triggerData, + }, + } + } + return null + } catch (error) { + logger.error(`[${requestId}] Error checking account ban status`, { error, actorUserId }) return { - success: false, - error: { - message: 'Account suspended', - statusCode: 403, - logCreated: true, + response: { + success: false, + error: { + message: 'Unable to verify account status. Execution blocked for security.', + statusCode: 500, + logCreated: true, + retryable: isRetryableInfrastructureError(error), + cause: describeRetryableInfrastructureError(error), + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: 'Unable to verify account status. Execution blocked for security.', + loggingSession: providedLoggingSession, + triggerData, }, } } - } catch (error) { - logger.error(`[${requestId}] Error checking account ban status`, { error, actorUserId }) - - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: 'Unable to verify account status. Execution blocked for security.', - loggingSession: providedLoggingSession, - triggerData, - }) - - return { - success: false, - error: { - message: 'Unable to verify account status. Execution blocked for security.', - statusCode: 500, - logCreated: true, - retryable: isRetryableInfrastructureError(error), - cause: describeRetryableInfrastructureError(error), - }, - } - } + })() // ========== STEP 4: Get Subscription ========== - const userSubscription = await getHighestPrioritySubscription(actorUserId) + const subscriptionFetch = getHighestPrioritySubscription(actorUserId) + + const [banFailure, userSubscription] = await Promise.all([banCheck, subscriptionFetch]) - // ========== STEP 5: Check Usage Limits ========== - // Snapshot reused by the STEP 7 admission reservation. - let usageSnapshot: { currentUsage: number; limit: number } | null = null - if (!skipUsageLimits) { + /** + * STEP 5: usage + per-member org usage gate. Returns the failure outcome (or + * `null` on pass/skip) plus the usage snapshot reused by the STEP 7 admission + * reservation. The snapshot is returned rather than written to an outer + * variable so concurrent gate tasks share no mutable state. + */ + const usageCheckTask = (async (): Promise<{ + failure: GateFailure | null + snapshot: UsageSnapshot | null + }> => { + if (skipUsageLimits) return { failure: null, snapshot: null } + let snapshot: UsageSnapshot | null = null try { const usageCheck = await checkServerSideUsageLimits(actorUserId, userSubscription) - usageSnapshot = { currentUsage: usageCheck.currentUsage, limit: usageCheck.limit } + snapshot = { currentUsage: usageCheck.currentUsage, limit: usageCheck.limit } if (usageCheck.isExceeded) { logger.warn( `[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`, @@ -412,28 +457,33 @@ export async function preprocessExecution( } ) - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: - usageCheck.message || - `Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`, - loggingSession: providedLoggingSession, - triggerData, - }) - return { - success: false, - error: { - message: - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.', - statusCode: 402, - logCreated: true, + failure: { + response: { + success: false, + error: { + message: + usageCheck.message || + 'Usage limit exceeded. Please upgrade your plan to continue.', + statusCode: 402, + logCreated: true, + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: + usageCheck.message || + `Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`, + loggingSession: providedLoggingSession, + triggerData, + }, }, + snapshot, } } @@ -457,126 +507,160 @@ export async function preprocessExecution( } ) - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: memberLimitMessage, - loggingSession: providedLoggingSession, - triggerData, - }) - return { - success: false, - error: { - message: memberLimitMessage, - statusCode: 402, - logCreated: true, + failure: { + response: { + success: false, + error: { + message: memberLimitMessage, + statusCode: 402, + logCreated: true, + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: memberLimitMessage, + loggingSession: providedLoggingSession, + triggerData, + }, }, + snapshot, } } + return { failure: null, snapshot } } catch (error) { logger.error(`[${requestId}] Error checking usage limits`, { error, actorUserId, }) - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: - 'Unable to determine usage limits. Execution blocked for security. Please contact support.', - loggingSession: providedLoggingSession, - triggerData, - }) - return { - success: false, - error: { - message: 'Unable to determine usage limits. Execution blocked for security.', - statusCode: 500, - logCreated: true, - retryable: isRetryableInfrastructureError(error), - cause: describeRetryableInfrastructureError(error), + failure: { + response: { + success: false, + error: { + message: 'Unable to determine usage limits. Execution blocked for security.', + statusCode: 500, + logCreated: true, + retryable: isRetryableInfrastructureError(error), + cause: describeRetryableInfrastructureError(error), + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: + 'Unable to determine usage limits. Execution blocked for security. Please contact support.', + loggingSession: providedLoggingSession, + triggerData, + }, }, + snapshot, } } - } + })() // ========== STEP 6: Check Rate Limits ========== let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined - if (checkRateLimit) { + /** + * STEP 6: rate-limit gate. Returns the failure outcome, or `null` on + * pass/skip. On a non-error outcome it populates `rateLimitInfo` for the + * success result, matching the sequential version. + */ + const rateLimitTask = (async (): Promise => { + if (!checkRateLimit) return null try { const rateLimiter = new RateLimiter() - rateLimitInfo = await rateLimiter.checkRateLimitWithSubscription( + const info = await rateLimiter.checkRateLimitWithSubscription( actorUserId, userSubscription, triggerType, false // not async ) + rateLimitInfo = info - if (!rateLimitInfo.allowed) { + if (!info.allowed) { logger.warn(`[${requestId}] Rate limit exceeded for user ${actorUserId}`, { triggerType, - remaining: rateLimitInfo.remaining, - resetAt: rateLimitInfo.resetAt, - }) - - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: `Rate limit exceeded. ${rateLimitInfo.remaining} requests remaining. Resets at ${rateLimitInfo.resetAt.toISOString()}.`, - loggingSession: providedLoggingSession, - triggerData, + remaining: info.remaining, + resetAt: info.resetAt, }) return { - success: false, - error: { - message: `Rate limit exceeded. Please try again later.`, - statusCode: 429, - logCreated: true, + response: { + success: false, + error: { + message: `Rate limit exceeded. Please try again later.`, + statusCode: 429, + logCreated: true, + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: `Rate limit exceeded. ${info.remaining} requests remaining. Resets at ${info.resetAt.toISOString()}.`, + loggingSession: providedLoggingSession, + triggerData, }, } } + return null } catch (error) { logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId }) - await recordPreprocessingError({ - workflowId, - executionId, - triggerType, - requestId, - userId: actorUserId, - workspaceId, - errorMessage: 'Error checking rate limits. Execution blocked for safety.', - loggingSession: providedLoggingSession, - triggerData, - }) - return { - success: false, - error: { - message: 'Error checking rate limits', - statusCode: 500, - logCreated: true, - retryable: isRetryableInfrastructureError(error), - cause: describeRetryableInfrastructureError(error), + response: { + success: false, + error: { + message: 'Error checking rate limits', + statusCode: 500, + logCreated: true, + retryable: isRetryableInfrastructureError(error), + cause: describeRetryableInfrastructureError(error), + }, + }, + recordError: { + workflowId, + executionId, + triggerType, + requestId, + userId: actorUserId, + workspaceId, + errorMessage: 'Error checking rate limits. Execution blocked for safety.', + loggingSession: providedLoggingSession, + triggerData, }, } } + })() + + const [usageResult, rateLimitFailure] = await Promise.all([usageCheckTask, rateLimitTask]) + const usageFailure = usageResult.failure + const usageSnapshot = usageResult.snapshot + + // Evaluate outcomes in fixed precedence order — ban (403) → usage (402) → + // rate limit (429) — so the response matches the sequential version exactly, + // independent of which gate's promise settled first. + const gateFailure = banFailure ?? usageFailure ?? rateLimitFailure + if (gateFailure) { + if (gateFailure.recordError) { + await recordPreprocessingError(gateFailure.recordError) + } + return gateFailure.response } /** diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index eba2011484a..58fbca16d58 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -72,26 +72,22 @@ vi.mock('@/lib/workflows/triggers/triggers', () => ({ vi.mock('@/lib/workflows/utils', () => workflowsUtilsMock) vi.mock('@/executor', () => ({ - Executor: vi.fn().mockImplementation( - class { - constructor(args: unknown) { - executorConstructorMock(args) - // biome-ignore lint/correctness/noConstructorReturn: vitest 4 constructs mocks via Reflect.construct; returning the instance overrides `new Executor(...)` - return { - execute: executorExecuteMock, - executeFromBlock: executorExecuteMock, - } + Executor: class { + constructor(args: unknown) { + executorConstructorMock(args) + // biome-ignore lint/correctness/noConstructorReturn: returning the instance overrides `new Executor(...)` so consumers get the mocked methods + return { + execute: executorExecuteMock, + executeFromBlock: executorExecuteMock, } } - ), + }, })) vi.mock('@/serializer', () => ({ - Serializer: vi.fn().mockImplementation( - class { - serializeWorkflow = serializeWorkflowMock - } - ), + Serializer: class { + serializeWorkflow = serializeWorkflowMock + }, })) import { @@ -192,6 +188,96 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { clearExecutionCancellationMock.mockResolvedValue(undefined) }) + it('loads workflow state and env vars concurrently, then starts logging before constructing the executor', async () => { + const callOrder: string[] = [] + + let releaseWorkflowLoad: (() => void) | undefined + let releaseEnvLoad: (() => void) | undefined + const workflowLoadGate = new Promise((resolve) => { + releaseWorkflowLoad = resolve + }) + const envLoadGate = new Promise((resolve) => { + releaseEnvLoad = resolve + }) + + loadWorkflowFromNormalizedTablesMock.mockImplementation(async () => { + callOrder.push('load-workflow:start') + await workflowLoadGate + callOrder.push('load-workflow:end') + return { + blocks: { + 'start-block': { + id: 'start-block', + type: 'start_trigger', + subBlocks: {}, + name: 'Start', + }, + }, + edges: [], + loops: {}, + parallels: {}, + } + }) + + getPersonalAndWorkspaceEnvMock.mockImplementation(async () => { + callOrder.push('load-env:start') + await envLoadGate + callOrder.push('load-env:end') + return { + personalEncrypted: {}, + workspaceEncrypted: {}, + personalDecrypted: {}, + workspaceDecrypted: {}, + } + }) + + safeStartMock.mockImplementation(async () => { + callOrder.push('safeStart') + return true + }) + + executorConstructorMock.mockImplementation(() => { + callOrder.push('executor-construct') + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + const executionPromise = executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + await Promise.resolve() + + expect(callOrder).toContain('load-workflow:start') + expect(callOrder).toContain('load-env:start') + expect(callOrder).not.toContain('safeStart') + expect(callOrder).not.toContain('executor-construct') + + releaseWorkflowLoad?.() + releaseEnvLoad?.() + + await executionPromise + + expect(callOrder).toEqual([ + 'load-workflow:start', + 'load-env:start', + 'load-workflow:end', + 'load-env:end', + 'safeStart', + 'executor-construct', + ]) + expect(safeStartMock).toHaveBeenCalledTimes(1) + expect(executorConstructorMock).toHaveBeenCalledTimes(1) + }) + it('routes onBlockStart through logging session persistence path', async () => { executorExecuteMock.mockResolvedValue({ success: true, diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 3763203aaae..068e4bf9e31 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -349,62 +349,78 @@ export async function executeWorkflowCore( } try { - let blocks - let edges: Edge[] - let loops - let parallels - - // Use workflowStateOverride if provided (for diff workflows) - if (metadata.workflowStateOverride) { - blocks = metadata.workflowStateOverride.blocks - edges = metadata.workflowStateOverride.edges - loops = metadata.workflowStateOverride.loops || {} - parallels = metadata.workflowStateOverride.parallels || {} - deploymentVersionId = metadata.workflowStateOverride.deploymentVersionId - - logger.info(`[${requestId}] Using workflow state override (diff workflow execution)`, { - blocksCount: Object.keys(blocks).length, - edgesCount: edges.length, - }) - } else if (useDraftState) { - const draftData = await loadWorkflowFromNormalizedTables(workflowId) + const personalEnvUserId = + metadata.isClientSession && metadata.sessionUserId + ? metadata.sessionUserId + : metadata.workflowUserId - if (!draftData) { - throw new Error('Workflow not found or not yet saved') + if (!personalEnvUserId) { + throw new Error('Missing workflowUserId in execution metadata') + } + + /** + * Resolves the workflow state from the override, the draft tables, or the + * deployed snapshot. The async load (draft/deployed) has no data dependency + * on the environment load, so the two are awaited concurrently below. + */ + const loadWorkflowState = async () => { + if (metadata.workflowStateOverride) { + const override = metadata.workflowStateOverride + logger.info(`[${requestId}] Using workflow state override (diff workflow execution)`, { + blocksCount: Object.keys(override.blocks).length, + edgesCount: override.edges.length, + }) + return { + blocks: override.blocks, + edges: override.edges, + loops: override.loops || {}, + parallels: override.parallels || {}, + deploymentVersionId: override.deploymentVersionId, + } } - blocks = draftData.blocks - edges = draftData.edges - loops = draftData.loops - parallels = draftData.parallels + if (useDraftState) { + const draftData = await loadWorkflowFromNormalizedTables(workflowId) - logger.info( - `[${requestId}] Using draft workflow state from normalized tables (client execution)` - ) - } else { - const deployedData = await loadDeployedWorkflowState(workflowId) - blocks = deployedData.blocks - edges = deployedData.edges - loops = deployedData.loops - parallels = deployedData.parallels - deploymentVersionId = deployedData.deploymentVersionId + if (!draftData) { + throw new Error('Workflow not found or not yet saved') + } + logger.info( + `[${requestId}] Using draft workflow state from normalized tables (client execution)` + ) + return { + blocks: draftData.blocks, + edges: draftData.edges, + loops: draftData.loops, + parallels: draftData.parallels, + deploymentVersionId: undefined, + } + } + + const deployedData = await loadDeployedWorkflowState(workflowId) logger.info(`[${requestId}] Using deployed workflow state (deployed execution)`) + return { + blocks: deployedData.blocks, + edges: deployedData.edges, + loops: deployedData.loops, + parallels: deployedData.parallels, + deploymentVersionId: deployedData.deploymentVersionId, + } } - const mergedStates = mergeSubblockStateWithValues(blocks) + const [workflowState, env] = await Promise.all([ + loadWorkflowState(), + getPersonalAndWorkspaceEnv(personalEnvUserId, providedWorkspaceId), + ]) - const personalEnvUserId = - metadata.isClientSession && metadata.sessionUserId - ? metadata.sessionUserId - : metadata.workflowUserId + const { blocks, loops, parallels } = workflowState + const edges: Edge[] = workflowState.edges + deploymentVersionId = workflowState.deploymentVersionId - if (!personalEnvUserId) { - throw new Error('Missing workflowUserId in execution metadata') - } + const mergedStates = mergeSubblockStateWithValues(blocks) - const { personalEncrypted, workspaceEncrypted, personalDecrypted, workspaceDecrypted } = - await getPersonalAndWorkspaceEnv(personalEnvUserId, providedWorkspaceId) + const { personalEncrypted, workspaceEncrypted, personalDecrypted, workspaceDecrypted } = env // Use encrypted values for logging (don't log decrypted secrets) const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted }) diff --git a/apps/sim/lib/workflows/persistence/utils.test.ts b/apps/sim/lib/workflows/persistence/utils.test.ts index afcc4081fad..964577f9cf8 100644 --- a/apps/sim/lib/workflows/persistence/utils.test.ts +++ b/apps/sim/lib/workflows/persistence/utils.test.ts @@ -113,6 +113,22 @@ vi.mock('@sim/db', () => ({ webhook: {}, })) +const { mockSanitizeAgentToolsInBlocks } = vi.hoisted(() => ({ + mockSanitizeAgentToolsInBlocks: vi.fn(), +})) + +/** + * Default identity behavior for the mocked migration step. Re-applied in the + * cache describe block's `beforeEach` because the outer `afterEach` calls + * `vi.resetAllMocks()`, which clears implementations. + */ +const sanitizeIdentity = (blocks: unknown) => ({ blocks }) +mockSanitizeAgentToolsInBlocks.mockImplementation(sanitizeIdentity) + +vi.mock('@/lib/workflows/sanitization/validation', () => ({ + sanitizeAgentToolsInBlocks: mockSanitizeAgentToolsInBlocks, +})) + import * as dbHelpers from '@/lib/workflows/persistence/utils' const mockWorkflowId = 'test-workflow-123' @@ -307,6 +323,7 @@ const mockWorkflowState = createWorkflowState({ describe('Database Helpers', () => { beforeEach(() => { vi.clearAllMocks() + mockSanitizeAgentToolsInBlocks.mockImplementation(sanitizeIdentity) }) afterEach(() => { @@ -1550,4 +1567,157 @@ describe('Database Helpers', () => { expect(messages2).toEqual([{ role: 'system', content: 'System' }]) }) }) + + describe('loadDeployedWorkflowState deployed-state cache', () => { + /** + * Minimal but realistic deployed state: a couple of plain (non-agent, + * credential-free) blocks plus an edge. Plain blocks make the real + * downstream migration steps (agent-message, subblock-id, credential, + * canonical-mode) no-ops, so the only observable "heavy work" is the + * mocked `sanitizeAgentToolsInBlocks` first step, which we use as the + * migration call counter. + */ + function buildDeployedState() { + return { + blocks: { + 'block-1': { + id: 'block-1', + type: 'api', + name: 'API Block', + position: { x: 0, y: 0 }, + enabled: true, + subBlocks: { url: { id: 'url', type: 'short-input', value: 'https://example.com' } }, + outputs: {}, + data: {}, + }, + 'block-2': { + id: 'block-2', + type: 'function', + name: 'Function Block', + position: { x: 100, y: 0 }, + enabled: true, + subBlocks: { code: { id: 'code', type: 'code', value: 'return 1' } }, + outputs: {}, + data: {}, + }, + }, + edges: [ + { + id: 'edge-1', + source: 'block-1', + target: 'block-2', + sourceHandle: 'output', + targetHandle: 'input', + }, + ], + loops: {}, + parallels: {}, + variables: { threshold: 5 }, + } + } + + /** + * Wires `db.select` to return a single active deployment-version row for the + * given id. Returns the inner `where` spy so tests can assert how many times + * the active-version SELECT ran. + */ + function mockActiveVersionSelect(versionId: string, state: unknown) { + const where = vi.fn().mockReturnValue({ + orderBy: vi.fn().mockReturnValue({ + limit: vi.fn().mockResolvedValue([{ id: versionId, state, createdAt: new Date() }]), + }), + }) + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ where }), + }) + return where + } + + beforeEach(() => { + vi.clearAllMocks() + mockSanitizeAgentToolsInBlocks.mockImplementation(sanitizeIdentity) + dbHelpers.invalidateDeployedStateCache() + }) + + it('serves a cache HIT, skipping migrations on the second call for the same active version', async () => { + const where = mockActiveVersionSelect('dv-hit', buildDeployedState()) + + const first = await dbHelpers.loadDeployedWorkflowState('wf-1', 'workspace-1') + const second = await dbHelpers.loadDeployedWorkflowState('wf-1', 'workspace-1') + + expect(first).toBeDefined() + expect(second).toBeDefined() + expect(mockSanitizeAgentToolsInBlocks).toHaveBeenCalledTimes(1) + expect(where).toHaveBeenCalledTimes(2) + }) + + it('still runs the active-version SELECT on every call so rollback/redeploy stays observable', async () => { + const where = mockActiveVersionSelect('dv-active', buildDeployedState()) + + await dbHelpers.loadDeployedWorkflowState('wf-2', 'workspace-1') + await dbHelpers.loadDeployedWorkflowState('wf-2', 'workspace-1') + + expect(where).toHaveBeenCalledTimes(2) + }) + + it('deep-clones on read: mutating the first result does not corrupt the cached copy', async () => { + mockActiveVersionSelect('dv-clone', buildDeployedState()) + + const first = await dbHelpers.loadDeployedWorkflowState('wf-3', 'workspace-1') + ;(first.blocks['block-1'] as any).name = 'MUTATED' + ;(first.blocks['block-1'].subBlocks.url as any).value = 'https://hacked.example' + first.edges.push({ + id: 'edge-injected', + source: 'block-2', + target: 'block-1', + } as any) + + const second = await dbHelpers.loadDeployedWorkflowState('wf-3', 'workspace-1') + + expect(second.blocks['block-1'].name).toBe('API Block') + expect(second.blocks['block-1'].subBlocks.url.value).toBe('https://example.com') + expect(second.edges).toHaveLength(1) + expect(second.blocks).toEqual(buildDeployedState().blocks) + }) + + it('keys the cache by deploymentVersionId: a different active id triggers a fresh build', async () => { + mockActiveVersionSelect('dv-old', buildDeployedState()) + await dbHelpers.loadDeployedWorkflowState('wf-4', 'workspace-1') + expect(mockSanitizeAgentToolsInBlocks).toHaveBeenCalledTimes(1) + + mockActiveVersionSelect('dv-new', buildDeployedState()) + await dbHelpers.loadDeployedWorkflowState('wf-4', 'workspace-1') + expect(mockSanitizeAgentToolsInBlocks).toHaveBeenCalledTimes(2) + }) + + it('invalidateDeployedStateCache(id) forces a rebuild on the next call', async () => { + mockActiveVersionSelect('dv-inv', buildDeployedState()) + + await dbHelpers.loadDeployedWorkflowState('wf-5', 'workspace-1') + await dbHelpers.loadDeployedWorkflowState('wf-5', 'workspace-1') + expect(mockSanitizeAgentToolsInBlocks).toHaveBeenCalledTimes(1) + + dbHelpers.invalidateDeployedStateCache('dv-inv') + + await dbHelpers.loadDeployedWorkflowState('wf-5', 'workspace-1') + expect(mockSanitizeAgentToolsInBlocks).toHaveBeenCalledTimes(2) + }) + + it('throws when there is no active deployment and does not cache the failure', async () => { + const where = vi.fn().mockReturnValue({ + orderBy: vi.fn().mockReturnValue({ + limit: vi.fn().mockResolvedValue([]), + }), + }) + mockDb.select.mockReturnValue({ + from: vi.fn().mockReturnValue({ where }), + }) + + await expect(dbHelpers.loadDeployedWorkflowState('wf-6', 'workspace-1')).rejects.toThrow( + 'Workflow wf-6 has no active deployment' + ) + + expect(mockSanitizeAgentToolsInBlocks).not.toHaveBeenCalled() + }) + }) }) diff --git a/apps/sim/lib/workflows/persistence/utils.ts b/apps/sim/lib/workflows/persistence/utils.ts index 5663d0f3fd7..42a32d43339 100644 --- a/apps/sim/lib/workflows/persistence/utils.ts +++ b/apps/sim/lib/workflows/persistence/utils.ts @@ -99,6 +99,51 @@ export async function blockExistsInDeployment( } } +/** + * Maximum number of deployed-state entries retained in the process-local cache. + * Each entry is keyed by an immutable `deploymentVersionId` and holds a + * fully-migrated {@link DeployedWorkflowData} snapshot (tens of KB to ~1MB), + * so the bound keeps worst-case memory within a sane envelope. + */ +const DEPLOYED_STATE_CACHE_MAX_ENTRIES = 500 +const DEPLOYED_STATE_CACHE_TTL_MS = 5 * 60 * 1000 + +interface DeployedStateCacheEntry { + data: DeployedWorkflowData + expiresAt: number +} + +/** + * Process-local cache of fully-loaded, post-migration deployed workflow state, + * keyed by the immutable `deploymentVersionId`. + * + * The id is unique per deploy — a redeploy mints a new id and a rollback + * reactivates an existing id — so the active-version lookup naturally selects a + * different (or already-cached) key whenever the active deployment changes, + * making the cache self-invalidating across redeploy/rollback. Insertion order + * is used for oldest-first (LRU-style) eviction once the bound is reached. + * + * A short TTL bounds the one piece of the cached state that is not strictly + * immutable: `applyBlockMigrations` resolves legacy credential references via a + * live lookup, so the TTL lets a credential change propagate across ECS tasks + * without waiting for redeploy or eviction. + */ +const deployedStateCache = new Map() + +/** + * Drop cached deployed state. Pass a `deploymentVersionId` to evict a single + * entry, or omit it to clear the entire cache. Explicit invalidation is not + * required for correctness — redeploy/rollback change the active id and key the + * cache anew — but the helper is exported for completeness and testing. + */ +export function invalidateDeployedStateCache(deploymentVersionId?: string): void { + if (deploymentVersionId) { + deployedStateCache.delete(deploymentVersionId) + return + } + deployedStateCache.clear() +} + export async function loadDeployedWorkflowState( workflowId: string, providedWorkspaceId?: string @@ -124,6 +169,14 @@ export async function loadDeployedWorkflowState( throw new Error(`Workflow ${workflowId} has no active deployment`) } + const cached = deployedStateCache.get(active.id) + if (cached) { + if (cached.expiresAt > Date.now()) { + return structuredClone(cached.data) + } + deployedStateCache.delete(active.id) + } + const state = active.state as WorkflowState & { variables?: Record } let resolvedWorkspaceId = providedWorkspaceId @@ -141,7 +194,7 @@ export async function loadDeployedWorkflowState( resolvedWorkspaceId ) - return { + const deployedState: DeployedWorkflowData = { blocks: migratedBlocks, edges: state.edges || [], loops: state.loops || {}, @@ -150,6 +203,19 @@ export async function loadDeployedWorkflowState( isFromNormalizedTables: false, deploymentVersionId: active.id, } + + if (deployedStateCache.size >= DEPLOYED_STATE_CACHE_MAX_ENTRIES) { + const oldestKey = deployedStateCache.keys().next().value + if (oldestKey !== undefined) { + deployedStateCache.delete(oldestKey) + } + } + deployedStateCache.set(active.id, { + data: deployedState, + expiresAt: Date.now() + DEPLOYED_STATE_CACHE_TTL_MS, + }) + + return structuredClone(deployedState) } catch (error) { logger.error(`Error loading deployed workflow state ${workflowId}:`, error) throw error diff --git a/apps/sim/providers/anthropic/client-cache.test.ts b/apps/sim/providers/anthropic/client-cache.test.ts new file mode 100644 index 00000000000..8f1618c2fb0 --- /dev/null +++ b/apps/sim/providers/anthropic/client-cache.test.ts @@ -0,0 +1,162 @@ +/** + * @vitest-environment node + */ +import type Anthropic from '@anthropic-ai/sdk' +import { describe, expect, it, vi } from 'vitest' +import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' + +/** + * Builds a fresh fake "client" object on every call so identity comparisons + * (`toBe`) tell us whether the cache returned the memoized instance or a new + * one from the factory. We never construct a real Anthropic SDK client — these + * tests exercise the cache, not the SDK. + */ +function makeFactory() { + return vi.fn(() => ({}) as Anthropic) +} + +/** + * Generates a unique suffix per test so distinct tests never collide on cache + * keys. The cache util exposes no reset hook, so isolation is achieved by + * namespacing keys rather than clearing shared state. + */ +let keyCounter = 0 +function uniqueNs(): string { + keyCounter += 1 + return `ns-${keyCounter}-${Date.now()}` +} + +/** Mirrors the anthropic provider's cache-key shape (index.ts). */ +function anthropicKey(apiKey: string, useNativeStructuredOutputs: boolean): string { + return `${apiKey}::${useNativeStructuredOutputs ? 'beta' : 'default'}` +} + +/** Mirrors the azure-anthropic provider's cache-key shape (index.ts). */ +function azureKey(opts: { + apiKey: string + baseURL: string + anthropicVersion: string + pinnedIP: string | null + useNativeStructuredOutputs: boolean +}): string { + return [ + opts.apiKey, + opts.baseURL, + opts.anthropicVersion, + opts.pinnedIP ?? 'no-pin', + opts.useNativeStructuredOutputs ? 'beta' : 'default', + ].join('::') +} + +describe('getCachedAnthropicClient', () => { + it('returns the SAME instance for an identical key and runs the factory once (memoized)', () => { + const ns = uniqueNs() + const key = anthropicKey(`${ns}-key`, false) + const factory = makeFactory() + + const first = getCachedAnthropicClient(key, factory) + const second = getCachedAnthropicClient(key, factory) + + expect(second).toBe(first) + expect(factory).toHaveBeenCalledTimes(1) + }) + + it('returns a DIFFERENT instance for a different apiKey (tenant isolation)', () => { + const ns = uniqueNs() + const factoryA = makeFactory() + const factoryB = makeFactory() + + const tenantA = getCachedAnthropicClient(anthropicKey(`${ns}-tenant-a`, false), factoryA) + const tenantB = getCachedAnthropicClient(anthropicKey(`${ns}-tenant-b`, false), factoryB) + + expect(tenantB).not.toBe(tenantA) + expect(factoryA).toHaveBeenCalledTimes(1) + expect(factoryB).toHaveBeenCalledTimes(1) + }) + + it('returns a different instance when the useNativeStructuredOutputs flag (beta header) differs', () => { + const ns = uniqueNs() + const apiKey = `${ns}-same-key` + const defaultFactory = makeFactory() + const betaFactory = makeFactory() + + const defaultClient = getCachedAnthropicClient(anthropicKey(apiKey, false), defaultFactory) + const betaClient = getCachedAnthropicClient(anthropicKey(apiKey, true), betaFactory) + + expect(betaClient).not.toBe(defaultClient) + expect(defaultFactory).toHaveBeenCalledTimes(1) + expect(betaFactory).toHaveBeenCalledTimes(1) + }) + + describe('azure key dimensions', () => { + it('memoizes when every azure dimension matches', () => { + const ns = uniqueNs() + const base = { + apiKey: `${ns}-azure-key`, + baseURL: 'https://example.openai.azure.com/anthropic', + anthropicVersion: '2023-06-01', + pinnedIP: '10.0.0.1', + useNativeStructuredOutputs: false, + } + const factory = makeFactory() + + const first = getCachedAnthropicClient(azureKey(base), factory) + const second = getCachedAnthropicClient(azureKey(base), factory) + + expect(second).toBe(first) + expect(factory).toHaveBeenCalledTimes(1) + }) + + it('produces a distinct instance for each differing azure dimension', () => { + const ns = uniqueNs() + const base = { + apiKey: `${ns}-azure-key`, + baseURL: 'https://a.openai.azure.com/anthropic', + anthropicVersion: '2023-06-01', + pinnedIP: '10.0.0.1', + useNativeStructuredOutputs: false, + } + + const baseFactory = makeFactory() + const baseClient = getCachedAnthropicClient(azureKey(base), baseFactory) + + // Each variant flips exactly one dimension and must NOT reuse baseClient. + const variants = [ + { ...base, baseURL: 'https://b.openai.azure.com/anthropic' }, + { ...base, anthropicVersion: '2024-10-22' }, + { ...base, pinnedIP: '10.0.0.2' }, + { ...base, pinnedIP: null }, + { ...base, useNativeStructuredOutputs: true }, + ] + + for (const variant of variants) { + const factory = makeFactory() + const client = getCachedAnthropicClient(azureKey(variant), factory) + expect(client).not.toBe(baseClient) + expect(factory).toHaveBeenCalledTimes(1) + } + }) + }) + + it('evicts the least-recently-used entry once the cache cap is exceeded', () => { + const ns = uniqueNs() + const CAP = 1_000 + + const oldestKey = `${ns}-evict-0` + const oldestFactory = makeFactory() + getCachedAnthropicClient(oldestKey, oldestFactory) + expect(oldestFactory).toHaveBeenCalledTimes(1) + + // Fill the remaining capacity, then push one past the cap. Since the oldest + // key has not been touched since insertion, it is the LRU victim. + for (let i = 1; i <= CAP; i += 1) { + getCachedAnthropicClient(`${ns}-evict-${i}`, makeFactory()) + } + + // The oldest key was evicted: requesting it again re-runs its factory. + const reFactory = makeFactory() + getCachedAnthropicClient(oldestKey, reFactory) + expect(reFactory).toHaveBeenCalledTimes(1) + expect(oldestFactory).toHaveBeenCalledTimes(1) + }) +}) diff --git a/apps/sim/providers/anthropic/client-cache.ts b/apps/sim/providers/anthropic/client-cache.ts new file mode 100644 index 00000000000..77b48c67f25 --- /dev/null +++ b/apps/sim/providers/anthropic/client-cache.ts @@ -0,0 +1,41 @@ +import type Anthropic from '@anthropic-ai/sdk' +import { LRUCache } from 'lru-cache' + +/** + * Bounded, idle-expiring cache of Anthropic SDK clients keyed by the inputs that + * affect client construction. Reusing a client across requests lets the + * underlying HTTP agent keep connections alive, avoiding a fresh TLS handshake + * on every request. + * + * The SDK client holds no per-request mutable state — abort signals are passed + * at the `.messages.create()` / `.stream()` call sites, not on the client — so a + * single client can be shared safely across concurrent requests. + * + * The `apiKey` is always part of the cache key, making it the tenant security + * boundary: clients are never shared across different API keys. + */ + +const CLIENT_CACHE_MAX_ENTRIES = 1_000 +const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 + +const clientCache = new LRUCache({ + max: CLIENT_CACHE_MAX_ENTRIES, + ttl: CLIENT_CACHE_TTL_MS, +}) + +/** + * Returns a cached Anthropic client for the given key, constructing and storing + * one via `factory` on a miss. The key must encode every input that varies the + * constructed client (at minimum the API key); identical keys safely share a + * single client instance. + */ +export function getCachedAnthropicClient(key: string, factory: () => Anthropic): Anthropic { + const existing = clientCache.get(key) + if (existing) { + return existing + } + + const client = factory() + clientCache.set(key, client) + return client +} diff --git a/apps/sim/providers/anthropic/index.ts b/apps/sim/providers/anthropic/index.ts index 543c328fb18..f41aaee97fb 100644 --- a/apps/sim/providers/anthropic/index.ts +++ b/apps/sim/providers/anthropic/index.ts @@ -1,6 +1,7 @@ import Anthropic from '@anthropic-ai/sdk' import { createLogger } from '@sim/logger' import type { StreamingExecution } from '@/executor/types' +import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' import { executeAnthropicProviderRequest } from '@/providers/anthropic/core' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' @@ -21,13 +22,19 @@ export const anthropicProvider: ProviderConfig = { return executeAnthropicProviderRequest(request, { providerId: 'anthropic', providerLabel: 'Anthropic', - createClient: (apiKey, useNativeStructuredOutputs) => - new Anthropic({ - apiKey, - defaultHeaders: useNativeStructuredOutputs - ? { 'anthropic-beta': 'structured-outputs-2025-11-13' } - : undefined, - }), + createClient: (apiKey, useNativeStructuredOutputs) => { + const cacheKey = `${apiKey}::${useNativeStructuredOutputs ? 'beta' : 'default'}` + return getCachedAnthropicClient( + cacheKey, + () => + new Anthropic({ + apiKey, + defaultHeaders: useNativeStructuredOutputs + ? { 'anthropic-beta': 'structured-outputs-2025-11-13' } + : undefined, + }) + ) + }, logger, }) }, diff --git a/apps/sim/providers/azure-anthropic/index.ts b/apps/sim/providers/azure-anthropic/index.ts index 39980d77c2e..f8dfd982bd7 100644 --- a/apps/sim/providers/azure-anthropic/index.ts +++ b/apps/sim/providers/azure-anthropic/index.ts @@ -3,6 +3,7 @@ import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' import { createPinnedFetch, validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import type { StreamingExecution } from '@/executor/types' +import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' import { executeAnthropicProviderRequest } from '@/providers/anthropic/core' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' @@ -29,6 +30,7 @@ export const azureAnthropicProvider: ProviderConfig = { } let pinnedFetch: typeof fetch | undefined + let pinnedIP: string | undefined if (userProvidedEndpoint) { const validation = await validateUrlWithDNS(userProvidedEndpoint, 'azureEndpoint') if (!validation.isValid) { @@ -41,7 +43,8 @@ export const azureAnthropicProvider: ProviderConfig = { if (!validation.resolvedIP) { throw new Error('Invalid Azure Anthropic endpoint: could not resolve a pinnable IP address') } - pinnedFetch = createPinnedFetch(validation.resolvedIP) + pinnedIP = validation.resolvedIP + pinnedFetch = createPinnedFetch(pinnedIP) } const apiKey = request.apiKey @@ -68,19 +71,31 @@ export const azureAnthropicProvider: ProviderConfig = { { providerId: 'azure-anthropic', providerLabel: 'Azure Anthropic', - createClient: (apiKey, useNativeStructuredOutputs) => - new Anthropic({ - baseURL, + createClient: (apiKey, useNativeStructuredOutputs) => { + const cacheKey = [ apiKey, - ...(pinnedFetch ? { fetch: pinnedFetch } : {}), - defaultHeaders: { - 'api-key': apiKey, - 'anthropic-version': anthropicVersion, - ...(useNativeStructuredOutputs - ? { 'anthropic-beta': 'structured-outputs-2025-11-13' } - : {}), - }, - }), + baseURL, + anthropicVersion, + pinnedIP ?? 'no-pin', + useNativeStructuredOutputs ? 'beta' : 'default', + ].join('::') + return getCachedAnthropicClient( + cacheKey, + () => + new Anthropic({ + baseURL, + apiKey, + ...(pinnedFetch ? { fetch: pinnedFetch } : {}), + defaultHeaders: { + 'api-key': apiKey, + 'anthropic-version': anthropicVersion, + ...(useNativeStructuredOutputs + ? { 'anthropic-beta': 'structured-outputs-2025-11-13' } + : {}), + }, + }) + ) + }, logger, } ) From 45e0fe6d884c2cef8cbbd9bc42d69dc3bfde91ce Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:19:10 -0700 Subject: [PATCH 02/11] fix(execution): run rate-limit gate only after ban/usage pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The rate-limit gate is not read-only — checkRateLimitWithSubscription consumes a token — so running it in parallel with the read-only gates debited rate-limit quota for requests that the ban (403) or usage (402) gates reject, which the original sequential flow never did. Move the rate-limit gate to run sequentially after the ban and usage gates pass, preserving the read-only gates' parallelism (ban + subscription + usage) and the exact ban -> usage -> rate precedence. Add regression tests asserting the rate limiter is not consumed when an earlier gate rejects, and is consumed once when they pass. Caught by Cursor Bugbot review. --- apps/sim/lib/execution/preprocessing.test.ts | 34 ++++++++++++++++ apps/sim/lib/execution/preprocessing.ts | 43 +++++++++++++------- 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index 9049ac2af83..320b18d3fdc 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -229,6 +229,40 @@ describe('preprocessExecution ban gate', () => { }) }) + it('does not debit rate-limit quota when the ban gate rejects', async () => { + // The rate-limit gate consumes a token, so it must not run for a request + // an earlier gate (ban) already rejects. + mockGetActivelyBannedUserIds.mockResolvedValue(['billed-account-1']) + + const result = await preprocessExecution({ ...baseOptions, checkRateLimit: true }) + + expect(result).toMatchObject({ success: false, error: { statusCode: 403 } }) + expect(mockCheckRateLimit).not.toHaveBeenCalled() + }) + + it('does not debit rate-limit quota when the usage gate rejects', async () => { + vi.mocked(checkServerSideUsageLimits).mockResolvedValue({ + isExceeded: true, + currentUsage: 20, + limit: 10, + message: 'Usage limit exceeded. Please upgrade your plan to continue.', + } as any) + + const result = await preprocessExecution({ ...baseOptions, checkRateLimit: true }) + + expect(result).toMatchObject({ success: false, error: { statusCode: 402 } }) + expect(mockCheckRateLimit).not.toHaveBeenCalled() + }) + + it('consumes the rate-limit gate exactly once when the ban and usage gates pass', async () => { + mockCheckRateLimit.mockResolvedValue({ allowed: true, remaining: 5, resetAt: new Date() }) + + const result = await preprocessExecution({ ...baseOptions, checkRateLimit: true }) + + expect(result.success).toBe(true) + expect(mockCheckRateLimit).toHaveBeenCalledTimes(1) + }) + it('checks the billing actor, caller-provided userId, and workflow owner in one call', async () => { const result = await preprocessExecution(baseOptions) diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 4f1fd56211d..111c29be161 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -573,11 +573,14 @@ export async function preprocessExecution( let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined /** - * STEP 6: rate-limit gate. Returns the failure outcome, or `null` on - * pass/skip. On a non-error outcome it populates `rateLimitInfo` for the - * success result, matching the sequential version. + * STEP 6: rate-limit gate. Unlike the other gates this one is NOT read-only — + * `checkRateLimitWithSubscription` consumes a token — so it is invoked + * sequentially only after the ban and usage gates pass, matching the original + * order. Running it eagerly or in parallel would debit rate-limit quota for + * requests that ban or usage rejects. Returns the failure outcome, or `null` + * on pass/skip; on a non-error outcome it populates `rateLimitInfo`. */ - const rateLimitTask = (async (): Promise => { + const runRateLimitGate = async (): Promise => { if (!checkRateLimit) return null try { const rateLimiter = new RateLimiter() @@ -646,21 +649,31 @@ export async function preprocessExecution( }, } } - })() + } - const [usageResult, rateLimitFailure] = await Promise.all([usageCheckTask, rateLimitTask]) - const usageFailure = usageResult.failure + const usageResult = await usageCheckTask const usageSnapshot = usageResult.snapshot - // Evaluate outcomes in fixed precedence order — ban (403) → usage (402) → - // rate limit (429) — so the response matches the sequential version exactly, - // independent of which gate's promise settled first. - const gateFailure = banFailure ?? usageFailure ?? rateLimitFailure - if (gateFailure) { - if (gateFailure.recordError) { - await recordPreprocessingError(gateFailure.recordError) + // The read-only gates (ban, subscription, usage) ran concurrently; evaluate + // them in fixed precedence order — ban (403) → usage (402) — so the response + // matches the sequential version regardless of which settled first. + const readGateFailure = banFailure ?? usageResult.failure + if (readGateFailure) { + if (readGateFailure.recordError) { + await recordPreprocessingError(readGateFailure.recordError) + } + return readGateFailure.response + } + + // Rate limiting (429) runs only after ban and usage pass, because it debits a + // token — matching the original sequential order so rejected requests never + // consume quota. + const rateLimitFailure = await runRateLimitGate() + if (rateLimitFailure) { + if (rateLimitFailure.recordError) { + await recordPreprocessingError(rateLimitFailure.recordError) } - return gateFailure.response + return rateLimitFailure.response } /** From a258a384b87584b9241218848802d9950007cae2 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:25:04 -0700 Subject: [PATCH 03/11] chore(execution): trim redundant preflight comments Tighten the gate overview to match the sequential rate-limit gate and drop inline notes that duplicated it or the runRateLimitGate doc. --- apps/sim/lib/execution/preprocessing.ts | 28 +++++++------------------ 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 111c29be161..96f64f630e0 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -322,21 +322,12 @@ export async function preprocessExecution( } } - // ========== STEPS 3.5–6: Parallel Read-Only Preflight Gates ========== - // All remaining gates before the STEP 7 write are read-only with no side - // effects, so they run concurrently to cut sequential latency. The ban check - // (STEP 3.5) and the subscription fetch (STEP 4) are mutually independent and - // start together. Once the subscription resolves, the usage gates (STEP 5a/5b) - // and the rate-limit gate (STEP 6) — all of which depend only on the - // subscription, actor, and workspace — start together. - // - // Each gate resolves to either `null` (passed/skipped) or a deferred outcome: - // the `PreprocessExecutionResult` to return plus the optional error-log write - // to perform. Because every gate completes regardless of which fails first, - // outcomes are evaluated IN A FIXED PRECEDENCE ORDER — ban (403) → usage (402) - // → rate limit (429) — to reproduce the sequential version's behavior exactly. - // Wasted work on a request that ends up rejected is acceptable here: the gates - // are read-only and the sole write (STEP 7) stays after every gate passes. + // ========== STEPS 3.5–6: Preflight Gates ========== + // The read-only gates run concurrently to cut latency: ban + subscription + // together, then usage (which needs the subscription). The rate-limit gate is + // stateful — it debits a token — so it runs sequentially only after ban and + // usage pass. Failures apply in fixed precedence (ban 403 → usage 402 → rate + // 429), and the sole write (the STEP 7 reservation) stays last. /** * A failing gate's deferred outcome: the response to return, plus an optional @@ -654,9 +645,7 @@ export async function preprocessExecution( const usageResult = await usageCheckTask const usageSnapshot = usageResult.snapshot - // The read-only gates (ban, subscription, usage) ran concurrently; evaluate - // them in fixed precedence order — ban (403) → usage (402) — so the response - // matches the sequential version regardless of which settled first. + // Precedence: ban (403) wins over usage (402). const readGateFailure = banFailure ?? usageResult.failure if (readGateFailure) { if (readGateFailure.recordError) { @@ -665,9 +654,6 @@ export async function preprocessExecution( return readGateFailure.response } - // Rate limiting (429) runs only after ban and usage pass, because it debits a - // token — matching the original sequential order so rejected requests never - // consume quota. const rateLimitFailure = await runRateLimitGate() if (rateLimitFailure) { if (rateLimitFailure.recordError) { From eb074db7ea35178bd8b692429a9b121641d9be87 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:30:34 -0700 Subject: [PATCH 04/11] =?UTF-8?q?refactor(cache):=20address=20review=20?= =?UTF-8?q?=E2=80=94=20idle=20TTL=20for=20client=20cache,=20LRUCache=20for?= =?UTF-8?q?=20deployed=20state?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - client-cache: add updateAgeOnGet so the TTL is genuinely idle-based (active clients keep their warm keep-alive connections; the JSDoc now matches behavior). - deployed-state: replace the hand-rolled Map + manual FIFO eviction/TTL with LRUCache (real LRU eviction, built-in TTL), matching the effectiveDecryptedEnv and integration-tool-schema caches. TTL stays absolute (not reset on read) so the credential-migration remap still propagates across ECS tasks. Both per review feedback from Greptile. --- apps/sim/lib/workflows/persistence/utils.ts | 43 +++++++------------- apps/sim/providers/anthropic/client-cache.ts | 3 ++ 2 files changed, 17 insertions(+), 29 deletions(-) diff --git a/apps/sim/lib/workflows/persistence/utils.ts b/apps/sim/lib/workflows/persistence/utils.ts index 42a32d43339..55d6cafc6c9 100644 --- a/apps/sim/lib/workflows/persistence/utils.ts +++ b/apps/sim/lib/workflows/persistence/utils.ts @@ -13,6 +13,7 @@ import type { DbOrTx, NormalizedWorkflowData } from '@sim/workflow-persistence/t import type { BlockState, Loop, Parallel, WorkflowState } from '@sim/workflow-types/workflow' import type { InferSelectModel } from 'drizzle-orm' import { and, desc, eq, inArray, lt, sql } from 'drizzle-orm' +import { LRUCache } from 'lru-cache' import type { Edge } from 'reactflow' import { remapConditionBlockIds, remapConditionEdgeHandle } from '@/lib/workflows/condition-ids' import { @@ -100,19 +101,13 @@ export async function blockExistsInDeployment( } /** - * Maximum number of deployed-state entries retained in the process-local cache. * Each entry is keyed by an immutable `deploymentVersionId` and holds a - * fully-migrated {@link DeployedWorkflowData} snapshot (tens of KB to ~1MB), - * so the bound keeps worst-case memory within a sane envelope. + * fully-migrated {@link DeployedWorkflowData} snapshot (tens of KB to ~1MB); + * the bound keeps worst-case memory within a sane envelope. */ const DEPLOYED_STATE_CACHE_MAX_ENTRIES = 500 const DEPLOYED_STATE_CACHE_TTL_MS = 5 * 60 * 1000 -interface DeployedStateCacheEntry { - data: DeployedWorkflowData - expiresAt: number -} - /** * Process-local cache of fully-loaded, post-migration deployed workflow state, * keyed by the immutable `deploymentVersionId`. @@ -120,15 +115,17 @@ interface DeployedStateCacheEntry { * The id is unique per deploy — a redeploy mints a new id and a rollback * reactivates an existing id — so the active-version lookup naturally selects a * different (or already-cached) key whenever the active deployment changes, - * making the cache self-invalidating across redeploy/rollback. Insertion order - * is used for oldest-first (LRU-style) eviction once the bound is reached. + * making the cache self-invalidating across redeploy/rollback. * - * A short TTL bounds the one piece of the cached state that is not strictly - * immutable: `applyBlockMigrations` resolves legacy credential references via a - * live lookup, so the TTL lets a credential change propagate across ECS tasks - * without waiting for redeploy or eviction. + * The TTL is absolute (not reset on read) on purpose: it bounds the one piece + * of the cached state that is not strictly immutable — `applyBlockMigrations` + * resolves legacy credential references via a live lookup — so a credential + * change propagates across ECS tasks even for a continuously-running workflow. */ -const deployedStateCache = new Map() +const deployedStateCache = new LRUCache({ + max: DEPLOYED_STATE_CACHE_MAX_ENTRIES, + ttl: DEPLOYED_STATE_CACHE_TTL_MS, +}) /** * Drop cached deployed state. Pass a `deploymentVersionId` to evict a single @@ -171,10 +168,7 @@ export async function loadDeployedWorkflowState( const cached = deployedStateCache.get(active.id) if (cached) { - if (cached.expiresAt > Date.now()) { - return structuredClone(cached.data) - } - deployedStateCache.delete(active.id) + return structuredClone(cached) } const state = active.state as WorkflowState & { variables?: Record } @@ -204,16 +198,7 @@ export async function loadDeployedWorkflowState( deploymentVersionId: active.id, } - if (deployedStateCache.size >= DEPLOYED_STATE_CACHE_MAX_ENTRIES) { - const oldestKey = deployedStateCache.keys().next().value - if (oldestKey !== undefined) { - deployedStateCache.delete(oldestKey) - } - } - deployedStateCache.set(active.id, { - data: deployedState, - expiresAt: Date.now() + DEPLOYED_STATE_CACHE_TTL_MS, - }) + deployedStateCache.set(active.id, deployedState) return structuredClone(deployedState) } catch (error) { diff --git a/apps/sim/providers/anthropic/client-cache.ts b/apps/sim/providers/anthropic/client-cache.ts index 77b48c67f25..9ba702bf26c 100644 --- a/apps/sim/providers/anthropic/client-cache.ts +++ b/apps/sim/providers/anthropic/client-cache.ts @@ -21,6 +21,9 @@ const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 const clientCache = new LRUCache({ max: CLIENT_CACHE_MAX_ENTRIES, ttl: CLIENT_CACHE_TTL_MS, + // Idle expiry: the TTL resets on every hit so a continuously-used client + // (and its warm keep-alive connections) survives, while idle keys age out. + updateAgeOnGet: true, }) /** From b87f8701238b49079e48d91eafad3450f1f51e21 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:40:08 -0700 Subject: [PATCH 05/11] test(execution): isolate rate-limit gate test from STEP 7 reservation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 'consumes the rate-limit gate once' test reached the STEP 7 admission reservation, which depends on Redis — it passed locally (reserve throws and is swallowed) but failed in CI (reserve returns not-reserved -> 429). Pass skipConcurrencyReservation so the test isolates the rate gate deterministically. --- apps/sim/lib/execution/preprocessing.test.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index 320b18d3fdc..657973a01a6 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -257,7 +257,13 @@ describe('preprocessExecution ban gate', () => { it('consumes the rate-limit gate exactly once when the ban and usage gates pass', async () => { mockCheckRateLimit.mockResolvedValue({ allowed: true, remaining: 5, resetAt: new Date() }) - const result = await preprocessExecution({ ...baseOptions, checkRateLimit: true }) + // skipConcurrencyReservation bypasses the STEP 7 admission reservation so the + // assertion isolates the rate gate and does not depend on Redis availability. + const result = await preprocessExecution({ + ...baseOptions, + checkRateLimit: true, + skipConcurrencyReservation: true, + }) expect(result.success).toBe(true) expect(mockCheckRateLimit).toHaveBeenCalledTimes(1) From 980aaf9b9a35bc37cae1664fad208da7559bdc75 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 11:56:42 -0700 Subject: [PATCH 06/11] perf(providers): memoize SDK clients where the pool is per-client (bedrock, vllm) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Generalize the Anthropic client cache into one shared memoizer (providers/client-cache.ts) and apply it only where each new client owns its own connection pool — so reuse actually keeps connections warm: - bedrock: AWS SDK clients hold a per-client connection pool (reuse is the AWS best practice). Keyed by region + credential identity. - vllm: a pinned endpoint creates its own undici Agent per call; key by the resolved IP so DNS re-validation still runs each request. - anthropic + azure-anthropic: migrated onto the shared memoizer. Deliberately NOT applied to the OpenAI-compatible providers, groq, cerebras, or google: their SDKs share a process-global keep-alive pool (Node openai-sdk module singleton agent; anthropic/global undici), so a fresh client per request already reuses connections and memoization would add complexity with ~no benefit. litellm uses a plain shared-agent client (no pinning) and is likewise skipped. Bounded LRU (max 1000, 30m idle TTL) with no close-on-eviction, avoiding the unbounded-growth and eviction-closes-in-use-client failure modes seen in similar client caches. --- .../providers/anthropic/client-cache.test.ts | 162 ------------------ apps/sim/providers/anthropic/client-cache.ts | 44 ----- apps/sim/providers/anthropic/index.ts | 6 +- apps/sim/providers/azure-anthropic/index.ts | 5 +- apps/sim/providers/bedrock/index.ts | 10 +- apps/sim/providers/client-cache.test.ts | 107 ++++++++++++ apps/sim/providers/client-cache.ts | 43 +++++ apps/sim/providers/vllm/index.ts | 22 ++- 8 files changed, 181 insertions(+), 218 deletions(-) delete mode 100644 apps/sim/providers/anthropic/client-cache.test.ts delete mode 100644 apps/sim/providers/anthropic/client-cache.ts create mode 100644 apps/sim/providers/client-cache.test.ts create mode 100644 apps/sim/providers/client-cache.ts diff --git a/apps/sim/providers/anthropic/client-cache.test.ts b/apps/sim/providers/anthropic/client-cache.test.ts deleted file mode 100644 index 8f1618c2fb0..00000000000 --- a/apps/sim/providers/anthropic/client-cache.test.ts +++ /dev/null @@ -1,162 +0,0 @@ -/** - * @vitest-environment node - */ -import type Anthropic from '@anthropic-ai/sdk' -import { describe, expect, it, vi } from 'vitest' -import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' - -/** - * Builds a fresh fake "client" object on every call so identity comparisons - * (`toBe`) tell us whether the cache returned the memoized instance or a new - * one from the factory. We never construct a real Anthropic SDK client — these - * tests exercise the cache, not the SDK. - */ -function makeFactory() { - return vi.fn(() => ({}) as Anthropic) -} - -/** - * Generates a unique suffix per test so distinct tests never collide on cache - * keys. The cache util exposes no reset hook, so isolation is achieved by - * namespacing keys rather than clearing shared state. - */ -let keyCounter = 0 -function uniqueNs(): string { - keyCounter += 1 - return `ns-${keyCounter}-${Date.now()}` -} - -/** Mirrors the anthropic provider's cache-key shape (index.ts). */ -function anthropicKey(apiKey: string, useNativeStructuredOutputs: boolean): string { - return `${apiKey}::${useNativeStructuredOutputs ? 'beta' : 'default'}` -} - -/** Mirrors the azure-anthropic provider's cache-key shape (index.ts). */ -function azureKey(opts: { - apiKey: string - baseURL: string - anthropicVersion: string - pinnedIP: string | null - useNativeStructuredOutputs: boolean -}): string { - return [ - opts.apiKey, - opts.baseURL, - opts.anthropicVersion, - opts.pinnedIP ?? 'no-pin', - opts.useNativeStructuredOutputs ? 'beta' : 'default', - ].join('::') -} - -describe('getCachedAnthropicClient', () => { - it('returns the SAME instance for an identical key and runs the factory once (memoized)', () => { - const ns = uniqueNs() - const key = anthropicKey(`${ns}-key`, false) - const factory = makeFactory() - - const first = getCachedAnthropicClient(key, factory) - const second = getCachedAnthropicClient(key, factory) - - expect(second).toBe(first) - expect(factory).toHaveBeenCalledTimes(1) - }) - - it('returns a DIFFERENT instance for a different apiKey (tenant isolation)', () => { - const ns = uniqueNs() - const factoryA = makeFactory() - const factoryB = makeFactory() - - const tenantA = getCachedAnthropicClient(anthropicKey(`${ns}-tenant-a`, false), factoryA) - const tenantB = getCachedAnthropicClient(anthropicKey(`${ns}-tenant-b`, false), factoryB) - - expect(tenantB).not.toBe(tenantA) - expect(factoryA).toHaveBeenCalledTimes(1) - expect(factoryB).toHaveBeenCalledTimes(1) - }) - - it('returns a different instance when the useNativeStructuredOutputs flag (beta header) differs', () => { - const ns = uniqueNs() - const apiKey = `${ns}-same-key` - const defaultFactory = makeFactory() - const betaFactory = makeFactory() - - const defaultClient = getCachedAnthropicClient(anthropicKey(apiKey, false), defaultFactory) - const betaClient = getCachedAnthropicClient(anthropicKey(apiKey, true), betaFactory) - - expect(betaClient).not.toBe(defaultClient) - expect(defaultFactory).toHaveBeenCalledTimes(1) - expect(betaFactory).toHaveBeenCalledTimes(1) - }) - - describe('azure key dimensions', () => { - it('memoizes when every azure dimension matches', () => { - const ns = uniqueNs() - const base = { - apiKey: `${ns}-azure-key`, - baseURL: 'https://example.openai.azure.com/anthropic', - anthropicVersion: '2023-06-01', - pinnedIP: '10.0.0.1', - useNativeStructuredOutputs: false, - } - const factory = makeFactory() - - const first = getCachedAnthropicClient(azureKey(base), factory) - const second = getCachedAnthropicClient(azureKey(base), factory) - - expect(second).toBe(first) - expect(factory).toHaveBeenCalledTimes(1) - }) - - it('produces a distinct instance for each differing azure dimension', () => { - const ns = uniqueNs() - const base = { - apiKey: `${ns}-azure-key`, - baseURL: 'https://a.openai.azure.com/anthropic', - anthropicVersion: '2023-06-01', - pinnedIP: '10.0.0.1', - useNativeStructuredOutputs: false, - } - - const baseFactory = makeFactory() - const baseClient = getCachedAnthropicClient(azureKey(base), baseFactory) - - // Each variant flips exactly one dimension and must NOT reuse baseClient. - const variants = [ - { ...base, baseURL: 'https://b.openai.azure.com/anthropic' }, - { ...base, anthropicVersion: '2024-10-22' }, - { ...base, pinnedIP: '10.0.0.2' }, - { ...base, pinnedIP: null }, - { ...base, useNativeStructuredOutputs: true }, - ] - - for (const variant of variants) { - const factory = makeFactory() - const client = getCachedAnthropicClient(azureKey(variant), factory) - expect(client).not.toBe(baseClient) - expect(factory).toHaveBeenCalledTimes(1) - } - }) - }) - - it('evicts the least-recently-used entry once the cache cap is exceeded', () => { - const ns = uniqueNs() - const CAP = 1_000 - - const oldestKey = `${ns}-evict-0` - const oldestFactory = makeFactory() - getCachedAnthropicClient(oldestKey, oldestFactory) - expect(oldestFactory).toHaveBeenCalledTimes(1) - - // Fill the remaining capacity, then push one past the cap. Since the oldest - // key has not been touched since insertion, it is the LRU victim. - for (let i = 1; i <= CAP; i += 1) { - getCachedAnthropicClient(`${ns}-evict-${i}`, makeFactory()) - } - - // The oldest key was evicted: requesting it again re-runs its factory. - const reFactory = makeFactory() - getCachedAnthropicClient(oldestKey, reFactory) - expect(reFactory).toHaveBeenCalledTimes(1) - expect(oldestFactory).toHaveBeenCalledTimes(1) - }) -}) diff --git a/apps/sim/providers/anthropic/client-cache.ts b/apps/sim/providers/anthropic/client-cache.ts deleted file mode 100644 index 9ba702bf26c..00000000000 --- a/apps/sim/providers/anthropic/client-cache.ts +++ /dev/null @@ -1,44 +0,0 @@ -import type Anthropic from '@anthropic-ai/sdk' -import { LRUCache } from 'lru-cache' - -/** - * Bounded, idle-expiring cache of Anthropic SDK clients keyed by the inputs that - * affect client construction. Reusing a client across requests lets the - * underlying HTTP agent keep connections alive, avoiding a fresh TLS handshake - * on every request. - * - * The SDK client holds no per-request mutable state — abort signals are passed - * at the `.messages.create()` / `.stream()` call sites, not on the client — so a - * single client can be shared safely across concurrent requests. - * - * The `apiKey` is always part of the cache key, making it the tenant security - * boundary: clients are never shared across different API keys. - */ - -const CLIENT_CACHE_MAX_ENTRIES = 1_000 -const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 - -const clientCache = new LRUCache({ - max: CLIENT_CACHE_MAX_ENTRIES, - ttl: CLIENT_CACHE_TTL_MS, - // Idle expiry: the TTL resets on every hit so a continuously-used client - // (and its warm keep-alive connections) survives, while idle keys age out. - updateAgeOnGet: true, -}) - -/** - * Returns a cached Anthropic client for the given key, constructing and storing - * one via `factory` on a miss. The key must encode every input that varies the - * constructed client (at minimum the API key); identical keys safely share a - * single client instance. - */ -export function getCachedAnthropicClient(key: string, factory: () => Anthropic): Anthropic { - const existing = clientCache.get(key) - if (existing) { - return existing - } - - const client = factory() - clientCache.set(key, client) - return client -} diff --git a/apps/sim/providers/anthropic/index.ts b/apps/sim/providers/anthropic/index.ts index f41aaee97fb..043ae4b0f09 100644 --- a/apps/sim/providers/anthropic/index.ts +++ b/apps/sim/providers/anthropic/index.ts @@ -1,8 +1,8 @@ import Anthropic from '@anthropic-ai/sdk' import { createLogger } from '@sim/logger' import type { StreamingExecution } from '@/executor/types' -import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' import { executeAnthropicProviderRequest } from '@/providers/anthropic/core' +import { getCachedProviderClient } from '@/providers/client-cache' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' @@ -23,8 +23,8 @@ export const anthropicProvider: ProviderConfig = { providerId: 'anthropic', providerLabel: 'Anthropic', createClient: (apiKey, useNativeStructuredOutputs) => { - const cacheKey = `${apiKey}::${useNativeStructuredOutputs ? 'beta' : 'default'}` - return getCachedAnthropicClient( + const cacheKey = `anthropic::${apiKey}::${useNativeStructuredOutputs ? 'beta' : 'default'}` + return getCachedProviderClient( cacheKey, () => new Anthropic({ diff --git a/apps/sim/providers/azure-anthropic/index.ts b/apps/sim/providers/azure-anthropic/index.ts index f8dfd982bd7..2f0498992a0 100644 --- a/apps/sim/providers/azure-anthropic/index.ts +++ b/apps/sim/providers/azure-anthropic/index.ts @@ -3,8 +3,8 @@ import { createLogger } from '@sim/logger' import { env } from '@/lib/core/config/env' import { createPinnedFetch, validateUrlWithDNS } from '@/lib/core/security/input-validation.server' import type { StreamingExecution } from '@/executor/types' -import { getCachedAnthropicClient } from '@/providers/anthropic/client-cache' import { executeAnthropicProviderRequest } from '@/providers/anthropic/core' +import { getCachedProviderClient } from '@/providers/client-cache' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' @@ -73,13 +73,14 @@ export const azureAnthropicProvider: ProviderConfig = { providerLabel: 'Azure Anthropic', createClient: (apiKey, useNativeStructuredOutputs) => { const cacheKey = [ + 'azure-anthropic', apiKey, baseURL, anthropicVersion, pinnedIP ?? 'no-pin', useNativeStructuredOutputs ? 'beta' : 'default', ].join('::') - return getCachedAnthropicClient( + return getCachedProviderClient( cacheKey, () => new Anthropic({ diff --git a/apps/sim/providers/bedrock/index.ts b/apps/sim/providers/bedrock/index.ts index 32be4078675..e22e5ca619b 100644 --- a/apps/sim/providers/bedrock/index.ts +++ b/apps/sim/providers/bedrock/index.ts @@ -24,6 +24,7 @@ import { generateToolUseId, getBedrockInferenceProfileId, } from '@/providers/bedrock/utils' +import { getCachedProviderClient } from '@/providers/client-cache' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import { createStreamingExecution } from '@/providers/streaming-execution' import { enrichLastModelSegment } from '@/providers/trace-enrichment' @@ -138,7 +139,14 @@ export const bedrockProvider: ProviderConfig = { } } - const client = new BedrockRuntimeClient(clientConfig) + // Memoized: each BedrockRuntimeClient owns its own connection pool (AWS SDK + // best practice is to reuse the client), so reusing it keeps connections warm + // across requests. Keyed by region + credential identity (a rotated key pair + // changes the access key id and so yields a fresh client). + const client = getCachedProviderClient( + `bedrock::${region}::${request.bedrockAccessKeyId ?? 'default-chain'}`, + () => new BedrockRuntimeClient(clientConfig) + ) const messages: BedrockMessage[] = [] const systemContent: SystemContentBlock[] = [] diff --git a/apps/sim/providers/client-cache.test.ts b/apps/sim/providers/client-cache.test.ts new file mode 100644 index 00000000000..6fd03ee97b5 --- /dev/null +++ b/apps/sim/providers/client-cache.test.ts @@ -0,0 +1,107 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it, vi } from 'vitest' +import { getCachedProviderClient } from '@/providers/client-cache' + +/** + * Builds a fresh fake "client" object on every call so identity comparisons + * (`toBe`) tell us whether the cache returned the memoized instance or a new one + * from the factory. We never construct a real SDK client — these tests exercise + * the cache, not any provider SDK. + */ +function makeFactory() { + return vi.fn(() => ({}) as object) +} + +/** + * Generates a unique suffix per test so distinct tests never collide on cache + * keys. The cache util exposes no reset hook, so isolation is achieved by + * namespacing keys rather than clearing shared state. + */ +let keyCounter = 0 +function uniqueNs(): string { + keyCounter += 1 + return `ns-${keyCounter}-${Date.now()}` +} + +describe('getCachedProviderClient', () => { + it('returns the SAME instance for an identical key and runs the factory once (memoized)', () => { + const key = `anthropic::${uniqueNs()}::default` + const factory = makeFactory() + + const first = getCachedProviderClient(key, factory) + const second = getCachedProviderClient(key, factory) + + expect(second).toBe(first) + expect(factory).toHaveBeenCalledTimes(1) + }) + + it('returns a DIFFERENT instance for a different apiKey (tenant isolation)', () => { + const ns = uniqueNs() + const factoryA = makeFactory() + const factoryB = makeFactory() + + const tenantA = getCachedProviderClient(`anthropic::${ns}-tenant-a::default`, factoryA) + const tenantB = getCachedProviderClient(`anthropic::${ns}-tenant-b::default`, factoryB) + + expect(tenantB).not.toBe(tenantA) + expect(factoryA).toHaveBeenCalledTimes(1) + expect(factoryB).toHaveBeenCalledTimes(1) + }) + + it('namespaces by provider: the same apiKey under different provider prefixes does not collide', () => { + const ns = uniqueNs() + const apiKey = `${ns}-shared-key` + const anthropicFactory = makeFactory() + const bedrockFactory = makeFactory() + + const anthropicClient = getCachedProviderClient(`anthropic::${apiKey}`, anthropicFactory) + const bedrockClient = getCachedProviderClient(`bedrock::${apiKey}`, bedrockFactory) + + expect(bedrockClient).not.toBe(anthropicClient) + }) + + it('treats every distinct key dimension as a distinct client', () => { + const ns = uniqueNs() + const base = `azure-anthropic::${ns}-key::https://a.example.com::2023-06-01::10.0.0.1::default` + const baseFactory = makeFactory() + const baseClient = getCachedProviderClient(base, baseFactory) + + const variants = [ + `azure-anthropic::${ns}-key::https://b.example.com::2023-06-01::10.0.0.1::default`, + `azure-anthropic::${ns}-key::https://a.example.com::2024-10-22::10.0.0.1::default`, + `azure-anthropic::${ns}-key::https://a.example.com::2023-06-01::10.0.0.2::default`, + `azure-anthropic::${ns}-key::https://a.example.com::2023-06-01::no-pin::default`, + `azure-anthropic::${ns}-key::https://a.example.com::2023-06-01::10.0.0.1::beta`, + ] + + for (const key of variants) { + const factory = makeFactory() + const client = getCachedProviderClient(key, factory) + expect(client).not.toBe(baseClient) + expect(factory).toHaveBeenCalledTimes(1) + } + }) + + it('evicts the least-recently-used entry once the cache cap is exceeded', () => { + const ns = uniqueNs() + const CAP = 1_000 + + const oldestKey = `evict::${ns}::0` + const oldestFactory = makeFactory() + getCachedProviderClient(oldestKey, oldestFactory) + expect(oldestFactory).toHaveBeenCalledTimes(1) + + // Fill the remaining capacity, then push one past the cap. The oldest key has + // not been touched since insertion, so it is the LRU eviction victim. + for (let i = 1; i <= CAP; i += 1) { + getCachedProviderClient(`evict::${ns}::${i}`, makeFactory()) + } + + const reFactory = makeFactory() + getCachedProviderClient(oldestKey, reFactory) + expect(reFactory).toHaveBeenCalledTimes(1) + expect(oldestFactory).toHaveBeenCalledTimes(1) + }) +}) diff --git a/apps/sim/providers/client-cache.ts b/apps/sim/providers/client-cache.ts new file mode 100644 index 00000000000..ca64c8de48f --- /dev/null +++ b/apps/sim/providers/client-cache.ts @@ -0,0 +1,43 @@ +import { LRUCache } from 'lru-cache' + +/** + * Shared, bounded, idle-expiring cache of provider SDK clients. Reusing a client + * across requests lets the underlying HTTP agent keep connections alive, avoiding + * a fresh TLS handshake (and client construction) on every request. + * + * Provider SDK clients (Anthropic, OpenAI, Groq, …) hold no per-request mutable + * state — abort signals and timeouts are passed at the call site, not on the + * client — so a single instance is safe to share across concurrent requests. + * + * Keys must be namespaced per provider and must encode every input that varies + * the constructed client. The API key is always part of the key, making it the + * tenant security boundary: clients are never shared across different keys. + */ + +const CLIENT_CACHE_MAX_ENTRIES = 1_000 +const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 + +const clientCache = new LRUCache({ + max: CLIENT_CACHE_MAX_ENTRIES, + ttl: CLIENT_CACHE_TTL_MS, + // Idle expiry: the TTL resets on every hit so a continuously-used client + // (and its warm keep-alive connections) survives, while idle keys age out. + updateAgeOnGet: true, +}) + +/** + * Returns a cached provider client for the given key, constructing and storing + * one via `factory` on a miss. The key must encode every input that varies the + * constructed client (provider namespace + API key at minimum); identical keys + * safely share a single client instance. + */ +export function getCachedProviderClient(key: string, factory: () => T): T { + const existing = clientCache.get(key) + if (existing) { + return existing as T + } + + const client = factory() + clientCache.set(key, client) + return client +} diff --git a/apps/sim/providers/vllm/index.ts b/apps/sim/providers/vllm/index.ts index 572b5df51ca..5f0dfa326ca 100644 --- a/apps/sim/providers/vllm/index.ts +++ b/apps/sim/providers/vllm/index.ts @@ -7,6 +7,7 @@ import { createPinnedFetch, validateUrlWithDNS } from '@/lib/core/security/input import type { StreamingExecution } from '@/executor/types' import { MAX_TOOL_ITERATIONS } from '@/providers' import { formatMessagesForProvider } from '@/providers/attachments' +import { getCachedProviderClient } from '@/providers/client-cache' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' import { createStreamingExecution } from '@/providers/streaming-execution' import { adaptOpenAIChatToolSchema } from '@/providers/tool-schema-adapter' @@ -114,6 +115,7 @@ export const vllmProvider: ProviderConfig = { * IP blocklist and blocked-port checks still apply, so SSRF protection is intact. */ let pinnedFetch: typeof fetch | undefined + let pinnedIP: string | undefined if (userProvidedEndpoint) { const validation = await validateUrlWithDNS(userProvidedEndpoint, 'vLLM endpoint', { allowHttp: true, @@ -128,15 +130,23 @@ export const vllmProvider: ProviderConfig = { if (!validation.resolvedIP) { throw new Error('Invalid vLLM endpoint: could not resolve a pinnable IP address') } - pinnedFetch = createPinnedFetch(validation.resolvedIP) + pinnedIP = validation.resolvedIP + pinnedFetch = createPinnedFetch(pinnedIP) } const apiKey = request.apiKey || env.VLLM_API_KEY || 'empty' - const vllm = new OpenAI({ - apiKey, - baseURL: `${baseUrl}/v1`, - ...(pinnedFetch ? { fetch: pinnedFetch } : {}), - }) + // Memoized: a pinned endpoint gets its own undici Agent per call, so reusing + // the client (keyed by the resolved IP) keeps its connections warm. DNS + // re-validation still runs every request; a new IP yields a new key/client. + const vllm = getCachedProviderClient( + `vllm::${apiKey}::${baseUrl}::${pinnedIP ?? 'no-pin'}`, + () => + new OpenAI({ + apiKey, + baseURL: `${baseUrl}/v1`, + ...(pinnedFetch ? { fetch: pinnedFetch } : {}), + }) + ) const allMessages: Message[] = [] From b6d207f46db6fdfc84b5e2a3f2cb65d6514a95f3 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 12:02:29 -0700 Subject: [PATCH 07/11] chore(perf): trim verbose comments to terse why-notes --- apps/sim/lib/api-key/byok.ts | 6 ++-- .../lib/billing/calculations/usage-monitor.ts | 7 ++--- apps/sim/lib/billing/core/plan.test.ts | 13 ++++----- apps/sim/lib/execution/preprocessing.ts | 8 ++---- apps/sim/lib/workflows/persistence/utils.ts | 28 ++++--------------- apps/sim/providers/bedrock/index.ts | 6 ++-- apps/sim/providers/client-cache.ts | 26 ++++------------- apps/sim/providers/vllm/index.ts | 5 ++-- 8 files changed, 29 insertions(+), 70 deletions(-) diff --git a/apps/sim/lib/api-key/byok.ts b/apps/sim/lib/api-key/byok.ts index a3f08a7d2cd..73df4a3a1b0 100644 --- a/apps/sim/lib/api-key/byok.ts +++ b/apps/sim/lib/api-key/byok.ts @@ -37,10 +37,8 @@ function nextRotationIndex(poolKey: string, poolSize: number): number { * creation order. A key that fails to decrypt is skipped in favor of the next * one in the pool. * - * The key list is read fresh from the database on every call rather than - * cached: BYOK lookups are not a hot database query, and reading fresh keeps - * key revocation/rotation effective immediately across every ECS task with no - * cross-instance cache-coherence concern. + * The key list is read fresh every call (not cached): BYOK is not a hot query, + * and reading fresh keeps revocation immediate across ECS tasks. */ export async function getBYOKKey( workspaceId: string | undefined | null, diff --git a/apps/sim/lib/billing/calculations/usage-monitor.ts b/apps/sim/lib/billing/calculations/usage-monitor.ts index 567a70e0b56..4e259f2036e 100644 --- a/apps/sim/lib/billing/calculations/usage-monitor.ts +++ b/apps/sim/lib/billing/calculations/usage-monitor.ts @@ -467,10 +467,9 @@ export async function checkOrgMemberUsageLimit( return { isExceeded: false, currentUsage: 0, limit: null } } - // Resolve the member cap first and short-circuit when none is set — the - // common case. Computing usage is only worthwhile once a cap exists, so the - // two queries stay sequential rather than racing (parallelizing would add a - // usage query on every uncapped member's execution). + // Resolve the cap first and short-circuit when unset (the common case); only + // then is computing usage worthwhile. Kept sequential, not raced, to avoid a + // usage query on every uncapped member's execution. const limit = await getOrgMemberUsageLimit(organizationId, userId) if (limit === null) { return { isExceeded: false, currentUsage: 0, limit: null } diff --git a/apps/sim/lib/billing/core/plan.test.ts b/apps/sim/lib/billing/core/plan.test.ts index 241afedd3af..cf223952b0c 100644 --- a/apps/sim/lib/billing/core/plan.test.ts +++ b/apps/sim/lib/billing/core/plan.test.ts @@ -127,7 +127,6 @@ describe('getHighestPrioritySubscription', () => { const result = await getHighestPrioritySubscription('user-1') - // Enterprise (org) always wins over Pro (personal). expect(result?.id).toBe('sub-org-enterprise') }) @@ -147,7 +146,7 @@ describe('getHighestPrioritySubscription', () => { it('returns the personal sub and skips org follow-ups when there are no memberships', async () => { queue('subscription', [personalPro('user-1')]) - queue('member', []) // no org memberships + queue('member', []) const result = await getHighestPrioritySubscription('user-1') @@ -159,8 +158,8 @@ describe('getHighestPrioritySubscription', () => { }) it('returns null when neither personal nor org subscriptions exist', async () => { - queue('subscription', []) // no personal subs - queue('member', []) // no memberships + queue('subscription', []) + queue('member', []) const result = await getHighestPrioritySubscription('user-1') @@ -168,9 +167,9 @@ describe('getHighestPrioritySubscription', () => { }) it('excludes orphaned org memberships whose organization row no longer exists', async () => { - queue('subscription', []) // no personal subs + queue('subscription', []) queue('member', [{ organizationId: 'ghost-org' }]) // membership points at a deleted org - queue('organization', []) // org-existence returns nothing -> orphaned + queue('organization', []) const result = await getHighestPrioritySubscription('user-1') @@ -184,7 +183,7 @@ describe('getHighestPrioritySubscription', () => { it('falls back to the personal sub when the only org is orphaned', async () => { queue('subscription', [personalPro('user-1')]) queue('member', [{ organizationId: 'ghost-org' }]) - queue('organization', []) // orphaned org + queue('organization', []) const result = await getHighestPrioritySubscription('user-1') diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 96f64f630e0..e36b9006b6c 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -323,11 +323,9 @@ export async function preprocessExecution( } // ========== STEPS 3.5–6: Preflight Gates ========== - // The read-only gates run concurrently to cut latency: ban + subscription - // together, then usage (which needs the subscription). The rate-limit gate is - // stateful — it debits a token — so it runs sequentially only after ban and - // usage pass. Failures apply in fixed precedence (ban 403 → usage 402 → rate - // 429), and the sole write (the STEP 7 reservation) stays last. + // Read-only gates run concurrently (ban + subscription, then usage). The + // rate-limit gate debits a token, so it runs sequentially only after ban and + // usage pass. Failures apply in fixed precedence: ban 403 → usage 402 → rate 429. /** * A failing gate's deferred outcome: the response to return, plus an optional diff --git a/apps/sim/lib/workflows/persistence/utils.ts b/apps/sim/lib/workflows/persistence/utils.ts index 55d6cafc6c9..725e7a13479 100644 --- a/apps/sim/lib/workflows/persistence/utils.ts +++ b/apps/sim/lib/workflows/persistence/utils.ts @@ -100,39 +100,21 @@ export async function blockExistsInDeployment( } } -/** - * Each entry is keyed by an immutable `deploymentVersionId` and holds a - * fully-migrated {@link DeployedWorkflowData} snapshot (tens of KB to ~1MB); - * the bound keeps worst-case memory within a sane envelope. - */ const DEPLOYED_STATE_CACHE_MAX_ENTRIES = 500 const DEPLOYED_STATE_CACHE_TTL_MS = 5 * 60 * 1000 /** - * Process-local cache of fully-loaded, post-migration deployed workflow state, - * keyed by the immutable `deploymentVersionId`. - * - * The id is unique per deploy — a redeploy mints a new id and a rollback - * reactivates an existing id — so the active-version lookup naturally selects a - * different (or already-cached) key whenever the active deployment changes, - * making the cache self-invalidating across redeploy/rollback. - * - * The TTL is absolute (not reset on read) on purpose: it bounds the one piece - * of the cached state that is not strictly immutable — `applyBlockMigrations` - * resolves legacy credential references via a live lookup — so a credential - * change propagates across ECS tasks even for a continuously-running workflow. + * Caches post-migration deployed state by the immutable `deploymentVersionId`, so + * a redeploy/rollback (which changes the active id) self-invalidates. The TTL is + * absolute on purpose — it bounds the one non-immutable part, the live credential + * remap in `applyBlockMigrations` — so credential changes still propagate. */ const deployedStateCache = new LRUCache({ max: DEPLOYED_STATE_CACHE_MAX_ENTRIES, ttl: DEPLOYED_STATE_CACHE_TTL_MS, }) -/** - * Drop cached deployed state. Pass a `deploymentVersionId` to evict a single - * entry, or omit it to clear the entire cache. Explicit invalidation is not - * required for correctness — redeploy/rollback change the active id and key the - * cache anew — but the helper is exported for completeness and testing. - */ +/** Evicts one deployed-state entry, or clears the cache when no id is given. */ export function invalidateDeployedStateCache(deploymentVersionId?: string): void { if (deploymentVersionId) { deployedStateCache.delete(deploymentVersionId) diff --git a/apps/sim/providers/bedrock/index.ts b/apps/sim/providers/bedrock/index.ts index e22e5ca619b..9d4b91dbf36 100644 --- a/apps/sim/providers/bedrock/index.ts +++ b/apps/sim/providers/bedrock/index.ts @@ -139,10 +139,8 @@ export const bedrockProvider: ProviderConfig = { } } - // Memoized: each BedrockRuntimeClient owns its own connection pool (AWS SDK - // best practice is to reuse the client), so reusing it keeps connections warm - // across requests. Keyed by region + credential identity (a rotated key pair - // changes the access key id and so yields a fresh client). + // AWS SDK clients own a per-client connection pool and are meant to be reused. + // Keyed by region + credential identity (a rotated key pair yields a new key). const client = getCachedProviderClient( `bedrock::${region}::${request.bedrockAccessKeyId ?? 'default-chain'}`, () => new BedrockRuntimeClient(clientConfig) diff --git a/apps/sim/providers/client-cache.ts b/apps/sim/providers/client-cache.ts index ca64c8de48f..f87f5d0335c 100644 --- a/apps/sim/providers/client-cache.ts +++ b/apps/sim/providers/client-cache.ts @@ -1,35 +1,21 @@ import { LRUCache } from 'lru-cache' -/** - * Shared, bounded, idle-expiring cache of provider SDK clients. Reusing a client - * across requests lets the underlying HTTP agent keep connections alive, avoiding - * a fresh TLS handshake (and client construction) on every request. - * - * Provider SDK clients (Anthropic, OpenAI, Groq, …) hold no per-request mutable - * state — abort signals and timeouts are passed at the call site, not on the - * client — so a single instance is safe to share across concurrent requests. - * - * Keys must be namespaced per provider and must encode every input that varies - * the constructed client. The API key is always part of the key, making it the - * tenant security boundary: clients are never shared across different keys. - */ - const CLIENT_CACHE_MAX_ENTRIES = 1_000 const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 +// updateAgeOnGet makes the TTL idle-based, so a continuously-used client keeps +// its warm keep-alive connections while idle keys age out. const clientCache = new LRUCache({ max: CLIENT_CACHE_MAX_ENTRIES, ttl: CLIENT_CACHE_TTL_MS, - // Idle expiry: the TTL resets on every hit so a continuously-used client - // (and its warm keep-alive connections) survives, while idle keys age out. updateAgeOnGet: true, }) /** - * Returns a cached provider client for the given key, constructing and storing - * one via `factory` on a miss. The key must encode every input that varies the - * constructed client (provider namespace + API key at minimum); identical keys - * safely share a single client instance. + * Memoizes provider SDK clients so connections stay warm across requests rather + * than re-handshaking per call. The key must be namespaced per provider and + * encode every input that varies the client; the API key is always part of it, + * making it the tenant boundary (clients are never shared across keys). */ export function getCachedProviderClient(key: string, factory: () => T): T { const existing = clientCache.get(key) diff --git a/apps/sim/providers/vllm/index.ts b/apps/sim/providers/vllm/index.ts index 5f0dfa326ca..d02b33c641d 100644 --- a/apps/sim/providers/vllm/index.ts +++ b/apps/sim/providers/vllm/index.ts @@ -135,9 +135,8 @@ export const vllmProvider: ProviderConfig = { } const apiKey = request.apiKey || env.VLLM_API_KEY || 'empty' - // Memoized: a pinned endpoint gets its own undici Agent per call, so reusing - // the client (keyed by the resolved IP) keeps its connections warm. DNS - // re-validation still runs every request; a new IP yields a new key/client. + // A pinned endpoint gets its own undici Agent, so reuse keeps connections + // warm. DNS re-validation still runs every request; a new IP rekeys. const vllm = getCachedProviderClient( `vllm::${apiKey}::${baseUrl}::${pinnedIP ?? 'no-pin'}`, () => From a349fc7de540fa225f180452859fff84cf120860 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 12:06:44 -0700 Subject: [PATCH 08/11] chore(perf): drop obvious inline comments, keep nuance as TSDoc --- apps/sim/lib/execution/preprocessing.ts | 6 ++---- apps/sim/providers/bedrock/index.ts | 2 -- apps/sim/providers/client-cache.ts | 6 ++++-- apps/sim/providers/vllm/index.ts | 2 -- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index e36b9006b6c..a41d3dbbbca 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -323,9 +323,8 @@ export async function preprocessExecution( } // ========== STEPS 3.5–6: Preflight Gates ========== - // Read-only gates run concurrently (ban + subscription, then usage). The - // rate-limit gate debits a token, so it runs sequentially only after ban and - // usage pass. Failures apply in fixed precedence: ban 403 → usage 402 → rate 429. + // Read-only gates (ban, subscription, usage) run concurrently; the stateful + // rate-limit gate runs after they pass. Precedence: ban 403 → usage 402 → rate 429. /** * A failing gate's deferred outcome: the response to return, plus an optional @@ -643,7 +642,6 @@ export async function preprocessExecution( const usageResult = await usageCheckTask const usageSnapshot = usageResult.snapshot - // Precedence: ban (403) wins over usage (402). const readGateFailure = banFailure ?? usageResult.failure if (readGateFailure) { if (readGateFailure.recordError) { diff --git a/apps/sim/providers/bedrock/index.ts b/apps/sim/providers/bedrock/index.ts index 9d4b91dbf36..1d4335707b9 100644 --- a/apps/sim/providers/bedrock/index.ts +++ b/apps/sim/providers/bedrock/index.ts @@ -139,8 +139,6 @@ export const bedrockProvider: ProviderConfig = { } } - // AWS SDK clients own a per-client connection pool and are meant to be reused. - // Keyed by region + credential identity (a rotated key pair yields a new key). const client = getCachedProviderClient( `bedrock::${region}::${request.bedrockAccessKeyId ?? 'default-chain'}`, () => new BedrockRuntimeClient(clientConfig) diff --git a/apps/sim/providers/client-cache.ts b/apps/sim/providers/client-cache.ts index f87f5d0335c..1f91a9778aa 100644 --- a/apps/sim/providers/client-cache.ts +++ b/apps/sim/providers/client-cache.ts @@ -3,8 +3,10 @@ import { LRUCache } from 'lru-cache' const CLIENT_CACHE_MAX_ENTRIES = 1_000 const CLIENT_CACHE_TTL_MS = 30 * 60 * 1_000 -// updateAgeOnGet makes the TTL idle-based, so a continuously-used client keeps -// its warm keep-alive connections while idle keys age out. +/** + * `updateAgeOnGet` makes the TTL idle-based: a continuously-used client keeps its + * warm keep-alive connections, while idle keys age out. + */ const clientCache = new LRUCache({ max: CLIENT_CACHE_MAX_ENTRIES, ttl: CLIENT_CACHE_TTL_MS, diff --git a/apps/sim/providers/vllm/index.ts b/apps/sim/providers/vllm/index.ts index d02b33c641d..936bf3af633 100644 --- a/apps/sim/providers/vllm/index.ts +++ b/apps/sim/providers/vllm/index.ts @@ -135,8 +135,6 @@ export const vllmProvider: ProviderConfig = { } const apiKey = request.apiKey || env.VLLM_API_KEY || 'empty' - // A pinned endpoint gets its own undici Agent, so reuse keeps connections - // warm. DNS re-validation still runs every request; a new IP rekeys. const vllm = getCachedProviderClient( `vllm::${apiKey}::${baseUrl}::${pinnedIP ?? 'no-pin'}`, () => From 0a269adce6571bf1f58cf37d162e9abb80754e4f Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 12:08:36 -0700 Subject: [PATCH 09/11] fix(bedrock): key client cache on full credential, not just access key id A corrected secret under the same access key id would otherwise keep serving the stale cached client until TTL/eviction. Caught by Cursor Bugbot. --- apps/sim/providers/bedrock/index.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/sim/providers/bedrock/index.ts b/apps/sim/providers/bedrock/index.ts index 1d4335707b9..4e512d15b48 100644 --- a/apps/sim/providers/bedrock/index.ts +++ b/apps/sim/providers/bedrock/index.ts @@ -139,8 +139,14 @@ export const bedrockProvider: ProviderConfig = { } } + // Key on the full credential (access key id + secret) so a corrected secret + // under the same access key id yields a fresh client rather than a stale one. + const credentialKey = + request.bedrockAccessKeyId && request.bedrockSecretKey + ? `${request.bedrockAccessKeyId}:${request.bedrockSecretKey}` + : 'default-chain' const client = getCachedProviderClient( - `bedrock::${region}::${request.bedrockAccessKeyId ?? 'default-chain'}`, + `bedrock::${region}::${credentialKey}`, () => new BedrockRuntimeClient(clientConfig) ) From de40ca6705bb7bd2097143b8df697fcc6d0381b2 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 12:30:31 -0700 Subject: [PATCH 10/11] test(execution,providers): fix preflight mock reset + isolate provider client cache in tests - preprocessing.test: re-establish the checkOrgMemberUsageLimit mock in beforeEach (the only gate mock not re-set). In the full suite its implementation was reset so the success-path test got undefined -> threw -> 500 -> success:false. Mirrors how checkServerSideUsageLimits is handled. - client-cache: add clearProviderClientCacheForTests; call it in the bedrock and vllm test beforeEach so construction assertions always start from a cache miss now that those providers memoize their client. --- apps/sim/lib/execution/preprocessing.test.ts | 7 ++++++- apps/sim/providers/bedrock/index.test.ts | 2 ++ apps/sim/providers/client-cache.ts | 5 +++++ apps/sim/providers/vllm/index.test.ts | 2 ++ 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index 657973a01a6..b0d8ccf0489 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -44,7 +44,10 @@ vi.mock('@sim/workflow-authz', () => ({ }), })) -import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' +import { + checkOrgMemberUsageLimit, + checkServerSideUsageLimits, +} from '@/lib/billing/calculations/usage-monitor' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { preprocessExecution } from './preprocessing' @@ -125,6 +128,7 @@ describe('preprocessExecution logPreprocessingErrors option', () => { remaining: 100, resetAt: new Date(), }) + vi.mocked(checkOrgMemberUsageLimit).mockResolvedValue({ isExceeded: false } as any) }) it('suppresses preprocessing-error logging when logPreprocessingErrors is false', async () => { @@ -174,6 +178,7 @@ describe('preprocessExecution ban gate', () => { currentUsage: 1, limit: 10, } as any) + vi.mocked(checkOrgMemberUsageLimit).mockResolvedValue({ isExceeded: false } as any) }) it('blocks execution with 403 when the actor is banned (ban wins over the parallel gates)', async () => { diff --git a/apps/sim/providers/bedrock/index.test.ts b/apps/sim/providers/bedrock/index.test.ts index 38cb857425e..3a9abceefbd 100644 --- a/apps/sim/providers/bedrock/index.test.ts +++ b/apps/sim/providers/bedrock/index.test.ts @@ -50,10 +50,12 @@ vi.mock('@/tools', () => ({ import { BedrockRuntimeClient } from '@aws-sdk/client-bedrock-runtime' import { bedrockProvider } from '@/providers/bedrock/index' +import { clearProviderClientCacheForTests } from '@/providers/client-cache' describe('bedrockProvider credential handling', () => { beforeEach(() => { vi.clearAllMocks() + clearProviderClientCacheForTests() mockSend.mockResolvedValue({ output: { message: { content: [{ text: 'response' }] } }, usage: { inputTokens: 10, outputTokens: 5 }, diff --git a/apps/sim/providers/client-cache.ts b/apps/sim/providers/client-cache.ts index 1f91a9778aa..7908a94d211 100644 --- a/apps/sim/providers/client-cache.ts +++ b/apps/sim/providers/client-cache.ts @@ -29,3 +29,8 @@ export function getCachedProviderClient(key: string, factory: clientCache.set(key, client) return client } + +/** Clears the cache so tests asserting client construction start from a miss. */ +export function clearProviderClientCacheForTests(): void { + clientCache.clear() +} diff --git a/apps/sim/providers/vllm/index.test.ts b/apps/sim/providers/vllm/index.test.ts index 8739c95f989..925c61ca2d2 100644 --- a/apps/sim/providers/vllm/index.test.ts +++ b/apps/sim/providers/vllm/index.test.ts @@ -79,6 +79,7 @@ vi.mock('@/stores/providers', () => ({ useProvidersStore: { getState: () => ({ setProviderModels: vi.fn() }) }, })) +import { clearProviderClientCacheForTests } from '@/providers/client-cache' import type { ProviderToolConfig } from '@/providers/types' import { vllmProvider } from '@/providers/vllm/index' @@ -117,6 +118,7 @@ const createPayload = (callIndex: number) => mockCreate.mock.calls[callIndex][0] describe('vllmProvider', () => { beforeEach(() => { vi.clearAllMocks() + clearProviderClientCacheForTests() openAIArgs.length = 0 envState.VLLM_BASE_URL = 'http://localhost:8000' envState.VLLM_API_KEY = undefined From be0228e40465728dc6c595e3897866ef06818a5d Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 16 Jun 2026 12:43:39 -0700 Subject: [PATCH 11/11] test(execution): make RateLimiter mock constructable under vitest 4.x The RateLimiter mock used an arrow factory (vi.fn(() => ({...}))). vitest 4.x (CI) rejects `new` on an arrow-implemented mock ("not a constructor"); 3.2.4 allowed it. The new rate-gate test is the first to actually `new RateLimiter()`, so it surfaced the failure only in CI. Switch the mock to a regular function and drop the speculative beforeEach re-establishments that didn't address it. --- apps/sim/lib/execution/preprocessing.test.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/sim/lib/execution/preprocessing.test.ts b/apps/sim/lib/execution/preprocessing.test.ts index b0d8ccf0489..e72c44a567e 100644 --- a/apps/sim/lib/execution/preprocessing.test.ts +++ b/apps/sim/lib/execution/preprocessing.test.ts @@ -28,7 +28,11 @@ vi.mock('@/lib/core/execution-limits', () => ({ getExecutionTimeout: vi.fn(() => 0), })) vi.mock('@/lib/core/rate-limiter/rate-limiter', () => ({ - RateLimiter: vi.fn(() => ({ checkRateLimitWithSubscription: mockCheckRateLimit })), + // Regular function (not an arrow) so `new RateLimiter()` is constructable under + // vitest 4.x, which rejects `new` on an arrow-implemented mock. + RateLimiter: vi.fn(function (this: unknown) { + return { checkRateLimitWithSubscription: mockCheckRateLimit } + }), })) vi.mock('@/lib/logs/execution/logging-session', () => loggingSessionMock) vi.mock('@/lib/workspaces/utils', () => ({ @@ -44,10 +48,7 @@ vi.mock('@sim/workflow-authz', () => ({ }), })) -import { - checkOrgMemberUsageLimit, - checkServerSideUsageLimits, -} from '@/lib/billing/calculations/usage-monitor' +import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { preprocessExecution } from './preprocessing' @@ -128,7 +129,6 @@ describe('preprocessExecution logPreprocessingErrors option', () => { remaining: 100, resetAt: new Date(), }) - vi.mocked(checkOrgMemberUsageLimit).mockResolvedValue({ isExceeded: false } as any) }) it('suppresses preprocessing-error logging when logPreprocessingErrors is false', async () => { @@ -178,7 +178,6 @@ describe('preprocessExecution ban gate', () => { currentUsage: 1, limit: 10, } as any) - vi.mocked(checkOrgMemberUsageLimit).mockResolvedValue({ isExceeded: false } as any) }) it('blocks execution with 403 when the actor is banned (ban wins over the parallel gates)', async () => {