Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .agents/skills/memory-load-check/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,35 @@ Read these when doing a deeper pass:
- cap downloads and parsed output separately
- preserve partial results when a later item exceeds the cap
- never read untrusted response bodies without a byte cap
- KB connector file downloads in `apps/sim/connectors/utils.ts`
- `CONNECTOR_MAX_FILE_BYTES`: shared per-file cap (aligned with the manual KB upload limit)
- `readBodyWithLimit`: stream a download body to a Buffer with a hard byte cap (null on overflow)
- `stubOrSkipBySize`: listing-time skip when the reported size exceeds the cap
- `markSkipped` / `sizeLimitSkipReason`: surface oversized files as failed (skipped) KB rows
- `ConnectorFileTooLargeError`: thrown mid-download when the listing under-reported size
- Large workflow value payloads
- prefer durable references/manifests over inlining large arrays or files
- materialize refs only behind an explicit byte budget

## KB Connector File Size Handling

The connector size pattern in `apps/sim/connectors/utils.ts` (`CONNECTOR_MAX_FILE_BYTES` + `readBodyWithLimit` + `stubOrSkipBySize`/`markSkipped`) exists for one risk: a knowledge-base connector downloading **arbitrary, user-controlled file bytes** that the source does not hard-cap. Apply it by that risk, not by the connector's name.

Use the pattern when the connector downloads file content via a stream/`download_url` where the user controls the size:
- file-storage connectors: Dropbox, OneDrive, SharePoint, Google Drive, S3, GitHub, GitLab, Azure DevOps
- any connector that fetches a file via a download URL even if it is not a "storage" service (e.g. the Zoom transcript `.vtt`)

For those, require all three:
- stream the body with `readBodyWithLimit(resp, CONNECTOR_MAX_FILE_BYTES)` — never raw `response.text()`/`response.arrayBuffer()`
- skip oversize at listing (`stubOrSkipBySize` with the reported size) and again at fetch time (overflow -> `markSkipped`), since the listing size can be missing or under-reported
- never drop/truncate silently — oversized files become content-less failed rows carrying `skippedReason`, so they stay visible in the KB UI instead of vanishing from the index

Skip the pattern when the source already bounds the payload:
- pure API/structured-data connectors (Jira, Linear, Notion, Confluence, Sentry, Slack, Zendesk, Gmail, ...) — paginated JSON/text; apply normal pagination + concurrency bounds instead of a per-file byte cap
- native-document connectors capped by the platform (Google Docs ~50 MB, Google Sheets via `MAX_ROWS`, Evernote ~25 MB/note) — a 100 MB cap can never fire, and wrapping a `response.json()`/Thrift parse in `readBodyWithLimit` is cargo-culting

Litmus test: "Can a user make this one fetch arbitrarily large, with nothing upstream stopping it?" Yes -> use the pattern. No (platform hard-cap, or already paginated) -> a per-file byte cap adds noise, not safety. Borderline: a user-configured/self-hosted endpoint with no platform cap (e.g. Obsidian) — bound it only if the content is genuinely unbounded.

## Review Workflow

