@@ -916,6 +916,189 @@ describe("TriggerChatTransport", () => {
916916 } ) ;
917917 } ) ;
918918
919+ describe ( "lastEventId tracking" , ( ) => {
920+ it ( "should pass lastEventId to SSE subscription on subsequent turns" , async ( ) => {
921+ const controlChunk = {
922+ type : "__trigger_waitpoint_ready" ,
923+ tokenId : "wp_token_eid" ,
924+ publicAccessToken : "wp_access_eid" ,
925+ } ;
926+
927+ let triggerCallCount = 0 ;
928+ const streamFetchCalls : { url : string ; headers : Record < string , string > } [ ] = [ ] ;
929+
930+ global . fetch = vi . fn ( ) . mockImplementation ( async ( url : string | URL , init ?: RequestInit ) => {
931+ const urlStr = typeof url === "string" ? url : url . toString ( ) ;
932+
933+ if ( urlStr . includes ( "/api/v1/tasks/" ) && urlStr . includes ( "/trigger" ) ) {
934+ triggerCallCount ++ ;
935+ return new Response (
936+ JSON . stringify ( { id : "run_eid" } ) ,
937+ {
938+ status : 200 ,
939+ headers : {
940+ "content-type" : "application/json" ,
941+ "x-trigger-jwt" : "pub_token_eid" ,
942+ } ,
943+ }
944+ ) ;
945+ }
946+
947+ if ( urlStr . includes ( "/api/v1/waitpoints/tokens/" ) && urlStr . includes ( "/complete" ) ) {
948+ return new Response (
949+ JSON . stringify ( { success : true } ) ,
950+ {
951+ status : 200 ,
952+ headers : { "content-type" : "application/json" } ,
953+ }
954+ ) ;
955+ }
956+
957+ if ( urlStr . includes ( "/realtime/v1/streams/" ) ) {
958+ streamFetchCalls . push ( {
959+ url : urlStr ,
960+ headers : ( init ?. headers as Record < string , string > ) ?? { } ,
961+ } ) ;
962+
963+ const chunks = [
964+ ...sampleChunks ,
965+ { type : "finish" as const , id : "part-1" } as UIMessageChunk ,
966+ controlChunk ,
967+ ] ;
968+ return new Response ( createSSEStream ( sseEncode ( chunks ) ) , {
969+ status : 200 ,
970+ headers : {
971+ "content-type" : "text/event-stream" ,
972+ "X-Stream-Version" : "v1" ,
973+ } ,
974+ } ) ;
975+ }
976+
977+ throw new Error ( `Unexpected fetch URL: ${ urlStr } ` ) ;
978+ } ) ;
979+
980+ const transport = new TriggerChatTransport ( {
981+ task : "my-task" ,
982+ accessToken : "token" ,
983+ baseURL : "https://api.test.trigger.dev" ,
984+ } ) ;
985+
986+ // First message — triggers a new run
987+ const stream1 = await transport . sendMessages ( {
988+ trigger : "submit-message" ,
989+ chatId : "chat-eid" ,
990+ messageId : undefined ,
991+ messages : [ createUserMessage ( "Hello" ) ] ,
992+ abortSignal : undefined ,
993+ } ) ;
994+
995+ const reader1 = stream1 . getReader ( ) ;
996+ while ( true ) {
997+ const { done } = await reader1 . read ( ) ;
998+ if ( done ) break ;
999+ }
1000+
1001+ // Second message — completes the waitpoint
1002+ const stream2 = await transport . sendMessages ( {
1003+ trigger : "submit-message" ,
1004+ chatId : "chat-eid" ,
1005+ messageId : undefined ,
1006+ messages : [ createUserMessage ( "Hello" ) , createAssistantMessage ( "Hi!" ) , createUserMessage ( "What's up?" ) ] ,
1007+ abortSignal : undefined ,
1008+ } ) ;
1009+
1010+ const reader2 = stream2 . getReader ( ) ;
1011+ while ( true ) {
1012+ const { done } = await reader2 . read ( ) ;
1013+ if ( done ) break ;
1014+ }
1015+
1016+ // The second stream subscription should include a Last-Event-ID header
1017+ expect ( streamFetchCalls . length ) . toBe ( 2 ) ;
1018+ const secondStreamHeaders = streamFetchCalls [ 1 ] ! . headers ;
1019+ // SSEStreamSubscription passes lastEventId as the Last-Event-ID header
1020+ expect ( secondStreamHeaders [ "Last-Event-ID" ] ) . toBeDefined ( ) ;
1021+ } ) ;
1022+ } ) ;
1023+
1024+ describe ( "AbortController cleanup" , ( ) => {
1025+ it ( "should terminate SSE connection after intercepting control chunk" , async ( ) => {
1026+ const controlChunk = {
1027+ type : "__trigger_waitpoint_ready" ,
1028+ tokenId : "wp_token_abort" ,
1029+ publicAccessToken : "wp_access_abort" ,
1030+ } ;
1031+
1032+ let streamAborted = false ;
1033+
1034+ global . fetch = vi . fn ( ) . mockImplementation ( async ( url : string | URL , init ?: RequestInit ) => {
1035+ const urlStr = typeof url === "string" ? url : url . toString ( ) ;
1036+
1037+ if ( urlStr . includes ( "/trigger" ) ) {
1038+ return new Response (
1039+ JSON . stringify ( { id : "run_abort_cleanup" } ) ,
1040+ {
1041+ status : 200 ,
1042+ headers : {
1043+ "content-type" : "application/json" ,
1044+ "x-trigger-jwt" : "pub_token" ,
1045+ } ,
1046+ }
1047+ ) ;
1048+ }
1049+
1050+ if ( urlStr . includes ( "/realtime/v1/streams/" ) ) {
1051+ // Track abort signal
1052+ const signal = init ?. signal ;
1053+ if ( signal ) {
1054+ signal . addEventListener ( "abort" , ( ) => {
1055+ streamAborted = true ;
1056+ } ) ;
1057+ }
1058+
1059+ const chunks = [
1060+ ...sampleChunks ,
1061+ { type : "finish" as const , id : "part-1" } as UIMessageChunk ,
1062+ controlChunk ,
1063+ ] ;
1064+ return new Response ( createSSEStream ( sseEncode ( chunks ) ) , {
1065+ status : 200 ,
1066+ headers : {
1067+ "content-type" : "text/event-stream" ,
1068+ "X-Stream-Version" : "v1" ,
1069+ } ,
1070+ } ) ;
1071+ }
1072+
1073+ throw new Error ( `Unexpected fetch URL: ${ urlStr } ` ) ;
1074+ } ) ;
1075+
1076+ const transport = new TriggerChatTransport ( {
1077+ task : "my-task" ,
1078+ accessToken : "token" ,
1079+ baseURL : "https://api.test.trigger.dev" ,
1080+ } ) ;
1081+
1082+ const stream = await transport . sendMessages ( {
1083+ trigger : "submit-message" ,
1084+ chatId : "chat-abort-cleanup" ,
1085+ messageId : undefined ,
1086+ messages : [ createUserMessage ( "Hello" ) ] ,
1087+ abortSignal : undefined ,
1088+ } ) ;
1089+
1090+ // Consume all chunks
1091+ const reader = stream . getReader ( ) ;
1092+ while ( true ) {
1093+ const { done } = await reader . read ( ) ;
1094+ if ( done ) break ;
1095+ }
1096+
1097+ // The internal AbortController should have aborted the fetch
1098+ expect ( streamAborted ) . toBe ( true ) ;
1099+ } ) ;
1100+ } ) ;
1101+
9191102 describe ( "async accessToken" , ( ) => {
9201103 it ( "should accept an async function for accessToken" , async ( ) => {
9211104 let tokenCallCount = 0 ;
@@ -974,6 +1157,108 @@ describe("TriggerChatTransport", () => {
9741157
9751158 expect ( tokenCallCount ) . toBe ( 1 ) ;
9761159 } ) ;
1160+
1161+ it ( "should resolve async token for waitpoint completion flow" , async ( ) => {
1162+ const controlChunk = {
1163+ type : "__trigger_waitpoint_ready" ,
1164+ tokenId : "wp_token_async" ,
1165+ publicAccessToken : "wp_access_async" ,
1166+ } ;
1167+
1168+ let tokenCallCount = 0 ;
1169+ let completeWaitpointCalled = false ;
1170+
1171+ global . fetch = vi . fn ( ) . mockImplementation ( async ( url : string | URL ) => {
1172+ const urlStr = typeof url === "string" ? url : url . toString ( ) ;
1173+
1174+ if ( urlStr . includes ( "/api/v1/tasks/" ) && urlStr . includes ( "/trigger" ) ) {
1175+ return new Response (
1176+ JSON . stringify ( { id : "run_async_wp" } ) ,
1177+ {
1178+ status : 200 ,
1179+ headers : {
1180+ "content-type" : "application/json" ,
1181+ "x-trigger-jwt" : "stream-token" ,
1182+ } ,
1183+ }
1184+ ) ;
1185+ }
1186+
1187+ if ( urlStr . includes ( "/api/v1/waitpoints/tokens/" ) && urlStr . includes ( "/complete" ) ) {
1188+ completeWaitpointCalled = true ;
1189+ return new Response (
1190+ JSON . stringify ( { success : true } ) ,
1191+ {
1192+ status : 200 ,
1193+ headers : { "content-type" : "application/json" } ,
1194+ }
1195+ ) ;
1196+ }
1197+
1198+ if ( urlStr . includes ( "/realtime/v1/streams/" ) ) {
1199+ const chunks = [
1200+ ...sampleChunks ,
1201+ { type : "finish" as const , id : "part-1" } as UIMessageChunk ,
1202+ controlChunk ,
1203+ ] ;
1204+ return new Response ( createSSEStream ( sseEncode ( chunks ) ) , {
1205+ status : 200 ,
1206+ headers : {
1207+ "content-type" : "text/event-stream" ,
1208+ "X-Stream-Version" : "v1" ,
1209+ } ,
1210+ } ) ;
1211+ }
1212+
1213+ throw new Error ( `Unexpected fetch URL: ${ urlStr } ` ) ;
1214+ } ) ;
1215+
1216+ const transport = new TriggerChatTransport ( {
1217+ task : "my-task" ,
1218+ accessToken : async ( ) => {
1219+ tokenCallCount ++ ;
1220+ await new Promise ( ( r ) => setTimeout ( r , 1 ) ) ;
1221+ return `async-wp-token-${ tokenCallCount } ` ;
1222+ } ,
1223+ baseURL : "https://api.test.trigger.dev" ,
1224+ } ) ;
1225+
1226+ // First message — triggers a new run (calls async token)
1227+ const stream1 = await transport . sendMessages ( {
1228+ trigger : "submit-message" ,
1229+ chatId : "chat-async-wp" ,
1230+ messageId : undefined ,
1231+ messages : [ createUserMessage ( "Hello" ) ] ,
1232+ abortSignal : undefined ,
1233+ } ) ;
1234+
1235+ const reader1 = stream1 . getReader ( ) ;
1236+ while ( true ) {
1237+ const { done } = await reader1 . read ( ) ;
1238+ if ( done ) break ;
1239+ }
1240+
1241+ const firstTokenCount = tokenCallCount ;
1242+
1243+ // Second message — should complete waitpoint (does NOT call async token)
1244+ const stream2 = await transport . sendMessages ( {
1245+ trigger : "submit-message" ,
1246+ chatId : "chat-async-wp" ,
1247+ messageId : undefined ,
1248+ messages : [ createUserMessage ( "Hello" ) , createAssistantMessage ( "Hi!" ) , createUserMessage ( "More" ) ] ,
1249+ abortSignal : undefined ,
1250+ } ) ;
1251+
1252+ const reader2 = stream2 . getReader ( ) ;
1253+ while ( true ) {
1254+ const { done } = await reader2 . read ( ) ;
1255+ if ( done ) break ;
1256+ }
1257+
1258+ // Token function should NOT have been called again for the waitpoint path
1259+ expect ( tokenCallCount ) . toBe ( firstTokenCount ) ;
1260+ expect ( completeWaitpointCalled ) . toBe ( true ) ;
1261+ } ) ;
9771262 } ) ;
9781263
9791264 describe ( "single-run mode (waitpoint loop)" , ( ) => {
0 commit comments