Skip to content

feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads#3461

Merged
Fokko merged 4 commits into
apache:mainfrom
GayathriSrividya:feat/issue-3170-dictionary-columns-scan
Jun 18, 2026
Merged

feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads#3461
Fokko merged 4 commits into
apache:mainfrom
GayathriSrividya:feat/issue-3170-dictionary-columns-scan

Conversation

@GayathriSrividya

Copy link
Copy Markdown
Contributor

Closes #3170

Rationale

Columns that contain large or frequently repeated string values (e.g. JSON blobs, low-cardinality categoricals) can exhaust memory when PyArrow loads them as plain string arrays. PyArrow's Parquet reader natively supports dictionary-encoded reads via its dictionary_columns kwarg, which deduplicates values and can dramatically reduce peak memory usage.

This was previously discussed in #3168 and a prior implementation (#3234) was closed as stale.

Changes

  • Added dictionary_columns: tuple[str, ...] = () to Table.scan(), TableScan.__init__, and StagedTable.scan().
  • Forwarded through DataScan.to_arrow() and to_arrow_batch_reader()ArrowScan.__init___task_to_record_batches_get_file_format().
  • Only applied when task.file.file_format == FileFormat.PARQUET; silently ignored for ORC (which does not support this kwarg).

Usage

# Read the "payload" column as dictionary-encoded to save memory
df = table.scan(dictionary_columns=("payload",)).to_arrow()

Verification

  • Added test_dictionary_columns_produces_dict_encoded_output — confirms the requested column is dict-encoded, non-requested columns are plain, and values are identical.
  • make lint
  • pytest tests/table/ tests/io/test_pyarrow.py

…icient reads

Columns that contain large or frequently repeated strings (e.g. JSON
blobs, low-cardinality categoricals) can exhaust memory when PyArrow
loads them as plain string arrays.  PyArrow's Parquet reader supports
reading such columns as dictionary-encoded arrays, which deduplicates
values and can dramatically reduce memory usage.

Add a dictionary_columns: tuple[str, ...] parameter to Table.scan()
(and the underlying TableScan / ArrowScan classes) that is forwarded
to _get_file_format() as PyArrow's dictionary_columns kwarg.  Only
applies to Parquet files; silently ignored for ORC.

Usage:
    table.scan(dictionary_columns=("payload",)).to_arrow()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

@Fokko Fokko left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a meta question, but I don't think this helps for large JSON blobs since it would directly exceed the dictionary limit and fall back to plain encoding. I do think this helps a lot for low-cardinality strings.

Do we know how Arrow decodes the data? For example, if the Parquet column is dictionary encoded, would Arrow do something smart with the buffers to not repeat this value many times?

Comment thread pyiceberg/table/__init__.py Outdated
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.
dictionary_columns:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to add Arrow specific things to the public API

@GayathriSrividya GayathriSrividya Jun 6, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @Fokko I have moved dictionary_columns off the public scan() API and onto the Arrow-specific output methods instead:

table.scan(...).to_arrow(dictionary_columns=("payload",))
table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))

That way it doesn't disturb the general scan interface. ArrowScan still accepts it for lower-level use. Pushed in the latest commit. Let me know if you have any further suggestions.

…w_batch_reader()

Addresses reviewer feedback that dictionary_columns is an Arrow-specific
concern and should not be part of the general-purpose scan() public API.

The parameter is now accepted directly by the Arrow output methods:
  table.scan(...).to_arrow(dictionary_columns=("payload",))
  table.scan(...).to_arrow_batch_reader(dictionary_columns=("payload",))

ArrowScan still accepts dictionary_columns for lower-level use.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@GayathriSrividya GayathriSrividya changed the title feat: add dictionary_columns parameter to Table.scan() for memory-efficient reads feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads Jun 6, 2026

@Fokko Fokko left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GayathriSrividya Moving this to the Arrow API seems a good solution. I think it is reasonable to add this. I've left some minor comments that I think would be good to clean up, but apart from that, this looks good to me

Comment thread pyiceberg/io/pyarrow.py Outdated
self._case_sensitive = case_sensitive
self._limit = limit
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
self._dictionary_columns = frozenset(dictionary_columns)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep it a tuple here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to keep as a tuple.

Comment thread pyiceberg/io/pyarrow.py Outdated
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
format_kwargs["dictionary_columns"] = tuple(dictionary_columns)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we convert the frozenset back to a tuple

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I updated this to keep dictionary_columns as a tuple throughout the Arrow path, so we no longer convert frozenset -> tuple at read time. This is now in commit 33e4c6d.

@Fokko Fokko left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GayathriSrividya for addressing the remaining nits 👍

@Fokko Fokko merged commit 6da06ad into apache:main Jun 18, 2026
16 checks passed
@kevinjqliu

Copy link
Copy Markdown
Contributor

I found one propagation gap: dictionary_columns reaches ArrowScan, but to_arrow_batch_reader() wraps the batches with the plain projected schema and casts back to it:

pa.RecordBatchReader.from_batches(target_schema, batches).cast(target_schema)

That erases the dictionary type, so to_arrow(dictionary_columns=...) preserves dictionary encoding, but to_arrow_batch_reader(dictionary_columns=...).read_all() returns the normal string type.

