Skip to content

feat(bigframes): Experimental local sample cache#17587

Draft
TrevorBergeron wants to merge 1 commit into
mainfrom
tbergeron_peek_cache
Draft

feat(bigframes): Experimental local sample cache#17587
TrevorBergeron wants to merge 1 commit into
mainfrom
tbergeron_peek_cache

Conversation

@TrevorBergeron

Copy link
Copy Markdown
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a local peek caching mechanism (PeekCache) to cache query relation samples, enabling faster interactive iteration on subsequent compatible operations. The review feedback highlights two critical issues in the subplan substitution logic: a correctness bug where a cached sample could be used even if the requested peek size exceeds the cached size, and a SQL escaping bug where using field.id.sql instead of field.id.name causes column mismatches with the local Arrow table. The feedback provides actionable code suggestions to enforce a minimum row requirement and use the raw column names, along with the necessary updates to the executor and unit tests.

Comment on lines +56 to +96
def substitute_peek_cached_subplans(
root: nodes.BigFrameNode,
peek_cache: PeekCache,
) -> nodes.BigFrameNode:
"""
Recursively replaces subplans in the tree that have a cached local sample
in the peek cache with a ReadLocalNode, provided that all ancestors
of the subplan are compatible with running on a sample.
"""
# Intermediate nodes that preserve the semantic validity of a sample.
# WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
# because evaluating them on a sample breaks semantic contracts.
_COMPATIBLE_ANCESTOR_CLASSES = (
nodes.SelectionNode,
nodes.ProjectionNode,
nodes.FilterNode,
nodes.PromoteOffsetsNode,
)

def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
if ancestors_compatible:
cached_sample = peek_cache.get(node)
if cached_sample is not None:
# Replace the node with a ReadLocalNode containing the cached sample
scan_list = nodes.ScanList(
tuple(nodes.ScanItem(field.id, field.id.sql) for field in node.fields)
)
session = node.session if node.session is not None else root.session
return nodes.ReadLocalNode(
local_data_source=cached_sample,
scan_list=scan_list,
session=session,
)

# If we didn't replace, recursively transform children
is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
next_ancestors_compatible = ancestors_compatible and is_current_compatible

return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))

return traverse(root, True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues in substitute_peek_cached_subplans:

  1. Correctness / Truncation Bug: If a user requests a larger peek size (e.g., df.head(20000)) than what is currently cached (e.g., 10000 rows), the function will still substitute the cached subplan and return the truncated 10000-row sample. We should only use the cached sample if it contains at least the requested number of rows (min_rows_required).
  2. SQL Escaping Bug: Using field.id.sql as the source_id for ScanItem will return the SQL-escaped representation of the column name (e.g., with backticks or double quotes). This will cause a runtime KeyError or column mismatch when reading from the local Arrow table, which contains raw, unescaped column names. Use field.id.name instead to get the raw column name.
def substitute_peek_cached_subplans(
    root: nodes.BigFrameNode,
    peek_cache: PeekCache,
    min_rows_required: int,
) -> nodes.BigFrameNode:
    """
    Recursively replaces subplans in the tree that have a cached local sample
    in the peek cache with a ReadLocalNode, provided that all ancestors
    of the subplan are compatible with running on a sample, and the cached
    sample contains at least the required number of rows.
    """
    # Intermediate nodes that preserve the semantic validity of a sample.
    # WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
    # because evaluating them on a sample breaks semantic contracts.
    _COMPATIBLE_ANCESTOR_CLASSES = (
        nodes.SelectionNode,
        nodes.ProjectionNode,
        nodes.FilterNode,
        nodes.PromoteOffsetsNode,
    )

    def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
        if ancestors_compatible:
            cached_sample = peek_cache.get(node)
            if cached_sample is not None and cached_sample.num_rows >= min_rows_required:
                # Replace the node with a ReadLocalNode containing the cached sample
                scan_list = nodes.ScanList(
                    tuple(nodes.ScanItem(field.id, field.id.name) for field in node.fields)
                )
                session = node.session if node.session is not None else root.session
                return nodes.ReadLocalNode(
                    local_data_source=cached_sample,
                    scan_list=scan_list,
                    session=session,
                )

        # If we didn't replace, recursively transform children
        is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
        next_ancestors_compatible = ancestors_compatible and is_current_compatible

        return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))

    return traverse(root, True)


if execution_spec.peek is not None and enable_peek_cache:
from bigframes.session.peek_cache import substitute_peek_cached_subplans
rewritten_node = substitute_peek_cached_subplans(array_value.node, self._peek_cache)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass min_rows_required=execution_spec.peek to substitute_peek_cached_subplans to ensure we only use the cached sample if it contains enough rows to satisfy the current peek request.

            rewritten_node = substitute_peek_cached_subplans(
                array_value.node, self._peek_cache, min_rows_required=execution_spec.peek
            )

cache.put(leaf, cached_ds)

# Now perform the tree substitution
rewritten = substitute_peek_cached_subplans(leaf, cache)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the test to pass min_rows_required=1 to match the updated signature of substitute_peek_cached_subplans.

Suggested change
rewritten = substitute_peek_cached_subplans(leaf, cache)
rewritten = substitute_peek_cached_subplans(leaf, cache, min_rows_required=1)

predicate=bigframes.core.expression.ScalarConstantExpression(True), # Dummy expression
)

rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the test to pass min_rows_required=1 to match the updated signature of substitute_peek_cached_subplans.

Suggested change
rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache)
rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache, min_rows_required=1)

# ReversedNode is an incompatible ancestor.
plan_incompatible = nodes.ReversedNode(child=leaf)

rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the test to pass min_rows_required=1 to match the updated signature of substitute_peek_cached_subplans.

Suggested change
rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache)
rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache, min_rows_required=1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant