Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
d007c64
feat(core): add _coalesce module skeleton with CoalesceOptions and stub
d-v-b Apr 24, 2026
abac6d3
test(core): add failing tests for coalesced_get basic cases
d-v-b Apr 24, 2026
f65018a
feat(core): implement coalesced_get for basic sequential cases
d-v-b Apr 24, 2026
9e1f1d2
test(core): cover Offset/Suffix/None and mixed-cluster cases in coale…
d-v-b Apr 24, 2026
cd5097b
feat(core): run coalesced fetches concurrently under max_concurrency
d-v-b Apr 24, 2026
3a85488
test(core): cover key-missing (start/mid) and fetch-raises in coalesc…
d-v-b Apr 24, 2026
4553523
test(core): cover max_coalesced_bytes cap in coalesced_get
d-v-b Apr 24, 2026
162dd6d
test(core): add coverage-invariant property test for coalesced_get
d-v-b Apr 24, 2026
401e28b
test(core): drop unused HEAVY_MERGE/NO_MERGE constants
d-v-b Apr 24, 2026
913928c
docs(core): shorten coalesced_get docstring summary line
d-v-b Apr 24, 2026
850b9cd
refactor(core): drop dead invariant checks in merged-group path
d-v-b Apr 24, 2026
b2ec638
test(core): cover key-missing on uncoalescable input
d-v-b Apr 24, 2026
a4c3330
fix(core): cancel pending fetches on early exit and stop-after-miss
d-v-b Apr 24, 2026
5b1d8cd
test(core): cover mid-stream miss with concurrency > 1
d-v-b Apr 24, 2026
6aa6f4b
test(core): verify consumer-break cancels pending fetches
d-v-b Apr 24, 2026
dded848
fix(core): type coalesced_get as AsyncGenerator
d-v-b Apr 24, 2026
865baf0
refactor(test): drop redundant pytestmark asyncio
d-v-b Apr 24, 2026
17d9f75
feat(storage): add private SupportsGetRanges protocol
d-v-b Apr 24, 2026
3ab711d
test(storage): add failing tests for FsspecStore.get_ranges
d-v-b Apr 24, 2026
913be10
feat(storage): add FsspecStore.get_ranges and coalesce_options kwarg
d-v-b Apr 24, 2026
0328e01
test(storage): add SupportsGetRanges conformance tests
d-v-b Apr 24, 2026
e7432c5
chore: add towncrier fragment for get_ranges
d-v-b Apr 24, 2026
349bd9c
docs: drop private-symbol mention from get_ranges changelog
d-v-b Apr 24, 2026
79e9927
test: refactor tests
d-v-b Apr 24, 2026
8754f85
test: skip fsspec-backed get_ranges tests on old fsspec
d-v-b Apr 24, 2026
8d86f05
Merge branch 'main' into feat/get-many
d-v-b Apr 30, 2026
287b00d
Merge branch 'main' into feat/get-many
d-v-b May 1, 2026
1607420
Merge branch 'main' into feat/get-many
d-v-b May 6, 2026
ba6fef2
Merge branch 'main' into feat/get-many
d-v-b May 6, 2026
49289de
Merge branch 'main' into feat/get-many
d-v-b May 7, 2026
f158af3
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
ef65b65
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
3e1f22c
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
a4906cc
Merge branch 'main' into feat/get-many
d-v-b May 7, 2026
2d556d4
chore: lint
d-v-b May 7, 2026
0e5b2c5
refactor: better function design with explicit context
d-v-b May 7, 2026
6e6a1ad
Merge branch 'main' into feat/get-many
d-v-b May 8, 2026
a997792
Merge branch 'main' into feat/get-many
d-v-b May 8, 2026
17f8d64
refactor(coalesce): split coalescing from coalesced get
d-v-b May 8, 2026
5eb25bc
refactor(coalesce): use sequence instead of iterable
d-v-b May 8, 2026
249ea2c
Merge branch 'main' into feat/get-many
d-v-b May 10, 2026
391985a
Update src/zarr/storage/_fsspec.py
d-v-b May 11, 2026
e56fc01
Merge branch 'main' into feat/get-many
d-v-b May 11, 2026
5731be6
refactor: banish typeddict, we are fine with kwargs
d-v-b May 12, 2026
b96dd14
refactor: apply suggestions from code review
d-v-b May 12, 2026
cbb42e9
refactor: use drop queue in favor of as_completed; abort early with F…
d-v-b May 12, 2026
7368206
refactor: define get_ranges on the store abc
d-v-b May 12, 2026
aad4e0e
Merge branch 'main' into feat/get-many
d-v-b May 12, 2026
76a3a9b
Update src/zarr/core/_coalesce.py
d-v-b May 12, 2026
470ab80
Apply suggestion from @chuckwondo
d-v-b May 12, 2026
236fe05
Apply suggestion from @chuckwondo
d-v-b May 12, 2026
4d7a24e
refactor: use function-scoped defaults
d-v-b May 12, 2026
c55ae5b
Merge branch 'feat/get-many' of https://github.com/d-v-b/zarr-python …
d-v-b May 12, 2026
11e6c39
fix: fix failing test
d-v-b May 12, 2026
83cc824
refactor: defaults in one place
d-v-b May 12, 2026
ca6477a
Merge branch 'main' into feat/get-many
d-v-b May 12, 2026
c22eef6
hoist imports
d-v-b May 12, 2026
63da755
Merge branch 'feat/get-many' of https://github.com/d-v-b/zarr-python …
d-v-b May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3925.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `zarr.abc.store.Store.get_ranges` for concurrent, coalesced multi-range reads from a single key. The method is defined on the `Store` ABC with a default implementation built on `Store.get`, so every store inherits a working version; stores with native multi-range backends (e.g. `FsspecStore`) can override for efficiency. Coalescing knobs (`max_concurrency`, `max_gap_bytes`, `max_coalesced_bytes`) are passed as keyword arguments to `get_ranges`.
56 changes: 55 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import partial
from itertools import starmap
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable

