Skip to content

Commit 03c98b0

Browse files
improvement(tables): filter-aware select-all runs, delete-job read mask, keyset index + autovacuum tuning
1 parent b324ac0 commit 03c98b0

16 files changed

Lines changed: 17630 additions & 61 deletions

File tree

apps/sim/app/api/table/[tableId]/columns/run/route.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
88
import { runWorkflowColumn } from '@/lib/table/workflow-columns'
9-
import { accessError, checkAccess } from '@/app/api/table/utils'
9+
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'
1010

1111
const logger = createLogger('TableRunColumnAPI')
1212

@@ -25,16 +25,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
2525
const parsed = await parseRequest(runColumnContract, request, { params })
2626
if (!parsed.success) return parsed.response
2727
const { tableId } = parsed.data.params
28-
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
28+
const { workspaceId, groupIds, runMode, rowIds, filter, limit } = parsed.data.body
2929
const access = await checkAccess(tableId, auth.userId, 'write')
3030
if (!access.ok) return accessError(access, requestId, tableId)
3131

32+
// Validate the filter up front (the dispatcher reuses it) so a bad field fails fast.
33+
const filterError = tableFilterError(filter, access.table.schema.columns)
34+
if (filterError) return filterError
35+
3236
const { dispatchId } = await runWorkflowColumn({
3337
tableId,
3438
workspaceId,
3539
groupIds,
3640
mode: runMode,
3741
rowIds,
42+
filter,
3843
limit,
3944
requestId,
4045
})

apps/sim/app/api/table/[tableId]/delete-async/route.test.ts

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,24 @@
22
* @vitest-environment node
33
*/
44
import { hybridAuthMockFns } from '@sim/testing'
5-
import { NextRequest } from 'next/server'
5+
import { NextRequest, NextResponse } from 'next/server'
66
import { beforeEach, describe, expect, it, vi } from 'vitest'
77
import type { TableDefinition } from '@/lib/table'
88

9-
const {
10-
mockCheckAccess,
11-
mockMarkTableJobRunning,
12-
mockRunTableDelete,
13-
mockBuildFilterClause,
14-
TableQueryValidationError,
15-
} = vi.hoisted(() => ({
16-
mockCheckAccess: vi.fn(),
17-
mockMarkTableJobRunning: vi.fn(),
18-
mockRunTableDelete: vi.fn(),
19-
mockBuildFilterClause: vi.fn(),
20-
TableQueryValidationError: class extends Error {},
21-
}))
9+
const { mockCheckAccess, mockMarkTableJobRunning, mockRunTableDelete, mockTableFilterError } =
10+
vi.hoisted(() => ({
11+
mockCheckAccess: vi.fn(),
12+
mockMarkTableJobRunning: vi.fn(),
13+
mockRunTableDelete: vi.fn(),
14+
mockTableFilterError: vi.fn(),
15+
}))
2216

