-
-
Notifications
You must be signed in to change notification settings - Fork 959
fix(webapp): reconcile trace with run lifecycle to handle ClickHouse lag #2875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bharathkumar39293
wants to merge
11
commits into
triggerdotdev:main
Choose a base branch
from
bharathkumar39293:fix/task-run-refresh-lag-2798
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+270
−48
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
bfda16b
Apply throttle changes
bharathkumar39293 e8d84fa
fix(webapp): reconcile trace with run lifecycle to handle clickhouse lag
bharathkumar39293 b591e28
chore(webapp): revert unrelated throttle changes
bharathkumar39293 425c3b4
style(webapp): clean up imports and logic in RunPresenter
bharathkumar39293 4b19a1f
fix(webapp): remove root span assumption and fix duplication in RunPr…
bharathkumar39293 e030cc2
fix(webapp): optimize reconciliation to O(1) and add trailing-edge th…
bharathkumar39293 4c3f0e8
fix(webapp): add missing createdAt to runData for accurate duration r…
bharathkumar39293 4dfadb9
refactor(webapp): move reconciliation logic to separate file for bett…
bharathkumar39293 80c5baf
refactor(webapp): complete modularization of reconciliation logic
bharathkumar39293 17dbcd0
refactor(webapp): modularize reconciliation logic and move tests to c…
bharathkumar39293 960b23f
refactor(webapp): remove unused import in RunPresenter.server.ts
bharathkumar39293 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
116 changes: 116 additions & 0 deletions
116
apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| import { describe, it, expect } from "vitest"; | ||
| import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server"; | ||
| import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; | ||
|
|
||
| describe("reconcileTraceWithRunLifecycle", () => { | ||
| const rootSpanId = "root-span-id"; | ||
| const createdAt = new Date("2024-01-01T00:00:00Z"); | ||
| const completedAt = new Date("2024-01-01T00:00:05Z"); | ||
|
|
||
| const runData: any = { | ||
| isFinished: true, | ||
| status: "COMPLETED_SUCCESSFULLY", | ||
| createdAt, | ||
| completedAt, | ||
| rootTaskRun: null, | ||
| }; | ||
|
|
||
| const initialEvents = [ | ||
| { | ||
| id: rootSpanId, | ||
| data: { | ||
| isPartial: true, | ||
| duration: millisecondsToNanoseconds(1000), // 1s, less than the 5s run duration | ||
| isError: false, | ||
| }, | ||
| }, | ||
| { | ||
| id: "child-span-id", | ||
| data: { | ||
| isPartial: false, | ||
| duration: millisecondsToNanoseconds(500), | ||
| isError: false, | ||
| }, | ||
| }, | ||
| ]; | ||
|
|
||
| it("should reconcile a finished run with lagging partial telemetry", () => { | ||
| const totalDuration = millisecondsToNanoseconds(1000); | ||
| const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, initialEvents as any, totalDuration); | ||
|
|
||
| expect(result.rootSpanStatus).toBe("completed"); | ||
|
|
||
| const rootEvent = result.events.find((e: any) => e.id === rootSpanId); | ||
| expect(rootEvent?.data.isPartial).toBe(false); | ||
| // 5s duration = 5000ms | ||
| expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); | ||
| expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); | ||
| }); | ||
|
|
||
| it("should not override duration if Clickhouse already has a longer finished duration", () => { | ||
| const longDuration = millisecondsToNanoseconds(10000); | ||
| const finishedEvents = [ | ||
| { | ||
| id: rootSpanId, | ||
| data: { | ||
| isPartial: false, | ||
| duration: longDuration, | ||
| isError: false, | ||
| }, | ||
| }, | ||
| ]; | ||
|
|
||
| const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, finishedEvents as any, longDuration); | ||
|
|
||
| const rootEvent = result.events.find((e: any) => e.id === rootSpanId); | ||
| expect(rootEvent?.data.duration).toBe(longDuration); | ||
| expect(rootEvent?.data.isPartial).toBe(false); | ||
| expect(result.totalDuration).toBe(longDuration); | ||
| }); | ||
|
|
||
| it("should handle unfinished runs without modification", () => { | ||
| const unfinishedRun = { ...runData, isFinished: false, completedAt: null }; | ||
| const totalDuration = millisecondsToNanoseconds(1000); | ||
| const result = reconcileTraceWithRunLifecycle(unfinishedRun, rootSpanId, initialEvents as any, totalDuration); | ||
|
|
||
| expect(result.rootSpanStatus).toBe("executing"); | ||
|
|
||
| const rootEvent = result.events.find((e: any) => e.id === rootSpanId); | ||
| expect(rootEvent?.data.isPartial).toBe(true); | ||
| expect(rootEvent?.data.duration).toBe(millisecondsToNanoseconds(1000)); | ||
| }); | ||
|
|
||
| it("should reconcile failed runs correctly", () => { | ||
| const failedRun = { ...runData, status: "COMPLETED_WITH_ERRORS" }; | ||
| const result = reconcileTraceWithRunLifecycle(failedRun, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); | ||
|
|
||
| expect(result.rootSpanStatus).toBe("failed"); | ||
| const rootEvent = result.events.find((e: any) => e.id === rootSpanId); | ||
| expect(rootEvent?.data.isError).toBe(true); | ||
| expect(rootEvent?.data.isPartial).toBe(false); | ||
| }); | ||
|
|
||
| it("should use rootTaskRun createdAt if available for duration calculation", () => { | ||
| const rootTaskCreatedAt = new Date("2023-12-31T23:59:50Z"); // 10s before run.createdAt | ||
| const runDataWithRoot: any = { | ||
| ...runData, | ||
| rootTaskRun: { createdAt: rootTaskCreatedAt }, | ||
| }; | ||
|
|
||
| const result = reconcileTraceWithRunLifecycle(runDataWithRoot, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); | ||
|
|
||
| // Duration should be from 23:59:50 to 00:00:05 = 15s | ||
| const rootEvent = result.events.find((e: any) => e.id === rootSpanId); | ||
| expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); | ||
| expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); | ||
| }); | ||
|
|
||
| it("should handle missing root span gracefully", () => { | ||
| const result = reconcileTraceWithRunLifecycle(runData, "non-existent-id", initialEvents as any, millisecondsToNanoseconds(1000)); | ||
|
|
||
| expect(result.rootSpanStatus).toBe("completed"); | ||
| expect(result.events).toEqual(initialEvents); | ||
| // totalDuration should still be updated to postgres duration even if root span is missing from events list | ||
| expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); | ||
| }); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; | ||
| import { isFailedRunStatus } from "~/v3/taskStatus"; | ||
| import type { TaskRunStatus } from "@trigger.dev/database"; | ||
|
|
||
| export type ReconcileRunData = { | ||
| isFinished: boolean; | ||
| status: TaskRunStatus; | ||
| createdAt: Date; | ||
| completedAt: Date | null; | ||
| rootTaskRun: { createdAt: Date } | null; | ||
| }; | ||
|
|
||
| export type ReconcileEvent = { | ||
| id: string; | ||
| data: { | ||
| isPartial: boolean; | ||
| isError: boolean; | ||
| duration?: number | null; | ||
| }; | ||
| }; | ||
|
|
||
| export type ReconcileResult = { | ||
| events: any[]; | ||
| totalDuration: number; | ||
| rootSpanStatus: "executing" | "completed" | "failed"; | ||
| }; | ||
|
|
||
| // NOTE: Clickhouse trace ingestion is eventually consistent. | ||
| // When a run is marked finished in Postgres, we reconcile the | ||
| // root span to reflect completion even if telemetry is still partial. | ||
| // This is a deliberate UI-layer tradeoff to prevent stale or "stuck" | ||
| // run states in the dashboard. | ||
| export function reconcileTraceWithRunLifecycle( | ||
| runData: ReconcileRunData, | ||
| rootSpanId: string, | ||
| events: any[], | ||
| totalDuration: number | ||
| ): ReconcileResult { | ||
| const rootEvent = events[0]; | ||
| const isActualRoot = rootEvent?.id === rootSpanId; | ||
|
|
||
| const currentStatus: "executing" | "completed" | "failed" = | ||
| isActualRoot && rootEvent | ||
| ? rootEvent.data.isError | ||
| ? "failed" | ||
| : !rootEvent.data.isPartial | ||
| ? "completed" | ||
| : "executing" | ||
| : "executing"; | ||
|
|
||
| if (!runData.isFinished) { | ||
| return { events, totalDuration, rootSpanStatus: currentStatus }; | ||
| } | ||
|
|
||
| const postgresRunDuration = runData.completedAt | ||
| ? millisecondsToNanoseconds( | ||
| runData.completedAt.getTime() - | ||
| (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() | ||
| ) | ||
| : 0; | ||
|
|
||
| const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); | ||
|
|
||
| // We only need to potentially update the root event (the first one) if it matches our ID | ||
| if (isActualRoot && rootEvent && rootEvent.data.isPartial) { | ||
| const updatedEvents = [...events]; | ||
| updatedEvents[0] = { | ||
| ...rootEvent, | ||
| data: { | ||
| ...rootEvent.data, | ||
| isPartial: false, | ||
| duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration), | ||
| isError: isFailedRunStatus(runData.status), | ||
| }, | ||
| }; | ||
|
|
||
| return { | ||
| events: updatedEvents, | ||
| totalDuration: updatedTotalDuration, | ||
| rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", | ||
| }; | ||
| } | ||
|
|
||
| return { | ||
| events, | ||
| totalDuration: updatedTotalDuration, | ||
| rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", | ||
| }; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,28 @@ | ||
| //From: https://kettanaito.com/blog/debounce-vs-throttle | ||
|
|
||
| /** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ | ||
| /** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ | ||
| export function throttle( | ||
| func: (...args: any[]) => void, | ||
| durationMs: number | ||
| ): (...args: any[]) => void { | ||
| let isPrimedToFire = false; | ||
|
|
||
| return (...args: any[]) => { | ||
| if (!isPrimedToFire) { | ||
| isPrimedToFire = true; | ||
| let timeoutId: NodeJS.Timeout | null = null; | ||
| let nextArgs: any[] | null = null; | ||
|
|
||
| setTimeout(() => { | ||
| func(...args); | ||
| isPrimedToFire = false; | ||
| }, durationMs); | ||
| const wrapped = (...args: any[]) => { | ||
| if (timeoutId) { | ||
| nextArgs = args; | ||
| return; | ||
| } | ||
|
|
||
| func(...args); | ||
|
|
||
| timeoutId = setTimeout(() => { | ||
| timeoutId = null; | ||
| if (nextArgs) { | ||
| const argsToUse = nextArgs; | ||
| nextArgs = null; | ||
| wrapped(...argsToUse); | ||
| } | ||
| }, durationMs); | ||
| }; | ||
|
|
||
| return wrapped; | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this throttle change is here now, just curious why you keep adding this to your PRs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the throttle change: It was originally in PR #2874, but after your feedback that it was unrelated to the SDK fix, I've isolated it here. In this PR (#2875), it's one of the two main components of the fix for the dashboard refresh bug (#2798). The original throttle could drop final status updates; this trailing-edge version ensures the completion events are always delivered to the UI