diff --git a/build/test/compose.yml b/build/test/compose.yml index d486ce32..24399309 100644 --- a/build/test/compose.yml +++ b/build/test/compose.yml @@ -35,8 +35,15 @@ services: test-kafka: image: confluentinc/confluent-local:7.4.0 environment: - - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://test-kafka:29092,PLAINTEXT_HOST://localhost:39092 - - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://test-kafka:29092,SASL_PLAINTEXT_HOST://localhost:39092 + KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:29092,CONTROLLER://localhost:29093,SASL_PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" + volumes: + - ./kafka_jaas.conf:/etc/kafka/jaas.conf:ro ports: - 39092:9092 test-gcp: diff --git a/build/test/kafka_jaas.conf b/build/test/kafka_jaas.conf new file mode 100644 index 00000000..8d39217b --- /dev/null +++ b/build/test/kafka_jaas.conf @@ -0,0 +1,6 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret"; +}; diff --git a/internal/destregistry/providers/destkafka/destkafka_publish_test.go b/internal/destregistry/providers/destkafka/destkafka_publish_test.go index 99629760..fec90b9d 100644 --- a/internal/destregistry/providers/destkafka/destkafka_publish_test.go +++ b/internal/destregistry/providers/destkafka/destkafka_publish_test.go @@ -14,6 +14,7 @@ import ( "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -29,9 +30,15 @@ type KafkaConsumer struct { } func NewKafkaConsumer(brokerAddr, topic string) (*KafkaConsumer, error) { + mechanism := plain.Mechanism{Username: "admin", Password: "admin-secret"} + dialer := &kafka.Dialer{ + SASLMechanism: mechanism, + Timeout: 10 * time.Second, + } reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{brokerAddr}, Topic: topic, + Dialer: dialer, StartOffset: kafka.FirstOffset, MaxWait: 500 * time.Millisecond, }) @@ -143,8 +150,8 @@ func (s *KafkaPublishSuite) SetupSuite() { "sasl_mechanism": "plain", }), testutil.DestinationFactory.WithCredentials(map[string]string{ - "username": "user", - "password": "pass", + "username": "admin", + "password": "admin-secret", }), ) @@ -242,7 +249,10 @@ func TestKafkaPublisher_ConnectionErrors(t *testing.T) { func ensureKafkaTopic(t *testing.T, brokerAddr, topic string) { t.Helper() - conn, err := kafka.Dial("tcp", brokerAddr) + mechanism := plain.Mechanism{Username: "admin", Password: "admin-secret"} + dialer := &kafka.Dialer{SASLMechanism: mechanism, Timeout: 10 * time.Second} + + conn, err := dialer.DialContext(context.Background(), "tcp", brokerAddr) require.NoError(t, err) defer conn.Close() diff --git a/internal/util/testinfra/kafka.go b/internal/util/testinfra/kafka.go index 9d72ed0c..dc507bba 100644 --- a/internal/util/testinfra/kafka.go +++ b/internal/util/testinfra/kafka.go @@ -29,15 +29,22 @@ func startKafkaTestContainer(cfg *Config) { // maps to a random port but the broker advertises 9092, clients would fail. const hostPort = "19092" + jaasConfig := `org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret";` + req := testcontainers.ContainerRequest{ Image: "confluentinc/confluent-local:7.4.0", ExposedPorts: []string{hostPort + ":9092/tcp"}, Env: map[string]string{ - "KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:" + hostPort, - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT", - "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093", - "KAFKA_INTER_BROKER_LISTENER_NAME": "PLAINTEXT", - "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + "KAFKA_ADVERTISED_LISTENERS": "SASL_PLAINTEXT://localhost:29092,SASL_PLAINTEXT_HOST://localhost:" + hostPort, + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT", + "KAFKA_LISTENERS": "SASL_PLAINTEXT://0.0.0.0:29092,SASL_PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093", + "KAFKA_INTER_BROKER_LISTENER_NAME": "SASL_PLAINTEXT", + "KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER", + "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL": "PLAIN", + "KAFKA_SASL_ENABLED_MECHANISMS": "PLAIN", + "KAFKA_LISTENER_NAME_SASL__PLAINTEXT_PLAIN_SASL_JAAS_CONFIG": jaasConfig, + "KAFKA_LISTENER_NAME_SASL__PLAINTEXT__HOST_PLAIN_SASL_JAAS_CONFIG": jaasConfig, + "KAFKA_OPTS": "-Djava.security.auth.login.config=/dev/null", "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1", "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",