1. Identify every changed data source:
Expand Down Expand Up @@ -96,6 +121,7 @@ Read these when doing a deeper pass:
- fetches all pages from an external API before processing
- reads an entire file, HTTP response, or stream without a max byte budget
- checks size only after `Buffer.concat`, `arrayBuffer`, `text`, `JSON.parse`, or parse expansion
- a KB connector silently drops or truncates an oversized file instead of recording it as a failed (skipped) row
- chunks only after loading the complete dataset
- paginates with unbounded/deep `OFFSET` on a mutable or large table
- creates one queue job per row without batching or a queue-level concurrency key
Expand Down
28 changes: 22 additions & 6 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1235,12 +1235,28 @@ async function handleExecutePost(
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
let eventToSend = event
if (isBuffered) {
const entry = terminalStatus
? await eventWriter.writeTerminal(event, terminalStatus)
: await eventWriter.write(event)
eventToSend = entry.event
eventToSend.eventId = entry.eventId
terminalEventPublished ||= Boolean(terminalStatus)
try {
const entry = terminalStatus
? await eventWriter.writeTerminal(event, terminalStatus)
: await eventWriter.write(event)
eventToSend = entry.event
eventToSend.eventId = entry.eventId
terminalEventPublished ||= Boolean(terminalStatus)
} catch (e) {
// The event buffer (Redis replay store) rejected this event — e.g. the flush
// batch exceeds the per-write byte cap for large block outputs. The buffer only
// backs reconnect/replay; the live SSE stream is the primary delivery. Fall
// through to enqueue the event live (below) instead of throwing, so terminal
// events still reach the active client and the UI doesn't hang on "running".
// Marking a terminal event delivered-live as published lets finalization close
// the stream cleanly instead of aborting it with controller.error().
reqLogger.warn('Event buffer write failed; delivering event over live stream only', {
eventType: event.type,
terminal: Boolean(terminalStatus),
error: toError(e).message,
})
terminalEventPublished ||= Boolean(terminalStatus)
}
}
if (!isStreamClosed) {
try {
Expand Down
34 changes: 31 additions & 3 deletions apps/sim/connectors/azure-devops/azure-devops.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import { azureDevopsConnectorMeta } from '@/connectors/azure-devops/meta'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { htmlToPlainText, joinTagArray, parseTagDate, readBodyWithLimit } from '@/connectors/utils'
import {
CONNECTOR_MAX_FILE_BYTES,
htmlToPlainText,
joinTagArray,
markSkipped,
parseTagDate,
readBodyWithLimit,
sizeLimitSkipReason,
} from '@/connectors/utils'

const logger = createLogger('AzureDevOpsConnector')

Expand All @@ -30,7 +38,7 @@ const FILE_BATCH_SIZE = 100
* and aborts (returning null) the moment the cap is exceeded. Larger files are
* skipped without being fully buffered.
*/
const MAX_FILE_SIZE = 10 * 1024 * 1024
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES
/** Bytes sniffed for a NUL byte when detecting binary files (matches git's heuristic). */
const BINARY_SNIFF_BYTES = 8000
/**
Expand Down Expand Up @@ -1090,7 +1098,27 @@ async function getFileDocument(
const buffer = await readBodyWithLimit(contentResponse, MAX_FILE_SIZE)
if (buffer === null) {
logger.info('Skipping oversized Azure DevOps file', { path })
return null
const skippedTitle = path.split('/').filter(Boolean).pop() || path
return markSkipped(
{
externalId,
title: skippedTitle,
content: '',
mimeType: 'text/plain',
sourceUrl: buildFileSourceUrl(repo?.webUrl, branch, path),
contentHash: buildFileContentHash(repoId, item.objectId),
metadata: {
kind: 'file',
organization,
project,
repository: repo?.name ?? '',
repositoryId: repoId,
branch,
path,
},
},
sizeLimitSkipReason(MAX_FILE_SIZE)
)
}
if (isBinaryBuffer(buffer)) {
logger.info('Skipping binary Azure DevOps file', { path })
Expand Down
85 changes: 61 additions & 24 deletions apps/sim/connectors/dropbox/dropbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,18 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import { dropboxConnectorMeta } from '@/connectors/dropbox/meta'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { htmlToPlainText, parseTagDate } from '@/connectors/utils'
import {
CONNECTOR_MAX_FILE_BYTES,
ConnectorFileTooLargeError,
htmlToPlainText,
isSkippedDocument,
markSkipped,
parseTagDate,
readBodyWithLimit,
sizeLimitSkipReason,
stubOrSkipBySize,
takeIndexableWithinCap,
} from '@/connectors/utils'

const logger = createLogger('DropboxConnector')

Expand All @@ -23,7 +34,7 @@ const SUPPORTED_EXTENSIONS = new Set([
'.tsv',
])

const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES

interface DropboxFileEntry {
'.tag': 'file' | 'folder' | 'deleted'
Expand All @@ -44,16 +55,18 @@ interface DropboxListFolderResponse {
has_more: boolean
}

function isSupportedFile(entry: DropboxFileEntry): boolean {
if (entry['.tag'] !== 'file') return false
if (entry.is_downloadable === false) return false
if (entry.size && entry.size > MAX_FILE_SIZE) return false

const name = entry.name.toLowerCase()
const dotIndex = name.lastIndexOf('.')
function hasSupportedExtension(name: string): boolean {
const lower = name.toLowerCase()
const dotIndex = lower.lastIndexOf('.')
if (dotIndex === -1) return false
return SUPPORTED_EXTENSIONS.has(lower.slice(dotIndex))
}

return SUPPORTED_EXTENSIONS.has(name.slice(dotIndex))
/** A downloadable file with a supported extension, regardless of size. */
function isDownloadableFile(entry: DropboxFileEntry): boolean {
return (
entry['.tag'] === 'file' && entry.is_downloadable !== false && hasSupportedExtension(entry.name)
)
}

async function downloadFileContent(accessToken: string, filePath: string): Promise<string> {
Expand All @@ -69,7 +82,15 @@ async function downloadFileContent(accessToken: string, filePath: string): Promi
throw new Error(`Failed to download file ${filePath}: ${response.status}`)
}

const text = await response.text()
// Stream with a hard byte cap so a file whose listing metadata under-reported
// (or omitted) its size can never be fully buffered into memory. Oversize raises
// so getDocument can surface it as a skipped (failed) row rather than dropping it.
const buffer = await readBodyWithLimit(response, MAX_FILE_SIZE)
if (!buffer) {
throw new ConnectorFileTooLargeError(MAX_FILE_SIZE)
}

const text = buffer.toString('utf8')

if (filePath.endsWith('.html') || filePath.endsWith('.htm')) {
return htmlToPlainText(text)
Expand Down Expand Up @@ -162,23 +183,27 @@ export const dropboxConnector: ConnectorConfig = {
data = await response.json()
}

const supportedFiles = data.entries.filter(isSupportedFile)
// Keep oversized files and surface them as skipped (failed) documents instead
// of dropping them silently at listing time.
const candidateFiles = data.entries.filter(isDownloadableFile)

const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0

let documents = supportedFiles.map(fileToStub)
const stubs = candidateFiles.map((entry) =>
stubOrSkipBySize(fileToStub(entry), entry.size, MAX_FILE_SIZE)
)
Comment thread
icecrasher321 marked this conversation as resolved.

if (maxFiles > 0) {
const remaining = maxFiles - previouslyFetched
if (documents.length > remaining) {
documents = documents.slice(0, remaining)
}
}
const { documents, indexableCount, capReached } = takeIndexableWithinCap(
stubs,
isSkippedDocument,
maxFiles,
previouslyFetched
)

const totalFetched = previouslyFetched + documents.length
const totalFetched = previouslyFetched + indexableCount
if (syncContext) syncContext.totalDocsFetched = totalFetched
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
const hitLimit = capReached
if (hitLimit && syncContext) syncContext.listingCapped = true

return {
Expand Down Expand Up @@ -210,12 +235,24 @@ export const dropboxConnector: ConnectorConfig = {

const entry = (await response.json()) as DropboxFileEntry

if (!isSupportedFile(entry)) return null
if (!isDownloadableFile(entry)) return null

const content = await downloadFileContent(accessToken, entry.path_lower)
const stub = fileToStub(entry)
if (entry.size && entry.size > MAX_FILE_SIZE) {
return markSkipped(stub, sizeLimitSkipReason(MAX_FILE_SIZE))
}

let content: string
try {
content = await downloadFileContent(accessToken, entry.path_lower)
} catch (error) {
if (error instanceof ConnectorFileTooLargeError) {
return markSkipped(stub, sizeLimitSkipReason(error.limitBytes))
}
throw error
}
if (!content.trim()) return null

const stub = fileToStub(entry)
return { ...stub, content, contentDeferred: false }
} catch (error) {
logger.warn(`Failed to fetch document ${externalId}`, {
Expand Down
51 changes: 43 additions & 8 deletions apps/sim/connectors/github/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import { githubConnectorMeta } from '@/connectors/github/meta'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { parseTagDate } from '@/connectors/utils'
import {
CONNECTOR_MAX_FILE_BYTES,
markSkipped,
parseTagDate,
sizeLimitSkipReason,
stubOrSkipBySize,
takeIndexableWithinCap,
} from '@/connectors/utils'

const logger = createLogger('GitHubConnector')

const GITHUB_API_URL = 'https://api.github.com'
const BATCH_SIZE = 30
const GIT_SHA_PREFIX = 'git-sha:'
const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES
const BINARY_SNIFF_BYTES = 8000

/**
Expand Down Expand Up @@ -197,16 +204,25 @@ export const githubConnector: ConnectorConfig = {
} else {
const tree = await fetchTree(accessToken, owner, repo, branch)

// Filter by path prefix, extensions, and size
// Filter by path prefix and extensions. Oversized files are kept here and
// surfaced as skipped (failed) documents at stub time so they stay visible.
const filtered = tree.filter((item) => {
if (pathPrefix && !item.path.startsWith(pathPrefix)) return false
if (!matchesExtension(item.path, extSet)) return false
if (typeof item.size === 'number' && item.size > MAX_FILE_SIZE) return false
return true
})

// Apply max files limit
capped = maxFiles > 0 ? filtered.slice(0, maxFiles) : filtered
// Apply the max-files limit to indexable files only; oversized files within
// the capped window are kept (and surfaced as skipped) but never consume the cap.
capped =
maxFiles > 0
? takeIndexableWithinCap(
filtered,
(item) => Boolean(item.size && item.size > MAX_FILE_SIZE),
maxFiles,
0
).documents
: filtered
if (syncContext) syncContext.filteredTree = capped
}

Expand All @@ -223,7 +239,9 @@ export const githubConnector: ConnectorConfig = {
batchSize: batch.length,
})

const documents = batch.map((item) => treeItemToStub(owner, repo, branch, item))
const documents = batch.map((item) =>
stubOrSkipBySize(treeItemToStub(owner, repo, branch, item), item.size, MAX_FILE_SIZE)
)

const nextOffset = offset + BATCH_SIZE
const hasMore = nextOffset < capped.length
Expand Down Expand Up @@ -281,7 +299,24 @@ export const githubConnector: ConnectorConfig = {
size,
limit: MAX_FILE_SIZE,
})
return null
return markSkipped(
{
externalId,
title: path.split('/').pop() || path,
content: '',
mimeType: 'text/plain',
sourceUrl: `https://github.com/${owner}/${repo}/blob/${branch.split('/').map(encodeURIComponent).join('/')}/${path.split('/').map(encodeURIComponent).join('/')}`,
contentHash: `${GIT_SHA_PREFIX}${data.sha as string}`,
metadata: {
path,
sha: data.sha as string,
size,
branch,
repository: `${owner}/${repo}`,
},
},
sizeLimitSkipReason(MAX_FILE_SIZE)
)
}

const rawContent = (data.content as string) || ''
Expand Down
Loading
Loading