Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/posit/connect/sessions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse

import requests

Expand Down Expand Up @@ -85,6 +85,10 @@ def post(self, url, data=None, json=None, preserve_post=True, max_redirects=5, *

redirect_url = urljoin(response.url, redirect_url)

parsed_redirect = urlparse(redirect_url)
if parsed_redirect.scheme not in ("http", "https"):
break

# For 307 and 308 the HTTP spec mandates preserving the method and body.
if response.status_code in (307, 308):
method = "POST"
Expand All @@ -96,8 +100,29 @@ def post(self, url, data=None, json=None, preserve_post=True, max_redirects=5, *
data = None
json = None

# Strip credentials on cross-origin redirects to prevent leaking
# the Authorization header / session auth to a different host.
request_kwargs = dict(kwargs)
parsed_current = urlparse(response.url)
same_origin = (
parsed_current.scheme == parsed_redirect.scheme
and parsed_current.hostname == parsed_redirect.hostname
and parsed_current.port == parsed_redirect.port
)
if not same_origin:
headers = dict(request_kwargs.get("headers") or {})
headers = {
k: v for k, v in headers.items() if k.lower() != "authorization"
}
# Setting to None suppresses any matching session-level header.
headers["Authorization"] = None
request_kwargs["headers"] = headers
request_kwargs["auth"] = None

# Perform the next request in the redirect chain.
response = self.request(method, redirect_url, data=data, json=json, **kwargs)
response = self.request(
method, redirect_url, data=data, json=json, **request_kwargs
)
redirect_count += 1

return response
78 changes: 78 additions & 0 deletions tests/posit/connect/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,84 @@ def test_post_redirect_no_location():
assert response.status_code == 302


@responses.activate
def test_post_cross_origin_redirect_strips_authorization():
initial_url = "https://example.com/api"
attacker_url = "https://attacker.example/steal"

responses.add(
responses.POST, initial_url, status=302, headers={"location": attacker_url}
)
responses.add(responses.POST, attacker_url, json={"ok": True}, status=200)

session = Session()
session.headers["Authorization"] = "Key super-secret"

response = session.post(initial_url, data={"key": "value"}, preserve_post=True)

assert response.status_code == 200
assert len(responses.calls) == 2
assert responses.calls[0].request.headers.get("Authorization") == "Key super-secret"
assert "Authorization" not in responses.calls[1].request.headers


@responses.activate
def test_post_cross_origin_redirect_strips_header_kwarg_authorization():
initial_url = "https://example.com/api"
attacker_url = "https://attacker.example/steal"

responses.add(
responses.POST, initial_url, status=302, headers={"location": attacker_url}
)
responses.add(responses.POST, attacker_url, json={"ok": True}, status=200)

session = Session()
response = session.post(
initial_url,
data={"key": "value"},
headers={"Authorization": "Bearer jwt-token"},
preserve_post=True,
)

assert response.status_code == 200
assert "Authorization" not in responses.calls[1].request.headers


@responses.activate
def test_post_same_origin_redirect_preserves_authorization():
initial_url = "https://example.com/api"
redirect_url = "https://example.com/next"

responses.add(
responses.POST, initial_url, status=302, headers={"location": "/next"}
)
responses.add(responses.POST, redirect_url, json={"ok": True}, status=200)

session = Session()
session.headers["Authorization"] = "Key super-secret"

response = session.post(initial_url, data={"key": "value"}, preserve_post=True)

assert response.status_code == 200
assert len(responses.calls) == 2
assert responses.calls[1].request.headers.get("Authorization") == "Key super-secret"


@responses.activate
def test_post_redirect_rejects_non_http_scheme():
initial_url = "https://example.com/api"

responses.add(
responses.POST, initial_url, status=302, headers={"location": "file:///etc/passwd"}
)

session = Session()
response = session.post(initial_url, data={"key": "value"})

assert len(responses.calls) == 1
assert response.status_code == 302


@responses.activate
def test_post_redirect_location_none_explicit():
url = "http://connect.example.com/api"
Expand Down
Loading