from zarr.core.sync import sync

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Sequence
from types import TracebackType
from typing import Any, Self

Expand Down Expand Up @@ -616,6 +617,59 @@ async def _get_many(
for req in requests:
yield (req[0], await self.get(*req))

async def get_ranges(
self,
key: str,
byte_ranges: Sequence[ByteRequest | None],
*,
prototype: BufferPrototype,
max_concurrency: int = 10,
max_gap_bytes: int = 1 << 20,
max_coalesced_bytes: int = 16 << 20,
Comment on lines +626 to +628
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I think this makes good sense. Perhaps just add some comments for clarity:

Suggested change
max_concurrency: int = 10,
max_gap_bytes: int = 1 << 20,
max_coalesced_bytes: int = 16 << 20,
max_concurrency: int = 10,
max_gap_bytes: int = 1 << 20, # 1 MiB
max_coalesced_bytes: int = 16 << 20, # 16 MiB

) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Read many byte ranges from `key`.

Yields one batch per underlying I/O operation, each a sequence of
`(input_index, Buffer | None)` tuples. Batches across yields arrive in
completion order, not input order. The default implementation built
into `Store` runs the coalescer over `self.get`, so subclasses get a
working implementation for free; stores that have a more efficient
backend (e.g. ranged HTTP, S3 byte-range fetches) should override.

Parameters
----------
key
Storage key to read from.
byte_ranges
Input ranges. `None` means "the whole value".
prototype
Buffer prototype, forwarded to `self.get`.
max_concurrency
Maximum number of merged fetches in flight at once.
max_gap_bytes
Two `RangeByteRequest`s separated by at most this many bytes may
be merged into one fetch.
max_coalesced_bytes
Upper bound on the size of a single merged fetch.

Raises
------
FileNotFoundError
If any underlying fetch returns `None` (i.e. `key` is absent).
"""
# Local import: zarr.core._coalesce imports symbols from this module.
from zarr.core._coalesce import coalesced_get

fetch = partial(self.get, key, prototype)
async for group in coalesced_get(
fetch,
byte_ranges,
max_concurrency=max_concurrency,
max_gap_bytes=max_gap_bytes,
max_coalesced_bytes=max_coalesced_bytes,
):
yield group

async def getsize(self, key: str) -> int:
"""
Return the size, in bytes, of a value in a Store.
Expand Down
222 changes: 222 additions & 0 deletions src/zarr/core/_coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# src/zarr/core/_coalesce.py
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, NamedTuple

from zarr.abc.store import RangeByteRequest

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence

from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer


class _WorkerCtx(NamedTuple):
"""Shared state passed to the per-task worker coroutines.

Bundling these lets the workers declare their dependencies as one
parameter instead of capturing them implicitly via closure.
"""

fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]]
semaphore: asyncio.Semaphore


