From 83e09985b0c16ebf6b0aba8732363c85903ec711 Mon Sep 17 00:00:00 2001 From: Taylor Steinberg Date: Wed, 8 Apr 2026 23:07:56 -0400 Subject: [PATCH] fix(sessions): strip Authorization on cross-origin POST redirects The custom Session.post manually followed redirects and re-dispatched via self.request without stripping credentials, bypassing requests' SessionRedirectMixin.rebuild_auth. A malicious or compromised Connect server could return a Location header pointing to an attacker-controlled host (including protocol-relative //attacker/... resolved via urljoin) and receive the Connect API key or bootstrap JWT. Compare origins (scheme, hostname, port) between the current response URL and the resolved redirect target; on mismatch, drop any Authorization header from the outgoing headers kwarg, suppress the session-level Authorization header, and set auth=None for the next hop. Also reject redirect schemes other than http/https. Co-Authored-By: Claude Sonnet 4.6 --- src/posit/connect/sessions.py | 29 ++++++++++- tests/posit/connect/test_sessions.py | 78 ++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/src/posit/connect/sessions.py b/src/posit/connect/sessions.py index 5ce73e5e..337fc850 100644 --- a/src/posit/connect/sessions.py +++ b/src/posit/connect/sessions.py @@ -1,4 +1,4 @@ -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse import requests @@ -85,6 +85,10 @@ def post(self, url, data=None, json=None, preserve_post=True, max_redirects=5, * redirect_url = urljoin(response.url, redirect_url) + parsed_redirect = urlparse(redirect_url) + if parsed_redirect.scheme not in ("http", "https"): + break + # For 307 and 308 the HTTP spec mandates preserving the method and body. if response.status_code in (307, 308): method = "POST" @@ -96,8 +100,29 @@ def post(self, url, data=None, json=None, preserve_post=True, max_redirects=5, * data = None json = None + # Strip credentials on cross-origin redirects to prevent leaking + # the Authorization header / session auth to a different host. + request_kwargs = dict(kwargs) + parsed_current = urlparse(response.url) + same_origin = ( + parsed_current.scheme == parsed_redirect.scheme + and parsed_current.hostname == parsed_redirect.hostname + and parsed_current.port == parsed_redirect.port + ) + if not same_origin: + headers = dict(request_kwargs.get("headers") or {}) + headers = { + k: v for k, v in headers.items() if k.lower() != "authorization" + } + # Setting to None suppresses any matching session-level header. + headers["Authorization"] = None + request_kwargs["headers"] = headers + request_kwargs["auth"] = None + # Perform the next request in the redirect chain. - response = self.request(method, redirect_url, data=data, json=json, **kwargs) + response = self.request( + method, redirect_url, data=data, json=json, **request_kwargs + ) redirect_count += 1 return response diff --git a/tests/posit/connect/test_sessions.py b/tests/posit/connect/test_sessions.py index de5af25b..7d3f2625 100644 --- a/tests/posit/connect/test_sessions.py +++ b/tests/posit/connect/test_sessions.py @@ -116,6 +116,84 @@ def test_post_redirect_no_location(): assert response.status_code == 302 +@responses.activate +def test_post_cross_origin_redirect_strips_authorization(): + initial_url = "https://example.com/api" + attacker_url = "https://attacker.example/steal" + + responses.add( + responses.POST, initial_url, status=302, headers={"location": attacker_url} + ) + responses.add(responses.POST, attacker_url, json={"ok": True}, status=200) + + session = Session() + session.headers["Authorization"] = "Key super-secret" + + response = session.post(initial_url, data={"key": "value"}, preserve_post=True) + + assert response.status_code == 200 + assert len(responses.calls) == 2 + assert responses.calls[0].request.headers.get("Authorization") == "Key super-secret" + assert "Authorization" not in responses.calls[1].request.headers + + +@responses.activate +def test_post_cross_origin_redirect_strips_header_kwarg_authorization(): + initial_url = "https://example.com/api" + attacker_url = "https://attacker.example/steal" + + responses.add( + responses.POST, initial_url, status=302, headers={"location": attacker_url} + ) + responses.add(responses.POST, attacker_url, json={"ok": True}, status=200) + + session = Session() + response = session.post( + initial_url, + data={"key": "value"}, + headers={"Authorization": "Bearer jwt-token"}, + preserve_post=True, + ) + + assert response.status_code == 200 + assert "Authorization" not in responses.calls[1].request.headers + + +@responses.activate +def test_post_same_origin_redirect_preserves_authorization(): + initial_url = "https://example.com/api" + redirect_url = "https://example.com/next" + + responses.add( + responses.POST, initial_url, status=302, headers={"location": "/next"} + ) + responses.add(responses.POST, redirect_url, json={"ok": True}, status=200) + + session = Session() + session.headers["Authorization"] = "Key super-secret" + + response = session.post(initial_url, data={"key": "value"}, preserve_post=True) + + assert response.status_code == 200 + assert len(responses.calls) == 2 + assert responses.calls[1].request.headers.get("Authorization") == "Key super-secret" + + +@responses.activate +def test_post_redirect_rejects_non_http_scheme(): + initial_url = "https://example.com/api" + + responses.add( + responses.POST, initial_url, status=302, headers={"location": "file:///etc/passwd"} + ) + + session = Session() + response = session.post(initial_url, data={"key": "value"}) + + assert len(responses.calls) == 1 + assert response.status_code == 302 + + @responses.activate def test_post_redirect_location_none_explicit(): url = "http://connect.example.com/api"