diff --git a/bigframes/bigquery/_operations/table.py b/bigframes/bigquery/_operations/table.py index c90f88dcd6..cad025412d 100644 --- a/bigframes/bigquery/_operations/table.py +++ b/bigframes/bigquery/_operations/table.py @@ -19,8 +19,8 @@ import google.cloud.bigquery import pandas as pd +import bigframes.core.compile.sqlglot.sql as sg_sql import bigframes.core.logging.log_adapter as log_adapter -import bigframes.core.sql.table import bigframes.session @@ -80,14 +80,16 @@ def create_external_table( """ import bigframes.pandas as bpd - sql = bigframes.core.sql.table.create_external_table_ddl( - table_name=table_name, - replace=replace, - if_not_exists=if_not_exists, - columns=columns, - partition_columns=partition_columns, - connection_name=connection_name, - options=options, + sql = sg_sql.to_sql( + sg_sql.create_external_table( + table_name=table_name, + replace=replace, + if_not_exists=if_not_exists, + columns=columns, + partition_columns=partition_columns, + connection_name=connection_name, + options=options, + ) ) if session is None: diff --git a/bigframes/core/compile/sqlglot/sql/__init__.py b/bigframes/core/compile/sqlglot/sql/__init__.py index 17c78ba379..047fb73d30 100644 --- a/bigframes/core/compile/sqlglot/sql/__init__.py +++ b/bigframes/core/compile/sqlglot/sql/__init__.py @@ -22,7 +22,7 @@ table, to_sql, ) -from bigframes.core.compile.sqlglot.sql.ddl import load_data +from bigframes.core.compile.sqlglot.sql.ddl import create_external_table, load_data from bigframes.core.compile.sqlglot.sql.dml import insert, replace __all__ = [ @@ -35,6 +35,7 @@ "table", "to_sql", # From ddl.py + "create_external_table", "load_data", # From dml.py "insert", diff --git a/bigframes/core/compile/sqlglot/sql/ddl.py b/bigframes/core/compile/sqlglot/sql/ddl.py index 911c63781b..5134368b89 100644 --- a/bigframes/core/compile/sqlglot/sql/ddl.py +++ b/bigframes/core/compile/sqlglot/sql/ddl.py @@ -22,51 +22,6 @@ from bigframes.core.compile.sqlglot.sql import base -def _loaddata_sql(self: sg.Generator, expression: sge.LoadData) -> str: - out = ["LOAD DATA"] - if expression.args.get("overwrite"): - out.append("OVERWRITE") - - out.append(f"INTO {self.sql(expression, 'this').strip()}") - - # We ignore inpath as it's just a dummy to satisfy sqlglot requirements - # but BigQuery uses FROM FILES instead. - - columns = self.sql(expression, "columns").strip() - if columns: - out.append(columns) - - partition_by = self.sql(expression, "partition_by").strip() - if partition_by: - out.append(partition_by) - - cluster_by = self.sql(expression, "cluster_by").strip() - if cluster_by: - out.append(cluster_by) - - options = self.sql(expression, "options").strip() - if options: - out.append(options) - - from_files = self.sql(expression, "from_files").strip() - if from_files: - out.append(f"FROM FILES {from_files}") - - with_partition_columns = self.sql(expression, "with_partition_columns").strip() - if with_partition_columns: - out.append(f"WITH PARTITION COLUMNS {with_partition_columns}") - - connection = self.sql(expression, "connection").strip() - if connection: - out.append(f"WITH CONNECTION {connection}") - - return " ".join(out) - - -# Register the transform for BigQuery generator -sg.dialects.bigquery.BigQuery.Generator.TRANSFORMS[sge.LoadData] = _loaddata_sql - - def load_data( table_name: str, *, @@ -84,21 +39,6 @@ def load_data( # Quoting is handled by the dialect. table_expr = sge.Table(this=base.identifier(table_name)) - sge_columns = ( - sge.Schema( - this=None, - expressions=[ - sge.ColumnDef( - this=base.identifier(name), - kind=sge.DataType.build(typ, dialect="bigquery"), - ) - for name, typ in columns.items() - ], - ) - if columns - else None - ) - sge_partition_by = ( sge.PartitionedByProperty( this=base.identifier(partition_by[0]) @@ -115,17 +55,6 @@ def load_data( else None ) - sge_table_options = ( - sge.Properties( - expressions=[ - sge.Property(this=base.identifier(k), value=base.literal(v)) - for k, v in table_options.items() - ] - ) - if table_options - else None - ) - sge_from_files = sge.Tuple( expressions=[ sge.Property(this=base.identifier(k), value=base.literal(v)) @@ -133,32 +62,159 @@ def load_data( ] ) - sge_with_partition_columns = ( - sge.Schema( - this=None, - expressions=[ - sge.ColumnDef( - this=base.identifier(name), - kind=sge.DataType.build(typ, dialect="bigquery"), - ) - for name, typ in with_partition_columns.items() - ], - ) - if with_partition_columns - else None - ) - sge_connection = base.identifier(connection_name) if connection_name else None return sge.LoadData( this=table_expr, overwrite=(write_disposition == "OVERWRITE"), inpath=sge.convert("fake"), # satisfy sqlglot's required inpath arg - columns=sge_columns, + columns=_get_sge_schema(columns), partition_by=sge_partition_by, cluster_by=sge_cluster_by, - options=sge_table_options, + options=_get_sge_properties(table_options), from_files=sge_from_files, - with_partition_columns=sge_with_partition_columns, + with_partition_columns=_get_sge_schema(with_partition_columns), + connection=sge_connection, + ) + + +def create_external_table( + table_name: str, + *, + replace: bool = False, + if_not_exists: bool = False, + columns: Optional[Mapping[str, str]] = None, + partition_columns: Optional[Mapping[str, str]] = None, + connection_name: Optional[str] = None, + options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None, +) -> sge.Create: + """Generates the CREATE EXTERNAL TABLE DDL statement.""" + sge_connection = base.identifier(connection_name) if connection_name else None + + table_expr = sge.Table(this=base.identifier(table_name)) + + # sqlglot.expressions.Create usually takes 'this' (Table or Schema) + sge_schema = _get_sge_schema(columns) + this: sge.Table | sge.Schema + if sge_schema: + sge_schema.set("this", table_expr) + this = sge_schema + else: + this = table_expr + + return sge.Create( + this=this, + kind="EXTERNAL TABLE", + replace=replace, + exists_ok=if_not_exists, + properties=_get_sge_properties(options), connection=sge_connection, + partition_columns=_get_sge_schema(partition_columns), ) + + +def _get_sge_schema( + columns: Optional[Mapping[str, str]] = None +) -> Optional[sge.Schema]: + if not columns: + return None + + return sge.Schema( + this=None, + expressions=[ + sge.ColumnDef( + this=base.identifier(name), + kind=sge.DataType.build(typ, dialect=base.DIALECT), + ) + for name, typ in columns.items() + ], + ) + + +def _get_sge_properties( + options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None +) -> Optional[sge.Properties]: + if not options: + return None + + return sge.Properties( + expressions=[ + sge.Property(this=base.identifier(k), value=base.literal(v)) + for k, v in options.items() + ] + ) + + +def _loaddata_sql(self: sg.Generator, expression: sge.LoadData) -> str: + out = ["LOAD DATA"] + if expression.args.get("overwrite"): + out.append("OVERWRITE") + + out.append(f"INTO {self.sql(expression, 'this').strip()}") + + # We ignore inpath as it's just a dummy to satisfy sqlglot requirements + # but BigQuery uses FROM FILES instead. + + columns = self.sql(expression, "columns").strip() + if columns: + out.append(columns) + + partition_by = self.sql(expression, "partition_by").strip() + if partition_by: + out.append(partition_by) + + cluster_by = self.sql(expression, "cluster_by").strip() + if cluster_by: + out.append(cluster_by) + + options = self.sql(expression, "options").strip() + if options: + out.append(options) + + from_files = self.sql(expression, "from_files").strip() + if from_files: + out.append(f"FROM FILES {from_files}") + + with_partition_columns = self.sql(expression, "with_partition_columns").strip() + if with_partition_columns: + out.append(f"WITH PARTITION COLUMNS {with_partition_columns}") + + connection = self.sql(expression, "connection").strip() + if connection: + out.append(f"WITH CONNECTION {connection}") + + return " ".join(out) + + +def _create_sql(self: sg.Generator, expression: sge.Create) -> str: + kind = expression.args.get("kind") + if kind != "EXTERNAL TABLE": + return self.create_sql(expression) + + out = ["CREATE"] + if expression.args.get("replace"): + out.append("OR REPLACE") + out.append("EXTERNAL TABLE") + if expression.args.get("exists_ok"): + out.append("IF NOT EXISTS") + + out.append(self.sql(expression, "this")) + + connection = self.sql(expression, "connection").strip() + if connection: + out.append(f"WITH CONNECTION {connection}") + + partition_columns = self.sql(expression, "partition_columns").strip() + if partition_columns: + out.append(f"WITH PARTITION COLUMNS {partition_columns}") + + properties = self.sql(expression, "properties").strip() + if properties: + out.append(properties) + + return " ".join(out) + + +# Register the transform for BigQuery generator +base.DIALECT.Generator.TRANSFORMS[sge.LoadData] = _loaddata_sql +base.DIALECT.Generator.TRANSFORMS[sge.Create] = _create_sql diff --git a/bigframes/core/sql/table.py b/bigframes/core/sql/table.py deleted file mode 100644 index 24a97ed159..0000000000 --- a/bigframes/core/sql/table.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -from typing import Mapping, Optional, Union - - -def create_external_table_ddl( - table_name: str, - *, - replace: bool = False, - if_not_exists: bool = False, - columns: Optional[Mapping[str, str]] = None, - partition_columns: Optional[Mapping[str, str]] = None, - connection_name: Optional[str] = None, - options: Mapping[str, Union[str, int, float, bool, list]], -) -> str: - """Generates the CREATE EXTERNAL TABLE DDL statement.""" - statement = ["CREATE"] - if replace: - statement.append("OR REPLACE") - statement.append("EXTERNAL TABLE") - if if_not_exists: - statement.append("IF NOT EXISTS") - statement.append(table_name) - - if columns: - column_defs = ", ".join([f"{name} {typ}" for name, typ in columns.items()]) - statement.append(f"({column_defs})") - - if connection_name: - statement.append(f"WITH CONNECTION `{connection_name}`") - - if partition_columns: - part_defs = ", ".join( - [f"{name} {typ}" for name, typ in partition_columns.items()] - ) - statement.append(f"WITH PARTITION COLUMNS ({part_defs})") - - if options: - opts = [] - for key, value in options.items(): - if isinstance(value, str): - value_sql = repr(value) - opts.append(f"{key} = {value_sql}") - elif isinstance(value, bool): - opts.append(f"{key} = {str(value).upper()}") - elif isinstance(value, list): - list_str = ", ".join([repr(v) for v in value]) - opts.append(f"{key} = [{list_str}]") - else: - opts.append(f"{key} = {value}") - options_str = ", ".join(opts) - statement.append(f"OPTIONS ({options_str})") - - return " ".join(statement) diff --git a/tests/unit/bigquery/test_table.py b/tests/unit/bigquery/test_table.py deleted file mode 100644 index badce5e5e2..0000000000 --- a/tests/unit/bigquery/test_table.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License""); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock - -import pytest - -import bigframes.bigquery -import bigframes.core.sql.table -import bigframes.session - - -@pytest.fixture -def mock_session(): - return mock.create_autospec(spec=bigframes.session.Session) - - -def test_create_external_table_ddl(): - sql = bigframes.core.sql.table.create_external_table_ddl( - "my-project.my_dataset.my_table", - columns={"col1": "INT64", "col2": "STRING"}, - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - ) - expected = "CREATE EXTERNAL TABLE my-project.my_dataset.my_table (col1 INT64, col2 STRING) OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert sql == expected - - -def test_create_external_table_ddl_replace(): - sql = bigframes.core.sql.table.create_external_table_ddl( - "my-project.my_dataset.my_table", - replace=True, - columns={"col1": "INT64", "col2": "STRING"}, - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - ) - expected = "CREATE OR REPLACE EXTERNAL TABLE my-project.my_dataset.my_table (col1 INT64, col2 STRING) OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert sql == expected - - -def test_create_external_table_ddl_if_not_exists(): - sql = bigframes.core.sql.table.create_external_table_ddl( - "my-project.my_dataset.my_table", - if_not_exists=True, - columns={"col1": "INT64", "col2": "STRING"}, - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - ) - expected = "CREATE EXTERNAL TABLE IF NOT EXISTS my-project.my_dataset.my_table (col1 INT64, col2 STRING) OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert sql == expected - - -def test_create_external_table_ddl_partition_columns(): - sql = bigframes.core.sql.table.create_external_table_ddl( - "my-project.my_dataset.my_table", - columns={"col1": "INT64", "col2": "STRING"}, - partition_columns={"part1": "DATE", "part2": "STRING"}, - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - ) - expected = "CREATE EXTERNAL TABLE my-project.my_dataset.my_table (col1 INT64, col2 STRING) WITH PARTITION COLUMNS (part1 DATE, part2 STRING) OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert sql == expected - - -def test_create_external_table_ddl_connection(): - sql = bigframes.core.sql.table.create_external_table_ddl( - "my-project.my_dataset.my_table", - columns={"col1": "INT64", "col2": "STRING"}, - connection_name="my-connection", - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - ) - expected = "CREATE EXTERNAL TABLE my-project.my_dataset.my_table (col1 INT64, col2 STRING) WITH CONNECTION `my-connection` OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert sql == expected - - -@mock.patch("bigframes.bigquery._operations.table._get_table_metadata") -def test_create_external_table(get_table_metadata_mock, mock_session): - bigframes.bigquery.create_external_table( - "my-project.my_dataset.my_table", - columns={"col1": "INT64", "col2": "STRING"}, - options={"format": "CSV", "uris": ["gs://bucket/path*"]}, - session=mock_session, - ) - mock_session.read_gbq_query.assert_called_once() - generated_sql = mock_session.read_gbq_query.call_args[0][0] - expected = "CREATE EXTERNAL TABLE my-project.my_dataset.my_table (col1 INT64, col2 STRING) OPTIONS (format = 'CSV', uris = ['gs://bucket/path*'])" - assert generated_sql == expected - get_table_metadata_mock.assert_called_once() diff --git a/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table/out.sql b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table/out.sql new file mode 100644 index 0000000000..867282de0e --- /dev/null +++ b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table/out.sql @@ -0,0 +1,7 @@ +CREATE EXTERNAL TABLE `my-project.my_dataset.my_table` ( + `col1` INT64, + `col2` STRING +) OPTIONS ( + format='CSV', + uris=['gs://bucket/path*'] +) \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_all_options/out.sql b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_all_options/out.sql new file mode 100644 index 0000000000..a08ddf5ee5 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_all_options/out.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE EXTERNAL TABLE `my-project.my_dataset.my_table` ( + `col1` INT64, + `col2` STRING +) WITH CONNECTION `my-connection` WITH PARTITION COLUMNS ( + `part1` DATE, + `part2` STRING +) OPTIONS ( + format='CSV', + uris=['gs://bucket/path*'] +) \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_if_not_exists/out.sql b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_if_not_exists/out.sql new file mode 100644 index 0000000000..e05a553317 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/sql/snapshots/test_ddl/test_create_external_table_if_not_exists/out.sql @@ -0,0 +1,6 @@ +CREATE EXTERNAL TABLE IF NOT EXISTS `my-project.my_dataset.my_table` ( + `col1` INT64 +) OPTIONS ( + format='CSV', + uris=['gs://bucket/path*'] +) \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/sql/test_ddl.py b/tests/unit/core/compile/sqlglot/sql/test_ddl.py index 14d3708883..48080cd6b9 100644 --- a/tests/unit/core/compile/sqlglot/sql/test_ddl.py +++ b/tests/unit/core/compile/sqlglot/sql/test_ddl.py @@ -12,13 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + import pytest +import bigframes.bigquery import bigframes.core.compile.sqlglot.sql as sql +import bigframes.session pytest.importorskip("pytest_snapshot") +@pytest.fixture +def mock_session(): + return mock.create_autospec(spec=bigframes.session.Session) + + def test_load_data_minimal(snapshot): expr = sql.load_data( "my-project.my_dataset.my_table", @@ -40,3 +49,39 @@ def test_load_data_all_options(snapshot): connection_name="my-connection", ) snapshot.assert_match(sql.to_sql(expr), "out.sql") + + +@mock.patch("bigframes.bigquery._operations.table._get_table_metadata") +def test_create_external_table(get_table_metadata_mock, mock_session, snapshot): + bigframes.bigquery.create_external_table( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + session=mock_session, + ) + mock_session.read_gbq_query.assert_called_once() + generated_sql = mock_session.read_gbq_query.call_args[0][0] + snapshot.assert_match(generated_sql, "out.sql") + get_table_metadata_mock.assert_called_once() + + +def test_create_external_table_all_options(snapshot): + expr = sql.create_external_table( + "my-project.my_dataset.my_table", + replace=True, + columns={"col1": "INT64", "col2": "STRING"}, + partition_columns={"part1": "DATE", "part2": "STRING"}, + connection_name="my-connection", + options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + snapshot.assert_match(sql.to_sql(expr), "out.sql") + + +def test_create_external_table_if_not_exists(snapshot): + expr = sql.create_external_table( + "my-project.my_dataset.my_table", + if_not_exists=True, + columns={"col1": "INT64"}, + options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + snapshot.assert_match(sql.to_sql(expr), "out.sql")