async def _fetch_single(
ctx: _WorkerCtx, idx: int, req: ByteRequest | None
) -> Sequence[tuple[int, Buffer | None]]:
"""Fetch one byte range. Raises FileNotFoundError if the key is absent."""
async with ctx.semaphore:
buf = await ctx.fetch(req)
if buf is None:
raise FileNotFoundError
return ((idx, buf),)


async def _fetch_group(
ctx: _WorkerCtx, members: list[tuple[int, RangeByteRequest]]
) -> Sequence[tuple[int, Buffer | None]]:
"""Fetch one merged byte range and slice it back into per-input buffers.

`members` must already be sorted by `start`; callers in this module
build it from the sorted mergeable list. Raises `FileNotFoundError`
if the key is absent.
"""
if len(members) == 1:
solo_idx, solo_req = members[0]
return await _fetch_single(ctx, solo_idx, solo_req)

start = members[0][1].start
end = max(r.end for _, r in members)
async with ctx.semaphore:
big = await ctx.fetch(RangeByteRequest(start, end))
if big is None:
raise FileNotFoundError
sliced = [(idx, big[r.start - start : r.end - start]) for idx, r in members]
return tuple(sliced)


def coalesce_ranges(
byte_ranges: Sequence[ByteRequest | None],
*,
max_gap_bytes: int,
max_coalesced_bytes: int,
) -> tuple[
list[list[tuple[int, RangeByteRequest]]],
list[tuple[int, ByteRequest | None]],
]:
"""Plan a set of byte-range fetches: which inputs merge, which stand alone.

Pure (no I/O). The result is the I/O plan a caller would execute: each
group corresponds to one fetch of a coalesced byte range, and each
uncoalescable item corresponds to one fetch of the original request.

All tuning knobs are required keyword arguments. `Store.get_ranges` is
the public entry point and owns the canonical default values; this
function takes them explicitly to avoid duplicating policy.

Parameters
----------
byte_ranges
Input ranges. `None` means "the whole value".
max_gap_bytes
Two `RangeByteRequest`s separated by at most this many bytes may be
merged into one fetch.
max_coalesced_bytes
Upper bound on the size of a single merged fetch.

Returns
-------
groups
List of merged groups. Each group is a list of
`(input_index, RangeByteRequest)` pairs sorted by `start`. A
single-element group represents a `RangeByteRequest` that did not
merge with any neighbor.
uncoalescable
List of `(input_index, request)` pairs for inputs that are not
`RangeByteRequest` (`OffsetByteRequest`, `SuffixByteRequest`,
`None`). Indices are preserved from the input order.

Notes
-----
Only `RangeByteRequest` inputs participate in coalescing. Two ranges
merge when both: their gap (next `start` minus current group's running
`end`) is `<= max_gap_bytes`, and the resulting merged span is
`<= max_coalesced_bytes`.
"""
indexed: list[tuple[int, ByteRequest | None]] = list(enumerate(byte_ranges))
mergeable: list[tuple[int, RangeByteRequest]] = [
(i, r) for i, r in indexed if isinstance(r, RangeByteRequest)
]
uncoalescable: list[tuple[int, ByteRequest | None]] = [
(i, r) for i, r in indexed if not isinstance(r, RangeByteRequest)
]

# Sort mergeables by start offset, then merge. Track running start/end of the
# current group so each merge step is O(1) instead of O(group size).
mergeable.sort(key=lambda pair: pair[1].start)
groups: list[list[tuple[int, RangeByteRequest]]] = []
group_start = 0
group_end = 0
for pair in mergeable:
_i, r = pair
if groups and r.start - group_end <= max_gap_bytes:
prospective_end = max(group_end, r.end)
if prospective_end - group_start <= max_coalesced_bytes:
groups[-1].append(pair)
group_end = prospective_end
continue
groups.append([pair])
Comment thread
d-v-b marked this conversation as resolved.
group_start = r.start
group_end = r.end

return groups, uncoalescable


async def coalesced_get(
fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]],
byte_ranges: Sequence[ByteRequest | None],
*,
max_concurrency: int,
max_gap_bytes: int,
max_coalesced_bytes: int,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't have to cast in order to invoke aclose():

Suggested change
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
) -> AsyncGenerator[Sequence[tuple[int, Buffer | None]]]:

"""Read many byte ranges through `fetch` with coalescing and concurrency.

Nearby ranges are merged into a single underlying I/O, and merged fetches
are run concurrently. Each yield corresponds to exactly one underlying I/O
operation: a sequence of `(input_index, result)` tuples for all input
ranges served by that I/O. Tuples within a yielded sequence are ordered by
start offset. Yields across groups are in completion order, not input
order.

All tuning knobs are required keyword arguments. `Store.get_ranges` is
the public entry point and owns the canonical default values; this
function takes them explicitly to avoid duplicating policy.

Parameters
----------
fetch
Callable that reads one byte range and returns a `Buffer` (or `None`
if the underlying key does not exist). Typically constructed via
`functools.partial(store.get, key, prototype)`.
byte_ranges
Input ranges. `None` means "the whole value".
max_concurrency
Maximum number of merged fetches in flight at once.
max_gap_bytes
Forwarded to `coalesce_ranges`.
max_coalesced_bytes
Forwarded to `coalesce_ranges`.

Yields
------
Sequence[tuple[int, Buffer | None]]
Per-I/O batch of `(input_index, result)` tuples.

Notes
-----
- Only `RangeByteRequest` inputs are coalesced. `OffsetByteRequest`,
`SuffixByteRequest`, and `None` are each treated as uncoalescable
(one fetch, one single-tuple yield per input).
- If any fetch returns `None`, the iterator raises `FileNotFoundError`
after cancelling pending fetches. Groups completed before the miss
remain observable on the yields preceding the raise.
- If a fetch raises, the exception propagates on the yield that produced
the failing group; earlier-completed groups remain observable.
"""
if not byte_ranges:
return

groups, singles = coalesce_ranges(
byte_ranges,
max_gap_bytes=max_gap_bytes,
max_coalesced_bytes=max_coalesced_bytes,
)

ctx = _WorkerCtx(fetch=fetch, semaphore=asyncio.Semaphore(max_concurrency))

# Launch all work as tasks. The semaphore bounds actual I/O concurrency.
# TaskGroup wraps any task exception in BaseExceptionGroup; we unwrap it
# so callers see the underlying error directly (e.g. FileNotFoundError).
# GeneratorExit (raised when the consumer calls aclose()) is also caught
# and re-raised bare so close completes cleanly.
try:
async with asyncio.TaskGroup() as tg:
tasks = [
*(tg.create_task(_fetch_group(ctx, group)) for group in groups),
*(tg.create_task(_fetch_single(ctx, i, single)) for i, single in singles),
]

for fut in asyncio.as_completed(tasks):
yield await fut
except BaseExceptionGroup as eg:
# Unwrap: prefer GeneratorExit, then a single inner exception, otherwise raise group.
for exc in eg.exceptions:
if isinstance(exc, GeneratorExit):
raise exc from None
if len(eg.exceptions) == 1:
raise eg.exceptions[0] from None
raise
Comment on lines +216 to +222
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do NOT want to reraise GeneratorExit:

Suggested change
# Unwrap: prefer GeneratorExit, then a single inner exception, otherwise raise group.
for exc in eg.exceptions:
if isinstance(exc, GeneratorExit):
raise exc from None
if len(eg.exceptions) == 1:
raise eg.exceptions[0] from None
raise
# Unwrap: prefer a single inner exception, otherwise raise group.
if subgroup := eg.subgroup(lambda e: not isinstance(e, GeneratorExit)):
e = subgroup.exceptions[0] if len(subgroup.exceptions) == 1 else subgroup
raise e from None

28 changes: 27 additions & 1 deletion src/zarr/storage/_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Sequence
from types import TracebackType
from typing import Any, Self

Expand Down Expand Up @@ -103,6 +103,32 @@ async def get_partial_values(
) -> list[Buffer | None]:
return await self._store.get_partial_values(prototype, key_ranges)

async def get_ranges(
self,
key: str,
byte_ranges: Sequence[ByteRequest | None],
*,
prototype: BufferPrototype,
max_concurrency: int | None = None,
max_gap_bytes: int | None = None,
max_coalesced_bytes: int | None = None,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Forward `get_ranges` to the wrapped store.

Default values for the coalescing kwargs are not declared here; the
wrapped store decides them. `None` means "don't override the wrapped
store's default".
"""
kwargs: dict[str, int] = {}
if max_concurrency is not None:
kwargs["max_concurrency"] = max_concurrency
if max_gap_bytes is not None:
kwargs["max_gap_bytes"] = max_gap_bytes
if max_coalesced_bytes is not None:
kwargs["max_coalesced_bytes"] = max_coalesced_bytes
async for group in self._store.get_ranges(key, byte_ranges, prototype=prototype, **kwargs):
yield group

async def exists(self, key: str) -> bool:
return await self._store.exists(key)

Expand Down
Loading
Loading