Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions tests/system/large/functions/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_custom_sa(scalars_dfs):
def test_remote_function_via_session_custom_sa(scalars_pandas_df_index):
# TODO(shobs): Automate the following set-up during testing in the test project.
#
# For upfront convenience, the following set up has been statically created
Expand All @@ -1249,14 +1249,13 @@ def test_remote_function_via_session_custom_sa(scalars_dfs):
rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project))

try:
# TODO(shobs): Figure out why the default ingress setting
# (internal-only) does not work here

@rf_session.remote_function(
input_types=[int],
output_type=int,
reuse=False,
cloud_function_service_account=gcf_service_account,
cloud_function_ingress_settings="all",
cloud_function_ingress_settings="internal-and-gclb",
)
def double_num(x):
if x is None:
Expand All @@ -1270,13 +1269,12 @@ def double_num(x):
assert gcf.service_config.service_account_email == gcf_service_account

# assert that the function works as expected on data
scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col = rf_session.read_pandas(scalars_pandas_df_index.int64_col)
bf_result_col = bf_int64_col.apply(double_num)
bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas()

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col = scalars_pandas_df_index.int64_col
pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x + x)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

Expand All @@ -1303,7 +1301,7 @@ def double_num(x):
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_custom_build_sa(
scalars_dfs, set_build_service_account
set_build_service_account, scalars_pandas_df_index
):
# TODO(shobs): Automate the following set-up during testing in the test project.
#
Expand All @@ -1321,15 +1319,14 @@ def test_remote_function_via_session_custom_build_sa(
rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project))

try:
# TODO(shobs): Figure out why the default ingress setting
# (internal-only) does not work here

@rf_session.remote_function(
input_types=[int],
output_type=int,
reuse=False,
cloud_function_service_account="default",
cloud_build_service_account=set_build_service_account,
cloud_function_ingress_settings="all",
cloud_function_ingress_settings="internal-and-gclb",
)
def double_num(x):
if x is None:
Expand All @@ -1342,14 +1339,11 @@ def double_num(x):
)
assert gcf.build_config.service_account == expected_build_service_account

# assert that the function works as expected on data
scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col = rf_session.read_pandas(scalars_pandas_df_index.int64_col)
bf_result_col = bf_int64_col.apply(double_num)
bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas()

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col = scalars_pandas_df_index.int64_col
pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x + x)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

Expand Down Expand Up @@ -1436,7 +1430,7 @@ def square_num(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_vpc(scalars_dfs):
def test_remote_function_via_session_vpc(scalars_pandas_df_index):
# TODO(shobs): Automate the following set-up during testing in the test project.
#
# For upfront convenience, the following set up has been statically created
Expand Down Expand Up @@ -1466,16 +1460,14 @@ def double_num(x):
return x
return x + x

# TODO(shobs): See if the test vpc can be configured to make this flow
# work with the default ingress setting (internal-only)
double_num_remote = rf_session.remote_function(
input_types=[int],
output_type=int,
reuse=False,
cloud_function_service_account="default",
cloud_function_vpc_connector=gcf_vpc_connector,
cloud_function_vpc_connector_egress_settings="all",
cloud_function_ingress_settings="all",
cloud_function_ingress_settings="internal-and-gclb",
)(double_num)

gcf = rf_session.cloudfunctionsclient.get_function(
Expand All @@ -1489,15 +1481,12 @@ def double_num(x):
# cloud_function_vpc_connector_egress_settings="all" earlier.
assert gcf.service_config.vpc_connector_egress_settings == 2

# assert that the function works as expected on data
scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col = rf_session.read_pandas(scalars_pandas_df_index.int64_col)
bf_result_col = bf_int64_col.apply(double_num_remote)
bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas()

pd_int64_col = scalars_pandas_df["int64_col"]
pd_result_col = pd_int64_col.apply(double_num).astype("Int64")
pd_int64_col = scalars_pandas_df_index.int64_col
pd_result_col = pd_int64_col.apply(double_num)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_frame_equal(bf_result, pd_result, check_dtype=False)
Expand Down
Loading