From 39571e8d9c02f3f6c86b5a74a446619173ef55c0 Mon Sep 17 00:00:00 2001 From: Matthew Greenwald Date: Tue, 24 Feb 2026 16:34:10 -0500 Subject: [PATCH 1/2] Enable automatic AWS credential discovery via provider chain Add support for AWS credential provider chain across all AWS integrations, eliminating the need for long-lived IAM access keys in production environments. Changes: - Enable aws_credentials library at runtime (remove runtime: false) - Remove self-hosted mode restriction for task role credentials - Add task role support to Kafka sinks with AWS MSK IAM authentication - Add task role support to HttpPushSqsPipeline (BroadwaySQS) - Update all AWS sink validations to support use_task_role flag This enables automatic credential discovery from: - Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) - AWS credentials file (~/.aws/credentials) - IRSA/EKS Pod Identity (web identity tokens) - ECS task credentials - EC2 instance metadata Benefits: - Eliminates security risk of long-lived IAM keys - Supports modern Kubernetes auth patterns (IRSA, Pod Identity) - Automatic credential rotation for temporary credentials - Follows AWS security best practices - Fully backward compatible (use_task_role defaults to false) Affected services: SQS, SNS, Kinesis, Kafka (MSK IAM), HttpPushSQS Signed-off-by: Matthew Greenwald --- CONTRIBUTING.md | 1 + config/runtime.exs | 16 +++- lib/sequin/aws/client.ex | 43 +++++----- lib/sequin/consumers/kafka_sink.ex | 28 +++++- lib/sequin/consumers/kinesis_sink.ex | 15 ---- lib/sequin/consumers/sns_sink.ex | 15 ---- lib/sequin/consumers/sqs_sink.ex | 15 ---- lib/sequin/runtime/http_push_sqs_pipeline.ex | 38 ++++++--- lib/sequin/sinks/kafka/aws_msk_iam/auth.ex | 32 +++++++ mix.exs | 5 +- test/sequin/consumers/sns_sink_test.exs | 50 ----------- test/sequin/kafka_sink_test.exs | 90 ++++++++++++++++++++ test/sequin/kinesis_sink_test.exs | 50 ----------- test/sequin/sqs_sink_test.exs | 50 ----------- 14 files changed, 207 insertions(+), 241 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7ab891a30..a76ec2d5e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -11,6 +11,7 @@ Thank you for your interest in contributing to Sequin! This document provides gu - GitHub CLI (`gh`) - Node.js (for frontend assets) - Go (only necessary for CLI development) +- [CMake](https://cmake.org/download/) ### Getting started diff --git a/config/runtime.exs b/config/runtime.exs index 32914c665..93c7a3cfd 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -63,13 +63,21 @@ end # Configure SQS integration for HTTP Push sinks sqs_config = if System.get_env("HTTP_PUSH_VIA_SQS_QUEUE_URL") do - %{ + base_config = %{ main_queue_url: System.fetch_env!("HTTP_PUSH_VIA_SQS_QUEUE_URL"), dlq_url: System.fetch_env!("HTTP_PUSH_VIA_SQS_DLQ_URL"), - region: System.fetch_env!("HTTP_PUSH_VIA_SQS_REGION"), - access_key_id: System.fetch_env!("HTTP_PUSH_VIA_SQS_ACCESS_KEY_ID"), - secret_access_key: System.fetch_env!("HTTP_PUSH_VIA_SQS_SECRET_ACCESS_KEY") + region: System.fetch_env!("HTTP_PUSH_VIA_SQS_REGION") } + + # Support both explicit credentials and task role + if System.get_env("HTTP_PUSH_VIA_SQS_USE_TASK_ROLE") == "true" do + Map.put(base_config, :use_task_role, true) + else + Map.merge(base_config, %{ + access_key_id: System.fetch_env!("HTTP_PUSH_VIA_SQS_ACCESS_KEY_ID"), + secret_access_key: System.fetch_env!("HTTP_PUSH_VIA_SQS_SECRET_ACCESS_KEY") + }) + end end # Enable via_sqs_for_new_sinks? flag for HttpPushSink diff --git a/lib/sequin/aws/client.ex b/lib/sequin/aws/client.ex index 0029ce3db..006b8757d 100644 --- a/lib/sequin/aws/client.ex +++ b/lib/sequin/aws/client.ex @@ -18,37 +18,32 @@ defmodule Sequin.Aws.Client do @impl Sequin.Aws def get_client(region) when is_binary(region) do - if Sequin.Config.self_hosted?() do - case get_credentials() do - {:ok, credentials} -> - client = build_client(credentials, region) - {:ok, client} + case get_credentials() do + {:ok, credentials} -> + client = build_client(credentials, region) + {:ok, client} - {:error, reason} -> - Logger.error("Failed to get task role credentials: #{inspect(reason)}") - {:error, reason} - end - else - {:error, Error.service(service: :aws, message: "Task role credentials are only available in self-hosted mode")} + {:error, reason} -> + Logger.error("Failed to get task role credentials: #{inspect(reason)}") + {:error, reason} end end defp get_credentials do - case Application.ensure_all_started(:aws_credentials) do - {:ok, _} -> - case :aws_credentials.get_credentials() do - :undefined -> - {:error, Error.service(service: :aws, message: "Task role credentials not found")} + case :aws_credentials.get_credentials() do + :undefined -> + {:error, + Error.service( + service: :aws, + message: + "Task role credentials not found. Ensure AWS credentials are available via environment variables, credentials file, ECS task role, web identity token, or EC2 metadata." + )} - credentials when is_map(credentials) -> - {:ok, credentials} + credentials when is_map(credentials) -> + {:ok, credentials} - other -> - {:error, Error.service(service: :aws, message: "Unexpected credential format: #{inspect(other)}")} - end - - {:error, reason} -> - {:error, Error.service(service: :aws, message: "Failed to start aws_credentials: #{inspect(reason)}")} + other -> + {:error, Error.service(service: :aws, message: "Unexpected credential format: #{inspect(other)}")} end end diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex index 6595a4ed5..0c3735203 100644 --- a/lib/sequin/consumers/kafka_sink.ex +++ b/lib/sequin/consumers/kafka_sink.ex @@ -9,7 +9,7 @@ defmodule Sequin.Consumers.KafkaSink do alias Sequin.Encrypted.Field, as: EncryptedField alias Sequin.Sinks.Kafka.AwsMskIam - @derive {Jason.Encoder, only: [:hosts, :topic]} + @derive {Jason.Encoder, only: [:hosts, :topic, :use_task_role]} @derive {Inspect, except: [:password, :aws_secret_access_key]} @primary_key false typed_embedded_schema do @@ -23,6 +23,7 @@ defmodule Sequin.Consumers.KafkaSink do field :aws_region, :string field :aws_access_key_id, :string field :aws_secret_access_key, EncryptedField + field :use_task_role, :boolean, default: false field :connection_id, :string field :routing_mode, Ecto.Enum, values: [:dynamic, :static] field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none @@ -40,6 +41,7 @@ defmodule Sequin.Consumers.KafkaSink do :aws_region, :aws_access_key_id, :aws_secret_access_key, + :use_task_role, :routing_mode, :compression ]) @@ -100,6 +102,7 @@ defmodule Sequin.Consumers.KafkaSink do sasl_mechanism = get_field(changeset, :sasl_mechanism) username = get_field(changeset, :username) password = get_field(changeset, :password) + use_task_role = get_field(changeset, :use_task_role) cond do sasl_mechanism in [:plain, :scram_sha_256, :scram_sha_512] -> @@ -109,9 +112,18 @@ defmodule Sequin.Consumers.KafkaSink do sasl_mechanism == :aws_msk_iam -> changeset = - validate_required(changeset, [:aws_access_key_id, :aws_secret_access_key, :aws_region], - message: "is required when SASL Mechanism is #{sasl_mechanism}" - ) + if use_task_role do + # When using task role, we only need region + changeset + |> validate_required([:aws_region]) + |> put_change(:aws_access_key_id, nil) + |> put_change(:aws_secret_access_key, nil) + else + # When using explicit credentials, we need all three + validate_required(changeset, [:aws_access_key_id, :aws_secret_access_key, :aws_region], + message: "is required when SASL Mechanism is #{sasl_mechanism}" + ) + end if get_field(changeset, :tls) do changeset @@ -186,6 +198,14 @@ defmodule Sequin.Consumers.KafkaSink do defp brod_compression(other), do: other # Add SASL authentication if username/password are configured + defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam, use_task_role: true} = sink) do + Keyword.put( + config, + :sasl, + {:callback, AwsMskIam.Auth, {:AWS_MSK_IAM, :task_role, sink.aws_region}} + ) + end + defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam} = sink) do Keyword.put( config, diff --git a/lib/sequin/consumers/kinesis_sink.ex b/lib/sequin/consumers/kinesis_sink.ex index e3bf9fb02..8d0be4d84 100644 --- a/lib/sequin/consumers/kinesis_sink.ex +++ b/lib/sequin/consumers/kinesis_sink.ex @@ -29,7 +29,6 @@ defmodule Sequin.Consumers.KinesisSink do |> validate_credentials() |> validate_stream_arn() |> validate_routing() - |> validate_cloud_mode_restrictions() end defp validate_credentials(changeset) do @@ -136,18 +135,4 @@ defmodule Sequin.Consumers.KinesisSink do end end - defp validate_cloud_mode_restrictions(changeset) do - self_hosted? = Sequin.Config.self_hosted?() - use_task_role? = get_field(changeset, :use_task_role) - - if not self_hosted? and use_task_role? do - add_error( - changeset, - :use_task_role, - "Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead." - ) - else - changeset - end - end end diff --git a/lib/sequin/consumers/sns_sink.ex b/lib/sequin/consumers/sns_sink.ex index 169a90fc1..4f202718f 100644 --- a/lib/sequin/consumers/sns_sink.ex +++ b/lib/sequin/consumers/sns_sink.ex @@ -42,7 +42,6 @@ defmodule Sequin.Consumers.SnsSink do |> validate_routing() |> put_is_fifo() |> validate_emulator_base_url() - |> validate_cloud_mode_restrictions() end defp validate_credentials(changeset) do @@ -171,18 +170,4 @@ defmodule Sequin.Consumers.SnsSink do ~r/^arn:aws:sns:(?[a-z0-9-]+):(?\d{12}):(?[a-zA-Z0-9_.-]+)$/ end - defp validate_cloud_mode_restrictions(changeset) do - self_hosted? = Sequin.Config.self_hosted?() - use_task_role? = get_field(changeset, :use_task_role) - - if not self_hosted? and use_task_role? do - add_error( - changeset, - :use_task_role, - "Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead." - ) - else - changeset - end - end end diff --git a/lib/sequin/consumers/sqs_sink.ex b/lib/sequin/consumers/sqs_sink.ex index fd7c000c7..550b1441b 100644 --- a/lib/sequin/consumers/sqs_sink.ex +++ b/lib/sequin/consumers/sqs_sink.ex @@ -43,7 +43,6 @@ defmodule Sequin.Consumers.SqsSink do |> put_is_fifo() |> validate_emulator_base_url() |> validate_routing() - |> validate_cloud_mode_restrictions() end defp validate_credentials(changeset) do @@ -171,18 +170,4 @@ defmodule Sequin.Consumers.SqsSink do ~r/^https?:\/\/sqs\.(?[a-z0-9-]+)\.([a-zA-Z0-9.-]+)(?::\d+)?\/\d{12}\/[a-zA-Z0-9_-]+(?:\.fifo)?$/ end - defp validate_cloud_mode_restrictions(changeset) do - self_hosted? = Sequin.Config.self_hosted?() - use_task_role? = get_field(changeset, :use_task_role) - - if not self_hosted? and use_task_role? do - add_error( - changeset, - :use_task_role, - "Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead." - ) - else - changeset - end - end end diff --git a/lib/sequin/runtime/http_push_sqs_pipeline.ex b/lib/sequin/runtime/http_push_sqs_pipeline.ex index a698ac99c..b63e49e63 100644 --- a/lib/sequin/runtime/http_push_sqs_pipeline.ex +++ b/lib/sequin/runtime/http_push_sqs_pipeline.ex @@ -110,12 +110,23 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do queue_kind = Keyword.fetch!(opts, :queue_kind) name = Keyword.fetch!(opts, :name) - {:ok, - %{ - region: region, - access_key_id: access_key_id, - secret_access_key: secret_access_key - }} = fetch_sqs_config() + {:ok, sqs_config} = fetch_sqs_config() + region = Map.fetch!(sqs_config, :region) + + {access_key_id, secret_access_key, token} = + if Map.get(sqs_config, :use_task_role) do + # Use aws_credentials provider chain + case :aws_credentials.get_credentials() do + :undefined -> + raise "Task role credentials not found" + + credentials -> + {credentials.access_key_id, credentials.secret_access_key, credentials[:token]} + end + else + # Use explicit credentials + {Map.fetch!(sqs_config, :access_key_id), Map.fetch!(sqs_config, :secret_access_key), nil} + end producer_mod = Keyword.get(opts, :producer_mod, BroadwaySQS.Producer) @@ -125,11 +136,13 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do module: { producer_mod, queue_url: queue_url, - config: [ - access_key_id: access_key_id, - secret_access_key: secret_access_key, - region: region - ], + config: + [ + access_key_id: access_key_id, + secret_access_key: secret_access_key, + region: region + ] + |> maybe_put_token(token), attribute_names: [:sent_timestamp, :approximate_receive_count, :approximate_first_receive_timestamp], receive_interval: 1_000, max_number_of_messages: 10, @@ -423,4 +436,7 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do defp default_req_opts do Application.get_env(:sequin, __MODULE__)[:req_opts] || [] end + + defp maybe_put_token(config, nil), do: config + defp maybe_put_token(config, token), do: Keyword.put(config, :token, token) end diff --git a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex index 6128d0e6d..75aa13ef6 100644 --- a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex +++ b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex @@ -11,6 +11,38 @@ defmodule Sequin.Sinks.Kafka.AwsMskIam.Auth do @handshake_version 1 + # Handle task role credentials + def auth( + host, + sock, + mod, + client_id, + timeout, + {:AWS_MSK_IAM = mechanism, :task_role, aws_region} = _sasl_opts + ) do + case :aws_credentials.get_credentials() do + :undefined -> + {:error, + "Task role credentials not found. Ensure AWS credentials are available via environment variables, credentials file, ECS task role, web identity token, or EC2 metadata."} + + credentials when is_map(credentials) -> + aws_secret_key_id = Map.get(credentials, :access_key_id) + aws_secret_access_key = Map.get(credentials, :secret_access_key) + + auth( + host, + sock, + mod, + client_id, + timeout, + {mechanism, aws_secret_key_id, aws_secret_access_key, aws_region} + ) + + other -> + {:error, "Unexpected credential format: #{inspect(other)}"} + end + end + def auth( _host, _sock, diff --git a/mix.exs b/mix.exs index 22b4f9d3c..243dbb632 100644 --- a/mix.exs +++ b/mix.exs @@ -23,8 +23,7 @@ defmodule Sequin.MixProject do def application do [ mod: {Sequin.Application, []}, - extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env()), - included_applications: [:aws_credentials] + extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env()) ] end @@ -70,7 +69,7 @@ defmodule Sequin.MixProject do # AWS and Cloud Services {:aws, "~> 1.0"}, - {:aws_credentials, "~> 1.0.0", runtime: false}, + {:aws_credentials, "~> 1.0.0"}, {:aws_rds_castore, "~> 1.2.0"}, {:aws_signature, "~> 0.3.2"}, diff --git a/test/sequin/consumers/sns_sink_test.exs b/test/sequin/consumers/sns_sink_test.exs index ef90e95e3..18c9f3935 100644 --- a/test/sequin/consumers/sns_sink_test.exs +++ b/test/sequin/consumers/sns_sink_test.exs @@ -87,56 +87,6 @@ defmodule Sequin.Consumers.SnsSinkTest do end end - describe "changeset/2 cloud mode restrictions" do - @tag self_hosted: false - test "validates that use_task_role is false when not self_hosted" do - params = %{ - topic_arn: "arn:aws:sns:us-east-1:123456789012:test-topic", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = SnsSink.changeset(%SnsSink{}, params) - - refute changeset.valid? - - assert changeset.errors[:use_task_role] == - {"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead.", []} - end - - @tag self_hosted: true - test "allows use_task_role when self_hosted" do - params = %{ - topic_arn: "arn:aws:sns:us-east-1:123456789012:test-topic", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = SnsSink.changeset(%SnsSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - - @tag self_hosted: false - test "allows use_task_role=false in cloud mode" do - params = %{ - topic_arn: "arn:aws:sns:us-east-1:123456789012:test-topic", - region: "us-east-1", - use_task_role: false, - access_key_id: "test_key", - secret_access_key: "test_secret", - routing_mode: :static - } - - changeset = SnsSink.changeset(%SnsSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - end describe "aws_client/1" do test "creates client with explicit credentials when use_task_role is false" do diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 72a2de17c..8609e4e00 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -215,6 +215,96 @@ defmodule Sequin.Consumers.KafkaSinkTest do refute :topic in changeset.changes end + + test "validates AWS MSK IAM credentials when mechanism is aws_msk_iam" do + # Test missing credentials + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + routing_mode: :static + }) + + refute changeset.valid? + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_access_key_id + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_secret_access_key + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).aws_region + + # Test with valid credentials + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + aws_access_key_id: "test_key", + aws_secret_access_key: "test_secret", + aws_region: "us-east-1", + routing_mode: :static + }) + + assert changeset.valid? + end + + test "validates AWS MSK IAM with use_task_role requires only region" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + aws_region: "us-east-1", + routing_mode: :static + }) + + assert changeset.valid? + assert Ecto.Changeset.get_field(changeset, :aws_access_key_id) == nil + assert Ecto.Changeset.get_field(changeset, :aws_secret_access_key) == nil + end + + test "nullifies AWS credentials when use_task_role is true" do + # Start with a sink that has credentials + existing_sink = %KafkaSink{ + aws_access_key_id: "existing_key", + aws_secret_access_key: "existing_secret" + } + + changeset = + KafkaSink.changeset(existing_sink, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + sasl_mechanism: :aws_msk_iam, + use_task_role: true, + aws_region: "us-east-1", + routing_mode: :static + }) + + assert changeset.valid? + # Verify credentials are explicitly nullified in changes + assert changeset.changes.aws_access_key_id == nil + assert changeset.changes.aws_secret_access_key == nil + end + + test "validates AWS MSK IAM requires TLS" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + sasl_mechanism: :aws_msk_iam, + aws_access_key_id: "test_key", + aws_secret_access_key: "test_secret", + aws_region: "us-east-1", + routing_mode: :static + }) + + refute changeset.valid? + assert "is required when SASL Mechanism is aws_msk_iam" in errors_on(changeset).tls + end end # Helper function to extract error messages diff --git a/test/sequin/kinesis_sink_test.exs b/test/sequin/kinesis_sink_test.exs index e3cad3bf8..d8790b53b 100644 --- a/test/sequin/kinesis_sink_test.exs +++ b/test/sequin/kinesis_sink_test.exs @@ -124,56 +124,6 @@ defmodule Sequin.Consumers.KinesisSinkTest do end end - describe "changeset/2 cloud mode restrictions" do - @tag self_hosted: false - test "validates that use_task_role is false when not self_hosted" do - params = %{ - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = KinesisSink.changeset(%KinesisSink{}, params) - - refute changeset.valid? - - assert changeset.errors[:use_task_role] == - {"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead.", []} - end - - @tag self_hosted: true - test "allows use_task_role when self_hosted" do - params = %{ - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = KinesisSink.changeset(%KinesisSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - - @tag self_hosted: false - test "allows use_task_role=false in cloud mode" do - params = %{ - stream_arn: "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", - region: "us-east-1", - use_task_role: false, - access_key_id: "test_key", - secret_access_key: "test_secret", - routing_mode: :static - } - - changeset = KinesisSink.changeset(%KinesisSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - end describe "region_from_arn/1" do test "extracts region from valid ARN" do diff --git a/test/sequin/sqs_sink_test.exs b/test/sequin/sqs_sink_test.exs index bb7ab2824..7e90681d1 100644 --- a/test/sequin/sqs_sink_test.exs +++ b/test/sequin/sqs_sink_test.exs @@ -70,56 +70,6 @@ defmodule Sequin.Consumers.SqsSinkTest do end end - describe "changeset/2 cloud mode restrictions" do - @tag self_hosted: false - test "validates that use_task_role is false when not self_hosted" do - params = %{ - queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/test", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = SqsSink.changeset(%SqsSink{}, params) - - refute changeset.valid? - - assert changeset.errors[:use_task_role] == - {"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead.", []} - end - - @tag self_hosted: true - test "allows use_task_role when self_hosted" do - params = %{ - queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/test", - region: "us-east-1", - use_task_role: true, - routing_mode: :static - } - - changeset = SqsSink.changeset(%SqsSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - - @tag self_hosted: false - test "allows use_task_role=false in cloud mode" do - params = %{ - queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/test", - region: "us-east-1", - use_task_role: false, - access_key_id: "test_key", - secret_access_key: "test_secret", - routing_mode: :static - } - - changeset = SqsSink.changeset(%SqsSink{}, params) - - assert changeset.valid? - refute changeset.errors[:use_task_role] - end - end describe "aws_client/1" do test "creates client with explicit credentials when use_task_role is false" do From 967716257423059fb7e2484536e07789272a40f0 Mon Sep 17 00:00:00 2001 From: Matthew Greenwald Date: Tue, 24 Feb 2026 16:56:54 -0500 Subject: [PATCH 2/2] fix: formatting + signoff Signed-off-by: Matthew Greenwald --- lib/sequin/consumers/kinesis_sink.ex | 1 - lib/sequin/consumers/sns_sink.ex | 1 - lib/sequin/consumers/sqs_sink.ex | 1 - lib/sequin/runtime/http_push_sqs_pipeline.ex | 7 +------ lib/sequin/sinks/kafka/aws_msk_iam/auth.ex | 9 +-------- test/sequin/consumers/sns_sink_test.exs | 1 - test/sequin/kinesis_sink_test.exs | 1 - test/sequin/sqs_sink_test.exs | 1 - 8 files changed, 2 insertions(+), 20 deletions(-) diff --git a/lib/sequin/consumers/kinesis_sink.ex b/lib/sequin/consumers/kinesis_sink.ex index 8d0be4d84..489d110f1 100644 --- a/lib/sequin/consumers/kinesis_sink.ex +++ b/lib/sequin/consumers/kinesis_sink.ex @@ -134,5 +134,4 @@ defmodule Sequin.Consumers.KinesisSink do {:error, reason} end end - end diff --git a/lib/sequin/consumers/sns_sink.ex b/lib/sequin/consumers/sns_sink.ex index 4f202718f..4e879698a 100644 --- a/lib/sequin/consumers/sns_sink.ex +++ b/lib/sequin/consumers/sns_sink.ex @@ -169,5 +169,4 @@ defmodule Sequin.Consumers.SnsSink do def sns_arn_regex do ~r/^arn:aws:sns:(?[a-z0-9-]+):(?\d{12}):(?[a-zA-Z0-9_.-]+)$/ end - end diff --git a/lib/sequin/consumers/sqs_sink.ex b/lib/sequin/consumers/sqs_sink.ex index 550b1441b..ebcb21c10 100644 --- a/lib/sequin/consumers/sqs_sink.ex +++ b/lib/sequin/consumers/sqs_sink.ex @@ -169,5 +169,4 @@ defmodule Sequin.Consumers.SqsSink do def sqs_url_regex do ~r/^https?:\/\/sqs\.(?[a-z0-9-]+)\.([a-zA-Z0-9.-]+)(?::\d+)?\/\d{12}\/[a-zA-Z0-9_-]+(?:\.fifo)?$/ end - end diff --git a/lib/sequin/runtime/http_push_sqs_pipeline.ex b/lib/sequin/runtime/http_push_sqs_pipeline.ex index b63e49e63..ffa1ead06 100644 --- a/lib/sequin/runtime/http_push_sqs_pipeline.ex +++ b/lib/sequin/runtime/http_push_sqs_pipeline.ex @@ -137,12 +137,7 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do producer_mod, queue_url: queue_url, config: - [ - access_key_id: access_key_id, - secret_access_key: secret_access_key, - region: region - ] - |> maybe_put_token(token), + maybe_put_token([access_key_id: access_key_id, secret_access_key: secret_access_key, region: region], token), attribute_names: [:sent_timestamp, :approximate_receive_count, :approximate_first_receive_timestamp], receive_interval: 1_000, max_number_of_messages: 10, diff --git a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex index 75aa13ef6..ffe413c86 100644 --- a/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex +++ b/lib/sequin/sinks/kafka/aws_msk_iam/auth.ex @@ -12,14 +12,7 @@ defmodule Sequin.Sinks.Kafka.AwsMskIam.Auth do @handshake_version 1 # Handle task role credentials - def auth( - host, - sock, - mod, - client_id, - timeout, - {:AWS_MSK_IAM = mechanism, :task_role, aws_region} = _sasl_opts - ) do + def auth(host, sock, mod, client_id, timeout, {:AWS_MSK_IAM = mechanism, :task_role, aws_region} = _sasl_opts) do case :aws_credentials.get_credentials() do :undefined -> {:error, diff --git a/test/sequin/consumers/sns_sink_test.exs b/test/sequin/consumers/sns_sink_test.exs index 18c9f3935..11889a034 100644 --- a/test/sequin/consumers/sns_sink_test.exs +++ b/test/sequin/consumers/sns_sink_test.exs @@ -87,7 +87,6 @@ defmodule Sequin.Consumers.SnsSinkTest do end end - describe "aws_client/1" do test "creates client with explicit credentials when use_task_role is false" do sink = %SnsSink{ diff --git a/test/sequin/kinesis_sink_test.exs b/test/sequin/kinesis_sink_test.exs index d8790b53b..0b963f715 100644 --- a/test/sequin/kinesis_sink_test.exs +++ b/test/sequin/kinesis_sink_test.exs @@ -124,7 +124,6 @@ defmodule Sequin.Consumers.KinesisSinkTest do end end - describe "region_from_arn/1" do test "extracts region from valid ARN" do arn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream" diff --git a/test/sequin/sqs_sink_test.exs b/test/sequin/sqs_sink_test.exs index 7e90681d1..a56fa8d17 100644 --- a/test/sequin/sqs_sink_test.exs +++ b/test/sequin/sqs_sink_test.exs @@ -70,7 +70,6 @@ defmodule Sequin.Consumers.SqsSinkTest do end end - describe "aws_client/1" do test "creates client with explicit credentials when use_task_role is false" do sink = %SqsSink{