Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/bigframes/bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ class ComputeOptions:
int | None: Number of rows, if set.
"""

enable_peek_cache: bool = False
"""
If enabled, peeking at a relation will pull a larger local sample (e.g. 10k rows)
and cache it locally. Subsequent compatible operations on the relation will run
locally on the cached sample, enabling fast interactive iteration.
"""

peek_cache_size: int = 10000
"""
The size of the local sample to pull and cache when peeking at a relation.
Defaults to 10000.
"""

semantic_ops_confirmation_threshold: Optional[int] = 0
"""
Deprecated.
Expand Down
52 changes: 52 additions & 0 deletions packages/bigframes/bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def __init__(
labels=dict(labels),
)
self._function_manager = function_manager
from bigframes.session.peek_cache import PeekCache
self._peek_cache = PeekCache()

def to_sql(
self,
Expand Down Expand Up @@ -209,6 +211,56 @@ async def _execute_async(
execution_spec: ex_spec.ExecutionSpec,
) -> executor.ExecuteResult:
await self._publisher.publish_async(bigframes.core.events.ExecutionStarted())

enable_peek_cache = (
execution_spec.bigquery_config.enable_peek_cache
if execution_spec.bigquery_config
else False
)

if execution_spec.peek is not None and enable_peek_cache:
from bigframes.session.peek_cache import substitute_peek_cached_subplans
rewritten_node = substitute_peek_cached_subplans(array_value.node, self._peek_cache)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass min_rows_required=execution_spec.peek to substitute_peek_cached_subplans to ensure we only use the cached sample if it contains enough rows to satisfy the current peek request.

            rewritten_node = substitute_peek_cached_subplans(
                array_value.node, self._peek_cache, min_rows_required=execution_spec.peek
            )

if rewritten_node != array_value.node:
rewritten_array_value = bigframes.core.ArrayValue(rewritten_node)
maybe_result = await self._try_execute_semi_executors(
rewritten_array_value, execution_spec
)
if maybe_result is not None:
return maybe_result

sample_size = (
execution_spec.bigquery_config.peek_cache_size
if execution_spec.bigquery_config
else 10000
)
actual_sample_size = max(execution_spec.peek, sample_size)
cache_execution_spec = dataclasses.replace(execution_spec, peek=actual_sample_size)

bq_result = await self._execute_bigquery(
array_value,
cache_execution_spec,
)

arrow_table = await asyncio.to_thread(bq_result.batches().to_arrow_table)
managed_table = local_data.ManagedArrowTable.from_pyarrow(arrow_table, bq_result.schema)
self._peek_cache.put(array_value.node, managed_table)

sliced_table = arrow_table.slice(0, execution_spec.peek)
result: executor.ExecuteResult = executor.LocalExecuteResult(
sliced_table,
bq_result.schema,
execution_metadata=bq_result.execution_metadata,
)

await self._publisher.publish_async(
bigframes.core.events.EventEnvelope(
event=bigframes.core.events.ExecutionFinished(result=result),
cell_execution_count=execution_spec.cell_execution_count,
)
)
return result

maybe_result = await self._try_execute_semi_executors(
array_value, execution_spec
)
Expand Down
4 changes: 4 additions & 0 deletions packages/bigframes/bigframes/session/execution_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ class BqComputeOptions:
enable_multi_query_execution: bool = True
maximum_bytes_billed: Optional[int] = None
extra_query_labels: tuple[tuple[str, str], ...] = ()
enable_peek_cache: bool = False
peek_cache_size: int = 10000

@classmethod
def from_compute_options(cls, compute_options: ComputeOptions) -> BqComputeOptions:
return cls(
enable_multi_query_execution=compute_options.enable_multi_query_execution,
maximum_bytes_billed=compute_options.maximum_bytes_billed,
extra_query_labels=tuple(compute_options.extra_query_labels.items()),
enable_peek_cache=compute_options.enable_peek_cache,
peek_cache_size=compute_options.peek_cache_size,
)

def push_labels(self, labels: Mapping[str, str]) -> BqComputeOptions:
Expand Down
96 changes: 96 additions & 0 deletions packages/bigframes/bigframes/session/peek_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections import OrderedDict
import threading
from typing import Optional

from bigframes.core import local_data, nodes


class PeekCache:
"""
Thread-safe LRU cache for storing local samples of query relations.
This enables fast iteration on subsequent compatible operations.
"""

def __init__(self, capacity: int = 100):
self.capacity = capacity
self._cache: OrderedDict[nodes.BigFrameNode, local_data.ManagedArrowTable] = OrderedDict()
self._lock = threading.Lock()

def get(self, key: nodes.BigFrameNode) -> Optional[local_data.ManagedArrowTable]:
with self._lock:
if key not in self._cache:
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
return self._cache[key]

def put(self, key: nodes.BigFrameNode, value: local_data.ManagedArrowTable) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self.capacity:
self._cache.popitem(last=False)

def clear(self) -> None:
with self._lock:
self._cache.clear()


def substitute_peek_cached_subplans(
root: nodes.BigFrameNode,
peek_cache: PeekCache,
) -> nodes.BigFrameNode:
"""
Recursively replaces subplans in the tree that have a cached local sample
in the peek cache with a ReadLocalNode, provided that all ancestors
of the subplan are compatible with running on a sample.
"""
# Intermediate nodes that preserve the semantic validity of a sample.
# WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
# because evaluating them on a sample breaks semantic contracts.
_COMPATIBLE_ANCESTOR_CLASSES = (
nodes.SelectionNode,
nodes.ProjectionNode,
nodes.FilterNode,
nodes.PromoteOffsetsNode,
)

def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
if ancestors_compatible:
cached_sample = peek_cache.get(node)
if cached_sample is not None:
# Replace the node with a ReadLocalNode containing the cached sample
scan_list = nodes.ScanList(
tuple(nodes.ScanItem(field.id, field.id.sql) for field in node.fields)
)
session = node.session if node.session is not None else root.session
return nodes.ReadLocalNode(
local_data_source=cached_sample,
scan_list=scan_list,
session=session,
)

# If we didn't replace, recursively transform children
is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
next_ancestors_compatible = ancestors_compatible and is_current_compatible

return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))

return traverse(root, True)
Comment on lines +56 to +96

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues in substitute_peek_cached_subplans:

  1. Correctness / Truncation Bug: If a user requests a larger peek size (e.g., df.head(20000)) than what is currently cached (e.g., 10000 rows), the function will still substitute the cached subplan and return the truncated 10000-row sample. We should only use the cached sample if it contains at least the requested number of rows (min_rows_required).
  2. SQL Escaping Bug: Using field.id.sql as the source_id for ScanItem will return the SQL-escaped representation of the column name (e.g., with backticks or double quotes). This will cause a runtime KeyError or column mismatch when reading from the local Arrow table, which contains raw, unescaped column names. Use field.id.name instead to get the raw column name.
def substitute_peek_cached_subplans(
    root: nodes.BigFrameNode,
    peek_cache: PeekCache,
    min_rows_required: int,
) -> nodes.BigFrameNode:
    """
    Recursively replaces subplans in the tree that have a cached local sample
    in the peek cache with a ReadLocalNode, provided that all ancestors
    of the subplan are compatible with running on a sample, and the cached
    sample contains at least the required number of rows.
    """
    # Intermediate nodes that preserve the semantic validity of a sample.
    # WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
    # because evaluating them on a sample breaks semantic contracts.
    _COMPATIBLE_ANCESTOR_CLASSES = (
        nodes.SelectionNode,
        nodes.ProjectionNode,
        nodes.FilterNode,
        nodes.PromoteOffsetsNode,
    )

    def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
        if ancestors_compatible:
            cached_sample = peek_cache.get(node)
            if cached_sample is not None and cached_sample.num_rows >= min_rows_required:
                # Replace the node with a ReadLocalNode containing the cached sample
                scan_list = nodes.ScanList(
                    tuple(nodes.ScanItem(field.id, field.id.name) for field in node.fields)
                )
                session = node.session if node.session is not None else root.session
                return nodes.ReadLocalNode(
                    local_data_source=cached_sample,
                    scan_list=scan_list,
                    session=session,
                )

        # If we didn't replace, recursively transform children
        is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
        next_ancestors_compatible = ancestors_compatible and is_current_compatible

        return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))

    return traverse(root, True)

Loading
Loading