From e00bb8a0890cab33f4f167e73c7a0c4ea94ef275 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 11 Mar 2026 18:46:51 +0000 Subject: [PATCH 1/3] feat: preserve OBJ_REF_DTYPE table schemas on save to_gbq --- bigframes/dataframe.py | 26 +++++++++++++++++++++++++ bigframes/dtypes.py | 7 +++++++ tests/system/small/test_dataframe_io.py | 25 ++++++++++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 25cedda8f4..c4f1eefc6a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4301,6 +4301,32 @@ def to_gbq( result_table = result.query_job.destination assert result_table is not None + obj_ref_dest_cols = [] + for col_id in id_overrides.keys(): + try: + if ( + export_array.get_column_type(col_id) + == bigframes.dtypes.OBJ_REF_DTYPE + ): + obj_ref_dest_cols.append(id_overrides[col_id]) + except Exception: + pass + + if obj_ref_dest_cols: + table = self._session.bqclient.get_table(result_table) + new_schema = [] + for field in table.schema: + if field.name in obj_ref_dest_cols: + field_dict = field.to_api_repr() + field_dict["description"] = "bigframes_dtype: OBJ_REF_DTYPE" + new_schema.append( + google.cloud.bigquery.SchemaField.from_api_repr(field_dict) + ) + else: + new_schema.append(field) + table.schema = new_schema + self._session.bqclient.update_table(table, ["schema"]) + if temp_table_ref: bigframes.session._io.bigquery.set_table_expiration( self._session.bqclient, diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 304428ef2f..1fdce9fe44 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -772,6 +772,13 @@ def convert_schema_field( ) -> typing.Tuple[str, Dtype]: is_repeated = field.mode == "REPEATED" if field.field_type == "RECORD": + if field.description == "bigframes_dtype: OBJ_REF_DTYPE": + bf_dtype = OBJ_REF_DTYPE # type: ignore + if is_repeated: + pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(bf_dtype)) + bf_dtype = pd.ArrowDtype(pa_type) + return field.name, bf_dtype + mapped_fields = map(convert_schema_field, field.fields) fields = [] for name, dtype in mapped_fields: diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index fece679d06..33cf11096d 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -1002,6 +1002,31 @@ def test_to_gbq_timedelta_tag_ignored_when_appending(bigquery_client, dataset_id assert table.schema[0].description is None +def test_to_gbq_obj_ref(session, dataset_id: str, bigquery_client): + destination_table = f"{dataset_id}.test_to_gbq_obj_ref" + sql = """ + SELECT + 'gs://cloud-samples-data/vision/ocr/sign.jpg' AS uri_col + """ + df = session.read_gbq(sql) + df["obj_ref_col"] = df["uri_col"].str.to_blob() + df = df.drop(columns=["uri_col"]) + + # Save the dataframe to bigquery + df.to_gbq(destination_table) + + # Verify the table schema description is added + table = bigquery_client.get_table(destination_table) + obj_ref_field = next(f for f in table.schema if f.name == "obj_ref_col") + assert obj_ref_field.field_type == "RECORD" + assert obj_ref_field.description == "bigframes_dtype: OBJ_REF_DTYPE" + + # Verify reloading it correctly restores the dtype + reloaded_df = session.read_gbq(destination_table) + assert reloaded_df["obj_ref_col"].dtype == dtypes.OBJ_REF_DTYPE + assert len(reloaded_df) == 1 + + @pytest.mark.parametrize( ("index"), [True, False], From a1b262fef20f43615b14c4c61a67bf9fae6486cd Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 16 Mar 2026 21:57:15 +0000 Subject: [PATCH 2/3] fix: round-trip persistence for OBJ_REF_DTYPE --- bigframes/dataframe.py | 26 ------------------------ bigframes/dtypes.py | 8 ++++++-- bigframes/session/bq_caching_executor.py | 9 ++++---- tests/system/small/test_dataframe_io.py | 3 --- 4 files changed, 11 insertions(+), 35 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index c4f1eefc6a..25cedda8f4 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4301,32 +4301,6 @@ def to_gbq( result_table = result.query_job.destination assert result_table is not None - obj_ref_dest_cols = [] - for col_id in id_overrides.keys(): - try: - if ( - export_array.get_column_type(col_id) - == bigframes.dtypes.OBJ_REF_DTYPE - ): - obj_ref_dest_cols.append(id_overrides[col_id]) - except Exception: - pass - - if obj_ref_dest_cols: - table = self._session.bqclient.get_table(result_table) - new_schema = [] - for field in table.schema: - if field.name in obj_ref_dest_cols: - field_dict = field.to_api_repr() - field_dict["description"] = "bigframes_dtype: OBJ_REF_DTYPE" - new_schema.append( - google.cloud.bigquery.SchemaField.from_api_repr(field_dict) - ) - else: - new_schema.append(field) - table.schema = new_schema - self._session.bqclient.update_table(table, ["schema"]) - if temp_table_ref: bigframes.session._io.bigquery.set_table_expiration( self._session.bqclient, diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 1fdce9fe44..347f947f6c 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -772,7 +772,7 @@ def convert_schema_field( ) -> typing.Tuple[str, Dtype]: is_repeated = field.mode == "REPEATED" if field.field_type == "RECORD": - if field.description == "bigframes_dtype: OBJ_REF_DTYPE": + if field.description == OBJ_REF_DESCRIPTION_TAG: bf_dtype = OBJ_REF_DTYPE # type: ignore if is_repeated: pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(bf_dtype)) @@ -834,8 +834,11 @@ def convert_to_schema_field( convert_to_schema_field(field.name, inner_bf_type, overrides) ) + description = ( + OBJ_REF_DESCRIPTION_TAG if bigframes_dtype == OBJ_REF_DTYPE else None + ) return google.cloud.bigquery.SchemaField( - name, "RECORD", fields=inner_fields + name, "RECORD", fields=inner_fields, description=description ) if bigframes_dtype.pyarrow_dtype == pa.duration("us"): # Timedeltas are represented as integers in microseconds. @@ -978,6 +981,7 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype: TIMEDELTA_DESCRIPTION_TAG = "#microseconds" +OBJ_REF_DESCRIPTION_TAG = "bigframes_dtype: OBJ_REF_DTYPE" def contains_db_dtypes_json_arrow_type(type_): diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 1e240a841c..fbcdfd33f5 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -334,13 +334,14 @@ def _export_gbq( session=array_value.session, ) - has_timedelta_col = any( - t == bigframes.dtypes.TIMEDELTA_DTYPE for t in array_value.schema.dtypes + has_special_dtype_col = any( + t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) + for t in array_value.schema.dtypes ) - if spec.if_exists != "append" and has_timedelta_col: + if spec.if_exists != "append" and has_special_dtype_col: # Only update schema if this is not modifying an existing table, and the - # new table contains timedelta columns. + # new table contains special columns (like timedelta or obj_ref). table = self.bqclient.get_table(spec.table) table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 33cf11096d..3da3544cbb 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -1012,16 +1012,13 @@ def test_to_gbq_obj_ref(session, dataset_id: str, bigquery_client): df["obj_ref_col"] = df["uri_col"].str.to_blob() df = df.drop(columns=["uri_col"]) - # Save the dataframe to bigquery df.to_gbq(destination_table) - # Verify the table schema description is added table = bigquery_client.get_table(destination_table) obj_ref_field = next(f for f in table.schema if f.name == "obj_ref_col") assert obj_ref_field.field_type == "RECORD" assert obj_ref_field.description == "bigframes_dtype: OBJ_REF_DTYPE" - # Verify reloading it correctly restores the dtype reloaded_df = session.read_gbq(destination_table) assert reloaded_df["obj_ref_col"].dtype == dtypes.OBJ_REF_DTYPE assert len(reloaded_df) == 1 From 63f65a7ee718965e9980ac7e097c691cd6237720 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Mon, 16 Mar 2026 22:10:36 +0000 Subject: [PATCH 3/3] fix: resolve mypy description type mismatch error --- bigframes/dtypes.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 347f947f6c..c1e7579ab9 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -834,11 +834,16 @@ def convert_to_schema_field( convert_to_schema_field(field.name, inner_bf_type, overrides) ) - description = ( - OBJ_REF_DESCRIPTION_TAG if bigframes_dtype == OBJ_REF_DTYPE else None - ) + if bigframes_dtype == OBJ_REF_DTYPE: + return google.cloud.bigquery.SchemaField( + name, + "RECORD", + fields=inner_fields, + description=OBJ_REF_DESCRIPTION_TAG, + ) + return google.cloud.bigquery.SchemaField( - name, "RECORD", fields=inner_fields, description=description + name, "RECORD", fields=inner_fields ) if bigframes_dtype.pyarrow_dtype == pa.duration("us"): # Timedeltas are represented as integers in microseconds.