Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Thank you for your interest in contributing to Sequin! This document provides gu
- GitHub CLI (`gh`)
- Node.js (for frontend assets)
- Go (only necessary for CLI development)
- [CMake](https://cmake.org/download/)

### Getting started

Expand Down
16 changes: 12 additions & 4 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,21 @@ end
# Configure SQS integration for HTTP Push sinks
sqs_config =
if System.get_env("HTTP_PUSH_VIA_SQS_QUEUE_URL") do
%{
base_config = %{
main_queue_url: System.fetch_env!("HTTP_PUSH_VIA_SQS_QUEUE_URL"),
dlq_url: System.fetch_env!("HTTP_PUSH_VIA_SQS_DLQ_URL"),
region: System.fetch_env!("HTTP_PUSH_VIA_SQS_REGION"),
access_key_id: System.fetch_env!("HTTP_PUSH_VIA_SQS_ACCESS_KEY_ID"),
secret_access_key: System.fetch_env!("HTTP_PUSH_VIA_SQS_SECRET_ACCESS_KEY")
region: System.fetch_env!("HTTP_PUSH_VIA_SQS_REGION")
}

# Support both explicit credentials and task role
if System.get_env("HTTP_PUSH_VIA_SQS_USE_TASK_ROLE") == "true" do
Map.put(base_config, :use_task_role, true)
else
Map.merge(base_config, %{
access_key_id: System.fetch_env!("HTTP_PUSH_VIA_SQS_ACCESS_KEY_ID"),
secret_access_key: System.fetch_env!("HTTP_PUSH_VIA_SQS_SECRET_ACCESS_KEY")
})
end
end

# Enable via_sqs_for_new_sinks? flag for HttpPushSink
Expand Down
43 changes: 19 additions & 24 deletions lib/sequin/aws/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,32 @@ defmodule Sequin.Aws.Client do

@impl Sequin.Aws
def get_client(region) when is_binary(region) do
if Sequin.Config.self_hosted?() do
case get_credentials() do
{:ok, credentials} ->
client = build_client(credentials, region)
{:ok, client}
case get_credentials() do
{:ok, credentials} ->
client = build_client(credentials, region)
{:ok, client}

{:error, reason} ->
Logger.error("Failed to get task role credentials: #{inspect(reason)}")
{:error, reason}
end
else
{:error, Error.service(service: :aws, message: "Task role credentials are only available in self-hosted mode")}
{:error, reason} ->
Logger.error("Failed to get task role credentials: #{inspect(reason)}")
{:error, reason}
end
end

defp get_credentials do
case Application.ensure_all_started(:aws_credentials) do
{:ok, _} ->
case :aws_credentials.get_credentials() do
:undefined ->
{:error, Error.service(service: :aws, message: "Task role credentials not found")}
case :aws_credentials.get_credentials() do
:undefined ->
{:error,
Error.service(
service: :aws,
message:
"Task role credentials not found. Ensure AWS credentials are available via environment variables, credentials file, ECS task role, web identity token, or EC2 metadata."
)}

credentials when is_map(credentials) ->
{:ok, credentials}
credentials when is_map(credentials) ->
{:ok, credentials}

other ->
{:error, Error.service(service: :aws, message: "Unexpected credential format: #{inspect(other)}")}
end

{:error, reason} ->
{:error, Error.service(service: :aws, message: "Failed to start aws_credentials: #{inspect(reason)}")}
other ->
{:error, Error.service(service: :aws, message: "Unexpected credential format: #{inspect(other)}")}
end
end

Expand Down
28 changes: 24 additions & 4 deletions lib/sequin/consumers/kafka_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Sequin.Consumers.KafkaSink do
alias Sequin.Encrypted.Field, as: EncryptedField
alias Sequin.Sinks.Kafka.AwsMskIam

@derive {Jason.Encoder, only: [:hosts, :topic]}
@derive {Jason.Encoder, only: [:hosts, :topic, :use_task_role]}
@derive {Inspect, except: [:password, :aws_secret_access_key]}
@primary_key false
typed_embedded_schema do
Expand All @@ -23,6 +23,7 @@ defmodule Sequin.Consumers.KafkaSink do
field :aws_region, :string
field :aws_access_key_id, :string
field :aws_secret_access_key, EncryptedField
field :use_task_role, :boolean, default: false
field :connection_id, :string
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none
Expand All @@ -40,6 +41,7 @@ defmodule Sequin.Consumers.KafkaSink do
:aws_region,
:aws_access_key_id,
:aws_secret_access_key,
:use_task_role,
:routing_mode,
:compression
])
Expand Down Expand Up @@ -100,6 +102,7 @@ defmodule Sequin.Consumers.KafkaSink do
sasl_mechanism = get_field(changeset, :sasl_mechanism)
username = get_field(changeset, :username)
password = get_field(changeset, :password)
use_task_role = get_field(changeset, :use_task_role)

