Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ defmodule Finch do
doc: "HTTP/2-specific options. Only relevant when `protocols` includes `:http2`.",
default: [
wait_for_server_settings?: false,
ping_interval: :infinity
ping_interval: :infinity,
max_connection_age: :infinity,
max_connection_age_jitter: 0
],
keys: [
wait_for_server_settings?: [
Expand All @@ -127,6 +129,28 @@ defmodule Finch do
the connection has been idle for this duration. When set to `:infinity` (default), \
no PINGs are sent.
"""
],
max_connection_age: [
type: :timeout,
doc: """
Maximum lifetime in milliseconds for an HTTP/2 connection before it is gracefully \
drained and replaced with a fresh one. When the timer expires the pool unregisters \
from the Registry (so new requests go to a fresh connection), finishes any in-flight \
requests, then terminates normally — the supervisor restarts it with a new DNS lookup. \
Useful for Kubernetes headless-service load balancing where DNS entries rotate. \
Defaults to `:infinity` (no age limit).
""",
default: :infinity
],
max_connection_age_jitter: [
type: :non_neg_integer,
doc: """
Random jitter in milliseconds added to `:max_connection_age`. Prevents multiple \
pool shards from draining simultaneously (thundering-herd). The actual age used is \
`max_connection_age + :rand.uniform(max_connection_age_jitter)`. \
Defaults to `0` (no jitter).
""",
default: 0
]
]
]
Expand Down Expand Up @@ -539,7 +563,9 @@ defmodule Finch do
pool_max_idle_time: valid[:pool_max_idle_time],
start_pool_metrics?: valid[:start_pool_metrics?],
wait_for_server_settings?: valid[:http2][:wait_for_server_settings?],
ping_interval: valid[:http2][:ping_interval]
ping_interval: valid[:http2][:ping_interval],
max_connection_age: valid[:http2][:max_connection_age],
max_connection_age_jitter: valid[:http2][:max_connection_age_jitter]
}
end

Expand Down Expand Up @@ -622,6 +648,13 @@ defmodule Finch do
> when streaming large responses which you do not intend to keep
> in memory.

> ### Connection draining {: .info}
>
> If the HTTP/2 pool this request is dispatched to is currently draining (see
> `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh
> pool. The retry is transparent to the caller. See `async_request/3` for the async
> variant, which does not retry automatically.

## Options

Shares options with `request/3`.
Expand Down Expand Up @@ -704,6 +737,13 @@ defmodule Finch do
> when streaming large responses which you do not intend to keep
> in memory.

> ### Connection draining {: .info}
>
> If the HTTP/2 pool this request is dispatched to is currently draining (see
> `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh
> pool. The retry is transparent to the caller. See `async_request/3` for the async
> variant, which does not retry automatically.

## Options

Shares options with `request/3`.
Expand Down Expand Up @@ -769,10 +809,21 @@ defmodule Finch do
end
end

defp __stream__(%Request{} = req, name, acc, fun, opts) do
defp __stream__(req, name, acc, fun, opts, retries \\ 3)

defp __stream__(%Request{} = req, name, acc, fun, opts, retries) do
case get_pool(req, name, opts) do
{pool, pool_mod} -> pool_mod.request(pool, req, acc, fun, name, opts)
_ -> {:error, Finch.Error.exception(:pool_not_available), acc}
{pool, pool_mod} ->
case pool_mod.request(pool, req, acc, fun, name, opts) do
{:error, %Finch.Error{reason: :read_only}, _acc} when retries > 0 ->
__stream__(req, name, acc, fun, opts, retries - 1)

other ->
other
end

_ ->
{:error, Finch.Error.exception(:pool_not_available), acc}
end
end

Expand All @@ -783,6 +834,13 @@ defmodule Finch do

See also `stream/5`.

> ### Connection draining {: .info}
>
> If the HTTP/2 pool this request is dispatched to is currently draining (see
> `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh
> pool. The retry is transparent to the caller. See `async_request/3` for the async
> variant, which does not retry automatically.

## Options

* `:pool_timeout` - This timeout is applied when we check out a connection from the pool.
Expand Down Expand Up @@ -898,6 +956,13 @@ defmodule Finch do
{ref, {:data, "..."}}
{ref, :done}

> ### Connection draining {: .info}
>
> Unlike `request/3` and `stream/5`, async requests are not automatically retried when a
> pool is draining (see `http2: [max_connection_age: ...]`). If the caller receives
> `{ref, {:error, %Finch.Error{reason: :read_only}}}`, it should retry by calling
> `async_request/3` again.

## Options

Shares options with `request/3`.
Expand Down
106 changes: 84 additions & 22 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ defmodule Finch.HTTP2.Pool do
metrics_ref: metrics_ref,
wait_for_server_settings?: pool_config.wait_for_server_settings?,
ping_interval: pool_config.ping_interval,
pings: %{}
pings: %{},
max_connection_age: pool_config.max_connection_age,
max_connection_age_jitter: pool_config.max_connection_age_jitter,
draining: false
}

{:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}}
Expand Down Expand Up @@ -273,7 +276,8 @@ defmodule Finch.HTTP2.Pool do
data
| conn: nil,
requests: %{},
pings: %{}
pings: %{},
draining: false
}

actions = [{{:timeout, :reconnect}, data.backoff_base, 1}]
Expand Down Expand Up @@ -352,6 +356,11 @@ defmodule Finch.HTTP2.Pool do
:keep_state_and_data
end

# The connection age timer may fire after transitioning to disconnected. Ignore it.
def disconnected({:timeout, :max_connection_age}, _content, _data) do
:keep_state_and_data
end

# Its possible that we can receive an info message telling us that a socket
# has been closed. This happens after we enter a disconnected state from a
# read_only state but we don't have any requests that are open. We've already
Expand Down Expand Up @@ -417,7 +426,7 @@ defmodule Finch.HTTP2.Pool do
def connected(:enter, _old_state, data) do
{:ok, _} = Registry.register(data.finch_name, data.pool_name, __MODULE__)
update_max_concurrent_streams(data)
{:keep_state_and_data, ping_action(data)}
{:keep_state_and_data, [ping_action(data) | connection_age_action(data)]}
end

# Issue request to the upstream server. We store a ref to the request so we
Expand Down Expand Up @@ -513,6 +522,21 @@ defmodule Finch.HTTP2.Pool do
end
end

# The connection has exceeded its maximum age. Unregister from the Registry so
# that new requests are routed to a fresh pool, then either stop immediately (if
# there are no in-flight requests) or drain remaining requests in read-only mode
# before stopping. The supervisor will restart the pool with a fresh DNS lookup.
def connected({:timeout, :max_connection_age}, _content, data) do
Registry.unregister(data.finch_name, data.pool_name)
data = %{data | draining: true}

if Enum.empty?(data.requests) do
{:stop, :normal, data}
else
{:next_state, :connected_read_only, data}
end
end

def connected({:call, from}, :ping, data) do
case HTTP2.ping(data.conn) do
{:ok, conn, ref} ->
Expand Down Expand Up @@ -590,12 +614,19 @@ defmodule Finch.HTTP2.Pool do
{data, actions} = handle_responses(data, responses)

# If the connection is still open for reading and we have pending requests
# to receive, we should try to wait for the responses. Otherwise enter
# the disconnected state so we can try to re-establish a connection.
if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do
{:keep_state, data, actions}
else
{:next_state, :disconnected, data, actions}
# to receive, we should try to wait for the responses. If the pool is
# draining and all requests are done, stop normally so the supervisor
# restarts it with a fresh DNS lookup. Otherwise enter the disconnected
# state so we can try to re-establish a connection.
cond do
HTTP2.open?(conn, :read) && Enum.any?(data.requests) ->
{:keep_state, data, actions}

data.draining && Enum.empty?(data.requests) ->
{:stop, :normal, data}

true ->
{:next_state, :disconnected, data, actions}
end

{:error, conn, error, responses} ->
Expand All @@ -607,13 +638,20 @@ defmodule Finch.HTTP2.Pool do
data = put_in(data.conn, conn)
{data, actions} = handle_responses(data, responses)

# Same as above, if we're still waiting on responses, we should stay in
# this state. Otherwise, we should enter the disconnected state and try
# to re-establish a connection.
if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do
{:keep_state, data, actions}
else
{:next_state, :disconnected, data, actions}
# If the connection is still open for reading and we have pending requests
# to receive, we should try to wait for the responses. If the pool is
# draining and all requests are done, stop normally so the supervisor
# restarts it with a fresh DNS lookup. Otherwise enter the disconnected
# state so we can try to re-establish a connection.
cond do
HTTP2.open?(conn, :read) && Enum.any?(data.requests) ->
{:keep_state, data, actions}

data.draining && Enum.empty?(data.requests) ->
{:stop, :normal, data}

true ->
{:next_state, :disconnected, data, actions}
end

:unknown ->
Expand All @@ -627,6 +665,11 @@ defmodule Finch.HTTP2.Pool do
:keep_state_and_data
end

# The connection age timer may fire after transitioning to read-only. Ignore it.
def connected_read_only({:timeout, :max_connection_age}, _content, _data) do
:keep_state_and_data
end

# In this state, we don't need to call HTTP2.cancel_request/2 since the connection
# is closed for writing, so we can't tell the server to cancel the request anymore.
def connected_read_only({:timeout, {:request_timeout, ref}}, _content, data) do
Expand All @@ -640,12 +683,19 @@ defmodule Finch.HTTP2.Pool do
send(request.from_pid, {request.request_ref, {:error, Error.exception(:request_timeout)}})
end

# If we're out of requests then we should enter the disconnected state.
# Otherwise wait for the remaining responses.
if Enum.empty?(data.requests) do
{:next_state, :disconnected, data}
else
{:keep_state, data}
# If requests remain, keep waiting for their responses. If the pool is
# draining and all requests are done, stop normally so the supervisor
# restarts it with a fresh DNS lookup. Otherwise enter the disconnected
# state so we can try to re-establish a connection.
cond do
Enum.any?(data.requests) ->
{:keep_state, data}

data.draining ->
{:stop, :normal, data}

true ->
{:next_state, :disconnected, data}
end
end

Expand Down Expand Up @@ -962,6 +1012,18 @@ defmodule Finch.HTTP2.Pool do

defp ping_action(data), do: {{:timeout, :ping}, data.ping_interval, :ping}

defp connection_age_action(%{max_connection_age: :infinity}), do: []

defp connection_age_action(%{max_connection_age_jitter: jitter} = data)
when is_integer(jitter) and jitter > 0 do
jitter_value = :rand.uniform(jitter)
[{{:timeout, :max_connection_age}, data.max_connection_age + jitter_value, nil}]
end

defp connection_age_action(data) do
[{{:timeout, :max_connection_age}, data.max_connection_age, nil}]
end

defp reply(%{from: nil, from_pid: pid, request_ref: request_ref}, reply) do
send(pid, {request_ref, reply})
:ok
Expand Down
68 changes: 67 additions & 1 deletion test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ defmodule Finch.HTTP2.PoolTest do
start_pool_metrics?: false,
count: 1,
wait_for_server_settings?: false,
ping_interval: :infinity
ping_interval: :infinity,
max_connection_age: :infinity,
max_connection_age_jitter: 0
}

pool = Finch.Pool.from_name({:https, "localhost", port, :default})
Expand Down Expand Up @@ -738,6 +740,70 @@ defmodule Finch.HTTP2.PoolTest do
end
end

describe "connection draining" do
test "expiry unregisters pool and stops it" do
{:ok, pool} =
start_server_and_connect_with(fn port ->
start_pool(port, max_connection_age: 50, max_connection_age_jitter: 0)
end)

ref = Process.monitor(pool)
assert [{^pool, _}] = Registry.lookup(:test, :pool_name)

assert_receive {:unregister, _registry, _key, _pid}, 500
assert [] = Registry.lookup(:test, :pool_name)

assert_receive {:DOWN, ^ref, :process, ^pool, reason}, 1_000
assert reason in [:normal, :noproc]
end

test "drain with in-flight requests completes them before stopping", %{request: req} do
us = self()

{:ok, pool} =
start_server_and_connect_with(fn port ->
start_pool(port, max_connection_age: 200, max_connection_age_jitter: 0)
end)

ref = Process.monitor(pool)

spawn(fn ->
result = request(pool, req, [])
send(us, {:resp, result})
end)

assert_recv_frames([headers(stream_id: stream_id)])

assert_receive {:unregister, _registry, _key, _pid}, 500

# New requests should fail with :read_only
assert {:error, %Finch.Error{reason: :read_only}, _acc} = request(pool, req, [])

# Complete the in-flight request
hbf = server_encode_headers([{":status", "200"}])

server_send_frames([
headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])),
data(stream_id: stream_id, data: "drained", flags: set_flags(:data, [:end_stream]))
])

assert_receive {:resp, {:ok, {200, [], "drained"}}}

assert_receive {:DOWN, ^ref, :process, ^pool, :normal}, 1_000
end

test "jitter does not crash" do
{:ok, pool} =
start_server_and_connect_with(fn port ->
start_pool(port, max_connection_age: 50, max_connection_age_jitter: 50)
end)

ref = Process.monitor(pool)
assert_receive {:DOWN, ^ref, :process, ^pool, reason}, 1_000
assert reason in [:normal, :noproc]
end
end

@pdict_key {__MODULE__, :http2_test_server}

describe "pool registration" do
Expand Down
Loading
Loading