feat(bigframes): Add productionize mode to bigframes#17595
feat(bigframes): Add productionize mode to bigframes#17595TrevorBergeron wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new 'productionize' mode to BigQuery DataFrames, allowing users to compile their lazy data pipelines into sequential SQL scripts or export them as Dataform projects. It adds the productionize context manager, query parameters, and execution interception. The review feedback highlights several critical areas for improvement, including respecting the if_exists configuration during SQL generation, safely parsing table names with domain-scoped project IDs, ensuring parameters are registered when using the SQLGlot compiler, and replacing regex-based SQL modifications with a safer AST-based IR representation.
| # 2. Add table creation statements | ||
| for table_name in order: | ||
| array_value, _ = self.recorded_writes[table_name] | ||
| # Compile to SQL using the session's executor | ||
| raw_sql = self.session._executor.to_sql(array_value, ordered=False) | ||
| statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};") |
There was a problem hiding this comment.
The compiled SQL script currently always uses CREATE OR REPLACE TABLE regardless of the if_exists configuration (e.g., "append" or "fail"). This can lead to silent data loss or incorrect execution behavior. We should respect the if_exists configuration of the recorded write.
| # 2. Add table creation statements | |
| for table_name in order: | |
| array_value, _ = self.recorded_writes[table_name] | |
| # Compile to SQL using the session's executor | |
| raw_sql = self.session._executor.to_sql(array_value, ordered=False) | |
| statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};") | |
| # 2. Add table creation statements | |
| for table_name in order: | |
| array_value, dest_spec = self.recorded_writes[table_name] | |
| # Compile to SQL using the session's executor | |
| raw_sql = self.session._executor.to_sql(array_value, ordered=False) | |
| if dest_spec.if_exists == "replace": | |
| statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};") | |
| elif dest_spec.if_exists == "append": | |
| statements.append(f"INSERT INTO `{table_name}`\n{raw_sql};") | |
| else: | |
| statements.append(f"CREATE TABLE `{table_name}` AS\n{raw_sql};") |
| parts = table_name.split(".") | ||
| if len(parts) == 3: | ||
| project_id, dataset_id, table_id = parts[0], parts[1], parts[2] | ||
| elif len(parts) == 2: | ||
| project_id = self.session.bqclient.project | ||
| dataset_id, table_id = parts[0], parts[1] | ||
| else: | ||
| raise ValueError(f"Invalid recorded table name: {table_name}") | ||
|
|
There was a problem hiding this comment.
Splitting the table name by . and expecting exactly 3 parts will fail for domain-scoped Google Cloud projects (e.g., example.com:my-project.dataset.table), which contain a dot in the project ID. We should split from the right to safely extract the table ID and dataset ID.
| parts = table_name.split(".") | |
| if len(parts) == 3: | |
| project_id, dataset_id, table_id = parts[0], parts[1], parts[2] | |
| elif len(parts) == 2: | |
| project_id = self.session.bqclient.project | |
| dataset_id, table_id = parts[0], parts[1] | |
| else: | |
| raise ValueError(f"Invalid recorded table name: {table_name}") | |
| # Parse the table name | |
| parts = table_name.split(".") | |
| if len(parts) >= 3: | |
| table_id = parts[-1] | |
| dataset_id = parts[-2] | |
| project_id = ".".join(parts[:-2]) | |
| elif len(parts) == 2: | |
| project_id = self.session.bqclient.project | |
| dataset_id, table_id = parts[0], parts[1] | |
| else: | |
| raise ValueError(f"Invalid recorded table name: {table_name}") |
| for dep_table in dependencies[table_name]: | ||
| dep_parts = dep_table.split(".") | ||
| dep_project, dep_dataset, dep_table_id = ( | ||
| dep_parts[0], | ||
| dep_parts[1], | ||
| dep_parts[2], | ||
| ) |
There was a problem hiding this comment.
Unpacking dep_table.split(".") into exactly 3 variables will fail for domain-scoped project IDs. We should split from the right to safely extract the table ID and dataset ID.
| for dep_table in dependencies[table_name]: | |
| dep_parts = dep_table.split(".") | |
| dep_project, dep_dataset, dep_table_id = ( | |
| dep_parts[0], | |
| dep_parts[1], | |
| dep_parts[2], | |
| ) | |
| for dep_table in dependencies[table_name]: | |
| dep_parts = dep_table.split(".") | |
| if len(dep_parts) >= 3: | |
| dep_table_id = dep_parts[-1] | |
| dep_dataset = dep_parts[-2] | |
| dep_project = ".".join(dep_parts[:-2]) | |
| elif len(dep_parts) == 2: | |
| dep_project = self.session.bqclient.project | |
| dep_dataset, dep_table_id = dep_parts[0], dep_parts[1] | |
| else: | |
| raise ValueError(f"Invalid dependent table name: {dep_table}") |
| @compile_expression.register | ||
| def _(self, expr: ex.ParameterExpression) -> sge.Expression: | ||
| return sge.Parameter(this=sge.to_identifier(expr.name)) |
There was a problem hiding this comment.
In the SQLGlot compiler, ParameterExpression is compiled directly without recording the parameter name in recorded_params. This means export_dataform won't know about the parameters when SQLGlot is used as the compiler, leading to unreplaced parameters in the exported Dataform files. We should record the parameter name when productionize mode is active.
| @compile_expression.register | |
| def _(self, expr: ex.ParameterExpression) -> sge.Expression: | |
| return sge.Parameter(this=sge.to_identifier(expr.name)) | |
| @compile_expression.register | |
| def _(self, expr: ex.ParameterExpression) -> sge.Expression: | |
| from bigframes.session.productionize import _state as prod_state | |
| if prod_state.active and prod_state.pipeline is not None: | |
| prod_state.pipeline.recorded_params[expr.name] = expr.name | |
| return sge.Parameter(this=sge.to_identifier(expr.name)) |
| for ibis_name, user_name in self.recorded_params.items(): | ||
| sql = re.sub( | ||
| rf"(?<!\w)@{ibis_name}(?!\w)", | ||
| f"${{dataform.projectConfig.vars.{user_name}}}", | ||
| sql, | ||
| ) | ||
|
|
||
| # Rewrite query parameters to Dataform project variables (e.g. from SQLGlot) | ||
| sql = _replace_parameter_refs(sql) |
There was a problem hiding this comment.
Instead of using regex to replace parameter references in the SQL string, which is error-prone and can corrupt string literals or comments, we should use sqlglot_ir.SQLGlotIR.from_unparsed_query to wrap the SQL. This allows for safe subsequent modifications and avoids parsing issues across different compilers.
from bigframes.core.compile.sqlglot_ir import SQLGlotIR
ir = SQLGlotIR.from_unparsed_query(sql)
# Now safely modify the query using the IR representationReferences
- The function
sqlglot_ir.SQLGlotIR.from_unparsed_querycan be used to wrap SQL from different compilers (e.g., Ibis, sqlglot) without parsing issues, allowing for safe subsequent modifications.
| def _replace_parameter_refs(sql: str) -> str: | ||
| """Replaces BigQuery query parameters (@param) with Dataform project variables (${dataform.projectConfig.vars.param}).""" | ||
| return re.sub( | ||
| r"(?<!\w)@([a-zA-Z_][a-zA-Z0-9_]*)", r"${dataform.projectConfig.vars.\1}", sql | ||
| ) |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