feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads#3461
Conversation
…icient reads
Columns that contain large or frequently repeated strings (e.g. JSON
blobs, low-cardinality categoricals) can exhaust memory when PyArrow
loads them as plain string arrays. PyArrow's Parquet reader supports
reading such columns as dictionary-encoded arrays, which deduplicates
values and can dramatically reduce memory usage.
Add a dictionary_columns: tuple[str, ...] parameter to Table.scan()
(and the underlying TableScan / ArrowScan classes) that is forwarded
to _get_file_format() as PyArrow's dictionary_columns kwarg. Only
applies to Parquet files; silently ignored for ORC.
Usage:
table.scan(dictionary_columns=("payload",)).to_arrow()
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fokko
left a comment
There was a problem hiding this comment.
More of a meta question, but I don't think this helps for large JSON blobs since it would directly exceed the dictionary limit and fall back to plain encoding. I do think this helps a lot for low-cardinality strings.
Do we know how Arrow decodes the data? For example, if the Parquet column is dictionary encoded, would Arrow do something smart with the buffers to not repeat this value many times?
| An integer representing the number of rows to | ||
| return in the scan result. If None, fetches all | ||
| matching rows. | ||
| dictionary_columns: |
There was a problem hiding this comment.
I'm hesitant to add Arrow specific things to the public API
There was a problem hiding this comment.
Good point @Fokko I have moved dictionary_columns off the public scan() API and onto the Arrow-specific output methods instead:
table.scan(...).to_arrow(dictionary_columns=("payload",))
table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))That way it doesn't disturb the general scan interface. ArrowScan still accepts it for lower-level use. Pushed in the latest commit. Let me know if you have any further suggestions.
…w_batch_reader()
Addresses reviewer feedback that dictionary_columns is an Arrow-specific
concern and should not be part of the general-purpose scan() public API.
The parameter is now accepted directly by the Arrow output methods:
table.scan(...).to_arrow(dictionary_columns=("payload",))
table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))
ArrowScan still accepts dictionary_columns for lower-level use.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fokko
left a comment
There was a problem hiding this comment.
Thanks @GayathriSrividya Moving this to the Arrow API seems a good solution. I think it is reasonable to add this. I've left some minor comments that I think would be good to clean up, but apart from that, this looks good to me
| self._case_sensitive = case_sensitive | ||
| self._limit = limit | ||
| self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) | ||
| self._dictionary_columns = frozenset(dictionary_columns) |
There was a problem hiding this comment.
Why not keep it a tuple here?
There was a problem hiding this comment.
I updated this to keep as a tuple.
| arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) | ||
| format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8} | ||
| if dictionary_columns and task.file.file_format == FileFormat.PARQUET: | ||
| format_kwargs["dictionary_columns"] = tuple(dictionary_columns) |
There was a problem hiding this comment.
Here we convert the frozenset back to a tuple
Fokko
left a comment
There was a problem hiding this comment.
Thanks @GayathriSrividya for addressing the remaining nits 👍
|
I found one propagation gap:
That erases the dictionary type, so iceberg-python/pyiceberg/table/__init__.py Lines 2242 to 2279 in 6da06ad Example public-path regression test: def test_to_arrow_batch_reader_preserves_dictionary_columns(catalog: Catalog) -> None:
arrow_table = pa.table(
{
"id": pa.array([1, 2, 3, 4], type=pa.int32()),
"label": pa.array(["a", "b", "a", "b"], type=pa.string()),
}
)
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table("default.dict_test", schema=arrow_table.schema)
table.append(arrow_table)
result = table.scan().to_arrow_batch_reader(dictionary_columns=("label",)).read_all()
assert pa.types.is_dictionary(result.schema.field("label").type)
assert result.column("label").to_pylist() == ["a", "b", "a", "b"]I didn’t see another user-facing path where the option is accepted and dropped. Minor adjacent cleanup: update the abstract |
<!-- Closes #2634 --> Closes #2634. # Rationale for this change Adds `IncrementalAppendScan`, which reads the data appended between two snapshots — the building block for incremental ingestion. Largely a revival of the work in #2235; see #2634 and the previous PRs for motivation. Split out of #3364 at the reviewers' request, and builds on the now-merged `BaseScan` / `ManifestGroupPlanner` refactor (#3511), so this PR's diff is the append-scan feature alone. The surface mirrors Iceberg-Java's engine-facing API (snapshot IDs, inclusive/exclusive start, optional start) rather than the narrower Spark read options, since PyIceberg is increasingly used by engines (e.g. Polars). See the API discussion on this PR. References: https://github.com/apache/iceberg (Iceberg-Java and Spark) and apache/iceberg-cpp#590. Inline review-aid comments (prefixed `[AI reviewer aid]`) point at the relevant reference code. # API `Table.incremental_append_scan(...)` returns an `IncrementalAppendScan`; `StagedTable` overrides it to raise, mirroring `scan()`. The scan reads the rows added by **append** snapshots in `(from, to]`, projected onto the table's current schema; delete / overwrite / replace snapshots in the range (e.g. compaction) are ignored. The range is set via the factory's Spark-style kwargs or the builder methods, each of which returns a refined copy (like `select()` / `filter()`): ```python table.incremental_append_scan( from_snapshot_id_exclusive=None, # optional; defaults to the oldest ancestor of `to` to_snapshot_id_inclusive=None, # optional; defaults to the current snapshot row_filter=..., selected_fields=..., case_sensitive=..., options=..., limit=..., ) scan.from_snapshot_id_exclusive(id) # or .from_snapshot_id_inclusive(id) .to_snapshot_id_inclusive(id) ``` The range is held as public attributes — `from_snapshot_id` + `from_snapshot_inclusive` + `to_snapshot_id` — a single start slot plus an inclusive flag, mirroring Java's `TableScanContext` and consistent with the other scans. # Changes - Range resolution mirrors Java's `BaseIncrementalScan`: an unset start scans from the oldest ancestor of the end; an inclusive start resolves to its parent as the exclusive boundary; an exclusive start is validated with `is_parent_ancestor_of`, so an expired start cursor is accepted as long as the lineage still passes through it; the end defaults to the current snapshot; an empty table with no range set scans nothing. - Planning walks the append-only ancestors in the range, dedups the data manifests whose `added_snapshot_id` is in range (set semantics via `ManifestFile.__eq__` / `__hash__`), and filters manifest entries to `ADDED`-in-range via a new `manifest_entry_filter` on `ManifestGroupPlanner.plan_files`. Compacted (`rewrite_data_files`) output is therefore not picked up — no double counting. - Projects onto the table's **current** schema (matching Java/C++), so rows written under an older schema in the range get `NULL` for newer columns. - Adds snapshot helpers `ancestors_between_ids`, `is_ancestor_of`, and `is_parent_ancestor_of`. - Arrow materialization (`to_arrow` / `to_arrow_batch_reader`) is shared with `DataScan` via small module-level helpers that take the projected schema explicitly, so `BaseScan` stays projection-free (per the #3511 review). # Out of scope (tracked follow-ups) - Branch selection (`use_branch`) and per-endpoint ref/tag start & end (`from_ref_*` / `to_ref_*`) — the rest of the engine-facing surface Java exposes. - `count()`, REST server-side planning, and user-facing doc examples (`mkdocs`). - `dictionary_columns` on `IncrementalAppendScan.to_arrow` / `to_arrow_batch_reader` (added to `DataScan` in #3461; the shared helpers already thread it) — kept out to isolate this PR. # Are these changes tested? Yes — unit tests (range resolution including unset / inclusive / exclusive and expired start, current-schema projection, builder and `update()` copies, empty table, staged-table guard) and integration tests (append-only, non-append snapshots ignored, compaction not double-counted, schema evolution within range, partition- and metrics-evaluator pruning, disconnected snapshots), plus the `test_incremental_read` provision fixture. # Are there any user-facing changes? Yes — the new `Table.incremental_append_scan(...)` API and `IncrementalAppendScan` class. No changes to existing public surface. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Closes #3170
Rationale
Columns that contain large or frequently repeated string values (e.g. JSON blobs, low-cardinality categoricals) can exhaust memory when PyArrow loads them as plain string arrays. PyArrow's Parquet reader natively supports dictionary-encoded reads via its
dictionary_columnskwarg, which deduplicates values and can dramatically reduce peak memory usage.This was previously discussed in #3168 and a prior implementation (#3234) was closed as stale.
Changes
dictionary_columns: tuple[str, ...] = ()toTable.scan(),TableScan.__init__, andStagedTable.scan().DataScan.to_arrow()andto_arrow_batch_reader()→ArrowScan.__init__→_task_to_record_batches→_get_file_format().task.file.file_format == FileFormat.PARQUET; silently ignored for ORC (which does not support this kwarg).Usage
Verification
test_dictionary_columns_produces_dict_encoded_output— confirms the requested column is dict-encoded, non-requested columns are plain, and values are identical.make lint✓pytest tests/table/ tests/io/test_pyarrow.py✓