Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/SystemConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ private SystemConfigs() {}
1,
Integer::parseUnsignedInt);

/**
* Sets the size of the thread pool used for asynchronous scan planning in the REST catalog
* reference implementation. This controls the parallelism of server-side scan planning operations
* in {@link org.apache.iceberg.rest.CatalogHandlers}.
*/
public static final ConfigEntry<Integer> REST_ASYNC_PLANNING_THREADS =
new ConfigEntry<>(
"iceberg.rest.async-planning-threads",
"ICEBERG_REST_ASYNC_PLANNING_THREADS",
Math.max(2, Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);

/** Whether to use the shared worker pool when planning table scans. */
public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
new ConfigEntry<>(
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
Expand All @@ -57,6 +56,7 @@
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -103,6 +103,7 @@
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.SQLViewRepresentation;
import org.apache.iceberg.view.View;
Expand All @@ -116,7 +117,17 @@ public class CatalogHandlers {
private static final String INITIAL_PAGE_TOKEN = "";
private static final InMemoryPlanningState IN_MEMORY_PLANNING_STATE =
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL = Executors.newSingleThreadExecutor();

/**
* Thread pool size for asynchronous scan planning operations. Controlled by the system property
* {@code iceberg.rest.async-planning-threads} or environment variable {@code
* ICEBERG_REST_ASYNC_PLANNING_THREADS}. Defaults to the number of available processors.
*/
public static final int ASYNC_PLANNING_THREADS =
SystemConfigs.REST_ASYNC_PLANNING_THREADS.value();

private static final ExecutorService ASYNC_PLANNING_POOL =
ThreadPools.newExitingWorkerPool("iceberg-rest-async-plan", ASYNC_PLANNING_THREADS);

// Advanced idempotency store with TTL and in-flight coalescing.
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.iceberg.SystemConfigs;
import org.junit.jupiter.api.Test;

/**
* Tests for the {@link CatalogHandlers#ASYNC_PLANNING_THREADS} configuration and related system
* properties.
*/
public class TestAsyncPlanningPoolConfig {

@Test
public void testDefaultAsyncPlanningThreadsIsReasonable() {
// The default should be at least 2 to enable parallelism
int defaultThreads = SystemConfigs.REST_ASYNC_PLANNING_THREADS.defaultValue();
assertThat(defaultThreads)
.as("Default async planning threads should be at least 2")
.isGreaterThanOrEqualTo(2);
}

@Test
public void testDefaultMatchesAvailableProcessors() {
int expected = Math.max(2, Runtime.getRuntime().availableProcessors());
int defaultThreads = SystemConfigs.REST_ASYNC_PLANNING_THREADS.defaultValue();
assertThat(defaultThreads)
.as("Default async planning threads should be max(2, availableProcessors) = %d", expected)
.isEqualTo(expected);
}

@Test
public void testPropertyKeyIsCorrect() {
String propertyKey = SystemConfigs.REST_ASYNC_PLANNING_THREADS.propertyKey();
assertThat(propertyKey)
.as("System property key should follow Iceberg convention")
.isEqualTo("iceberg.rest.async-planning-threads");
}

@Test
public void testEnvVarKeyIsCorrect() {
String envKey = SystemConfigs.REST_ASYNC_PLANNING_THREADS.envKey();
assertThat(envKey)
.as("Environment variable key should follow Iceberg convention")
.isEqualTo("ICEBERG_REST_ASYNC_PLANNING_THREADS");
}

@Test
public void testCatalogHandlersExposesThreadCount() {
// Verify the constant is publicly accessible for server implementations
assertThat(CatalogHandlers.ASYNC_PLANNING_THREADS)
.as("CatalogHandlers should expose the configured thread count")
.isGreaterThanOrEqualTo(2);
}
}