Skip to content

feat(bigframes): Add productionize mode to bigframes#17595

Draft
TrevorBergeron wants to merge 1 commit into
mainfrom
tbergeron_production_mode
Draft

feat(bigframes): Add productionize mode to bigframes#17595
TrevorBergeron wants to merge 1 commit into
mainfrom
tbergeron_production_mode

Conversation

@TrevorBergeron

Copy link
Copy Markdown
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@TrevorBergeron TrevorBergeron added the do not merge Indicates a pull request not ready for merge, due to either quality or timing. label Jun 30, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new 'productionize' mode to BigQuery DataFrames, allowing users to compile their lazy data pipelines into sequential SQL scripts or export them as Dataform projects. It adds the productionize context manager, query parameters, and execution interception. The review feedback highlights several critical areas for improvement, including respecting the if_exists configuration during SQL generation, safely parsing table names with domain-scoped project IDs, ensuring parameters are registered when using the SQLGlot compiler, and replacing regex-based SQL modifications with a safer AST-based IR representation.

Comment on lines +161 to +166
# 2. Add table creation statements
for table_name in order:
array_value, _ = self.recorded_writes[table_name]
# Compile to SQL using the session's executor
raw_sql = self.session._executor.to_sql(array_value, ordered=False)
statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The compiled SQL script currently always uses CREATE OR REPLACE TABLE regardless of the if_exists configuration (e.g., "append" or "fail"). This can lead to silent data loss or incorrect execution behavior. We should respect the if_exists configuration of the recorded write.

Suggested change
# 2. Add table creation statements
for table_name in order:
array_value, _ = self.recorded_writes[table_name]
# Compile to SQL using the session's executor
raw_sql = self.session._executor.to_sql(array_value, ordered=False)
statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};")
# 2. Add table creation statements
for table_name in order:
array_value, dest_spec = self.recorded_writes[table_name]
# Compile to SQL using the session's executor
raw_sql = self.session._executor.to_sql(array_value, ordered=False)
if dest_spec.if_exists == "replace":
statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};")
elif dest_spec.if_exists == "append":
statements.append(f"INSERT INTO `{table_name}`\n{raw_sql};")
else:
statements.append(f"CREATE TABLE `{table_name}` AS\n{raw_sql};")

Comment on lines +207 to +215
parts = table_name.split(".")
if len(parts) == 3:
project_id, dataset_id, table_id = parts[0], parts[1], parts[2]
elif len(parts) == 2:
project_id = self.session.bqclient.project
dataset_id, table_id = parts[0], parts[1]
else:
raise ValueError(f"Invalid recorded table name: {table_name}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Splitting the table name by . and expecting exactly 3 parts will fail for domain-scoped Google Cloud projects (e.g., example.com:my-project.dataset.table), which contain a dot in the project ID. We should split from the right to safely extract the table ID and dataset ID.

Suggested change
parts = table_name.split(".")
if len(parts) == 3:
project_id, dataset_id, table_id = parts[0], parts[1], parts[2]
elif len(parts) == 2:
project_id = self.session.bqclient.project
dataset_id, table_id = parts[0], parts[1]
else:
raise ValueError(f"Invalid recorded table name: {table_name}")
# Parse the table name
parts = table_name.split(".")
if len(parts) >= 3:
table_id = parts[-1]
dataset_id = parts[-2]
project_id = ".".join(parts[:-2])
elif len(parts) == 2:
project_id = self.session.bqclient.project
dataset_id, table_id = parts[0], parts[1]
else:
raise ValueError(f"Invalid recorded table name: {table_name}")

Comment on lines +232 to +238
for dep_table in dependencies[table_name]:
dep_parts = dep_table.split(".")
dep_project, dep_dataset, dep_table_id = (
dep_parts[0],
dep_parts[1],
dep_parts[2],
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Unpacking dep_table.split(".") into exactly 3 variables will fail for domain-scoped project IDs. We should split from the right to safely extract the table ID and dataset ID.

Suggested change
for dep_table in dependencies[table_name]:
dep_parts = dep_table.split(".")
dep_project, dep_dataset, dep_table_id = (
dep_parts[0],
dep_parts[1],
dep_parts[2],
)
for dep_table in dependencies[table_name]:
dep_parts = dep_table.split(".")
if len(dep_parts) >= 3:
dep_table_id = dep_parts[-1]
dep_dataset = dep_parts[-2]
dep_project = ".".join(dep_parts[:-2])
elif len(dep_parts) == 2:
dep_project = self.session.bqclient.project
dep_dataset, dep_table_id = dep_parts[0], dep_parts[1]
else:
raise ValueError(f"Invalid dependent table name: {dep_table}")

Comment on lines +82 to +84
@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
return sge.Parameter(this=sge.to_identifier(expr.name))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the SQLGlot compiler, ParameterExpression is compiled directly without recording the parameter name in recorded_params. This means export_dataform won't know about the parameters when SQLGlot is used as the compiler, leading to unreplaced parameters in the exported Dataform files. We should record the parameter name when productionize mode is active.

Suggested change
@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
return sge.Parameter(this=sge.to_identifier(expr.name))
@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
from bigframes.session.productionize import _state as prod_state
if prod_state.active and prod_state.pipeline is not None:
prod_state.pipeline.recorded_params[expr.name] = expr.name
return sge.Parameter(this=sge.to_identifier(expr.name))

Comment on lines +220 to +228
for ibis_name, user_name in self.recorded_params.items():
sql = re.sub(
rf"(?<!\w)@{ibis_name}(?!\w)",
f"${{dataform.projectConfig.vars.{user_name}}}",
sql,
)

# Rewrite query parameters to Dataform project variables (e.g. from SQLGlot)
sql = _replace_parameter_refs(sql)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instead of using regex to replace parameter references in the SQL string, which is error-prone and can corrupt string literals or comments, we should use sqlglot_ir.SQLGlotIR.from_unparsed_query to wrap the SQL. This allows for safe subsequent modifications and avoids parsing issues across different compilers.

            from bigframes.core.compile.sqlglot_ir import SQLGlotIR
            ir = SQLGlotIR.from_unparsed_query(sql)
            # Now safely modify the query using the IR representation
References
  1. The function sqlglot_ir.SQLGlotIR.from_unparsed_query can be used to wrap SQL from different compilers (e.g., Ibis, sqlglot) without parsing issues, allowing for safe subsequent modifications.

Comment on lines +374 to +378
def _replace_parameter_refs(sql: str) -> str:
"""Replaces BigQuery query parameters (@param) with Dataform project variables (${dataform.projectConfig.vars.param})."""
return re.sub(
r"(?<!\w)@([a-zA-Z_][a-zA-Z0-9_]*)", r"${dataform.projectConfig.vars.\1}", sql
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The helper function _replace_parameter_refs is no longer needed if we use precise parameter replacement, and removing it eliminates the risk of generic regex replacement bugs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do not merge Indicates a pull request not ready for merge, due to either quality or timing.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant