Skip to content

Commit a6b1215

Browse files
committed
feat: add skip_time parameter to test runner
1 parent 65a4d59 commit a6b1215

6 files changed

Lines changed: 75 additions & 14 deletions

File tree

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,26 @@ def test_my_durable_functions():
111111
three_result: StepOperation = result.get_step("three")
112112
assert three_result.result == '"5 6"'
113113
```
114+
115+
### Skipping Time in Tests
116+
117+
By default, wait operations use real time delays. For faster test execution, use the `skip_time` parameter:
118+
119+
```python
120+
def test_with_skip_time():
121+
with DurableFunctionTestRunner(handler=function_under_test) as runner:
122+
# Wait operations complete immediately
123+
result = runner.run(input="input str", timeout=10, skip_time=True)
124+
125+
assert result.status is InvocationStatus.SUCCEEDED
126+
```
127+
128+
You can also use the `DURABLE_EXECUTION_TIME_SCALE` environment variable to control wait durations:
129+
- `DURABLE_EXECUTION_TIME_SCALE=0` - Skip all waits (same as `skip_time=True`)
130+
- `DURABLE_EXECUTION_TIME_SCALE=0.5` - Half speed
131+
- `DURABLE_EXECUTION_TIME_SCALE=2.0` - Double speed
132+
133+
114134
## Architecture
115135
![Durable Functions Python Test Framework Architecture](/assets/dar-python-test-framework-architecture.svg)
116136

examples/test/wait/test_wait.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
def test_wait(durable_runner):
1616
"""Test wait example."""
1717
with durable_runner:
18-
result = durable_runner.run(input="test", timeout=10)
18+
result = durable_runner.run(input="test", timeout=10, skip_time=True)
1919

2020
assert result.status is InvocationStatus.SUCCEEDED
2121
assert deserialize_operation_payload(result.result) == "Wait completed"

src/aws_durable_execution_sdk_python_testing/checkpoint/processor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
class CheckpointProcessor:
3434
"""Handle OperationUpdate transformations and execution state updates."""
3535

36-
def __init__(self, store: ExecutionStore, scheduler: Scheduler):
36+
def __init__(
37+
self, store: ExecutionStore, scheduler: Scheduler, time_scale: float = 1.0
38+
):
3739
self._store = store
3840
self._scheduler = scheduler
3941
self._notifier = ExecutionNotifier()
40-
self._transformer = OperationTransformer()
42+
self._transformer = OperationTransformer(time_scale=time_scale)
4143

4244
def add_execution_observer(self, observer) -> None:
4345
"""Add observer for execution events."""

src/aws_durable_execution_sdk_python_testing/checkpoint/processors/wait.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@
3030
class WaitProcessor(OperationProcessor):
3131
"""Processes WAIT operation updates with timer scheduling."""
3232

33+
def __init__(self, time_scale: float = 1.0):
34+
"""Initialize WaitProcessor with time scale.
35+
36+
Args:
37+
time_scale: Multiplier for wait durations. Use 0.0 to skip waits entirely.
38+
Defaults to 1.0 (real time). Can also be overridden by
39+
DURABLE_EXECUTION_TIME_SCALE environment variable.
40+
"""
41+
self._time_scale = time_scale
42+
3343
def process(
3444
self,
3545
update: OperationUpdate,
@@ -43,8 +53,11 @@ def process(
4353
wait_seconds = (
4454
update.wait_options.wait_seconds if update.wait_options else 0
4555
)
46-
time_scale = float(os.getenv("DURABLE_EXECUTION_TIME_SCALE", "1.0"))
47-
logging.info("Using DURABLE_EXECUTION_TIME_SCALE: %f", time_scale)
56+
# Environment variable takes precedence
57+
time_scale = float(
58+
os.getenv("DURABLE_EXECUTION_TIME_SCALE", str(self._time_scale))
59+
)
60+
logging.info("Using time_scale: %f", time_scale)
4861
scaled_wait_seconds = wait_seconds * time_scale
4962

5063
scheduled_end_timestamp = datetime.now(UTC) + timedelta(

src/aws_durable_execution_sdk_python_testing/checkpoint/transformer.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,21 @@
4343
class OperationTransformer:
4444
"""Transforms OperationUpdates to Operations while maintaining order and triggering scheduler actions."""
4545

46-
_DEFAULT_PROCESSORS: ClassVar[dict[OperationType, OperationProcessor]] = {
47-
OperationType.STEP: StepProcessor(),
48-
OperationType.WAIT: WaitProcessor(),
49-
OperationType.CONTEXT: ContextProcessor(),
50-
OperationType.CALLBACK: CallbackProcessor(),
51-
OperationType.EXECUTION: ExecutionProcessor(),
52-
}
53-
5446
def __init__(
5547
self,
5648
processors: MutableMapping[OperationType, OperationProcessor] | None = None,
49+
time_scale: float = 1.0,
5750
):
58-
self.processors = processors if processors else self._DEFAULT_PROCESSORS
51+
if processors:
52+
self.processors = processors
53+
else:
54+
self.processors = {
55+
OperationType.STEP: StepProcessor(),
56+
OperationType.WAIT: WaitProcessor(time_scale=time_scale),
57+
OperationType.CONTEXT: ContextProcessor(),
58+
OperationType.CALLBACK: CallbackProcessor(),
59+
OperationType.EXECUTION: ExecutionProcessor(),
60+
}
5961

6062
def process_updates(
6163
self,

src/aws_durable_execution_sdk_python_testing/runner.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
from aws_durable_execution_sdk_python_testing.checkpoint.processor import (
3636
CheckpointProcessor,
3737
)
38+
from aws_durable_execution_sdk_python_testing.checkpoint.processors.wait import (
39+
WaitProcessor,
40+
)
3841
from aws_durable_execution_sdk_python_testing.client import InMemoryServiceClient
3942
from aws_durable_execution_sdk_python_testing.exceptions import (
4043
DurableFunctionsLocalRunnerError,
@@ -606,7 +609,28 @@ def run(
606609
function_name: str = "test-function",
607610
execution_name: str = "execution-name",
608611
account_id: str = "123456789012",
612+
skip_time: bool = False,
609613
) -> DurableFunctionTestResult:
614+
"""Run the durable function and wait for completion.
615+
616+
Args:
617+
input: Input payload for the function
618+
timeout: Maximum execution time in seconds
619+
function_name: Name of the function
620+
execution_name: Name of the execution
621+
account_id: AWS account ID
622+
skip_time: If True, wait operations complete immediately. If False (default),
623+
wait operations use real time delays.
624+
625+
Returns:
626+
Test result containing execution status and operations
627+
"""
628+
# Update time_scale in checkpoint processor for this run
629+
time_scale = 0.0 if skip_time else 1.0
630+
self._checkpoint_processor._transformer.processors[OperationType.WAIT] = (
631+
WaitProcessor(time_scale=time_scale)
632+
)
633+
610634
execution_arn = self.run_async(
611635
input=input,
612636
timeout=timeout,

0 commit comments

Comments
 (0)