Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-synced-data-writes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/query-db-collection': patch
---

Fix syncedData not updating when manual write operations (writeUpsert, writeInsert, etc.) are called after async operations in mutation handlers. Previously, the sync transaction would be blocked by the persisting user transaction, leaving syncedData stale until the next sync cycle.
23 changes: 22 additions & 1 deletion packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ interface PendingSyncedTransaction<
upserts: Map<TKey, T>
deletes: Set<TKey>
}
/**
* When true, this transaction should be processed immediately even if there
* are persisting user transactions. Used by manual write operations (writeInsert,
* writeUpdate, writeDelete, writeUpsert) which need synchronous updates to syncedData.
*/
immediate?: boolean
}

export class CollectionStateManager<
Expand Down Expand Up @@ -437,13 +443,17 @@ export class CollectionStateManager<
committedSyncedTransactions,
uncommittedSyncedTransactions,
hasTruncateSync,
hasImmediateSync,
} = this.pendingSyncedTransactions.reduce(
(acc, t) => {
if (t.committed) {
acc.committedSyncedTransactions.push(t)
if (t.truncate === true) {
acc.hasTruncateSync = true
}
if (t.immediate === true) {
acc.hasImmediateSync = true
}
} else {
acc.uncommittedSyncedTransactions.push(t)
}
Expand All @@ -457,10 +467,21 @@ export class CollectionStateManager<
PendingSyncedTransaction<TOutput, TKey>
>,
hasTruncateSync: false,
hasImmediateSync: false,
},
)

if (!hasPersistingTransaction || hasTruncateSync) {
// Process committed transactions if:
// 1. No persisting user transaction (normal sync flow), OR
// 2. There's a truncate operation (must be processed immediately), OR
// 3. There's an immediate transaction (manual writes must be processed synchronously)
//
// Note: When hasImmediateSync or hasTruncateSync is true, we process ALL committed
// sync transactions (not just the immediate/truncate ones). This is intentional for
// ordering correctness: if we only processed the immediate transaction, earlier
// non-immediate transactions would be applied later and could overwrite newer state.
// Processing all committed transactions together preserves causal ordering.
if (!hasPersistingTransaction || hasTruncateSync || hasImmediateSync) {
// Set flag to prevent redundant optimistic state recalculations
this.isCommittingSyncTransactions = true

Expand Down
3 changes: 2 additions & 1 deletion packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ export class CollectionSyncManager<
const syncRes = normalizeSyncFnResult(
this.config.sync.sync({
collection: this.collection,
begin: () => {
begin: (options?: { immediate?: boolean }) => {
this.state.pendingSyncedTransactions.push({
committed: false,
operations: [],
deletedKeys: new Set(),
immediate: options?.immediate,
})
},
write: (
Expand Down
7 changes: 6 additions & 1 deletion packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,12 @@ export interface SyncConfig<
> {
sync: (params: {
collection: Collection<T, TKey, any, any, any>
begin: () => void
/**
* Begin a new sync transaction.
* @param options.immediate - When true, the transaction will be processed immediately
* even if there are persisting user transactions. Used by manual write operations.
*/
begin: (options?: { immediate?: boolean }) => void
write: (message: ChangeMessageOrDeleteKeyMessage<T, TKey>) => void
commit: () => void
markReady: () => void
Expand Down
11 changes: 9 additions & 2 deletions packages/query-db-collection/src/manual-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ export interface SyncContext<
queryClient: QueryClient
queryKey: Array<unknown>
getKey: (item: TRow) => TKey
begin: () => void
/**
* Begin a new sync transaction.
* @param options.immediate - When true, the transaction will be processed immediately
* even if there are persisting user transactions. Used by manual write operations.
*/
begin: (options?: { immediate?: boolean }) => void
write: (message: Omit<ChangeMessage<TRow>, `key`>) => void
commit: () => void
/**
Expand Down Expand Up @@ -144,7 +149,9 @@ export function performWriteOperations<
const normalized = normalizeOperations(operations, ctx)
validateOperations(normalized, ctx)

ctx.begin()
// Use immediate: true to ensure syncedData is updated synchronously,
// even when called from within a mutationFn with an active persisting transaction
ctx.begin({ immediate: true })

for (const op of normalized) {
switch (op.type) {
Expand Down
93 changes: 93 additions & 0 deletions packages/query-db-collection/tests/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,99 @@ describe(`QueryCollection`, () => {
expect(todo?.id).not.toBe(clientId)
})

it(`should update syncedData immediately when writeUpsert is called after async API in onUpdate handler`, async () => {
// Reproduces bug where syncedData shows stale values when writeUpsert is called
// AFTER an async API call in a mutation handler. The async await causes the
// transaction to be added to state.transactions before writeUpsert runs,
// which means commitPendingTransactions() sees hasPersistingTransaction=true
// and would skip processing the sync transaction without the immediate flag.
const queryKey = [`writeUpsert-after-api-test`]

type Brand = {
id: string
brandName: string
}

const serverBrands: Array<Brand> = [{ id: `123`, brandName: `A` }]

const queryFn = vi.fn().mockImplementation(async () => {
return [...serverBrands]
})

// Track syncedData state immediately after writeUpsert
let syncedDataAfterWriteUpsert: Brand | undefined
let hasPersistingTransactionDuringWrite = false

const collection = createCollection(
queryCollectionOptions<Brand>({
id: `writeUpsert-after-api-test`,
queryKey,
queryFn,
queryClient,
getKey: (item: Brand) => item.id,
startSync: true,
onUpdate: async ({ transaction }) => {
const updates = transaction.mutations.map((m) => m.modified)

// Simulate async API call - THIS IS KEY!
// After this await, the transaction will be in state.transactions
await new Promise((resolve) => setTimeout(resolve, 10))

// Check if there's now a persisting transaction
hasPersistingTransactionDuringWrite = Array.from(
collection._state.transactions.values(),
).some((tx) => tx.state === `persisting`)

// Update server state
for (const update of updates) {
const idx = serverBrands.findIndex((b) => b.id === update.id)
if (idx !== -1) {
serverBrands[idx] = { ...serverBrands[idx], ...update }
}
}

// Write the server response back to syncedData
// Without the immediate flag, this would be blocked by the persisting transaction
collection.utils.writeBatch(() => {
for (const update of updates) {
collection.utils.writeUpsert(update)
}
})

// Check syncedData IMMEDIATELY after writeUpsert
syncedDataAfterWriteUpsert = collection._state.syncedData.get(`123`)

return { refetch: false }
},
}),
)

await vi.waitFor(() => {
expect(collection.status).toBe(`ready`)
})

// Verify initial state
expect(collection._state.syncedData.get(`123`)?.brandName).toBe(`A`)

// Update brandName from A to B
collection.update(`123`, (draft) => {
draft.brandName = `B`
})

// Wait for mutation to complete
await flushPromises()
await new Promise((resolve) => setTimeout(resolve, 50))

// Verify we had a persisting transaction during the write
expect(hasPersistingTransactionDuringWrite).toBe(true)

// The CRITICAL assertion: syncedData should have been updated IMMEDIATELY after writeUpsert
// Without the fix, this would fail because commitPendingTransactions() would skip
// processing due to hasPersistingTransaction being true
expect(syncedDataAfterWriteUpsert).toBeDefined()
expect(syncedDataAfterWriteUpsert?.brandName).toBe(`B`)
})

it(`should not rollback object field updates after server response with refetch: false`, async () => {
const queryKey = [`object-field-update-test`]

Expand Down
Loading