Skip to content

Integrate DataFusion as execution engine for compute-heavy operations #3554

Description

@qzyu999

Feature Request / Improvement

Problem

PyIceberg cannot perform several operations at production scale due to unbounded memory requirements in the PyArrow execution path:

  • Tables with equality deletes are unreadable (hard ValueError)
  • CoW deletes OOM on large Parquet files (~1GB)
  • CoW overwrite OOMs (same pattern as delete)
  • Upsert uses O(n) row-by-row comparison
  • Compaction not implemented, requires external sort (infeasible in-memory for large tables)
  • Orphan file deletion OOMs (LEFT ANTI JOIN of millions of paths)

The list goes on (documented below) with operations that don't scale in the typical single-node environment of PyIceberg. These block PyIceberg from achieving feature parity with Java Iceberg for V2/V3.

Proposed Solution

Integrate Apache DataFusion (already exists via pip install 'pyiceberg[pyiceberg-core]') as an optional execution engine behind an automatic engine-resolution layer. When installed, compute-heavy operations use DataFusion's spill-to-disk execution (bounded memory). When not installed, the existing PyArrow path remains unchanged (works for small data, OOMs gracefully on large).

No existing behavior changes. No forced dependency. DuckDB-style UX where only a developer only needs to configure a memory budget if they so choose.

Design Doc

Support for PyIceberg DataFusion Integration

Implementation Approach

There are two ways to execute DataFusion operations from PyIceberg. Both use the same underlying Rust DataFusion engine; the difference is where orchestration happens.

Track 1: Python-side DataFusion (datafusion-python)

Python orchestrates the plan, configures the session, registers Parquet files, runs SQL, gets Arrow data back. Writing new files happens in Python.

Pros: Works today, no upstream iceberg-rust changes needed.
Cons: Object store config must be duplicated (DataFusion side + PyIceberg FileIO). File writing in Python is less capable than Rust's IcebergWriteExec (no automatic target-size splitting, no partition routing).

Checklist

  • Engine resolution module (pyiceberg/execution/engine.py)
  • Object store bridge (translate FileIO properties → DataFusion object store config)
  • Equality delete resolution (register files, anti-join SQL, return Arrow table)
  • Orphan file deletion (register path arrays, anti-join SQL, return paths)
  • Streaming CoW rewrite (register file, filter SQL, iterate batches, write from Python)
  • Compaction (register files, sort SQL, iterate batches, write from Python)

Track 2: Rust-side execution (pyiceberg_core.execution)

One Python function call crosses the FFI boundary. Rust handles everything, file I/O, plan construction, execution, file writing, and returns only metadata. Consistent with how pyiceberg_core already works (transforms, TableProvider).

Pros: No object store duplication (uses Iceberg's FileIO). Rust writes files via IcebergWriteExec (target-size splitting, partition routing built in). No per-batch FFI overhead. Consistent with existing pyiceberg_core patterns.
Cons: Requires contributions to iceberg-rust (bounded session helper, execution module, overwrite commit node). Write-path operations blocked on upstream OverwriteAction/RewriteFiles landing.

Checklist (iceberg-rust contributions)

  • Bounded-memory session helper (iceberg-datafusion — no blockers)
  • pyiceberg_core.execution module stubs (bindings/python/ — no blockers)
  • execute_antijoin_paths implementation (needs session helper)
  • execute_equality_resolution implementation (needs session helper)
  • execute_cow_rewrite implementation (needs session helper + OverwriteAction)
  • execute_compaction implementation (needs session helper + OverwriteAction)

Checklist (this repo)

  • Engine resolution module (pyiceberg/execution/engine.py)
  • Dispatch wiring: call pyiceberg_core.execution when available, else PyArrow fallback

How they relate

  • Both tracks share the same user-facing API (table.compact(), table.delete(), etc.)
  • Both share the same commit path (Python Transaction API)
  • Both share the same engine resolution layer
  • Per operation, you pick one — they are interchangeable behind the dispatch layer
  • Swapping Track 1 → Track 2 for any operation is a single function body change (same inputs, same outputs)
  • Track 2 is the preferred long-term path (matches existing pyiceberg_core patterns)
  • Track 1 can serve as an interim implementation for operations where Track 2 isn't ready yet

Operations Unblocked

  • Equality delete read resolution
  • Streaming CoW delete/overwrite
  • Table compaction (sort + rewrite)
  • Orphan file deletion
  • Upsert via hash join
  • Equality-to-positional conversion
  • Position delete compaction
  • Full MoR compaction
  • Z-Order / Hilbert sorting
  • DV compaction
  • Incremental compaction
  • Sort-order enforcement on write
  • Dynamic partition overwrite (bounded memory)

Related Issues

PyIceberg

iceberg-rust

datafusion-python

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions