-
-
Notifications
You must be signed in to change notification settings - Fork 403
feat:get_ranges #3925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
d-v-b
wants to merge
58
commits into
zarr-developers:main
Choose a base branch
from
d-v-b:feat/get-many
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
feat:get_ranges #3925
Changes from all commits
Commits
Show all changes
58 commits
Select commit
Hold shift + click to select a range
d007c64
feat(core): add _coalesce module skeleton with CoalesceOptions and stub
d-v-b abac6d3
test(core): add failing tests for coalesced_get basic cases
d-v-b f65018a
feat(core): implement coalesced_get for basic sequential cases
d-v-b 9e1f1d2
test(core): cover Offset/Suffix/None and mixed-cluster cases in coale…
d-v-b cd5097b
feat(core): run coalesced fetches concurrently under max_concurrency
d-v-b 3a85488
test(core): cover key-missing (start/mid) and fetch-raises in coalesc…
d-v-b 4553523
test(core): cover max_coalesced_bytes cap in coalesced_get
d-v-b 162dd6d
test(core): add coverage-invariant property test for coalesced_get
d-v-b 401e28b
test(core): drop unused HEAVY_MERGE/NO_MERGE constants
d-v-b 913928c
docs(core): shorten coalesced_get docstring summary line
d-v-b 850b9cd
refactor(core): drop dead invariant checks in merged-group path
d-v-b b2ec638
test(core): cover key-missing on uncoalescable input
d-v-b a4c3330
fix(core): cancel pending fetches on early exit and stop-after-miss
d-v-b 5b1d8cd
test(core): cover mid-stream miss with concurrency > 1
d-v-b 6aa6f4b
test(core): verify consumer-break cancels pending fetches
d-v-b dded848
fix(core): type coalesced_get as AsyncGenerator
d-v-b 865baf0
refactor(test): drop redundant pytestmark asyncio
d-v-b 17d9f75
feat(storage): add private SupportsGetRanges protocol
d-v-b 3ab711d
test(storage): add failing tests for FsspecStore.get_ranges
d-v-b 913be10
feat(storage): add FsspecStore.get_ranges and coalesce_options kwarg
d-v-b 0328e01
test(storage): add SupportsGetRanges conformance tests
d-v-b e7432c5
chore: add towncrier fragment for get_ranges
d-v-b 349bd9c
docs: drop private-symbol mention from get_ranges changelog
d-v-b 79e9927
test: refactor tests
d-v-b 8754f85
test: skip fsspec-backed get_ranges tests on old fsspec
d-v-b 8d86f05
Merge branch 'main' into feat/get-many
d-v-b 287b00d
Merge branch 'main' into feat/get-many
d-v-b 1607420
Merge branch 'main' into feat/get-many
d-v-b ba6fef2
Merge branch 'main' into feat/get-many
d-v-b 49289de
Merge branch 'main' into feat/get-many
d-v-b f158af3
Update src/zarr/core/_coalesce.py
d-v-b ef65b65
Update src/zarr/core/_coalesce.py
d-v-b 3e1f22c
Update src/zarr/core/_coalesce.py
d-v-b a4906cc
Merge branch 'main' into feat/get-many
d-v-b 2d556d4
chore: lint
d-v-b 0e5b2c5
refactor: better function design with explicit context
d-v-b 6e6a1ad
Merge branch 'main' into feat/get-many
d-v-b a997792
Merge branch 'main' into feat/get-many
d-v-b 17f8d64
refactor(coalesce): split coalescing from coalesced get
d-v-b 5eb25bc
refactor(coalesce): use sequence instead of iterable
d-v-b 249ea2c
Merge branch 'main' into feat/get-many
d-v-b 391985a
Update src/zarr/storage/_fsspec.py
d-v-b e56fc01
Merge branch 'main' into feat/get-many
d-v-b 5731be6
refactor: banish typeddict, we are fine with kwargs
d-v-b b96dd14
refactor: apply suggestions from code review
d-v-b cbb42e9
refactor: use drop queue in favor of as_completed; abort early with F…
d-v-b 7368206
refactor: define get_ranges on the store abc
d-v-b aad4e0e
Merge branch 'main' into feat/get-many
d-v-b 76a3a9b
Update src/zarr/core/_coalesce.py
d-v-b 470ab80
Apply suggestion from @chuckwondo
d-v-b 236fe05
Apply suggestion from @chuckwondo
d-v-b 4d7a24e
refactor: use function-scoped defaults
d-v-b c55ae5b
Merge branch 'feat/get-many' of https://github.com/d-v-b/zarr-python …
d-v-b 11e6c39
fix: fix failing test
d-v-b 83cc824
refactor: defaults in one place
d-v-b ca6477a
Merge branch 'main' into feat/get-many
d-v-b c22eef6
hoist imports
d-v-b 63da755
Merge branch 'feat/get-many' of https://github.com/d-v-b/zarr-python …
d-v-b File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add `zarr.abc.store.Store.get_ranges` for concurrent, coalesced multi-range reads from a single key. The method is defined on the `Store` ABC with a default implementation built on `Store.get`, so every store inherits a working version; stores with native multi-range backends (e.g. `FsspecStore`) can override for efficiency. Coalescing knobs (`max_concurrency`, `max_gap_bytes`, `max_coalesced_bytes`) are passed as keyword arguments to `get_ranges`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,222 @@ | ||||||||||||||||||||||||
| # src/zarr/core/_coalesce.py | ||||||||||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||
| from typing import TYPE_CHECKING, NamedTuple | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| from zarr.abc.store import RangeByteRequest | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if TYPE_CHECKING: | ||||||||||||||||||||||||
| from collections.abc import AsyncIterator, Awaitable, Callable, Sequence | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| from zarr.abc.store import ByteRequest | ||||||||||||||||||||||||
| from zarr.core.buffer import Buffer | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| class _WorkerCtx(NamedTuple): | ||||||||||||||||||||||||
| """Shared state passed to the per-task worker coroutines. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Bundling these lets the workers declare their dependencies as one | ||||||||||||||||||||||||
| parameter instead of capturing them implicitly via closure. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]] | ||||||||||||||||||||||||
| semaphore: asyncio.Semaphore | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def _fetch_single( | ||||||||||||||||||||||||
| ctx: _WorkerCtx, idx: int, req: ByteRequest | None | ||||||||||||||||||||||||
| ) -> Sequence[tuple[int, Buffer | None]]: | ||||||||||||||||||||||||
| """Fetch one byte range. Raises FileNotFoundError if the key is absent.""" | ||||||||||||||||||||||||
| async with ctx.semaphore: | ||||||||||||||||||||||||
| buf = await ctx.fetch(req) | ||||||||||||||||||||||||
| if buf is None: | ||||||||||||||||||||||||
| raise FileNotFoundError | ||||||||||||||||||||||||
| return ((idx, buf),) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def _fetch_group( | ||||||||||||||||||||||||
| ctx: _WorkerCtx, members: list[tuple[int, RangeByteRequest]] | ||||||||||||||||||||||||
| ) -> Sequence[tuple[int, Buffer | None]]: | ||||||||||||||||||||||||
| """Fetch one merged byte range and slice it back into per-input buffers. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| `members` must already be sorted by `start`; callers in this module | ||||||||||||||||||||||||
| build it from the sorted mergeable list. Raises `FileNotFoundError` | ||||||||||||||||||||||||
| if the key is absent. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
| if len(members) == 1: | ||||||||||||||||||||||||
| solo_idx, solo_req = members[0] | ||||||||||||||||||||||||
| return await _fetch_single(ctx, solo_idx, solo_req) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| start = members[0][1].start | ||||||||||||||||||||||||
| end = max(r.end for _, r in members) | ||||||||||||||||||||||||
| async with ctx.semaphore: | ||||||||||||||||||||||||
| big = await ctx.fetch(RangeByteRequest(start, end)) | ||||||||||||||||||||||||
| if big is None: | ||||||||||||||||||||||||
| raise FileNotFoundError | ||||||||||||||||||||||||
| sliced = [(idx, big[r.start - start : r.end - start]) for idx, r in members] | ||||||||||||||||||||||||
| return tuple(sliced) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def coalesce_ranges( | ||||||||||||||||||||||||
| byte_ranges: Sequence[ByteRequest | None], | ||||||||||||||||||||||||
| *, | ||||||||||||||||||||||||
| max_gap_bytes: int, | ||||||||||||||||||||||||
| max_coalesced_bytes: int, | ||||||||||||||||||||||||
| ) -> tuple[ | ||||||||||||||||||||||||
| list[list[tuple[int, RangeByteRequest]]], | ||||||||||||||||||||||||
| list[tuple[int, ByteRequest | None]], | ||||||||||||||||||||||||
| ]: | ||||||||||||||||||||||||
| """Plan a set of byte-range fetches: which inputs merge, which stand alone. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Pure (no I/O). The result is the I/O plan a caller would execute: each | ||||||||||||||||||||||||
| group corresponds to one fetch of a coalesced byte range, and each | ||||||||||||||||||||||||
| uncoalescable item corresponds to one fetch of the original request. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| All tuning knobs are required keyword arguments. `Store.get_ranges` is | ||||||||||||||||||||||||
| the public entry point and owns the canonical default values; this | ||||||||||||||||||||||||
| function takes them explicitly to avoid duplicating policy. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Parameters | ||||||||||||||||||||||||
| ---------- | ||||||||||||||||||||||||
| byte_ranges | ||||||||||||||||||||||||
| Input ranges. `None` means "the whole value". | ||||||||||||||||||||||||
| max_gap_bytes | ||||||||||||||||||||||||
| Two `RangeByteRequest`s separated by at most this many bytes may be | ||||||||||||||||||||||||
| merged into one fetch. | ||||||||||||||||||||||||
| max_coalesced_bytes | ||||||||||||||||||||||||
| Upper bound on the size of a single merged fetch. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Returns | ||||||||||||||||||||||||
| ------- | ||||||||||||||||||||||||
| groups | ||||||||||||||||||||||||
| List of merged groups. Each group is a list of | ||||||||||||||||||||||||
| `(input_index, RangeByteRequest)` pairs sorted by `start`. A | ||||||||||||||||||||||||
| single-element group represents a `RangeByteRequest` that did not | ||||||||||||||||||||||||
| merge with any neighbor. | ||||||||||||||||||||||||
| uncoalescable | ||||||||||||||||||||||||
| List of `(input_index, request)` pairs for inputs that are not | ||||||||||||||||||||||||
| `RangeByteRequest` (`OffsetByteRequest`, `SuffixByteRequest`, | ||||||||||||||||||||||||
| `None`). Indices are preserved from the input order. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Notes | ||||||||||||||||||||||||
| ----- | ||||||||||||||||||||||||
| Only `RangeByteRequest` inputs participate in coalescing. Two ranges | ||||||||||||||||||||||||
| merge when both: their gap (next `start` minus current group's running | ||||||||||||||||||||||||
| `end`) is `<= max_gap_bytes`, and the resulting merged span is | ||||||||||||||||||||||||
| `<= max_coalesced_bytes`. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
| indexed: list[tuple[int, ByteRequest | None]] = list(enumerate(byte_ranges)) | ||||||||||||||||||||||||
| mergeable: list[tuple[int, RangeByteRequest]] = [ | ||||||||||||||||||||||||
| (i, r) for i, r in indexed if isinstance(r, RangeByteRequest) | ||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||
| uncoalescable: list[tuple[int, ByteRequest | None]] = [ | ||||||||||||||||||||||||
| (i, r) for i, r in indexed if not isinstance(r, RangeByteRequest) | ||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Sort mergeables by start offset, then merge. Track running start/end of the | ||||||||||||||||||||||||
| # current group so each merge step is O(1) instead of O(group size). | ||||||||||||||||||||||||
| mergeable.sort(key=lambda pair: pair[1].start) | ||||||||||||||||||||||||
| groups: list[list[tuple[int, RangeByteRequest]]] = [] | ||||||||||||||||||||||||
| group_start = 0 | ||||||||||||||||||||||||
| group_end = 0 | ||||||||||||||||||||||||
| for pair in mergeable: | ||||||||||||||||||||||||
| _i, r = pair | ||||||||||||||||||||||||
| if groups and r.start - group_end <= max_gap_bytes: | ||||||||||||||||||||||||
| prospective_end = max(group_end, r.end) | ||||||||||||||||||||||||
| if prospective_end - group_start <= max_coalesced_bytes: | ||||||||||||||||||||||||
| groups[-1].append(pair) | ||||||||||||||||||||||||
| group_end = prospective_end | ||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||
| groups.append([pair]) | ||||||||||||||||||||||||
|
d-v-b marked this conversation as resolved.
|
||||||||||||||||||||||||
| group_start = r.start | ||||||||||||||||||||||||
| group_end = r.end | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| return groups, uncoalescable | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def coalesced_get( | ||||||||||||||||||||||||
| fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]], | ||||||||||||||||||||||||
| byte_ranges: Sequence[ByteRequest | None], | ||||||||||||||||||||||||
| *, | ||||||||||||||||||||||||
| max_concurrency: int, | ||||||||||||||||||||||||
| max_gap_bytes: int, | ||||||||||||||||||||||||
| max_coalesced_bytes: int, | ||||||||||||||||||||||||
| ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: | ||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we don't have to
Suggested change
|
||||||||||||||||||||||||
| """Read many byte ranges through `fetch` with coalescing and concurrency. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Nearby ranges are merged into a single underlying I/O, and merged fetches | ||||||||||||||||||||||||
| are run concurrently. Each yield corresponds to exactly one underlying I/O | ||||||||||||||||||||||||
| operation: a sequence of `(input_index, result)` tuples for all input | ||||||||||||||||||||||||
| ranges served by that I/O. Tuples within a yielded sequence are ordered by | ||||||||||||||||||||||||
| start offset. Yields across groups are in completion order, not input | ||||||||||||||||||||||||
| order. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| All tuning knobs are required keyword arguments. `Store.get_ranges` is | ||||||||||||||||||||||||
| the public entry point and owns the canonical default values; this | ||||||||||||||||||||||||
| function takes them explicitly to avoid duplicating policy. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Parameters | ||||||||||||||||||||||||
| ---------- | ||||||||||||||||||||||||
| fetch | ||||||||||||||||||||||||
| Callable that reads one byte range and returns a `Buffer` (or `None` | ||||||||||||||||||||||||
| if the underlying key does not exist). Typically constructed via | ||||||||||||||||||||||||
| `functools.partial(store.get, key, prototype)`. | ||||||||||||||||||||||||
| byte_ranges | ||||||||||||||||||||||||
| Input ranges. `None` means "the whole value". | ||||||||||||||||||||||||
| max_concurrency | ||||||||||||||||||||||||
| Maximum number of merged fetches in flight at once. | ||||||||||||||||||||||||
| max_gap_bytes | ||||||||||||||||||||||||
| Forwarded to `coalesce_ranges`. | ||||||||||||||||||||||||
| max_coalesced_bytes | ||||||||||||||||||||||||
| Forwarded to `coalesce_ranges`. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Yields | ||||||||||||||||||||||||
| ------ | ||||||||||||||||||||||||
| Sequence[tuple[int, Buffer | None]] | ||||||||||||||||||||||||
| Per-I/O batch of `(input_index, result)` tuples. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Notes | ||||||||||||||||||||||||
| ----- | ||||||||||||||||||||||||
| - Only `RangeByteRequest` inputs are coalesced. `OffsetByteRequest`, | ||||||||||||||||||||||||
| `SuffixByteRequest`, and `None` are each treated as uncoalescable | ||||||||||||||||||||||||
| (one fetch, one single-tuple yield per input). | ||||||||||||||||||||||||
| - If any fetch returns `None`, the iterator raises `FileNotFoundError` | ||||||||||||||||||||||||
| after cancelling pending fetches. Groups completed before the miss | ||||||||||||||||||||||||
| remain observable on the yields preceding the raise. | ||||||||||||||||||||||||
| - If a fetch raises, the exception propagates on the yield that produced | ||||||||||||||||||||||||
| the failing group; earlier-completed groups remain observable. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
| if not byte_ranges: | ||||||||||||||||||||||||
| return | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| groups, singles = coalesce_ranges( | ||||||||||||||||||||||||
| byte_ranges, | ||||||||||||||||||||||||
| max_gap_bytes=max_gap_bytes, | ||||||||||||||||||||||||
| max_coalesced_bytes=max_coalesced_bytes, | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| ctx = _WorkerCtx(fetch=fetch, semaphore=asyncio.Semaphore(max_concurrency)) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Launch all work as tasks. The semaphore bounds actual I/O concurrency. | ||||||||||||||||||||||||
| # TaskGroup wraps any task exception in BaseExceptionGroup; we unwrap it | ||||||||||||||||||||||||
| # so callers see the underlying error directly (e.g. FileNotFoundError). | ||||||||||||||||||||||||
| # GeneratorExit (raised when the consumer calls aclose()) is also caught | ||||||||||||||||||||||||
| # and re-raised bare so close completes cleanly. | ||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| async with asyncio.TaskGroup() as tg: | ||||||||||||||||||||||||
| tasks = [ | ||||||||||||||||||||||||
| *(tg.create_task(_fetch_group(ctx, group)) for group in groups), | ||||||||||||||||||||||||
| *(tg.create_task(_fetch_single(ctx, i, single)) for i, single in singles), | ||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| for fut in asyncio.as_completed(tasks): | ||||||||||||||||||||||||
| yield await fut | ||||||||||||||||||||||||
| except BaseExceptionGroup as eg: | ||||||||||||||||||||||||
| # Unwrap: prefer GeneratorExit, then a single inner exception, otherwise raise group. | ||||||||||||||||||||||||
| for exc in eg.exceptions: | ||||||||||||||||||||||||
| if isinstance(exc, GeneratorExit): | ||||||||||||||||||||||||
| raise exc from None | ||||||||||||||||||||||||
| if len(eg.exceptions) == 1: | ||||||||||||||||||||||||
| raise eg.exceptions[0] from None | ||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||
|
Comment on lines
+216
to
+222
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do NOT want to reraise
Suggested change
|
||||||||||||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I think this makes good sense. Perhaps just add some comments for clarity: