feat(bigframes): Experimental local sample cache#17587
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a local peek caching mechanism (PeekCache) to cache query relation samples, enabling faster interactive iteration on subsequent compatible operations. The review feedback highlights two critical issues in the subplan substitution logic: a correctness bug where a cached sample could be used even if the requested peek size exceeds the cached size, and a SQL escaping bug where using field.id.sql instead of field.id.name causes column mismatches with the local Arrow table. The feedback provides actionable code suggestions to enforce a minimum row requirement and use the raw column names, along with the necessary updates to the executor and unit tests.
| def substitute_peek_cached_subplans( | ||
| root: nodes.BigFrameNode, | ||
| peek_cache: PeekCache, | ||
| ) -> nodes.BigFrameNode: | ||
| """ | ||
| Recursively replaces subplans in the tree that have a cached local sample | ||
| in the peek cache with a ReadLocalNode, provided that all ancestors | ||
| of the subplan are compatible with running on a sample. | ||
| """ | ||
| # Intermediate nodes that preserve the semantic validity of a sample. | ||
| # WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded | ||
| # because evaluating them on a sample breaks semantic contracts. | ||
| _COMPATIBLE_ANCESTOR_CLASSES = ( | ||
| nodes.SelectionNode, | ||
| nodes.ProjectionNode, | ||
| nodes.FilterNode, | ||
| nodes.PromoteOffsetsNode, | ||
| ) | ||
|
|
||
| def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode: | ||
| if ancestors_compatible: | ||
| cached_sample = peek_cache.get(node) | ||
| if cached_sample is not None: | ||
| # Replace the node with a ReadLocalNode containing the cached sample | ||
| scan_list = nodes.ScanList( | ||
| tuple(nodes.ScanItem(field.id, field.id.sql) for field in node.fields) | ||
| ) | ||
| session = node.session if node.session is not None else root.session | ||
| return nodes.ReadLocalNode( | ||
| local_data_source=cached_sample, | ||
| scan_list=scan_list, | ||
| session=session, | ||
| ) | ||
|
|
||
| # If we didn't replace, recursively transform children | ||
| is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES) | ||
| next_ancestors_compatible = ancestors_compatible and is_current_compatible | ||
|
|
||
| return node.transform_children(lambda child: traverse(child, next_ancestors_compatible)) | ||
|
|
||
| return traverse(root, True) |
There was a problem hiding this comment.
There are two issues in substitute_peek_cached_subplans:
- Correctness / Truncation Bug: If a user requests a larger peek size (e.g.,
df.head(20000)) than what is currently cached (e.g., 10000 rows), the function will still substitute the cached subplan and return the truncated 10000-row sample. We should only use the cached sample if it contains at least the requested number of rows (min_rows_required). - SQL Escaping Bug: Using
field.id.sqlas thesource_idforScanItemwill return the SQL-escaped representation of the column name (e.g., with backticks or double quotes). This will cause a runtimeKeyErroror column mismatch when reading from the local Arrow table, which contains raw, unescaped column names. Usefield.id.nameinstead to get the raw column name.
def substitute_peek_cached_subplans(
root: nodes.BigFrameNode,
peek_cache: PeekCache,
min_rows_required: int,
) -> nodes.BigFrameNode:
"""
Recursively replaces subplans in the tree that have a cached local sample
in the peek cache with a ReadLocalNode, provided that all ancestors
of the subplan are compatible with running on a sample, and the cached
sample contains at least the required number of rows.
"""
# Intermediate nodes that preserve the semantic validity of a sample.
# WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
# because evaluating them on a sample breaks semantic contracts.
_COMPATIBLE_ANCESTOR_CLASSES = (
nodes.SelectionNode,
nodes.ProjectionNode,
nodes.FilterNode,
nodes.PromoteOffsetsNode,
)
def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
if ancestors_compatible:
cached_sample = peek_cache.get(node)
if cached_sample is not None and cached_sample.num_rows >= min_rows_required:
# Replace the node with a ReadLocalNode containing the cached sample
scan_list = nodes.ScanList(
tuple(nodes.ScanItem(field.id, field.id.name) for field in node.fields)
)
session = node.session if node.session is not None else root.session
return nodes.ReadLocalNode(
local_data_source=cached_sample,
scan_list=scan_list,
session=session,
)
# If we didn't replace, recursively transform children
is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
next_ancestors_compatible = ancestors_compatible and is_current_compatible
return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))
return traverse(root, True)|
|
||
| if execution_spec.peek is not None and enable_peek_cache: | ||
| from bigframes.session.peek_cache import substitute_peek_cached_subplans | ||
| rewritten_node = substitute_peek_cached_subplans(array_value.node, self._peek_cache) |
There was a problem hiding this comment.
Pass min_rows_required=execution_spec.peek to substitute_peek_cached_subplans to ensure we only use the cached sample if it contains enough rows to satisfy the current peek request.
rewritten_node = substitute_peek_cached_subplans(
array_value.node, self._peek_cache, min_rows_required=execution_spec.peek
)| cache.put(leaf, cached_ds) | ||
|
|
||
| # Now perform the tree substitution | ||
| rewritten = substitute_peek_cached_subplans(leaf, cache) |
There was a problem hiding this comment.
| predicate=bigframes.core.expression.ScalarConstantExpression(True), # Dummy expression | ||
| ) | ||
|
|
||
| rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache) |
There was a problem hiding this comment.
Update the test to pass min_rows_required=1 to match the updated signature of substitute_peek_cached_subplans.
| rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache) | |
| rewritten_compatible = substitute_peek_cached_subplans(plan_compatible, cache, min_rows_required=1) |
| # ReversedNode is an incompatible ancestor. | ||
| plan_incompatible = nodes.ReversedNode(child=leaf) | ||
|
|
||
| rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache) |
There was a problem hiding this comment.
Update the test to pass min_rows_required=1 to match the updated signature of substitute_peek_cached_subplans.
| rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache) | |
| rewritten_incompatible = substitute_peek_cached_subplans(plan_incompatible, cache, min_rows_required=1) |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