diff --git a/.changeset/idempotency-key-catalog-eviction.md b/.changeset/idempotency-key-catalog-eviction.md new file mode 100644 index 00000000000..b14ce3efd9d --- /dev/null +++ b/.changeset/idempotency-key-catalog-eviction.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"trigger.dev": patch +--- + +Fix idempotency key metadata (original key + scope) being silently dropped when a single run creates more than 1000 idempotency keys. The in-process catalog that maps a key's hash back to its original key/scope is no longer bounded to 1000 entries, so `idempotencyKeys.create()` results retain their metadata regardless of how many are created in a run. The catalog is now cleared at each run boundary so it does not accumulate across warm-start runs. diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index b7f621954c9..9953e07c831 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -35,6 +35,7 @@ import { realtimeStreams, inputStreams, sessionStreams, + resetIdempotencyKeyCatalog, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -378,6 +379,7 @@ function resetExecutionEnvironment() { taskContext.disable(); standardTraceContextManager.reset(); standardHeartbeatsManager.reset(); + resetIdempotencyKeyCatalog(); // Wait for all streams to finish before completing the run waitUntil.register({ diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index ed8fc9be5e7..88b1b3e6c2f 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -34,6 +34,7 @@ import { realtimeStreams, inputStreams, sessionStreams, + resetIdempotencyKeyCatalog, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -350,6 +351,7 @@ function resetExecutionEnvironment() { taskContext.disable(); standardTraceContextManager.reset(); standardHeartbeatsManager.reset(); + resetIdempotencyKeyCatalog(); // Wait for all streams to finish before completing the run waitUntil.register({ diff --git a/packages/core/src/v3/idempotency-key-catalog/catalog.ts b/packages/core/src/v3/idempotency-key-catalog/catalog.ts index bcc74c90c9b..bccad383a23 100644 --- a/packages/core/src/v3/idempotency-key-catalog/catalog.ts +++ b/packages/core/src/v3/idempotency-key-catalog/catalog.ts @@ -8,4 +8,5 @@ export type IdempotencyKeyOptions = { export interface IdempotencyKeyCatalog { registerKeyOptions(hash: string, options: IdempotencyKeyOptions): void; getKeyOptions(hash: string): IdempotencyKeyOptions | undefined; + clear(): void; } diff --git a/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.test.ts b/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.test.ts new file mode 100644 index 00000000000..29c6aa134d4 --- /dev/null +++ b/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.test.ts @@ -0,0 +1,56 @@ +import { describe, it, expect } from "vitest"; +import { InMemoryIdempotencyKeyCatalog } from "./inMemoryIdempotencyKeyCatalog.js"; + +describe("InMemoryIdempotencyKeyCatalog", () => { + it("stores and retrieves options", () => { + const catalog = new InMemoryIdempotencyKeyCatalog(); + const options = { key: "my-key", scope: "global" as const }; + + catalog.registerKeyOptions("hash1", options); + + expect(catalog.getKeyOptions("hash1")).toEqual(options); + }); + + it("returns undefined for non-existent keys", () => { + const catalog = new InMemoryIdempotencyKeyCatalog(); + + expect(catalog.getKeyOptions("non-existent")).toBeUndefined(); + }); + + it("updates options when registering the same hash twice", () => { + const catalog = new InMemoryIdempotencyKeyCatalog(); + + catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); + catalog.registerKeyOptions("hash1", { key: "key1-updated", scope: "run" }); + + expect(catalog.getKeyOptions("hash1")).toEqual({ key: "key1-updated", scope: "run" }); + }); + + it("retains every entry regardless of count (no eviction)", () => { + const catalog = new InMemoryIdempotencyKeyCatalog(); + const count = 5000; + + for (let i = 0; i < count; i++) { + catalog.registerKeyOptions(`hash${i}`, { key: `key${i}`, scope: "global" }); + } + + // The very first entry must still be present — nothing is silently evicted. + expect(catalog.getKeyOptions("hash0")).toEqual({ key: "key0", scope: "global" }); + expect(catalog.getKeyOptions(`hash${count - 1}`)).toEqual({ + key: `key${count - 1}`, + scope: "global", + }); + }); + + it("clear() removes all entries", () => { + const catalog = new InMemoryIdempotencyKeyCatalog(); + + catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); + catalog.registerKeyOptions("hash2", { key: "key2", scope: "run" }); + + catalog.clear(); + + expect(catalog.getKeyOptions("hash1")).toBeUndefined(); + expect(catalog.getKeyOptions("hash2")).toBeUndefined(); + }); +}); diff --git a/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.ts b/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.ts new file mode 100644 index 00000000000..6c1996a930d --- /dev/null +++ b/packages/core/src/v3/idempotency-key-catalog/inMemoryIdempotencyKeyCatalog.ts @@ -0,0 +1,25 @@ +import type { IdempotencyKeyCatalog, IdempotencyKeyOptions } from "./catalog.js"; + +/** + * Maps an idempotency-key hash back to the original user-provided key and scope. + * + * The mapping is held for the lifetime of a single run: the worker clears it at + * each run boundary (warm starts reuse the process), so it never accumulates + * across runs. Within a run every registered key is retained regardless of how + * many are created, so the key/scope metadata is never silently dropped. + */ +export class InMemoryIdempotencyKeyCatalog implements IdempotencyKeyCatalog { + private cache = new Map(); + + registerKeyOptions(hash: string, options: IdempotencyKeyOptions): void { + this.cache.set(hash, options); + } + + getKeyOptions(hash: string): IdempotencyKeyOptions | undefined { + return this.cache.get(hash); + } + + clear(): void { + this.cache.clear(); + } +} diff --git a/packages/core/src/v3/idempotency-key-catalog/index.ts b/packages/core/src/v3/idempotency-key-catalog/index.ts index 685639ed498..1b395b8f243 100644 --- a/packages/core/src/v3/idempotency-key-catalog/index.ts +++ b/packages/core/src/v3/idempotency-key-catalog/index.ts @@ -2,7 +2,7 @@ const API_NAME = "idempotency-key-catalog"; import { getGlobal, registerGlobal } from "../utils/globals.js"; import type { IdempotencyKeyCatalog, IdempotencyKeyOptions } from "./catalog.js"; -import { LRUIdempotencyKeyCatalog } from "./lruIdempotencyKeyCatalog.js"; +import { InMemoryIdempotencyKeyCatalog } from "./inMemoryIdempotencyKeyCatalog.js"; export class IdempotencyKeyCatalogAPI { private static _instance?: IdempotencyKeyCatalogAPI; @@ -24,11 +24,15 @@ export class IdempotencyKeyCatalogAPI { return this.#getCatalog().getKeyOptions(hash); } + public clear(): void { + this.#getCatalog().clear(); + } + #getCatalog(): IdempotencyKeyCatalog { let catalog = getGlobal(API_NAME); if (!catalog) { - // Auto-initialize with LRU catalog on first access - catalog = new LRUIdempotencyKeyCatalog(); + // Auto-initialize on first access + catalog = new InMemoryIdempotencyKeyCatalog(); registerGlobal(API_NAME, catalog, true); } return catalog; diff --git a/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.test.ts b/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.test.ts deleted file mode 100644 index ba523f60407..00000000000 --- a/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.test.ts +++ /dev/null @@ -1,209 +0,0 @@ -import { describe, it, expect } from "vitest"; -import { LRUIdempotencyKeyCatalog } from "./lruIdempotencyKeyCatalog.js"; - -describe("LRUIdempotencyKeyCatalog", () => { - describe("registerKeyOptions and getKeyOptions", () => { - it("should store and retrieve options", () => { - const catalog = new LRUIdempotencyKeyCatalog(); - const options = { key: "my-key", scope: "global" as const }; - - catalog.registerKeyOptions("hash1", options); - - expect(catalog.getKeyOptions("hash1")).toEqual(options); - }); - - it("should return undefined for non-existent keys", () => { - const catalog = new LRUIdempotencyKeyCatalog(); - - expect(catalog.getKeyOptions("non-existent")).toBeUndefined(); - }); - - it("should store multiple keys", () => { - const catalog = new LRUIdempotencyKeyCatalog(); - const options1 = { key: "key1", scope: "global" as const }; - const options2 = { key: "key2", scope: "run" as const }; - const options3 = { key: "key3", scope: "attempt" as const }; - - catalog.registerKeyOptions("hash1", options1); - catalog.registerKeyOptions("hash2", options2); - catalog.registerKeyOptions("hash3", options3); - - expect(catalog.getKeyOptions("hash1")).toEqual(options1); - expect(catalog.getKeyOptions("hash2")).toEqual(options2); - expect(catalog.getKeyOptions("hash3")).toEqual(options3); - }); - - it("should update options when registering same key twice", () => { - const catalog = new LRUIdempotencyKeyCatalog(); - const options1 = { key: "key1", scope: "global" as const }; - const options2 = { key: "key1-updated", scope: "run" as const }; - - catalog.registerKeyOptions("hash1", options1); - catalog.registerKeyOptions("hash1", options2); - - expect(catalog.getKeyOptions("hash1")).toEqual(options2); - }); - }); - - describe("LRU eviction", () => { - it("should evict oldest entry when over capacity", () => { - const catalog = new LRUIdempotencyKeyCatalog(3); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - - // All three should exist - expect(catalog.getKeyOptions("hash1")).toBeDefined(); - expect(catalog.getKeyOptions("hash2")).toBeDefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - - // After the gets above, the LRU order from oldest to newest is: hash1, hash2, hash3 - // (each get moves the key to the most recent position) - - // Add a fourth - hash1 should be evicted (it was accessed first, so it's the oldest) - catalog.registerKeyOptions("hash4", { key: "key4", scope: "global" }); - - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - expect(catalog.getKeyOptions("hash2")).toBeDefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - expect(catalog.getKeyOptions("hash4")).toBeDefined(); - }); - - it("should evict least recently registered entry when capacity exceeded", () => { - const catalog = new LRUIdempotencyKeyCatalog(3); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - - // Adding fourth should evict hash1 (oldest) - catalog.registerKeyOptions("hash4", { key: "key4", scope: "global" }); - - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - expect(catalog.getKeyOptions("hash2")).toBeDefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - expect(catalog.getKeyOptions("hash4")).toBeDefined(); - }); - - it("should evict multiple entries when adding many at once would exceed capacity", () => { - const catalog = new LRUIdempotencyKeyCatalog(2); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - catalog.registerKeyOptions("hash4", { key: "key4", scope: "global" }); - - // Only hash3 and hash4 should remain - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - expect(catalog.getKeyOptions("hash2")).toBeUndefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - expect(catalog.getKeyOptions("hash4")).toBeDefined(); - }); - - it("should work with maxSize of 1", () => { - const catalog = new LRUIdempotencyKeyCatalog(1); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - expect(catalog.getKeyOptions("hash1")).toBeDefined(); - - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - expect(catalog.getKeyOptions("hash2")).toBeDefined(); - }); - }); - - describe("LRU ordering", () => { - it("should move accessed key to most recent position", () => { - const catalog = new LRUIdempotencyKeyCatalog(3); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - - // Access hash1, moving it to most recent - catalog.getKeyOptions("hash1"); - - // Add hash4 - should evict hash2 (now the oldest) - catalog.registerKeyOptions("hash4", { key: "key4", scope: "global" }); - - expect(catalog.getKeyOptions("hash1")).toBeDefined(); - expect(catalog.getKeyOptions("hash2")).toBeUndefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - expect(catalog.getKeyOptions("hash4")).toBeDefined(); - }); - - it("should move re-registered key to most recent position", () => { - const catalog = new LRUIdempotencyKeyCatalog(3); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - - // Re-register hash1, moving it to most recent - catalog.registerKeyOptions("hash1", { key: "key1-updated", scope: "run" }); - - // Add hash4 - should evict hash2 (now the oldest) - catalog.registerKeyOptions("hash4", { key: "key4", scope: "global" }); - - expect(catalog.getKeyOptions("hash1")).toEqual({ key: "key1-updated", scope: "run" }); - expect(catalog.getKeyOptions("hash2")).toBeUndefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - expect(catalog.getKeyOptions("hash4")).toBeDefined(); - }); - - it("should not affect order when getting non-existent key", () => { - const catalog = new LRUIdempotencyKeyCatalog(2); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - catalog.registerKeyOptions("hash2", { key: "key2", scope: "global" }); - - // Try to get non-existent key - catalog.getKeyOptions("non-existent"); - - // Add hash3 - should still evict hash1 (oldest) - catalog.registerKeyOptions("hash3", { key: "key3", scope: "global" }); - - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - expect(catalog.getKeyOptions("hash2")).toBeDefined(); - expect(catalog.getKeyOptions("hash3")).toBeDefined(); - }); - }); - - describe("default maxSize", () => { - it("should use default maxSize of 1000", () => { - const catalog = new LRUIdempotencyKeyCatalog(); - - // Register 1001 entries - for (let i = 0; i < 1001; i++) { - catalog.registerKeyOptions(`hash${i}`, { key: `key${i}`, scope: "global" }); - } - - // First entry should be evicted - expect(catalog.getKeyOptions("hash0")).toBeUndefined(); - // Last entry should exist - expect(catalog.getKeyOptions("hash1000")).toBeDefined(); - }); - }); - - describe("edge cases", () => { - it("should handle negative maxSize by clamping to 0", () => { - const catalog = new LRUIdempotencyKeyCatalog(-5); - - // With maxSize clamped to 0, nothing should be stored - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - - // Should be immediately evicted since maxSize is 0 - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - }); - - it("should handle maxSize of 0", () => { - const catalog = new LRUIdempotencyKeyCatalog(0); - - catalog.registerKeyOptions("hash1", { key: "key1", scope: "global" }); - - // Should be immediately evicted since maxSize is 0 - expect(catalog.getKeyOptions("hash1")).toBeUndefined(); - }); - }); -}); diff --git a/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.ts b/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.ts deleted file mode 100644 index 2fcd072d15c..00000000000 --- a/packages/core/src/v3/idempotency-key-catalog/lruIdempotencyKeyCatalog.ts +++ /dev/null @@ -1,36 +0,0 @@ -import type { IdempotencyKeyCatalog, IdempotencyKeyOptions } from "./catalog.js"; - -export class LRUIdempotencyKeyCatalog implements IdempotencyKeyCatalog { - private cache: Map; - private readonly maxSize: number; - - constructor(maxSize: number = 1_000) { - this.cache = new Map(); - // Clamp to non-negative to prevent infinite loop in eviction - this.maxSize = Math.max(0, maxSize); - } - - registerKeyOptions(hash: string, options: IdempotencyKeyOptions): void { - // Delete and re-add to update position (most recently used) - this.cache.delete(hash); - this.cache.set(hash, options); - - // Evict oldest entries if over capacity - while (this.cache.size > this.maxSize) { - const oldest = this.cache.keys().next().value; - if (oldest !== undefined) { - this.cache.delete(oldest); - } - } - } - - getKeyOptions(hash: string): IdempotencyKeyOptions | undefined { - const options = this.cache.get(hash); - if (options) { - // Move to end (most recently used) - this.cache.delete(hash); - this.cache.set(hash, options); - } - return options; - } -} diff --git a/packages/core/src/v3/idempotencyKeys.test.ts b/packages/core/src/v3/idempotencyKeys.test.ts new file mode 100644 index 00000000000..f511a85f869 --- /dev/null +++ b/packages/core/src/v3/idempotencyKeys.test.ts @@ -0,0 +1,42 @@ +import { describe, it, expect } from "vitest"; +import { + createIdempotencyKey, + getIdempotencyKeyOptions, + resetIdempotencyKeyCatalog, +} from "./idempotencyKeys.js"; + +describe("idempotencyKeys metadata retention", () => { + it("retains key/scope options for every key created in a run, even beyond 1000", async () => { + const count = 3000; + const keys: string[] = []; + + for (let i = 0; i < count; i++) { + const key = await createIdempotencyKey(`item-${i}`, { scope: "global" }); + keys.push(key); + } + + // The very first key created should still resolve its original options. + // With a fixed-size LRU catalog (cap 1000), the earliest ~2000 keys are + // silently evicted and this returns undefined. + const firstOptions = getIdempotencyKeyOptions(keys[0]!); + expect(firstOptions).toEqual({ key: "item-0", scope: "global" }); + + // Every key should resolve to its own original options. + for (let i = 0; i < count; i++) { + const options = getIdempotencyKeyOptions(keys[i]!); + expect(options, `options missing for key index ${i}`).toEqual({ + key: `item-${i}`, + scope: "global", + }); + } + }); + + it("forgets options after the catalog is reset at a run boundary", async () => { + const key = await createIdempotencyKey("boundary-key", { scope: "global" }); + expect(getIdempotencyKeyOptions(key)).toEqual({ key: "boundary-key", scope: "global" }); + + resetIdempotencyKeyCatalog(); + + expect(getIdempotencyKeyOptions(key)).toBeUndefined(); + }); +}); diff --git a/packages/core/src/v3/idempotencyKeys.ts b/packages/core/src/v3/idempotencyKeys.ts index aafe9f906f0..8e3d6e9da14 100644 --- a/packages/core/src/v3/idempotencyKeys.ts +++ b/packages/core/src/v3/idempotencyKeys.ts @@ -38,6 +38,17 @@ export function getIdempotencyKeyOptions( return undefined; } +/** + * Clears the in-process idempotency key catalog. + * + * The catalog maps an idempotency-key hash back to its original key and scope so + * the SDK can attach that metadata when triggering. The worker calls this at each + * run boundary so the mapping does not accumulate across warm-start runs. + */ +export function resetIdempotencyKeyCatalog(): void { + idempotencyKeyCatalog.clear(); +} + export function isIdempotencyKey( value: string | string[] | IdempotencyKey ): value is IdempotencyKey {