2317
vi.mock('@sim/utils/id', () => ({
2418
generateId: vi.fn().mockReturnValue('job-id-xyz'),
2519
generateShortId: vi.fn().mockReturnValue('short-id'),
2620
}))
2721
vi.mock('@/lib/table/service', () => ({ markTableJobRunning: mockMarkTableJobRunning }))
2822
vi.mock('@/lib/table/delete-runner', () => ({ runTableDelete: mockRunTableDelete }))
29-
vi.mock('@/lib/table/sql', () => ({
30-
buildFilterClause: mockBuildFilterClause,
31-
TableQueryValidationError,
32-
}))
3323
vi.mock('@/lib/core/utils/background', () => ({
3424
runDetached: (_label: string, work: () => Promise<unknown>) => {
3525
void work()
@@ -41,6 +31,7 @@ vi.mock('@/app/api/table/utils', async () => {
4131
checkAccess: mockCheckAccess,
4232
accessError: (result: { status: number }) =>
4333
NextResponse.json({ error: 'denied' }, { status: result.status }),
34+
tableFilterError: mockTableFilterError,
4435
}
4536
})
4637

@@ -90,7 +81,7 @@ describe('POST /api/table/[tableId]/delete-async', () => {
9081
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
9182
mockMarkTableJobRunning.mockResolvedValue(true)
9283
mockRunTableDelete.mockResolvedValue(undefined)
93-
mockBuildFilterClause.mockReturnValue({})
84+
mockTableFilterError.mockReturnValue(null)
9485
})
9586

9687
it('claims the job slot and kicks off the delete worker with filter + exclusions', async () => {
@@ -99,7 +90,11 @@ describe('POST /api/table/[tableId]/delete-async', () => {
9990

10091
expect(response.status).toBe(200)
10192
expect(data.data).toEqual({ tableId: 'tbl_1', jobId: 'job-id-xyz' })
102-
expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', 'job-id-xyz', 'delete')
93+
expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', 'job-id-xyz', 'delete', {
94+
filter: { status: 'archived' },
95+
excludeRowIds: ['row_keep'],
96+
cutoff: expect.any(String),
97+
})
10398
expect(mockRunTableDelete).toHaveBeenCalledWith(
10499
expect.objectContaining({
105100
jobId: 'job-id-xyz',
@@ -128,9 +123,7 @@ describe('POST /api/table/[tableId]/delete-async', () => {
128123
})
129124

130125
it('returns 400 on an invalid filter without claiming the slot', async () => {
131-
mockBuildFilterClause.mockImplementation(() => {
132-
throw new TableQueryValidationError('bad field')
133-
})
126+
mockTableFilterError.mockReturnValue(NextResponse.json({ error: 'bad field' }, { status: 400 }))
134127
const response = await makeRequest(validBody)
135128
expect(response.status).toBe(400)
136129
expect(mockMarkTableJobRunning).not.toHaveBeenCalled()

apps/sim/app/api/table/[tableId]/delete-async/route.ts

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
77
import { runDetached } from '@/lib/core/utils/background'
88
import { generateRequestId } from '@/lib/core/utils/request'
99
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
10-
import { USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants'
1110
import { runTableDelete } from '@/lib/table/delete-runner'
1211
import { markTableJobRunning } from '@/lib/table/service'
13-
import { buildFilterClause, TableQueryValidationError } from '@/lib/table/sql'
14-
import type { Filter } from '@/lib/table/types'
15-
import { accessError, checkAccess } from '@/app/api/table/utils'
12+
import type { TableDeleteJobPayload } from '@/lib/table/types'
13+
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'
1614

1715
const logger = createLogger('TableDeleteAsync')
1816

@@ -57,24 +55,18 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
5755
}
5856

5957
// Validate the filter up front so the caller gets immediate feedback (the worker reuses it).
60-
if (filter) {
61-
try {
62-
buildFilterClause(filter as Filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns)
63-
} catch (error) {
64-
if (error instanceof TableQueryValidationError) {
65-
return NextResponse.json({ error: error.message }, { status: 400 })
66-
}
67-
throw error
68-
}
69-
}
58+
const filterError = tableFilterError(filter, table.schema.columns)
59+
if (filterError) return filterError
7060

7161
// Rows inserted after this instant are spared (created_at <= cutoff in the worker).
7262
const cutoff = new Date()
7363

7464
// Atomically claim the job slot — one background job per table, so this also blocks while an
75-
// import is in flight (and vice versa).
65+
// import is in flight (and vice versa). The scope is persisted to the job's payload so read
66+
// paths can mask the doomed rows while the job runs (see `pendingDeleteMask`).
7667
const jobId = generateId()
77-
const claimed = await markTableJobRunning(tableId, jobId, 'delete')
68+
const payload: TableDeleteJobPayload = { filter, excludeRowIds, cutoff: cutoff.toISOString() }
69+
const claimed = await markTableJobRunning(tableId, jobId, 'delete', payload)
7870
if (!claimed) {
7971
return NextResponse.json(
8072
{ error: 'A job is already in progress for this table' },
@@ -87,7 +79,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8779
jobId,
8880
tableId,
8981
workspaceId,
90-
filter: filter as Filter | undefined,
82+
filter,
9183
excludeRowIds,
9284
cutoff,
9385
})

apps/sim/app/api/table/utils.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,32 @@ import {
66
updateTableColumnBodySchema,
77
} from '@/lib/api/contracts/tables'
88
import type { MultipartError } from '@/lib/core/utils/multipart'
9-
import type { ColumnDefinition, TableDefinition } from '@/lib/table'
10-
import { getTableById } from '@/lib/table'
9+
import type { ColumnDefinition, Filter, TableDefinition } from '@/lib/table'
10+
import { buildFilterClause, getTableById, TableQueryValidationError } from '@/lib/table'
11+
import { USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants'
1112
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
1213

14+
/**
15+
* Validates a `filter` against the table's column schema, returning a 400 response on a bad field
16+
* (or `null` when the filter is valid or absent). Shared by the routes that accept a filter
17+
* (`delete-async`, `columns/run`) so a bad field fails fast with a clear message.
18+
*/
19+
export function tableFilterError(
20+
filter: Filter | undefined,
21+
columns: ColumnDefinition[]
22+
): NextResponse | null {
23+
if (!filter) return null
24+
try {
25+
buildFilterClause(filter, USER_TABLE_ROWS_SQL_NAME, columns)
26+
return null
27+
} catch (error) {
28+
if (error instanceof TableQueryValidationError) {
29+
return NextResponse.json({ error: error.message }, { status: 400 })
30+
}
31+
throw error
32+
}
33+
}
34+
1335
const logger = createLogger('TableUtils')
1436

1537
/**

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { Loader, TableX } from '@/components/emcn/icons'
1111
import type { RunLimit, RunMode, TableFindMatch } from '@/lib/api/contracts/tables'
1212
import { cn } from '@/lib/core/utils/cn'
1313
import { captureEvent } from '@/lib/posthog/client'
14-
import type { ColumnDefinition, TableRow as TableRowType, WorkflowGroup } from '@/lib/table'
14+
import type { ColumnDefinition, Filter, TableRow as TableRowType, WorkflowGroup } from '@/lib/table'
1515
import { getColumnId } from '@/lib/table/column-keys'
1616
import { TABLE_LIMITS } from '@/lib/table/constants'
1717
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
@@ -114,6 +114,8 @@ export interface SelectionSnapshot {
114114
rowIds: string[]
115115
allRows: boolean
116116
rowCount: number
117+
/** Active filter when `allRows` is set — lets a filtered "select all" run only matching rows. */
118+
filter?: Filter
117119
} | null
118120
/** Drives Play (`hasIncompleteOrFailed`) / Refresh (`hasCompleted`) /
119121
* Stop (`hasInFlight`) visibility on the action bar. */
@@ -3360,13 +3362,15 @@ export function TableGrid({
33603362
if (!rowSelectionIsEmpty(rowSelection)) {
33613363
if (rowSelection.kind === 'all') {
33623364
// `rowIds` is the loaded window (virtualized); the label total comes from the filter-scoped
3363-
// row count minus any deselected rows.
3365+
// row count minus any deselected rows. `filter` lets the run target the matching rows
3366+
// server-side (the dispatcher walks them) rather than the loaded window.
33643367
const excluded = rowSelection.excluded?.size ?? 0
33653368
return {
33663369
groupIds: tableWorkflowGroupIds,
33673370
rowIds: rows.map((r) => r.id),
33683371
allRows: true,
33693372
rowCount: Math.max(0, selectAllTotalRef.current - excluded),
3373+
filter: queryOptions.filter ?? undefined,
33703374
}
33713375
}
33723376
const rowIds = rows.filter((r) => rowSelectionIncludes(rowSelection, r.id)).map((r) => r.id)

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ export function Table({
251251
(args: {
252252
groupIds: string[]
253253
rowIds?: string[]
254+
filter?: Filter
254255
runMode: RunMode
255256
limit?: RunLimit
256257
source: 'row' | 'rows' | 'column'
@@ -631,6 +632,8 @@ export function Table({
631632
runScope({
632633
groupIds: scope.groupIds,
633634
rowIds: scope.allRows ? undefined : scope.rowIds,
635+
// `filter` is only populated on select-all, so it's already undefined otherwise.
636+
filter: scope.filter,
634637
runMode: 'incomplete',
635638
source: 'rows',
636639
})
@@ -641,6 +644,7 @@ export function Table({
641644
runScope({
642645
groupIds: scope.groupIds,
643646
rowIds: scope.allRows ? undefined : scope.rowIds,
647+
filter: scope.filter,
644648
runMode: 'all',
645649
source: 'rows',
646650
})

apps/sim/hooks/queries/tables.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,6 +1633,10 @@ interface RunColumnVariables {
16331633
runMode?: RunMode
16341634
/** Restrict to these rows. Server applies the same eligibility predicate. */
16351635
rowIds?: string[]
1636+
/** "Select all under a filter" — run every row matching this filter (mutually exclusive with
1637+
* `rowIds`). Optimistic stamping is skipped (like `limit`) since the matching set isn't known
1638+
* client-side; the dispatcher's real pending stamps drive the UI. */
1639+
filter?: Filter
16361640
/** Cap the run to the first `max` eligible rows. Omit for an unbounded run.
16371641
* Optimistic stamping is skipped when set — the dispatcher's real pending
16381642
* stamps drive the UI for the actual capped rows. */
@@ -1758,24 +1762,31 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
17581762
const queryClient = useQueryClient()
17591763

17601764
return useMutation({
1761-
mutationFn: async ({ groupIds, runMode = 'all', rowIds, limit }: RunColumnVariables) => {
1765+
mutationFn: async ({
1766+
groupIds,
1767+
runMode = 'all',
1768+
rowIds,
1769+
filter,
1770+
limit,
1771+
}: RunColumnVariables) => {
17621772
return requestJson(runColumnContract, {
17631773
params: { tableId },
17641774
body: {
17651775
workspaceId,
17661776
groupIds,
17671777
runMode,
17681778
...(rowIds && rowIds.length > 0 ? { rowIds } : {}),
1779+
...(filter ? { filter } : {}),
17691780
...(limit ? { limit } : {}),
17701781
},
17711782
})
17721783
},
1773-
onMutate: async ({ groupIds, runMode = 'all', rowIds, limit }) => {
1774-
// Capped runs touch only the first N eligible rows, chosen server-side by
1775-
// position. We can't predict that set client-side, so optimistic stamping
1776-
// is skipped — the dispatcher's real pending stamps (cell SSE) drive the
1777-
// UI within the first window.
1778-
if (limit)
1784+
onMutate: async ({ groupIds, runMode = 'all', rowIds, filter, limit }) => {
1785+
// Capped and filtered runs target a set we can't predict client-side (capped picks the first
1786+
// N by position; filtered matches a server-evaluated predicate), so optimistic stamping is
1787+
// skipped — the dispatcher's real pending stamps (cell SSE) drive the UI within the first
1788+
// window.
1789+
if (limit || filter)
17791790
return { snapshots: undefined, runStateSnapshot: undefined, didBumpRunState: false }
17801791
const targetRowIds = rowIds && rowIds.length > 0 ? new Set(rowIds) : null
17811792
const targetGroupIds = new Set(groupIds)

apps/sim/lib/api/contracts/tables.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,9 @@ export const runColumnBodySchema = z.object({
11091109
groupIds: z.array(z.string().min(1)).min(1),
11101110
runMode: z.enum(['all', 'incomplete']).default('all'),
11111111
rowIds: z.array(z.string().min(1)).min(1).optional(),
1112+
/** "Select all under a filter" — run every row matching this filter instead of `rowIds`. The
1113+
* dispatcher walks only matching rows (paginated), so no id list is materialized. */
1114+
filter: nonEmptyFilterSchema.optional(),
11121115
/** Cap the run to the first `max` eligible rows. Omit for an unbounded run. */
11131116
limit: runLimitSchema.optional(),
11141117
})

apps/sim/lib/table/dispatcher.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import { generateId } from '@sim/utils/id'
66
import { and, asc, eq, gt, inArray, isNotNull, ne, or, type SQL, sql } from 'drizzle-orm'
77
import { getJobQueue } from '@/lib/core/async-jobs/config'
88
import { writeWorkflowGroupState } from '@/lib/table/cell-write'
9+
import { USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants'
910
import { isExecCancelledAfter } from '@/lib/table/deps'
1011
import { appendTableEvent } from '@/lib/table/events'
11-
import type { RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table/types'
12+
import { buildFilterClause } from '@/lib/table/sql'
13+
import type { Filter, RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table/types'
1214
import {
1315
buildEnqueueItems,
1416
buildPendingRuns,
@@ -32,6 +34,9 @@ export type DispatchMode = 'all' | 'incomplete' | 'new'
3234
export interface DispatchScope {
3335
groupIds: string[]
3436
rowIds?: string[]
37+
/** "Select all matching a filter" — run every row matching this filter (mutually exclusive with
38+
* `rowIds`). Lets the action-bar Play/Refresh target a filtered view without materializing ids. */
39+
filter?: Filter
3540
}
3641

3742
/**
@@ -390,6 +395,15 @@ export async function dispatcherStep(dispatchId: string): Promise<DispatcherStep
390395
]
391396
if (dispatch.scope.rowIds && dispatch.scope.rowIds.length > 0) {
392397
filters.push(inArray(userTableRows.id, dispatch.scope.rowIds))
398+
} else if (dispatch.scope.filter) {
399+
// "Select all under a filter": walk only the matching rows. Same cursor/window mechanism —
400+
// non-matching rows are simply never selected, like mode eligibility.
401+
const filterClause = buildFilterClause(
402+
dispatch.scope.filter,
403+
USER_TABLE_ROWS_SQL_NAME,
404+
table.schema.columns
405+
)
406+
if (filterClause) filters.push(filterClause)
393407
}
394408
// `'new'` mode targets only rows whose targeted groups haven't been
395409
// attempted. Exclude a row only when EVERY targeted group already has a

0 commit comments

Comments
 (0)