diff --git a/bigframes/core/compile/__init__.py b/bigframes/core/compile/__init__.py index 15d2d0e52c..c1b9c5d902 100644 --- a/bigframes/core/compile/__init__.py +++ b/bigframes/core/compile/__init__.py @@ -13,28 +13,30 @@ # limitations under the License. from __future__ import annotations -from typing import Any +from typing import Literal -from bigframes import options from bigframes.core.compile.api import test_only_ibis_inferred_schema from bigframes.core.compile.configs import CompileRequest, CompileResult -def compiler() -> Any: - """Returns the appropriate compiler module based on session options.""" - if options.experiments.sql_compiler == "experimental": +def compile_sql( + request: CompileRequest, + compiler_name: Literal["sqlglot", "ibis"] = "sqlglot", +) -> CompileResult: + """Compiles a BigFrameNode according to the request into SQL.""" + if compiler_name == "sqlglot": import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler - return sqlglot_compiler + return sqlglot_compiler.compile_sql(request) else: import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler - return ibis_compiler + return ibis_compiler.compile_sql(request) __all__ = [ "test_only_ibis_inferred_schema", "CompileRequest", "CompileResult", - "compiler", + "compile_sql", ] diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 1e240a841c..fd617d9332 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -17,6 +17,8 @@ import math import threading from typing import Literal, Mapping, Optional, Sequence, Tuple +import uuid +import warnings import weakref import google.api_core.exceptions @@ -24,6 +26,7 @@ import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 +import google.cloud.exceptions import bigframes from bigframes import exceptions as bfe @@ -160,6 +163,43 @@ def __init__( ) self._upload_lock = threading.Lock() + def _compile( + self, + node: nodes.BigFrameNode, + *, + ordered: bool = False, + peek: Optional[int] = None, + materialize_all_order_keys: bool = False, + compiler_name: Literal["sqlglot", "ibis"] = "sqlglot", + ) -> compile.CompileResult: + return compile.compile_sql( + compile.CompileRequest( + node, + sort_rows=ordered, + peek_count=peek, + materialize_all_order_keys=materialize_all_order_keys, + ), + compiler_name=compiler_name, + ) + + def _with_fallback(self, run_fn): + compiler_option = bigframes.options.experiments.sql_compiler + if compiler_option == "legacy": + return run_fn("ibis") + elif compiler_option == "experimental": + return run_fn("sqlglot") + else: # stable + compiler_id = f"{uuid.uuid1().hex[:12]}" + try: + return run_fn("sqlglot", compiler_id=compiler_id) + except google.cloud.exceptions.BadRequest as e: + msg = bfe.format_message( + f"Compiler ID {compiler_id}: BadRequest on sqlglot. " + f"Falling back to ibis. Details: {e.message}" + ) + warnings.warn(msg, category=UserWarning) + return run_fn("ibis", compiler_id=compiler_id) + def to_sql( self, array_value: bigframes.core.ArrayValue, @@ -175,9 +215,7 @@ def to_sql( else array_value.node ) node = self._substitute_large_local_sources(node) - compiled = compile.compiler().compile_sql( - compile.CompileRequest(node, sort_rows=ordered) - ) + compiled = self._compile(node, ordered=ordered) return compiled.sql def execute( @@ -293,46 +331,56 @@ def _export_gbq( # validate destination table existing_table = self._maybe_find_existing_table(spec) - compiled = compile.compiler().compile_sql( - compile.CompileRequest(plan, sort_rows=False) - ) - sql = compiled.sql + def run_with_compiler(compiler_name, compiler_id=None): + compiled = self._compile(plan, ordered=False, compiler_name=compiler_name) + sql = compiled.sql - if (existing_table is not None) and _if_schema_match( - existing_table.schema, array_value.schema - ): - # b/409086472: Uses DML for table appends and replacements to avoid - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: - # https://cloud.google.com/bigquery/quotas#standard_tables - job_config = bigquery.QueryJobConfig() + if (existing_table is not None) and _if_schema_match( + existing_table.schema, array_value.schema + ): + # b/409086472: Uses DML for table appends and replacements to avoid + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: + # https://cloud.google.com/bigquery/quotas#standard_tables + job_config = bigquery.QueryJobConfig() + + ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) + if spec.if_exists == "append": + sql = sg_sql.to_sql( + sg_sql.insert(ir.expr.as_select_all(), spec.table) + ) + else: # for "replace" + assert spec.if_exists == "replace" + sql = sg_sql.to_sql( + sg_sql.replace(ir.expr.as_select_all(), spec.table) + ) + else: + dispositions = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, + "append": bigquery.WriteDisposition.WRITE_APPEND, + } + job_config = bigquery.QueryJobConfig( + write_disposition=dispositions[spec.if_exists], + destination=spec.table, + clustering_fields=spec.cluster_cols if spec.cluster_cols else None, + ) - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) - if spec.if_exists == "append": - sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) - else: # for "replace" - assert spec.if_exists == "replace" - sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - else: - dispositions = { - "fail": bigquery.WriteDisposition.WRITE_EMPTY, - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, - "append": bigquery.WriteDisposition.WRITE_APPEND, - } - job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[spec.if_exists], - destination=spec.table, - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + job_config.labels["bigframes-compiler"] = ( + f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name ) - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - iterator, job = self._run_execute_query( - sql=sql, - job_config=job_config, - session=array_value.session, - ) + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + iterator, job = self._run_execute_query( + sql=sql, + job_config=job_config, + session=array_value.session, + ) + return iterator, job + + iterator, job = self._with_fallback(run_with_compiler) has_timedelta_col = any( t == bigframes.dtypes.TIMEDELTA_DTYPE for t in array_value.schema.dtypes @@ -648,34 +696,44 @@ def _execute_plan_gbq( ] cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] - compiled = compile.compiler().compile_sql( - compile.CompileRequest( + def run_with_compiler(compiler_name, compiler_id=None): + compiled = self._compile( plan, - sort_rows=ordered, - peek_count=peek, + ordered=ordered, + peek=peek, materialize_all_order_keys=(cache_spec is not None), + compiler_name=compiler_name, ) - ) - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema - destination_table: Optional[bigquery.TableReference] = None + destination_table: Optional[bigquery.TableReference] = None - job_config = bigquery.QueryJobConfig() - if create_table: - destination_table = self.storage_manager.create_temp_table( - compiled_schema, cluster_cols + job_config = bigquery.QueryJobConfig() + if create_table: + destination_table = self.storage_manager.create_temp_table( + compiled_schema, cluster_cols + ) + job_config.destination = destination_table + + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + job_config.labels["bigframes-compiler"] = ( + f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name ) - job_config.destination = destination_table - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - iterator, query_job = self._run_execute_query( - sql=compiled.sql, - job_config=job_config, - query_with_job=(destination_table is not None), - session=plan.session, - ) + + iterator, query_job = self._run_execute_query( + sql=compiled.sql, + job_config=job_config, + query_with_job=(destination_table is not None), + session=plan.session, + ) + return iterator, query_job, compiled + + iterator, query_job, compiled = self._with_fallback(run_with_compiler) + + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema # we could actually cache even when caching is not explicitly requested, but being conservative for now result_bq_data = None diff --git a/tests/unit/session/test_bq_caching_executor.py b/tests/unit/session/test_bq_caching_executor.py new file mode 100644 index 0000000000..1418db87fa --- /dev/null +++ b/tests/unit/session/test_bq_caching_executor.py @@ -0,0 +1,110 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import google.cloud.bigquery as bigquery +import google.cloud.exceptions +import pyarrow as pa +import pytest + +import bigframes +import bigframes.core.nodes as nodes +import bigframes.core.schema as schemata +from bigframes.session.bq_caching_executor import BigQueryCachingExecutor + + +@pytest.fixture +def mock_executor(): + bqclient = mock.create_autospec(bigquery.Client) + bqclient.project = "test-project" + storage_manager = mock.Mock() + bqstoragereadclient = mock.Mock() + loader = mock.Mock() + publisher = mock.Mock() + return BigQueryCachingExecutor( + bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher + ) + + +def test_compiler_with_fallback_legacy(mock_executor): + run_fn = mock.Mock() + with bigframes.option_context("experiments.sql_compiler", "legacy"): + mock_executor._with_fallback(run_fn) + run_fn.assert_called_once_with("ibis") + + +def test_compiler_with_fallback_experimental(mock_executor): + run_fn = mock.Mock() + with bigframes.option_context("experiments.sql_compiler", "experimental"): + mock_executor._with_fallback(run_fn) + run_fn.assert_called_once_with("sqlglot") + + +def test_compiler_with_fallback_stable_success(mock_executor): + run_fn = mock.Mock() + with bigframes.option_context("experiments.sql_compiler", "stable"): + mock_executor._with_fallback(run_fn) + run_fn.assert_called_once_with("sqlglot", compiler_id=mock.ANY) + + +def test_compiler_execute_plan_gbq_fallback_labels(mock_executor): + plan = mock.create_autospec(nodes.BigFrameNode) + plan.schema = schemata.ArraySchema(tuple()) + plan.session = None + + # Mock prepare_plan + mock_executor.prepare_plan = mock.Mock(return_value=plan) + + # Mock _compile + from bigframes.core.compile.configs import CompileResult + + fake_compiled = CompileResult( + sql="SELECT 1", sql_schema=[], row_order=None, encoded_type_refs="fake_refs" + ) + mock_executor._compile = mock.Mock(return_value=fake_compiled) + + # Mock _run_execute_query to fail first time, then succeed + mock_iterator = mock.Mock() + mock_iterator.total_rows = 0 + mock_iterator.to_arrow.return_value = pa.Table.from_arrays([], names=[]) + mock_query_job = mock.Mock(spec=bigquery.QueryJob) + mock_query_job.destination = None + + error = google.cloud.exceptions.BadRequest("failed") + error.job = mock.Mock(spec=bigquery.QueryJob) # type: ignore + error.job.job_id = "failed_job_id" # type: ignore + + mock_executor._run_execute_query = mock.Mock( + side_effect=[error, (mock_iterator, mock_query_job)] + ) + + with bigframes.option_context("experiments.sql_compiler", "stable"), pytest.warns( + UserWarning, match="Falling back to ibis" + ): + mock_executor._execute_plan_gbq(plan, ordered=False, must_create_table=False) + + # Verify labels for both calls + assert mock_executor._run_execute_query.call_count == 2 + + call_1_kwargs = mock_executor._run_execute_query.call_args_list[0][1] + call_2_kwargs = mock_executor._run_execute_query.call_args_list[1][1] + + label_1 = call_1_kwargs["job_config"].labels["bigframes-compiler"] + label_2 = call_2_kwargs["job_config"].labels["bigframes-compiler"] + + assert label_1.startswith("sqlglot-") + assert label_2.startswith("ibis-") + # Both should have the same compiler_id suffix + assert label_1.split("-")[1] == label_2.split("-")[1]