Skip to content

Commit 7708538

Browse files
CopilotCopilot
andcommitted
fix: update InMemoryOrchestrationBackend suspend/resume to set instance status
The in-memory backend's suspend() and resume() methods queued the appropriate history events but never updated instance.status, causing getOrchestrationState() to return incorrect status. In the real DTS sidecar, calling the suspend/resume RPCs immediately transitions the orchestration status. This fix aligns the in-memory backend with that behavior: - suspend() now sets status to ORCHESTRATION_STATUS_SUSPENDED and notifies state waiters - resume() now transitions status from SUSPENDED back to RUNNING and notifies state waiters Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent cc97f44 commit 7708538

2 files changed

Lines changed: 130 additions & 0 deletions

File tree

packages/durabletask-js/src/testing/in-memory-backend.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,19 @@ export class InMemoryOrchestrationBackend {
175175
return;
176176
}
177177

178+
// Update status immediately to match real sidecar behavior, where the
179+
// suspend RPC transitions the orchestration to SUSPENDED right away.
180+
instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED;
181+
178182
const event = pbh.newSuspendEvent();
179183
instance.pendingEvents.push(event);
180184
instance.lastUpdatedAt = new Date();
181185

182186
if (!this.orchestrationQueueSet.has(instanceId)) {
183187
this.enqueueOrchestration(instanceId);
184188
}
189+
190+
this.notifyWaiters(instanceId);
185191
}
186192

187193
/**
@@ -193,13 +199,21 @@ export class InMemoryOrchestrationBackend {
193199
throw new Error(`Orchestration instance '${instanceId}' not found`);
194200
}
195201

202+
// Transition from SUSPENDED back to RUNNING to match real sidecar behavior.
203+
// Only update if the instance was actually suspended.
204+
if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) {
205+
instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING;
206+
}
207+
196208
const event = pbh.newResumeEvent();
197209
instance.pendingEvents.push(event);
198210
instance.lastUpdatedAt = new Date();
199211

200212
if (!this.orchestrationQueueSet.has(instanceId)) {
201213
this.enqueueOrchestration(instanceId);
202214
}
215+
216+
this.notifyWaiters(instanceId);
203217
}
204218

205219
/**

packages/durabletask-js/test/in-memory-backend.spec.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,4 +377,120 @@ describe("In-Memory Backend", () => {
377377
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
378378
expect(state?.serializedOutput).toEqual(JSON.stringify(42));
379379
});
380+
381+
describe("suspend and resume status", () => {
382+
it("should update status to SUSPENDED when suspend is called", async () => {
383+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
384+
yield ctx.waitForExternalEvent("proceed");
385+
return "done";
386+
};
387+
388+
worker.addOrchestrator(orchestrator);
389+
await worker.start();
390+
391+
const id = await client.scheduleNewOrchestration(orchestrator);
392+
await client.waitForOrchestrationStart(id, false, 10);
393+
394+
await client.suspendOrchestration(id);
395+
396+
const state = await client.getOrchestrationState(id);
397+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
398+
});
399+
400+
it("should update status to RUNNING when resume is called after suspend", async () => {
401+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
402+
yield ctx.waitForExternalEvent("proceed");
403+
return "done";
404+
};
405+
406+
worker.addOrchestrator(orchestrator);
407+
await worker.start();
408+
409+
const id = await client.scheduleNewOrchestration(orchestrator);
410+
await client.waitForOrchestrationStart(id, false, 10);
411+
412+
await client.suspendOrchestration(id);
413+
let state = await client.getOrchestrationState(id);
414+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
415+
416+
await client.resumeOrchestration(id);
417+
state = await client.getOrchestrationState(id);
418+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING);
419+
});
420+
421+
it("should complete successfully after suspend and resume", async () => {
422+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
423+
const val: number = yield ctx.waitForExternalEvent("proceed");
424+
return val * 2;
425+
};
426+
427+
worker.addOrchestrator(orchestrator);
428+
await worker.start();
429+
430+
const id = await client.scheduleNewOrchestration(orchestrator);
431+
await client.waitForOrchestrationStart(id, false, 10);
432+
433+
// Suspend the orchestration
434+
await client.suspendOrchestration(id);
435+
let state = await client.getOrchestrationState(id);
436+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
437+
438+
// Send an event while suspended (will be buffered)
439+
await client.raiseOrchestrationEvent(id, "proceed", 21);
440+
441+
// Resume the orchestration
442+
await client.resumeOrchestration(id);
443+
444+
// Wait for completion — the buffered event should be processed
445+
state = await client.waitForOrchestrationCompletion(id, true, 10);
446+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
447+
expect(state?.serializedOutput).toEqual(JSON.stringify(42));
448+
});
449+
450+
it("should be idempotent when suspend is called twice", async () => {
451+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
452+
yield ctx.waitForExternalEvent("proceed");
453+
return "done";
454+
};
455+
456+
worker.addOrchestrator(orchestrator);
457+
await worker.start();
458+
459+
const id = await client.scheduleNewOrchestration(orchestrator);
460+
await client.waitForOrchestrationStart(id, false, 10);
461+
462+
// Call suspend twice — should not throw
463+
await client.suspendOrchestration(id);
464+
await client.suspendOrchestration(id);
465+
466+
const state = await client.getOrchestrationState(id);
467+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
468+
});
469+
470+
it("should notify state waiters on suspend", async () => {
471+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
472+
yield ctx.waitForExternalEvent("proceed");
473+
return "done";
474+
};
475+
476+
worker.addOrchestrator(orchestrator);
477+
await worker.start();
478+
479+
const id = await client.scheduleNewOrchestration(orchestrator);
480+
await client.waitForOrchestrationStart(id, false, 10);
481+
482+
// Set up a waiter for SUSPENDED status, then suspend
483+
const suspendedPromise = backend.waitForState(
484+
id,
485+
(inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.SUSPENDED,
486+
5000,
487+
);
488+
489+
await client.suspendOrchestration(id);
490+
491+
const suspendedInstance = await suspendedPromise;
492+
expect(suspendedInstance).toBeDefined();
493+
expect(backend.toClientStatus(suspendedInstance!.status)).toEqual(OrchestrationStatus.SUSPENDED);
494+
});
495+
});
380496
});

0 commit comments

Comments
 (0)