diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java index be59424992ee..5c958cba29a4 100644 --- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java +++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java @@ -61,6 +61,18 @@ private SystemConfigs() {} 1, Integer::parseUnsignedInt); + /** + * Sets the size of the thread pool used for asynchronous scan planning in the REST catalog + * reference implementation. This controls the parallelism of server-side scan planning operations + * in {@link org.apache.iceberg.rest.CatalogHandlers}. + */ + public static final ConfigEntry REST_ASYNC_PLANNING_THREADS = + new ConfigEntry<>( + "iceberg.rest.async-planning-threads", + "ICEBERG_REST_ASYNC_PLANNING_THREADS", + Math.max(2, Runtime.getRuntime().availableProcessors()), + Integer::parseUnsignedInt); + /** Whether to use the shared worker pool when planning table scans. */ public static final ConfigEntry SCAN_THREAD_POOL_ENABLED = new ConfigEntry<>( diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 3a1e62260aae..f127c9dd180b 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -57,6 +56,7 @@ import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -103,6 +103,7 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.SQLViewRepresentation; import org.apache.iceberg.view.View; @@ -116,7 +117,17 @@ public class CatalogHandlers { private static final String INITIAL_PAGE_TOKEN = ""; private static final InMemoryPlanningState IN_MEMORY_PLANNING_STATE = InMemoryPlanningState.getInstance(); - private static final ExecutorService ASYNC_PLANNING_POOL = Executors.newSingleThreadExecutor(); + + /** + * Thread pool size for asynchronous scan planning operations. Controlled by the system property + * {@code iceberg.rest.async-planning-threads} or environment variable {@code + * ICEBERG_REST_ASYNC_PLANNING_THREADS}. Defaults to the number of available processors. + */ + public static final int ASYNC_PLANNING_THREADS = + SystemConfigs.REST_ASYNC_PLANNING_THREADS.value(); + + private static final ExecutorService ASYNC_PLANNING_POOL = + ThreadPools.newExitingWorkerPool("iceberg-rest-async-plan", ASYNC_PLANNING_THREADS); // Advanced idempotency store with TTL and in-flight coalescing. // diff --git a/core/src/test/java/org/apache/iceberg/rest/TestAsyncPlanningPoolConfig.java b/core/src/test/java/org/apache/iceberg/rest/TestAsyncPlanningPoolConfig.java new file mode 100644 index 000000000000..7fe911920e80 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestAsyncPlanningPoolConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.SystemConfigs; +import org.junit.jupiter.api.Test; + +/** + * Tests for the {@link CatalogHandlers#ASYNC_PLANNING_THREADS} configuration and related system + * properties. + */ +public class TestAsyncPlanningPoolConfig { + + @Test + public void testDefaultAsyncPlanningThreadsIsReasonable() { + // The default should be at least 2 to enable parallelism + int defaultThreads = SystemConfigs.REST_ASYNC_PLANNING_THREADS.defaultValue(); + assertThat(defaultThreads) + .as("Default async planning threads should be at least 2") + .isGreaterThanOrEqualTo(2); + } + + @Test + public void testDefaultMatchesAvailableProcessors() { + int expected = Math.max(2, Runtime.getRuntime().availableProcessors()); + int defaultThreads = SystemConfigs.REST_ASYNC_PLANNING_THREADS.defaultValue(); + assertThat(defaultThreads) + .as("Default async planning threads should be max(2, availableProcessors) = %d", expected) + .isEqualTo(expected); + } + + @Test + public void testPropertyKeyIsCorrect() { + String propertyKey = SystemConfigs.REST_ASYNC_PLANNING_THREADS.propertyKey(); + assertThat(propertyKey) + .as("System property key should follow Iceberg convention") + .isEqualTo("iceberg.rest.async-planning-threads"); + } + + @Test + public void testEnvVarKeyIsCorrect() { + String envKey = SystemConfigs.REST_ASYNC_PLANNING_THREADS.envKey(); + assertThat(envKey) + .as("Environment variable key should follow Iceberg convention") + .isEqualTo("ICEBERG_REST_ASYNC_PLANNING_THREADS"); + } + + @Test + public void testCatalogHandlersExposesThreadCount() { + // Verify the constant is publicly accessible for server implementations + assertThat(CatalogHandlers.ASYNC_PLANNING_THREADS) + .as("CatalogHandlers should expose the configured thread count") + .isGreaterThanOrEqualTo(2); + } +}