cond do
sasl_mechanism in [:plain, :scram_sha_256, :scram_sha_512] ->
Expand All @@ -109,9 +112,18 @@ defmodule Sequin.Consumers.KafkaSink do

sasl_mechanism == :aws_msk_iam ->
changeset =
validate_required(changeset, [:aws_access_key_id, :aws_secret_access_key, :aws_region],
message: "is required when SASL Mechanism is #{sasl_mechanism}"
)
if use_task_role do
# When using task role, we only need region
changeset
|> validate_required([:aws_region])
|> put_change(:aws_access_key_id, nil)
|> put_change(:aws_secret_access_key, nil)
else
# When using explicit credentials, we need all three
validate_required(changeset, [:aws_access_key_id, :aws_secret_access_key, :aws_region],
message: "is required when SASL Mechanism is #{sasl_mechanism}"
)
end

if get_field(changeset, :tls) do
changeset
Expand Down Expand Up @@ -186,6 +198,14 @@ defmodule Sequin.Consumers.KafkaSink do
defp brod_compression(other), do: other

# Add SASL authentication if username/password are configured
defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam, use_task_role: true} = sink) do
Keyword.put(
config,
:sasl,
{:callback, AwsMskIam.Auth, {:AWS_MSK_IAM, :task_role, sink.aws_region}}
)
end

