diff --git a/lib/finch.ex b/lib/finch.ex index 5459854a..0c0f96f3 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -105,7 +105,9 @@ defmodule Finch do doc: "HTTP/2-specific options. Only relevant when `protocols` includes `:http2`.", default: [ wait_for_server_settings?: false, - ping_interval: :infinity + ping_interval: :infinity, + max_connection_age: :infinity, + max_connection_age_jitter: 0 ], keys: [ wait_for_server_settings?: [ @@ -127,6 +129,28 @@ defmodule Finch do the connection has been idle for this duration. When set to `:infinity` (default), \ no PINGs are sent. """ + ], + max_connection_age: [ + type: :timeout, + doc: """ + Maximum lifetime in milliseconds for an HTTP/2 connection before it is gracefully \ + drained and replaced with a fresh one. When the timer expires the pool unregisters \ + from the Registry (so new requests go to a fresh connection), finishes any in-flight \ + requests, then terminates normally — the supervisor restarts it with a new DNS lookup. \ + Useful for Kubernetes headless-service load balancing where DNS entries rotate. \ + Defaults to `:infinity` (no age limit). + """, + default: :infinity + ], + max_connection_age_jitter: [ + type: :non_neg_integer, + doc: """ + Random jitter in milliseconds added to `:max_connection_age`. Prevents multiple \ + pool shards from draining simultaneously (thundering-herd). The actual age used is \ + `max_connection_age + :rand.uniform(max_connection_age_jitter)`. \ + Defaults to `0` (no jitter). + """, + default: 0 ] ] ] @@ -539,7 +563,9 @@ defmodule Finch do pool_max_idle_time: valid[:pool_max_idle_time], start_pool_metrics?: valid[:start_pool_metrics?], wait_for_server_settings?: valid[:http2][:wait_for_server_settings?], - ping_interval: valid[:http2][:ping_interval] + ping_interval: valid[:http2][:ping_interval], + max_connection_age: valid[:http2][:max_connection_age], + max_connection_age_jitter: valid[:http2][:max_connection_age_jitter] } end @@ -622,6 +648,13 @@ defmodule Finch do > when streaming large responses which you do not intend to keep > in memory. + > ### Connection draining {: .info} + > + > If the HTTP/2 pool this request is dispatched to is currently draining (see + > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh + > pool. The retry is transparent to the caller. See `async_request/3` for the async + > variant, which does not retry automatically. + ## Options Shares options with `request/3`. @@ -704,6 +737,13 @@ defmodule Finch do > when streaming large responses which you do not intend to keep > in memory. + > ### Connection draining {: .info} + > + > If the HTTP/2 pool this request is dispatched to is currently draining (see + > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh + > pool. The retry is transparent to the caller. See `async_request/3` for the async + > variant, which does not retry automatically. + ## Options Shares options with `request/3`. @@ -769,10 +809,21 @@ defmodule Finch do end end - defp __stream__(%Request{} = req, name, acc, fun, opts) do + defp __stream__(req, name, acc, fun, opts, retries \\ 3) + + defp __stream__(%Request{} = req, name, acc, fun, opts, retries) do case get_pool(req, name, opts) do - {pool, pool_mod} -> pool_mod.request(pool, req, acc, fun, name, opts) - _ -> {:error, Finch.Error.exception(:pool_not_available), acc} + {pool, pool_mod} -> + case pool_mod.request(pool, req, acc, fun, name, opts) do + {:error, %Finch.Error{reason: :read_only}, _acc} when retries > 0 -> + __stream__(req, name, acc, fun, opts, retries - 1) + + other -> + other + end + + _ -> + {:error, Finch.Error.exception(:pool_not_available), acc} end end @@ -783,6 +834,13 @@ defmodule Finch do See also `stream/5`. + > ### Connection draining {: .info} + > + > If the HTTP/2 pool this request is dispatched to is currently draining (see + > `http2: [max_connection_age: ...]`), the request is automatically retried on a fresh + > pool. The retry is transparent to the caller. See `async_request/3` for the async + > variant, which does not retry automatically. + ## Options * `:pool_timeout` - This timeout is applied when we check out a connection from the pool. @@ -898,6 +956,13 @@ defmodule Finch do {ref, {:data, "..."}} {ref, :done} + > ### Connection draining {: .info} + > + > Unlike `request/3` and `stream/5`, async requests are not automatically retried when a + > pool is draining (see `http2: [max_connection_age: ...]`). If the caller receives + > `{ref, {:error, %Finch.Error{reason: :read_only}}}`, it should retry by calling + > `async_request/3` again. + ## Options Shares options with `request/3`. diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 7efe7510..6c130886 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -230,7 +230,10 @@ defmodule Finch.HTTP2.Pool do metrics_ref: metrics_ref, wait_for_server_settings?: pool_config.wait_for_server_settings?, ping_interval: pool_config.ping_interval, - pings: %{} + pings: %{}, + max_connection_age: pool_config.max_connection_age, + max_connection_age_jitter: pool_config.max_connection_age_jitter, + draining: false } {:ok, :disconnected, data, {:next_event, :internal, {:connect, 0}}} @@ -273,7 +276,8 @@ defmodule Finch.HTTP2.Pool do data | conn: nil, requests: %{}, - pings: %{} + pings: %{}, + draining: false } actions = [{{:timeout, :reconnect}, data.backoff_base, 1}] @@ -352,6 +356,11 @@ defmodule Finch.HTTP2.Pool do :keep_state_and_data end + # The connection age timer may fire after transitioning to disconnected. Ignore it. + def disconnected({:timeout, :max_connection_age}, _content, _data) do + :keep_state_and_data + end + # Its possible that we can receive an info message telling us that a socket # has been closed. This happens after we enter a disconnected state from a # read_only state but we don't have any requests that are open. We've already @@ -417,7 +426,7 @@ defmodule Finch.HTTP2.Pool do def connected(:enter, _old_state, data) do {:ok, _} = Registry.register(data.finch_name, data.pool_name, __MODULE__) update_max_concurrent_streams(data) - {:keep_state_and_data, ping_action(data)} + {:keep_state_and_data, [ping_action(data) | connection_age_action(data)]} end # Issue request to the upstream server. We store a ref to the request so we @@ -513,6 +522,21 @@ defmodule Finch.HTTP2.Pool do end end + # The connection has exceeded its maximum age. Unregister from the Registry so + # that new requests are routed to a fresh pool, then either stop immediately (if + # there are no in-flight requests) or drain remaining requests in read-only mode + # before stopping. The supervisor will restart the pool with a fresh DNS lookup. + def connected({:timeout, :max_connection_age}, _content, data) do + Registry.unregister(data.finch_name, data.pool_name) + data = %{data | draining: true} + + if Enum.empty?(data.requests) do + {:stop, :normal, data} + else + {:next_state, :connected_read_only, data} + end + end + def connected({:call, from}, :ping, data) do case HTTP2.ping(data.conn) do {:ok, conn, ref} -> @@ -590,12 +614,19 @@ defmodule Finch.HTTP2.Pool do {data, actions} = handle_responses(data, responses) # If the connection is still open for reading and we have pending requests - # to receive, we should try to wait for the responses. Otherwise enter - # the disconnected state so we can try to re-establish a connection. - if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do - {:keep_state, data, actions} - else - {:next_state, :disconnected, data, actions} + # to receive, we should try to wait for the responses. If the pool is + # draining and all requests are done, stop normally so the supervisor + # restarts it with a fresh DNS lookup. Otherwise enter the disconnected + # state so we can try to re-establish a connection. + cond do + HTTP2.open?(conn, :read) && Enum.any?(data.requests) -> + {:keep_state, data, actions} + + data.draining && Enum.empty?(data.requests) -> + {:stop, :normal, data} + + true -> + {:next_state, :disconnected, data, actions} end {:error, conn, error, responses} -> @@ -607,13 +638,20 @@ defmodule Finch.HTTP2.Pool do data = put_in(data.conn, conn) {data, actions} = handle_responses(data, responses) - # Same as above, if we're still waiting on responses, we should stay in - # this state. Otherwise, we should enter the disconnected state and try - # to re-establish a connection. - if HTTP2.open?(conn, :read) && Enum.any?(data.requests) do - {:keep_state, data, actions} - else - {:next_state, :disconnected, data, actions} + # If the connection is still open for reading and we have pending requests + # to receive, we should try to wait for the responses. If the pool is + # draining and all requests are done, stop normally so the supervisor + # restarts it with a fresh DNS lookup. Otherwise enter the disconnected + # state so we can try to re-establish a connection. + cond do + HTTP2.open?(conn, :read) && Enum.any?(data.requests) -> + {:keep_state, data, actions} + + data.draining && Enum.empty?(data.requests) -> + {:stop, :normal, data} + + true -> + {:next_state, :disconnected, data, actions} end :unknown -> @@ -627,6 +665,11 @@ defmodule Finch.HTTP2.Pool do :keep_state_and_data end + # The connection age timer may fire after transitioning to read-only. Ignore it. + def connected_read_only({:timeout, :max_connection_age}, _content, _data) do + :keep_state_and_data + end + # In this state, we don't need to call HTTP2.cancel_request/2 since the connection # is closed for writing, so we can't tell the server to cancel the request anymore. def connected_read_only({:timeout, {:request_timeout, ref}}, _content, data) do @@ -640,12 +683,19 @@ defmodule Finch.HTTP2.Pool do send(request.from_pid, {request.request_ref, {:error, Error.exception(:request_timeout)}}) end - # If we're out of requests then we should enter the disconnected state. - # Otherwise wait for the remaining responses. - if Enum.empty?(data.requests) do - {:next_state, :disconnected, data} - else - {:keep_state, data} + # If requests remain, keep waiting for their responses. If the pool is + # draining and all requests are done, stop normally so the supervisor + # restarts it with a fresh DNS lookup. Otherwise enter the disconnected + # state so we can try to re-establish a connection. + cond do + Enum.any?(data.requests) -> + {:keep_state, data} + + data.draining -> + {:stop, :normal, data} + + true -> + {:next_state, :disconnected, data} end end @@ -962,6 +1012,18 @@ defmodule Finch.HTTP2.Pool do defp ping_action(data), do: {{:timeout, :ping}, data.ping_interval, :ping} + defp connection_age_action(%{max_connection_age: :infinity}), do: [] + + defp connection_age_action(%{max_connection_age_jitter: jitter} = data) + when is_integer(jitter) and jitter > 0 do + jitter_value = :rand.uniform(jitter) + [{{:timeout, :max_connection_age}, data.max_connection_age + jitter_value, nil}] + end + + defp connection_age_action(data) do + [{{:timeout, :max_connection_age}, data.max_connection_age, nil}] + end + defp reply(%{from: nil, from_pid: pid, request_ref: request_ref}, reply) do send(pid, {request_ref, reply}) :ok diff --git a/test/finch/http2/pool_test.exs b/test/finch/http2/pool_test.exs index 6ff6f74c..ba679e09 100644 --- a/test/finch/http2/pool_test.exs +++ b/test/finch/http2/pool_test.exs @@ -46,7 +46,9 @@ defmodule Finch.HTTP2.PoolTest do start_pool_metrics?: false, count: 1, wait_for_server_settings?: false, - ping_interval: :infinity + ping_interval: :infinity, + max_connection_age: :infinity, + max_connection_age_jitter: 0 } pool = Finch.Pool.from_name({:https, "localhost", port, :default}) @@ -738,6 +740,70 @@ defmodule Finch.HTTP2.PoolTest do end end + describe "connection draining" do + test "expiry unregisters pool and stops it" do + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port, max_connection_age: 50, max_connection_age_jitter: 0) + end) + + ref = Process.monitor(pool) + assert [{^pool, _}] = Registry.lookup(:test, :pool_name) + + assert_receive {:unregister, _registry, _key, _pid}, 500 + assert [] = Registry.lookup(:test, :pool_name) + + assert_receive {:DOWN, ^ref, :process, ^pool, reason}, 1_000 + assert reason in [:normal, :noproc] + end + + test "drain with in-flight requests completes them before stopping", %{request: req} do + us = self() + + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port, max_connection_age: 200, max_connection_age_jitter: 0) + end) + + ref = Process.monitor(pool) + + spawn(fn -> + result = request(pool, req, []) + send(us, {:resp, result}) + end) + + assert_recv_frames([headers(stream_id: stream_id)]) + + assert_receive {:unregister, _registry, _key, _pid}, 500 + + # New requests should fail with :read_only + assert {:error, %Finch.Error{reason: :read_only}, _acc} = request(pool, req, []) + + # Complete the in-flight request + hbf = server_encode_headers([{":status", "200"}]) + + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), + data(stream_id: stream_id, data: "drained", flags: set_flags(:data, [:end_stream])) + ]) + + assert_receive {:resp, {:ok, {200, [], "drained"}}} + + assert_receive {:DOWN, ^ref, :process, ^pool, :normal}, 1_000 + end + + test "jitter does not crash" do + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port, max_connection_age: 50, max_connection_age_jitter: 50) + end) + + ref = Process.monitor(pool) + assert_receive {:DOWN, ^ref, :process, ^pool, reason}, 1_000 + assert reason in [:normal, :noproc] + end + end + @pdict_key {__MODULE__, :http2_test_server} describe "pool registration" do diff --git a/test/finch_test.exs b/test/finch_test.exs index 2f12dbe2..e7cc9cf4 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -1808,6 +1808,92 @@ defmodule FinchTest do end end + defp wait_for_pool(finch_name, key, timeout \\ 2_000) do + deadline = System.monotonic_time(:millisecond) + timeout + wait_for_pool_loop(finch_name, key, deadline) + end + + defp wait_for_pool_loop(finch_name, key, deadline) do + case Registry.lookup(finch_name, key) do + [{_pid, _}] -> + :ok + + [] -> + if System.monotonic_time(:millisecond) < deadline do + Process.sleep(10) + wait_for_pool_loop(finch_name, key, deadline) + else + flunk("Pool did not re-register within timeout") + end + end + end + + describe "connection draining" do + test "max_connection_age option is accepted and requests succeed", %{finch_name: finch_name} do + url = Application.get_env(:finch, :test_https_h2_url) + + Finch.TestHelper.start_finch!( + name: finch_name, + pools: %{ + url => [ + protocols: [:http2], + count: 1, + http2: [max_connection_age: 30_000, max_connection_age_jitter: 5_000], + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + } + ) + + assert {:ok, %Finch.Response{status: 200}} = + Finch.build(:get, url) |> Finch.request(finch_name) + end + + test "requests succeed after pool drains and restarts", %{finch_name: finch_name} do + url = Application.get_env(:finch, :test_https_h2_url) + + Finch.TestHelper.start_finch!( + name: finch_name, + pools: %{ + url => [ + protocols: [:http2], + count: 1, + http2: [max_connection_age: 500, max_connection_age_jitter: 0], + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + } + ) + + # Request before drain timer fires — should succeed normally. + assert {:ok, %Finch.Response{status: 200}} = + Finch.build(:get, url) |> Finch.request(finch_name) + + # Get the current pool PID and monitor it so we can deterministically + # wait for the drain-triggered shutdown instead of sleeping. + %URI{scheme: scheme, host: host, port: port} = URI.parse(url) + pool_key = {String.to_atom(scheme), host, port, :default} + [{old_pid, _}] = Registry.lookup(finch_name, pool_key) + ref = Process.monitor(old_pid) + + assert_receive {:DOWN, ^ref, :process, ^old_pid, :normal}, 2_000 + + # Wait for the supervisor to restart the pool and re-register it. + wait_for_pool(finch_name, pool_key) + + # After the supervisor restarts the pool with a fresh connection, + # requests should succeed again. + assert {:ok, %Finch.Response{status: 200}} = + Finch.build(:get, url) |> Finch.request(finch_name) + end + end + describe "find_pool/2" do test "returns {:ok, pid} for existing pools", %{bypass: bypass, finch_name: finch_name} do start_supervised!({Finch, name: finch_name})