def to_arrow_batch_reader(self, dictionary_columns: tuple[str, ...] = ()) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.
For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.
Args:
dictionary_columns:
A tuple of column names that PyArrow should read as
dictionary-encoded (``pa.DictionaryArray``). Dictionary
encoding can substantially reduce memory usage for columns
with low-cardinality repeated string values.
Only applies to Parquet files; silently ignored for ORC.
Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
"""
import pyarrow as pa
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
target_schema = schema_to_pyarrow(self.projection())
batches = ArrowScan(
self.table_metadata,
self.io,
self.projection(),
self.row_filter,
self.case_sensitive,
self.limit,
dictionary_columns=dictionary_columns,
).to_record_batches(self.plan_files())
return pa.RecordBatchReader.from_batches(
target_schema,
batches,
).cast(target_schema)

Example public-path regression test:

def test_to_arrow_batch_reader_preserves_dictionary_columns(catalog: Catalog) -> None:
    arrow_table = pa.table(
        {
            "id": pa.array([1, 2, 3, 4], type=pa.int32()),
            "label": pa.array(["a", "b", "a", "b"], type=pa.string()),
        }
    )
    catalog.create_namespace_if_not_exists("default")
    table = catalog.create_table("default.dict_test", schema=arrow_table.schema)
    table.append(arrow_table)

    result = table.scan().to_arrow_batch_reader(dictionary_columns=("label",)).read_all()

    assert pa.types.is_dictionary(result.schema.field("label").type)
    assert result.column("label").to_pylist() == ["a", "b", "a", "b"]

I didn’t see another user-facing path where the option is accepted and dropped. Minor adjacent cleanup: update the abstract TableScan.to_arrow signature so type checkers see the new keyword.

kevinjqliu pushed a commit that referenced this pull request Jun 28, 2026
<!-- Closes #2634 -->

Closes #2634.

# Rationale for this change

Adds `IncrementalAppendScan`, which reads the data appended between two
snapshots — the building block for incremental ingestion. Largely a
revival of the work in #2235; see #2634 and the previous PRs for
motivation.

Split out of #3364 at the reviewers' request, and builds on the
now-merged `BaseScan` / `ManifestGroupPlanner` refactor (#3511), so this
PR's diff is the append-scan feature alone.

The surface mirrors Iceberg-Java's engine-facing API (snapshot IDs,
inclusive/exclusive start, optional start) rather than the narrower
Spark read options, since PyIceberg is increasingly used by engines
(e.g. Polars). See the API discussion on this PR.

References: https://github.com/apache/iceberg (Iceberg-Java and Spark)
and apache/iceberg-cpp#590. Inline review-aid
comments (prefixed `[AI reviewer aid]`) point at the relevant reference
code.

# API

`Table.incremental_append_scan(...)` returns an `IncrementalAppendScan`;
`StagedTable` overrides it to raise, mirroring `scan()`. The scan reads
the rows added by **append** snapshots in `(from, to]`, projected onto
the table's current schema; delete / overwrite / replace snapshots in
the range (e.g. compaction) are ignored.

The range is set via the factory's Spark-style kwargs or the builder
methods, each of which returns a refined copy (like `select()` /
`filter()`):

```python
table.incremental_append_scan(
    from_snapshot_id_exclusive=None,   # optional; defaults to the oldest ancestor of `to`
    to_snapshot_id_inclusive=None,     # optional; defaults to the current snapshot
    row_filter=..., selected_fields=..., case_sensitive=..., options=..., limit=...,
)

scan.from_snapshot_id_exclusive(id)    # or .from_snapshot_id_inclusive(id)
    .to_snapshot_id_inclusive(id)
```

The range is held as public attributes — `from_snapshot_id` +
`from_snapshot_inclusive` + `to_snapshot_id` — a single start slot plus
an inclusive flag, mirroring Java's `TableScanContext` and consistent
with the other scans.

# Changes

- Range resolution mirrors Java's `BaseIncrementalScan`: an unset start
scans from the oldest ancestor of the end; an inclusive start resolves
to its parent as the exclusive boundary; an exclusive start is validated
with `is_parent_ancestor_of`, so an expired start cursor is accepted as
long as the lineage still passes through it; the end defaults to the
current snapshot; an empty table with no range set scans nothing.
- Planning walks the append-only ancestors in the range, dedups the data
manifests whose `added_snapshot_id` is in range (set semantics via
`ManifestFile.__eq__` / `__hash__`), and filters manifest entries to
`ADDED`-in-range via a new `manifest_entry_filter` on
`ManifestGroupPlanner.plan_files`. Compacted (`rewrite_data_files`)
output is therefore not picked up — no double counting.
- Projects onto the table's **current** schema (matching Java/C++), so
rows written under an older schema in the range get `NULL` for newer
columns.
- Adds snapshot helpers `ancestors_between_ids`, `is_ancestor_of`, and
`is_parent_ancestor_of`.
- Arrow materialization (`to_arrow` / `to_arrow_batch_reader`) is shared
with `DataScan` via small module-level helpers that take the projected
schema explicitly, so `BaseScan` stays projection-free (per the #3511
review).

# Out of scope (tracked follow-ups)

- Branch selection (`use_branch`) and per-endpoint ref/tag start & end
(`from_ref_*` / `to_ref_*`) — the rest of the engine-facing surface Java
exposes.
- `count()`, REST server-side planning, and user-facing doc examples
(`mkdocs`).
- `dictionary_columns` on `IncrementalAppendScan.to_arrow` /
`to_arrow_batch_reader` (added to `DataScan` in #3461; the shared
helpers already thread it) — kept out to isolate this PR.

# Are these changes tested?

Yes — unit tests (range resolution including unset / inclusive /
exclusive and expired start, current-schema projection, builder and
`update()` copies, empty table, staged-table guard) and integration
tests (append-only, non-append snapshots ignored, compaction not
double-counted, schema evolution within range, partition- and
metrics-evaluator pruning, disconnected snapshots), plus the
`test_incremental_read` provision fixture.

# Are there any user-facing changes?

Yes — the new `Table.incremental_append_scan(...)` API and
`IncrementalAppendScan` class. No changes to existing public surface.

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feature request: pass optional parameters to DataScan/pyarrow

3 participants