Skip to content

Commit cc56408

Browse files
authored
perf(execution): parallelize preflight gates, cache deployed state, memoize Anthropic client (#5098)
* perf(execution): parallelize preflight gates, cache deployed state, memoize Anthropic client - Memoize Anthropic + Azure-Anthropic SDK clients (new client-cache.ts) keyed by apiKey (+beta header; +baseURL/version/pinnedIP for Azure) so HTTP keep-alive connections are reused instead of a fresh TLS handshake per call. apiKey is the tenant boundary. - Parallelize the read-only preflight gates in preprocessing.ts (ban + subscription, then usage + org-member + rate-limit) while preserving exact error precedence (ban 403 -> usage 402 -> rate 429) and keeping the sole write (admission reservation) last. - Parallelize the independent workflow-state and env-var loads in execution-core. - Cache deployed workflow state by immutable deploymentVersionId with deep-clone-on-read, oldest-first eviction, and a 5-min TTL bounding the credential-mapping edge across ECS tasks. - Parallelize the independent personal-subscription + membership queries in getHighestPrioritySubscription. - BYOK: drop the redundant getWorkspaceById existence check (auth already validates the workspace); read the key list fresh every call for zero cross-instance staleness. Billing/usage/ban/permission reads stay fresh on the primary (no cache, no replica). Adds tests for every new mechanism and fixes a pre-existing vitest class-mock incompatibility that had execution-core.test.ts fully red on staging. * fix(execution): run rate-limit gate only after ban/usage pass The rate-limit gate is not read-only — checkRateLimitWithSubscription consumes a token — so running it in parallel with the read-only gates debited rate-limit quota for requests that the ban (403) or usage (402) gates reject, which the original sequential flow never did. Move the rate-limit gate to run sequentially after the ban and usage gates pass, preserving the read-only gates' parallelism (ban + subscription + usage) and the exact ban -> usage -> rate precedence. Add regression tests asserting the rate limiter is not consumed when an earlier gate rejects, and is consumed once when they pass. Caught by Cursor Bugbot review. * chore(execution): trim redundant preflight comments Tighten the gate overview to match the sequential rate-limit gate and drop inline notes that duplicated it or the runRateLimitGate doc. * refactor(cache): address review — idle TTL for client cache, LRUCache for deployed state - client-cache: add updateAgeOnGet so the TTL is genuinely idle-based (active clients keep their warm keep-alive connections; the JSDoc now matches behavior). - deployed-state: replace the hand-rolled Map + manual FIFO eviction/TTL with LRUCache (real LRU eviction, built-in TTL), matching the effectiveDecryptedEnv and integration-tool-schema caches. TTL stays absolute (not reset on read) so the credential-migration remap still propagates across ECS tasks. Both per review feedback from Greptile. * test(execution): isolate rate-limit gate test from STEP 7 reservation The 'consumes the rate-limit gate once' test reached the STEP 7 admission reservation, which depends on Redis — it passed locally (reserve throws and is swallowed) but failed in CI (reserve returns not-reserved -> 429). Pass skipConcurrencyReservation so the test isolates the rate gate deterministically. * perf(providers): memoize SDK clients where the pool is per-client (bedrock, vllm) Generalize the Anthropic client cache into one shared memoizer (providers/client-cache.ts) and apply it only where each new client owns its own connection pool — so reuse actually keeps connections warm: - bedrock: AWS SDK clients hold a per-client connection pool (reuse is the AWS best practice). Keyed by region + credential identity. - vllm: a pinned endpoint creates its own undici Agent per call; key by the resolved IP so DNS re-validation still runs each request. - anthropic + azure-anthropic: migrated onto the shared memoizer. Deliberately NOT applied to the OpenAI-compatible providers, groq, cerebras, or google: their SDKs share a process-global keep-alive pool (Node openai-sdk module singleton agent; anthropic/global undici), so a fresh client per request already reuses connections and memoization would add complexity with ~no benefit. litellm uses a plain shared-agent client (no pinning) and is likewise skipped. Bounded LRU (max 1000, 30m idle TTL) with no close-on-eviction, avoiding the unbounded-growth and eviction-closes-in-use-client failure modes seen in similar client caches. * chore(perf): trim verbose comments to terse why-notes * chore(perf): drop obvious inline comments, keep nuance as TSDoc * fix(bedrock): key client cache on full credential, not just access key id A corrected secret under the same access key id would otherwise keep serving the stale cached client until TTL/eviction. Caught by Cursor Bugbot. * test(execution,providers): fix preflight mock reset + isolate provider client cache in tests - preprocessing.test: re-establish the checkOrgMemberUsageLimit mock in beforeEach (the only gate mock not re-set). In the full suite its implementation was reset so the success-path test got undefined -> threw -> 500 -> success:false. Mirrors how checkServerSideUsageLimits is handled. - client-cache: add clearProviderClientCacheForTests; call it in the bedrock and vllm test beforeEach so construction assertions always start from a cache miss now that those providers memoize their client. * test(execution): make RateLimiter mock constructable under vitest 4.x The RateLimiter mock used an arrow factory (vi.fn(() => ({...}))). vitest 4.x (CI) rejects `new` on an arrow-implemented mock ("not a constructor"); 3.2.4 allowed it. The new rate-gate test is the first to actually `new RateLimiter()`, so it surfaced the failure only in CI. Switch the mock to a regular function and drop the speculative beforeEach re-establishments that didn't address it.
1 parent f238184 commit cc56408

19 files changed

Lines changed: 1128 additions & 291 deletions

File tree

apps/sim/lib/api-key/byok.test.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
*/
44
import { beforeEach, describe, expect, it, vi } from 'vitest'
55

6-
const { mockOrderBy, mockGetWorkspaceById, mockDecryptSecret } = vi.hoisted(() => ({
6+
const { mockOrderBy, mockDecryptSecret } = vi.hoisted(() => ({
77
mockOrderBy: vi.fn(),
8-
mockGetWorkspaceById: vi.fn(),
98
mockDecryptSecret: vi.fn(),
109
}))
1110

@@ -19,10 +18,6 @@ vi.mock('@sim/db', () => ({
1918
},
2019
}))
2120

22-
vi.mock('@/lib/workspaces/permissions/utils', () => ({
23-
getWorkspaceById: mockGetWorkspaceById,
24-
}))
25-
2621
vi.mock('@/lib/core/security/encryption', () => ({
2722
decryptSecret: mockDecryptSecret,
2823
}))
@@ -70,7 +65,6 @@ const storedKey = (id: string) => ({ id, encryptedApiKey: `encrypted-${id}` })
7065
describe('getBYOKKey', () => {
7166
beforeEach(() => {
7267
vi.clearAllMocks()
73-
mockGetWorkspaceById.mockResolvedValue({ id: 'workspace' })
7468
mockOrderBy.mockResolvedValue([])
7569
mockDecryptSecret.mockImplementation(async (encrypted: string) => ({
7670
decrypted: encrypted.replace('encrypted-', 'decrypted-'),
@@ -80,13 +74,6 @@ describe('getBYOKKey', () => {
8074
it('returns null when no workspaceId is provided', async () => {
8175
expect(await getBYOKKey(undefined, 'openai')).toBeNull()
8276
expect(await getBYOKKey(null, 'openai')).toBeNull()
83-
expect(mockGetWorkspaceById).not.toHaveBeenCalled()
84-
})
85-
86-
it('returns null when the workspace does not exist', async () => {
87-
mockGetWorkspaceById.mockResolvedValue(null)
88-
89-
expect(await getBYOKKey(uniqueWorkspaceId(), 'openai')).toBeNull()
9077
})
9178

9279
it('returns null when the workspace has no keys for the provider', async () => {
@@ -123,6 +110,17 @@ describe('getBYOKKey', () => {
123110
])
124111
})
125112

113+
it('reads the key list fresh from the database on every call', async () => {
114+
const workspaceId = uniqueWorkspaceId()
115+
mockOrderBy.mockResolvedValue([storedKey('key-1')])
116+
117+
await getBYOKKey(workspaceId, 'openai')
118+
await getBYOKKey(workspaceId, 'openai')
119+
await getBYOKKey(workspaceId, 'openai')
120+
121+
expect(mockOrderBy).toHaveBeenCalledTimes(3)
122+
})
123+
126124
it('tracks rotation independently per provider within a workspace', async () => {
127125
const workspaceId = uniqueWorkspaceId()
128126
mockOrderBy.mockResolvedValue([storedKey('key-1'), storedKey('key-2')])

apps/sim/lib/api-key/byok.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { getRotatingApiKey } from '@/lib/core/config/api-keys'
66
import { env } from '@/lib/core/config/env'
77
import { isHosted } from '@/lib/core/config/env-flags'
88
import { decryptSecret } from '@/lib/core/security/encryption'
9-
import { getWorkspaceById } from '@/lib/workspaces/permissions/utils'
109
import { getHostedModels } from '@/providers/models'
1110
import { PROVIDER_PLACEHOLDER_KEY } from '@/providers/utils'
1211
import { useProvidersStore } from '@/stores/providers/store'
@@ -37,6 +36,9 @@ function nextRotationIndex(poolKey: string, poolSize: number): number {
3736
* multiple keys stored for the provider, requests round-robin across them in
3837
* creation order. A key that fails to decrypt is skipped in favor of the next
3938
* one in the pool.
39+
*
40+
* The key list is read fresh every call (not cached): BYOK is not a hot query,
41+
* and reading fresh keeps revocation immediate across ECS tasks.
4042
*/
4143
export async function getBYOKKey(
4244
workspaceId: string | undefined | null,
@@ -47,11 +49,6 @@ export async function getBYOKKey(
4749
}
4850

4951
try {
50-
const activeWorkspace = await getWorkspaceById(workspaceId)
51-
if (!activeWorkspace) {
52-
return null
53-
}
54-
5552
const keys = await db
5653
.select({ id: workspaceBYOKKeys.id, encryptedApiKey: workspaceBYOKKeys.encryptedApiKey })
5754
.from(workspaceBYOKKeys)

apps/sim/lib/billing/calculations/usage-monitor.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ export async function checkOrgMemberUsageLimit(
467467
return { isExceeded: false, currentUsage: 0, limit: null }
468468
}
469469

470+
// Resolve the cap first and short-circuit when unset (the common case); only
471+
// then is computing usage worthwhile. Kept sequential, not raced, to avoid a
472+
// usage query on every uncapped member's execution.
470473
const limit = await getOrgMemberUsageLimit(organizationId, userId)
471474
if (limit === null) {
472475
return { isExceeded: false, currentUsage: 0, limit: null }
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { beforeEach, describe, expect, it, vi } from 'vitest'
5+
6+
/**
7+
* Drizzle mock for `getHighestPrioritySubscription`. It issues up to four
8+
* queries keyed by table:
9+
* - `subscription` for the user's personal subs (parallelized with members)
10+
* - `member` for the user's org memberships (parallelized with subs)
11+
* - `organization` for the org-existence follow-up
12+
* - `subscription` again for the org-scoped subs follow-up
13+
*
14+
* The mock routes results by the table object passed to `.from()`, serving the
15+
* (twice-read) `subscription` table from a FIFO queue (first read = personal,
16+
* second = org). It records which tables were queried so we can assert the
17+
* parallelized pair both run and that follow-ups are skipped when appropriate.
18+
*
19+
* Table sentinels and shared mock state live inside `vi.hoisted` so the
20+
* `vi.mock` factories (hoisted to the top of the file) can reference them.
21+
*/
22+
const { SUBSCRIPTION_TABLE, MEMBER_TABLE, ORGANIZATION_TABLE, resultsByTable, fromCalls, select } =
23+
vi.hoisted(() => {
24+
const SUBSCRIPTION_TABLE = { __table: 'subscription' }
25+
const MEMBER_TABLE = { __table: 'member' }
26+
const ORGANIZATION_TABLE = { __table: 'organization' }
27+
28+
const resultsByTable: Record<string, unknown[][]> = {
29+
subscription: [],
30+
member: [],
31+
organization: [],
32+
}
33+
const fromCalls: string[] = []
34+
35+
const select = vi.fn(() => ({
36+
from: (table: { __table: string }) => {
37+
fromCalls.push(table.__table)
38+
const where = () => {
39+
const queue = resultsByTable[table.__table]
40+
const next = queue.length > 0 ? queue.shift() : []
41+
return Promise.resolve(next ?? [])
42+
}
43+
return { where }
44+
},
45+
}))
46+
47+
return {
48+
SUBSCRIPTION_TABLE,
49+
MEMBER_TABLE,
50+
ORGANIZATION_TABLE,
51+
resultsByTable,
52+
fromCalls,
53+
select,
54+
}
55+
})
56+
57+
vi.mock('@sim/db', () => ({
58+
db: { select },
59+
}))
60+
61+
vi.mock('@sim/db/schema', () => ({
62+
subscription: SUBSCRIPTION_TABLE,
63+
member: MEMBER_TABLE,
64+
organization: ORGANIZATION_TABLE,
65+
}))
66+
67+
/**
68+
* Realistic plan-check predicates so `pickHighestPrioritySubscription` exercises
69+
* the real Enterprise > Team > Pro priority ordering over the rows we feed it.
70+
*/
71+
vi.mock('@/lib/billing/subscriptions/utils', () => ({
72+
ENTITLED_SUBSCRIPTION_STATUSES: ['active', 'past_due'],
73+
checkEnterprisePlan: (s: any) =>
74+
s?.plan === 'enterprise' && ['active', 'past_due'].includes(s?.status),
75+
checkTeamPlan: (s: any) => s?.plan === 'team' && ['active', 'past_due'].includes(s?.status),
76+
checkProPlan: (s: any) => s?.plan === 'pro' && ['active', 'past_due'].includes(s?.status),
77+
}))
78+
79+
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
80+
81+
interface SubRow {
82+
id: string
83+
referenceId: string
84+
plan: string
85+
status: string
86+
}
87+
88+
function personalPro(userId: string): SubRow {
89+
return { id: 'sub-personal-pro', referenceId: userId, plan: 'pro', status: 'active' }
90+
}
91+
92+
function orgEnterprise(orgId: string): SubRow {
93+
return { id: 'sub-org-enterprise', referenceId: orgId, plan: 'enterprise', status: 'active' }
94+
}
95+
96+
function queue(table: 'subscription' | 'member' | 'organization', rows: unknown[]) {
97+
resultsByTable[table].push(rows)
98+
}
99+
100+
describe('getHighestPrioritySubscription', () => {
101+
beforeEach(() => {
102+
vi.clearAllMocks()
103+
resultsByTable.subscription = []
104+
resultsByTable.member = []
105+
resultsByTable.organization = []
106+
fromCalls.length = 0
107+
})
108+
109+
it('picks the org Enterprise sub over a personal Pro sub (priority order)', async () => {
110+
queue('subscription', [personalPro('user-1')]) // personalSubs query
111+
queue('member', [{ organizationId: 'org-1' }]) // memberships query
112+
queue('organization', [{ id: 'org-1' }]) // org-existence query
113+
queue('subscription', [orgEnterprise('org-1')]) // org-subscriptions query
114+
115+
const result = await getHighestPrioritySubscription('user-1')
116+
117+
expect(result).not.toBeNull()
118+
expect(result?.id).toBe('sub-org-enterprise')
119+
expect(result?.plan).toBe('enterprise')
120+
})
121+
122+
it('selection is deterministic regardless of which parallelized query resolves first', async () => {
123+
queue('subscription', [personalPro('user-1')])
124+
queue('member', [{ organizationId: 'org-1' }])
125+
queue('organization', [{ id: 'org-1' }])
126+
queue('subscription', [orgEnterprise('org-1')])
127+
128+
const result = await getHighestPrioritySubscription('user-1')
129+
130+
expect(result?.id).toBe('sub-org-enterprise')
131+
})
132+
133+
it('issues BOTH the personal-subscriptions and memberships queries (parallelized pair)', async () => {
134+
queue('subscription', [personalPro('user-1')])
135+
queue('member', [{ organizationId: 'org-1' }])
136+
queue('organization', [{ id: 'org-1' }])
137+
queue('subscription', [orgEnterprise('org-1')])
138+
139+
await getHighestPrioritySubscription('user-1')
140+
141+
expect(fromCalls).toContain('subscription')
142+
expect(fromCalls).toContain('member')
143+
// First two queries are exactly the parallelized pair (in either order).
144+
expect(fromCalls.slice(0, 2).sort()).toEqual(['member', 'subscription'])
145+
})
146+
147+
it('returns the personal sub and skips org follow-ups when there are no memberships', async () => {
148+
queue('subscription', [personalPro('user-1')])
149+
queue('member', [])
150+
151+
const result = await getHighestPrioritySubscription('user-1')
152+
153+
expect(result?.id).toBe('sub-personal-pro')
154+
expect(result?.plan).toBe('pro')
155+
// org-existence + org-subscription follow-ups are NOT issued.
156+
expect(fromCalls).not.toContain('organization')
157+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
158+
})
159+
160+
it('returns null when neither personal nor org subscriptions exist', async () => {
161+
queue('subscription', [])
162+
queue('member', [])
163+
164+
const result = await getHighestPrioritySubscription('user-1')
165+
166+
expect(result).toBeNull()
167+
})
168+
169+
it('excludes orphaned org memberships whose organization row no longer exists', async () => {
170+
queue('subscription', [])
171+
queue('member', [{ organizationId: 'ghost-org' }]) // membership points at a deleted org
172+
queue('organization', [])
173+
174+
const result = await getHighestPrioritySubscription('user-1')
175+
176+
// Org subs are never fetched (no valid org ids) -> falls back to null.
177+
expect(result).toBeNull()
178+
expect(fromCalls).toContain('organization')
179+
// Only the initial personal-subs read on `subscription`; org-subs query skipped.
180+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
181+
})
182+
183+
it('falls back to the personal sub when the only org is orphaned', async () => {
184+
queue('subscription', [personalPro('user-1')])
185+
queue('member', [{ organizationId: 'ghost-org' }])
186+
queue('organization', [])
187+
188+
const result = await getHighestPrioritySubscription('user-1')
189+
190+
expect(result?.id).toBe('sub-personal-pro')
191+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
192+
})
193+
})

apps/sim/lib/billing/core/plan.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,21 @@ export async function getHighestPrioritySubscription(
8282
) {
8383
const { onError = 'return-null', executor = db } = options
8484
try {
85-
const personalSubs = await executor
86-
.select()
87-
.from(subscription)
88-
.where(
89-
and(
90-
eq(subscription.referenceId, userId),
91-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES)
92-
)
93-
)
94-
95-
const memberships = await executor
96-
.select({ organizationId: member.organizationId })
97-
.from(member)
98-
.where(eq(member.userId, userId))
85+
const [personalSubs, memberships] = await Promise.all([
86+
executor
87+
.select()
88+
.from(subscription)
89+
.where(
90+
and(
91+
eq(subscription.referenceId, userId),
92+
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES)
93+
)
94+
),
95+
executor
96+
.select({ organizationId: member.organizationId })
97+
.from(member)
98+
.where(eq(member.userId, userId)),
99+
])
99100

100101
const orgIds = memberships.map((m: { organizationId: string }) => m.organizationId)
101102

0 commit comments

Comments
 (0)