defp maybe_add_sasl(config, %{sasl_mechanism: :aws_msk_iam} = sink) do
Keyword.put(
config,
Expand Down
16 changes: 0 additions & 16 deletions lib/sequin/consumers/kinesis_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ defmodule Sequin.Consumers.KinesisSink do
|> validate_credentials()
|> validate_stream_arn()
|> validate_routing()
|> validate_cloud_mode_restrictions()
end

defp validate_credentials(changeset) do
Expand Down Expand Up @@ -135,19 +134,4 @@ defmodule Sequin.Consumers.KinesisSink do
{:error, reason}
end
end

defp validate_cloud_mode_restrictions(changeset) do
self_hosted? = Sequin.Config.self_hosted?()
use_task_role? = get_field(changeset, :use_task_role)

if not self_hosted? and use_task_role? do
add_error(
changeset,
:use_task_role,
"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead."
)
else
changeset
end
end
end
16 changes: 0 additions & 16 deletions lib/sequin/consumers/sns_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule Sequin.Consumers.SnsSink do
|> validate_routing()
|> put_is_fifo()
|> validate_emulator_base_url()
|> validate_cloud_mode_restrictions()
end

defp validate_credentials(changeset) do
Expand Down Expand Up @@ -170,19 +169,4 @@ defmodule Sequin.Consumers.SnsSink do
def sns_arn_regex do
~r/^arn:aws:sns:(?<region>[a-z0-9-]+):(?<account_id>\d{12}):(?<topic_name>[a-zA-Z0-9_.-]+)$/
end

defp validate_cloud_mode_restrictions(changeset) do
self_hosted? = Sequin.Config.self_hosted?()
use_task_role? = get_field(changeset, :use_task_role)

if not self_hosted? and use_task_role? do
add_error(
changeset,
:use_task_role,
"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead."
)
else
changeset
end
end
end
16 changes: 0 additions & 16 deletions lib/sequin/consumers/sqs_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ defmodule Sequin.Consumers.SqsSink do
|> put_is_fifo()
|> validate_emulator_base_url()
|> validate_routing()
|> validate_cloud_mode_restrictions()
end

defp validate_credentials(changeset) do
Expand Down Expand Up @@ -170,19 +169,4 @@ defmodule Sequin.Consumers.SqsSink do
def sqs_url_regex do
~r/^https?:\/\/sqs\.(?<region>[a-z0-9-]+)\.([a-zA-Z0-9.-]+)(?::\d+)?\/\d{12}\/[a-zA-Z0-9_-]+(?:\.fifo)?$/
end

defp validate_cloud_mode_restrictions(changeset) do
self_hosted? = Sequin.Config.self_hosted?()
use_task_role? = get_field(changeset, :use_task_role)

if not self_hosted? and use_task_role? do
add_error(
changeset,
:use_task_role,
"Task role credentials are not supported in Sequin Cloud. Please use explicit credentials instead."
)
else
changeset
end
end
end
33 changes: 22 additions & 11 deletions lib/sequin/runtime/http_push_sqs_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,23 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do
queue_kind = Keyword.fetch!(opts, :queue_kind)
name = Keyword.fetch!(opts, :name)

{:ok,
%{
region: region,
access_key_id: access_key_id,
secret_access_key: secret_access_key
}} = fetch_sqs_config()
{:ok, sqs_config} = fetch_sqs_config()
region = Map.fetch!(sqs_config, :region)

{access_key_id, secret_access_key, token} =
if Map.get(sqs_config, :use_task_role) do
# Use aws_credentials provider chain
case :aws_credentials.get_credentials() do
:undefined ->
raise "Task role credentials not found"

credentials ->
{credentials.access_key_id, credentials.secret_access_key, credentials[:token]}
end
else
# Use explicit credentials
{Map.fetch!(sqs_config, :access_key_id), Map.fetch!(sqs_config, :secret_access_key), nil}
end

producer_mod = Keyword.get(opts, :producer_mod, BroadwaySQS.Producer)

Expand All @@ -125,11 +136,8 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do
module: {
producer_mod,
queue_url: queue_url,
config: [
access_key_id: access_key_id,
secret_access_key: secret_access_key,
region: region
],
config:
maybe_put_token([access_key_id: access_key_id, secret_access_key: secret_access_key, region: region], token),
attribute_names: [:sent_timestamp, :approximate_receive_count, :approximate_first_receive_timestamp],
receive_interval: 1_000,
max_number_of_messages: 10,
Expand Down Expand Up @@ -423,4 +431,7 @@ defmodule Sequin.Runtime.HttpPushSqsPipeline do
defp default_req_opts do
Application.get_env(:sequin, __MODULE__)[:req_opts] || []
end

defp maybe_put_token(config, nil), do: config
defp maybe_put_token(config, token), do: Keyword.put(config, :token, token)
end
25 changes: 25 additions & 0 deletions lib/sequin/sinks/kafka/aws_msk_iam/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ defmodule Sequin.Sinks.Kafka.AwsMskIam.Auth do

@handshake_version 1

# Handle task role credentials
def auth(host, sock, mod, client_id, timeout, {:AWS_MSK_IAM = mechanism, :task_role, aws_region} = _sasl_opts) do
case :aws_credentials.get_credentials() do
:undefined ->
{:error,
"Task role credentials not found. Ensure AWS credentials are available via environment variables, credentials file, ECS task role, web identity token, or EC2 metadata."}

credentials when is_map(credentials) ->
aws_secret_key_id = Map.get(credentials, :access_key_id)
aws_secret_access_key = Map.get(credentials, :secret_access_key)

auth(
host,
sock,
mod,
client_id,
timeout,
{mechanism, aws_secret_key_id, aws_secret_access_key, aws_region}
)

other ->
{:error, "Unexpected credential format: #{inspect(other)}"}
end
end

def auth(
_host,
_sock,
Expand Down
5 changes: 2 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ defmodule Sequin.MixProject do
def application do
[
mod: {Sequin.Application, []},
extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env()),
included_applications: [:aws_credentials]
extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env())
]
end

Expand Down Expand Up @@ -70,7 +69,7 @@ defmodule Sequin.MixProject do

# AWS and Cloud Services
{:aws, "~> 1.0"},
{:aws_credentials, "~> 1.0.0", runtime: false},
{:aws_credentials, "~> 1.0.0"},
{:aws_rds_castore, "~> 1.2.0"},
{:aws_signature, "~> 0.3.2"},

Expand Down
Loading
Loading