diff --git a/utility-belt/src/main/java/io/confluent/admin/utils/cli/KafkaReadyCommand.java b/utility-belt/src/main/java/io/confluent/admin/utils/cli/KafkaReadyCommand.java index 014b031529..56bf2aebc0 100644 --- a/utility-belt/src/main/java/io/confluent/admin/utils/cli/KafkaReadyCommand.java +++ b/utility-belt/src/main/java/io/confluent/admin/utils/cli/KafkaReadyCommand.java @@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import io.confluent.admin.utils.ClusterStatus; @@ -52,6 +53,13 @@ public class KafkaReadyCommand { private static final Logger log = LogManager.getLogger(KafkaReadyCommand.class); public static final String KAFKA_READY = "kafka-ready"; + private static final String CONFIG_PROVIDERS_PREFIX = "config.providers"; + + // When set to "true", config.providers entries are stripped from the worker config + // before running the kafka-ready check. This prevents ClassNotFoundException when + // config provider plugin JARs are on the worker's plugin.path but not on the + // CUB_CLASSPATH used by kafka-ready. Default: disabled (original behavior). + static final String SKIP_CONFIG_PROVIDERS_ENV = "CUB_KAFKA_READY_SKIP_CONFIG_PROVIDERS"; private static ArgumentParser createArgsParser() { ArgumentParser kafkaReady = ArgumentParsers @@ -130,6 +138,11 @@ public static void main(String[] args) { "Bootstrap servers should be provided through config or bootstrap_servers" ); } + + if (isSkipConfigProvidersEnabled()) { + workerProps = stripConfigProviders(workerProps); + } + success = ClusterStatus.isKafkaReady( workerProps, res.getInt("min_expected_brokers"), @@ -156,4 +169,39 @@ public static void main(String[] args) { System.exit(1); } } + + /** + * Returns a copy of the properties map with config.providers entries removed. + * If no config.providers entries are found, returns the original map unmodified. + */ + static Map stripConfigProviders(Map props) { + boolean hasProviderKeys = false; + for (String key : props.keySet()) { + if (key.startsWith(CONFIG_PROVIDERS_PREFIX)) { + hasProviderKeys = true; + break; + } + } + if (!hasProviderKeys) { + return props; + } + + Map result = new HashMap<>(props); + int count = 0; + Iterator it = result.keySet().iterator(); + while (it.hasNext()) { + String key = it.next(); + if (key.startsWith(CONFIG_PROVIDERS_PREFIX)) { + it.remove(); + count++; + } + } + log.info("Stripped {} config.providers properties from kafka-ready config ({}=true).", + count, SKIP_CONFIG_PROVIDERS_ENV); + return result; + } + + static boolean isSkipConfigProvidersEnabled() { + return "true".equalsIgnoreCase(System.getenv(SKIP_CONFIG_PROVIDERS_ENV)); + } } diff --git a/utility-belt/src/test/java/io/confluent/admin/utils/cli/KafkaReadyCommandTest.java b/utility-belt/src/test/java/io/confluent/admin/utils/cli/KafkaReadyCommandTest.java new file mode 100644 index 0000000000..76e36631cf --- /dev/null +++ b/utility-belt/src/test/java/io/confluent/admin/utils/cli/KafkaReadyCommandTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.admin.utils.cli; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KafkaReadyCommandTest { + + @Test + public void stripConfigProviders_removesAllConfigProviderEntries() { + Map props = new HashMap<>(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("security.protocol", "SASL_SSL"); + props.put("sasl.mechanism", "PLAIN"); + props.put("config.providers", "secretmanager"); + props.put("config.providers.secretmanager.class", + "io.confluent.csid.config.provider.aws.SecretsManagerConfigProvider"); + props.put("config.providers.secretmanager.param.aws.region", "us-east-1"); + + Map result = KafkaReadyCommand.stripConfigProviders(props); + + assertThat(result).containsOnlyKeys( + "bootstrap.servers", "security.protocol", "sasl.mechanism"); + // original map is not mutated + assertThat(props).hasSize(6); + } + + @Test + public void stripConfigProviders_returnsOriginalWhenNoProviders() { + Map props = new HashMap<>(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("security.protocol", "PLAINTEXT"); + + Map result = KafkaReadyCommand.stripConfigProviders(props); + + assertThat(result).isSameAs(props); + } + + @Test + public void stripConfigProviders_handlesMultipleProviders() { + Map props = new HashMap<>(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("config.providers", "secretmanager,vault"); + props.put("config.providers.secretmanager.class", "com.example.SecretsProvider"); + props.put("config.providers.vault.class", "com.example.VaultProvider"); + props.put("config.providers.vault.param.address", "https://vault:8200"); + props.put("ssl.truststore.location", "/etc/ssl/truststore.jks"); + + Map result = KafkaReadyCommand.stripConfigProviders(props); + + assertThat(result).containsOnlyKeys("bootstrap.servers", "ssl.truststore.location"); + } + + @Test + public void stripConfigProviders_doesNotRemoveSimilarKeys() { + Map props = new HashMap<>(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("config.providers", "secretmanager"); + props.put("config.providers.secretmanager.class", "com.example.Provider"); + props.put("some.config.providers.unrelated", "value"); + + Map result = KafkaReadyCommand.stripConfigProviders(props); + + assertThat(result).containsOnlyKeys("bootstrap.servers", "some.config.providers.unrelated"); + } +}