From 3e52ba6ec4735903e15ba30c95986027420e68f7 Mon Sep 17 00:00:00 2001 From: Jeremy McEntire Date: Thu, 4 Jun 2026 19:18:01 -0500 Subject: [PATCH] feat(custody): define opaque provider-use authorization boundary --- docs/credential-custody-boundary.md | 77 +++++ src/baton/credential_custody.py | 424 ++++++++++++++++++++++++++++ tests/test_credential_custody.py | 266 +++++++++++++++++ 3 files changed, 767 insertions(+) create mode 100644 docs/credential-custody-boundary.md create mode 100644 src/baton/credential_custody.py create mode 100644 tests/test_credential_custody.py diff --git a/docs/credential-custody-boundary.md b/docs/credential-custody-boundary.md new file mode 100644 index 0000000..5c47903 --- /dev/null +++ b/docs/credential-custody-boundary.md @@ -0,0 +1,77 @@ +# Provider Credential Custody Boundary + +## Purpose + +Baton owns the cloud-neutral egress control boundary for provider-backed +operations. A business workflow may select a connector and provide opaque +recipient and payload references, but it must never receive or invoke with an email, +telephony, or other external-provider credential value. + +`src/baton/credential_custody.py` defines the reusable boundary: + +- `ProviderCredentialHandle` is an administrator-configured opaque handle, + not a secret-store URI or credential value. +- `SignedWorkloadAuthorizationVerifier` converts a signed authorization + reference into a verified, credential-free outcome. +- `CredentialCustodyAuthorizer` obtains a trusted verifier outcome and + `authorize_provider_dispatch` binds it to the exact workload, initial + connector handle, channel, purpose, opaque recipient and payload references, + request fingerprint, and provider-attempt budget before consuming the + reservation. +- `AuthorizationConsumptionLedger` is required for every provider operation and + is invoked inside authorization to atomically bind authorization, request + fingerprint, and idempotency key. +- `CustodiedReferenceResolver` is the only permitted provider-operation + boundary. It resolves material internally and returns a sanitized outcome. +- The resulting `AuthorizedProviderDispatch` carries one ledger reservation + across primary and backup attempts, but can select only connector handles + already included in its verified scope and is bounded by its verified + provider-attempt budget. + +## Failure Semantics + +- Missing, expired, mismatched, or out-of-scope authority is denied before any + provider operation. +- Only single-dispatch authority with an exact durable ledger reservation is + accepted. This prevents a provider send from accepting reusable authority or + a `one_time` claim without replay enforcement. +- Sanitized outcomes carry provider identity, status, correlation identifiers, + audit reference, and bounded failure code only. They do not carry material, + provider response bodies, recipient data, or message data. + +## Integration Gate + +This module is a contract surface, not a configured production custody store. +MEA integration remains blocked until all of the following exist: + +1. A trusted Signet-compatible verifier implementation with issuer and rotation + policy. +2. A durable consumption ledger with atomic reserve/complete/replay behavior. +3. A custody-internal resolver implementation, potentially backed by OpenBao + after license, deployment, assurance, and operational review. +4. Tamper-evident audit and failure notification sink implementations. +5. Key-free executable evidence for each boundary and the provider executor. + +No provider credential values or signing keys are used or packaged by this +contract or its tests. + +## OpenBao Candidate Evidence + +OpenBao is a candidate implementation backend for the internal resolver, not +an adopted dependency in this change. Official project documentation checked +on 2026-06-04 records: + +- Source licensing as MPL-2.0 with OSI and FSF recognition: + +- Versioned arbitrary secret storage and ACL separation through KV v2: + +- Cryptographic processing without retaining submitted data through Transit: + +- Request audit-device behavior: + +- JWT/OIDC authentication capability: + + +These documents establish candidate fit and commercial open-source eligibility; +they do not establish certification, deployment security, high availability, +unseal policy, vulnerability disposition, or approval for MEA production use. diff --git a/src/baton/credential_custody.py b/src/baton/credential_custody.py new file mode 100644 index 0000000..b3d3431 --- /dev/null +++ b/src/baton/credential_custody.py @@ -0,0 +1,424 @@ +"""Cloud-neutral provider credential custody contracts. + +This module defines the boundary between a business workflow and a +single-purpose provider executor. No public type carries credential values. +A concrete resolver may access provider material only while executing an +already authorized operation inside the custody boundary. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import StrEnum +from collections.abc import Callable +from typing import Protocol + + +_FINGERPRINT_RE = re.compile(r"^[a-f0-9]{64}$") +_CODE_RE = re.compile(r"^[a-z][a-z0-9_]{0,63}$") + + +class ProviderChannel(StrEnum): + SMS = "sms" + EMAIL = "email" + + +class CustodyResultStatus(StrEnum): + ACCEPTED = "accepted" + DELIVERED = "delivered" + FAILED = "failed" + + +class CustodyAuditKind(StrEnum): + AUTHORIZATION_DENIED = "authorization_denied" + AUTHORIZATION_ACCEPTED = "authorization_accepted" + RESERVATION_REQUIRED = "reservation_required" + REPLAY_REJECTED = "replay_rejected" + INVOCATION_COMPLETED = "invocation_completed" + INVOCATION_FAILED = "invocation_failed" + + +class CustodyError(Exception): + """Base error for the non-material custody boundary.""" + + +class CustodyAuthorizationDenied(CustodyError): + """A verified outcome cannot authorize this provider operation.""" + + +class ConsumptionReservationRequired(CustodyAuthorizationDenied): + """Bounded-use authority lacks an atomic consumption reservation.""" + + +@dataclass(frozen=True) +class ProviderCredentialHandle: + """Opaque lookup handle configured by an administrator. + + ``handle_id`` is not a provider credential, secret-store URI, or ciphertext. + The custody implementation owns the mapping to a concrete backend record. + """ + + handle_id: str + connector_id: str + provider_key: str + channel: ProviderChannel + version_ref: str + + def __post_init__(self) -> None: + for name in ("handle_id", "connector_id", "provider_key", "version_ref"): + if not getattr(self, name): + raise ValueError(f"{name} is required") + + +@dataclass(frozen=True) +class WorkloadAuthorizationReference: + """Opaque reference to a signed authorization checked by a trusted verifier.""" + + reference: str + + def __post_init__(self) -> None: + if not self.reference: + raise ValueError("authorization reference is required") + + +@dataclass(frozen=True) +class CredentialUseRequest: + """Immutable provider-use scope supplied to the custody boundary.""" + + operation_id: str + dispatch_id: str + workload_id: str + connector_id: str + channel: ProviderChannel + purpose: str + recipient_ref: str + payload_ref: str + idempotency_key: str + request_fingerprint: str + + def __post_init__(self) -> None: + for name in ( + "operation_id", + "dispatch_id", + "workload_id", + "connector_id", + "purpose", + "recipient_ref", + "payload_ref", + "idempotency_key", + ): + if not getattr(self, name): + raise ValueError(f"{name} is required") + if not _FINGERPRINT_RE.fullmatch(self.request_fingerprint): + raise ValueError("request_fingerprint must be a lowercase SHA-256 digest") + + +@dataclass(frozen=True) +class VerifiedWorkloadAuthorization: + """Credential-free outcome returned after trusted signature verification.""" + + authorization_id: str + workload_id: str + allowed_connector_ids: frozenset[str] + allowed_channels: frozenset[ProviderChannel] + allowed_purposes: frozenset[str] + request_fingerprint: str + not_before: datetime + not_after: datetime + max_uses: int | None + max_provider_attempts: int + + def __post_init__(self) -> None: + if not self.authorization_id or not self.workload_id: + raise ValueError("authorization_id and workload_id are required") + if not _FINGERPRINT_RE.fullmatch(self.request_fingerprint): + raise ValueError("request_fingerprint must be a lowercase SHA-256 digest") + if self.not_before.tzinfo is None or self.not_after.tzinfo is None: + raise ValueError("authorization timestamps must be timezone-aware") + if self.not_after <= self.not_before: + raise ValueError("authorization expiry must be after not_before") + if self.max_uses is not None and self.max_uses < 1: + raise ValueError("max_uses must be positive when bounded") + if self.max_provider_attempts < 1: + raise ValueError("max_provider_attempts must be positive") + + +@dataclass(frozen=True) +class ConsumptionReservation: + """Atomic bounded-use reservation produced by a durable ledger.""" + + reservation_id: str + authorization_id: str + request_fingerprint: str + idempotency_key: str + + def __post_init__(self) -> None: + if not self.reservation_id or not self.authorization_id or not self.idempotency_key: + raise ValueError("reservation identifiers are required") + if not _FINGERPRINT_RE.fullmatch(self.request_fingerprint): + raise ValueError("request_fingerprint must be a lowercase SHA-256 digest") + + +@dataclass(frozen=True) +class AuthorizedCredentialUse: + """Non-material authorization passed to a custody-internal invoker.""" + + authorization_id: str + handle_id: str + connector_id: str + provider_key: str + operation_id: str + dispatch_id: str + recipient_ref: str + payload_ref: str + idempotency_key: str + request_fingerprint: str + reservation_id: str = "" + + +@dataclass(frozen=True) +class AuthorizedProviderDispatch: + """Reserved provider dispatch that may select only scoped connector handles.""" + + authorization_id: str + workload_id: str + allowed_connector_ids: frozenset[str] + channel: ProviderChannel + purpose: str + operation_id: str + dispatch_id: str + recipient_ref: str + payload_ref: str + idempotency_key: str + request_fingerprint: str + reservation_id: str + max_provider_attempts: int + + def authorize_handle(self, handle: ProviderCredentialHandle) -> AuthorizedCredentialUse: + if handle.connector_id not in self.allowed_connector_ids: + raise CustodyAuthorizationDenied("connector is outside authorization scope") + if handle.channel is not self.channel: + raise CustodyAuthorizationDenied("channel is outside authorization scope") + return AuthorizedCredentialUse( + authorization_id=self.authorization_id, + handle_id=handle.handle_id, + connector_id=handle.connector_id, + provider_key=handle.provider_key, + operation_id=self.operation_id, + dispatch_id=self.dispatch_id, + recipient_ref=self.recipient_ref, + payload_ref=self.payload_ref, + idempotency_key=self.idempotency_key, + request_fingerprint=self.request_fingerprint, + reservation_id=self.reservation_id, + ) + + +@dataclass(frozen=True) +class SanitizedCustodyOutcome: + """Result safe to emit outside the provider custody boundary.""" + + operation_id: str + dispatch_id: str + provider_key: str + status: CustodyResultStatus + audit_ref: str + failure_code: str = "" + + def __post_init__(self) -> None: + if not self.operation_id or not self.dispatch_id or not self.provider_key: + raise ValueError("outcome identity fields are required") + if not self.audit_ref: + raise ValueError("audit_ref is required") + if self.failure_code and not _CODE_RE.fullmatch(self.failure_code): + raise ValueError("failure_code must be a sanitized identifier") + if self.status is CustodyResultStatus.FAILED and not self.failure_code: + raise ValueError("failed outcomes require a failure_code") + if self.status is not CustodyResultStatus.FAILED and self.failure_code: + raise ValueError("successful outcomes cannot contain a failure_code") + + +@dataclass(frozen=True) +class CustodyAuditEvent: + """Sanitized accountability event, suitable for alerting and audit sinks.""" + + kind: CustodyAuditKind + operation_id: str + dispatch_id: str + workload_id: str + connector_id: str + provider_key: str + authorization_id: str = "" + failure_code: str = "" + + def __post_init__(self) -> None: + for name in ( + "operation_id", + "dispatch_id", + "workload_id", + "connector_id", + "provider_key", + ): + if not getattr(self, name): + raise ValueError(f"{name} is required") + if self.failure_code and not _CODE_RE.fullmatch(self.failure_code): + raise ValueError("failure_code must be a sanitized identifier") + + +class SignedWorkloadAuthorizationVerifier(Protocol): + """Validate origin and scope of a signed workload authorization.""" + + async def verify( + self, + reference: WorkloadAuthorizationReference, + request: CredentialUseRequest, + ) -> VerifiedWorkloadAuthorization: + ... + + +class AuthorizationConsumptionLedger(Protocol): + """Reserve bounded authority atomically and persist terminal outcomes. + + A concrete implementation must reject reuse with a different request + fingerprint. The delegated executor journal is responsible for returning + an already completed sanitized delivery outcome rather than invoking twice. + """ + + async def reserve_once( + self, + authorization: VerifiedWorkloadAuthorization, + request: CredentialUseRequest, + ) -> ConsumptionReservation: + ... + + async def complete( + self, + reservation: ConsumptionReservation, + outcome: SanitizedCustodyOutcome, + ) -> None: + ... + + +class CustodiedReferenceResolver(Protocol): + """Invoke a provider operation while keeping resolved material internal.""" + + async def invoke( + self, + handle: ProviderCredentialHandle, + authorized_use: AuthorizedCredentialUse, + ) -> SanitizedCustodyOutcome: + ... + + +class CustodyAuditSink(Protocol): + """Persist sanitized custody transitions for accountability.""" + + async def emit(self, event: CustodyAuditEvent) -> None: + ... + + +class CustodyFailureNotifier(Protocol): + """Raise sanitized operational notifications for custody failures.""" + + async def notify(self, event: CustodyAuditEvent) -> None: + ... + + +async def authorize_provider_dispatch( + initial_handle: ProviderCredentialHandle, + request: CredentialUseRequest, + authorization: VerifiedWorkloadAuthorization, + *, + ledger: AuthorizationConsumptionLedger, + now: datetime | None = None, +) -> AuthorizedProviderDispatch: + """Validate and reserve a credential-free verifier outcome for one dispatch. + + Provider-use authority must describe exactly one dispatch and cannot cross + this boundary unless a ledger has atomically reserved the exact + authorization, request fingerprint, and idempotency key. + """ + + current_time = now or datetime.now(timezone.utc) + if current_time < authorization.not_before or current_time >= authorization.not_after: + raise CustodyAuthorizationDenied("workload authorization is outside its validity window") + if authorization.workload_id != request.workload_id: + raise CustodyAuthorizationDenied("workload identity is outside authorization scope") + if authorization.request_fingerprint != request.request_fingerprint: + raise CustodyAuthorizationDenied("request fingerprint is outside authorization scope") + if initial_handle.connector_id != request.connector_id: + raise CustodyAuthorizationDenied( + "initial credential handle does not match requested connector" + ) + if request.connector_id not in authorization.allowed_connector_ids: + raise CustodyAuthorizationDenied("initial connector is outside authorization scope") + if ( + initial_handle.channel is not request.channel + or request.channel not in authorization.allowed_channels + ): + raise CustodyAuthorizationDenied("channel is outside authorization scope") + if request.purpose not in authorization.allowed_purposes: + raise CustodyAuthorizationDenied("purpose is outside authorization scope") + + if authorization.max_uses != 1: + raise CustodyAuthorizationDenied( + "provider invocation requires single-dispatch authorization" + ) + reservation = await ledger.reserve_once(authorization, request) + if ( + reservation.authorization_id != authorization.authorization_id + or reservation.request_fingerprint != request.request_fingerprint + or reservation.idempotency_key != request.idempotency_key + ): + raise ConsumptionReservationRequired( + "ledger reservation does not bind this authorization request" + ) + + return AuthorizedProviderDispatch( + authorization_id=authorization.authorization_id, + workload_id=request.workload_id, + allowed_connector_ids=authorization.allowed_connector_ids, + channel=request.channel, + purpose=request.purpose, + operation_id=request.operation_id, + dispatch_id=request.dispatch_id, + recipient_ref=request.recipient_ref, + payload_ref=request.payload_ref, + idempotency_key=request.idempotency_key, + request_fingerprint=request.request_fingerprint, + reservation_id=reservation.reservation_id, + max_provider_attempts=authorization.max_provider_attempts, + ) + + +class CredentialCustodyAuthorizer: + """Obtain verified authority and reserve its exact provider operation.""" + + def __init__( + self, + verifier: SignedWorkloadAuthorizationVerifier, + ledger: AuthorizationConsumptionLedger, + *, + clock: Callable[[], datetime] | None = None, + ): + self._verifier = verifier + self._ledger = ledger + self._clock = clock or (lambda: datetime.now(timezone.utc)) + + async def authorize( + self, + reference: WorkloadAuthorizationReference, + handle: ProviderCredentialHandle, + request: CredentialUseRequest, + ) -> AuthorizedProviderDispatch: + authorization = await self._verifier.verify(reference, request) + reserved_dispatch = await authorize_provider_dispatch( + handle, + request, + authorization, + ledger=self._ledger, + now=self._clock(), + ) + return reserved_dispatch diff --git a/tests/test_credential_custody.py b/tests/test_credential_custody.py new file mode 100644 index 0000000..08b1a13 --- /dev/null +++ b/tests/test_credential_custody.py @@ -0,0 +1,266 @@ +"""Credential-free contract checks for the provider custody boundary.""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from baton.credential_custody import ( + ConsumptionReservation, + ConsumptionReservationRequired, + CredentialUseRequest, + CredentialCustodyAuthorizer, + CustodyAuthorizationDenied, + CustodyResultStatus, + ProviderChannel, + ProviderCredentialHandle, + SanitizedCustodyOutcome, + VerifiedWorkloadAuthorization, + WorkloadAuthorizationReference, + authorize_provider_dispatch, +) + + +FINGERPRINT = "a" * 64 +NOW = datetime(2026, 6, 4, tzinfo=timezone.utc) + + +def _handle( + connector_id: str = "sms-primary", + provider_key: str = "provider-primary", +) -> ProviderCredentialHandle: + return ProviderCredentialHandle( + handle_id=f"handle-{connector_id}-v3", + connector_id=connector_id, + provider_key=provider_key, + channel=ProviderChannel.SMS, + version_ref="active-v3", + ) + + +def _request(**overrides) -> CredentialUseRequest: + fields = { + "operation_id": "operation-1", + "dispatch_id": "dispatch-1", + "workload_id": "mea-comms", + "connector_id": "sms-primary", + "channel": ProviderChannel.SMS, + "purpose": "case_notification", + "recipient_ref": "recipient-ref-1", + "payload_ref": "payload-ref-1", + "idempotency_key": "dispatch-once-1", + "request_fingerprint": FINGERPRINT, + } + fields.update(overrides) + return CredentialUseRequest(**fields) + + +def _authorization(**overrides) -> VerifiedWorkloadAuthorization: + fields = { + "authorization_id": "authorization-1", + "workload_id": "mea-comms", + "allowed_connector_ids": frozenset({"sms-primary", "sms-backup"}), + "allowed_channels": frozenset({ProviderChannel.SMS}), + "allowed_purposes": frozenset({"case_notification"}), + "request_fingerprint": FINGERPRINT, + "not_before": NOW - timedelta(minutes=1), + "not_after": NOW + timedelta(minutes=5), + "max_uses": None, + "max_provider_attempts": 3, + } + fields.update(overrides) + return VerifiedWorkloadAuthorization(**fields) + + +def _reservation(**overrides) -> ConsumptionReservation: + fields = { + "reservation_id": "reservation-1", + "authorization_id": "authorization-1", + "request_fingerprint": FINGERPRINT, + "idempotency_key": "dispatch-once-1", + } + fields.update(overrides) + return ConsumptionReservation(**fields) + + +class OutcomeLedger: + def __init__(self, reservation: ConsumptionReservation | None = None): + self.reservation = reservation or _reservation() + self.requests: list[CredentialUseRequest] = [] + + async def reserve_once( + self, + _authorization: VerifiedWorkloadAuthorization, + request: CredentialUseRequest, + ) -> ConsumptionReservation: + self.requests.append(request) + return self.reservation + + +class OutcomeVerifier: + def __init__(self, authorization: VerifiedWorkloadAuthorization): + self.authorization = authorization + + async def verify( + self, + _reference: WorkloadAuthorizationReference, + _request: CredentialUseRequest, + ) -> VerifiedWorkloadAuthorization: + return self.authorization + + +async def test_single_dispatch_reservation_authorizes_only_opaque_handle_use(): + dispatch = await authorize_provider_dispatch( + _handle(), + _request(), + _authorization(max_uses=1), + ledger=OutcomeLedger(), + now=NOW, + ) + result = dispatch.authorize_handle(_handle()) + + assert result.handle_id == "handle-sms-primary-v3" + assert result.dispatch_id == "dispatch-1" + assert result.recipient_ref == "recipient-ref-1" + assert result.payload_ref == "payload-ref-1" + assert result.reservation_id == "reservation-1" + assert dispatch.max_provider_attempts == 3 + assert not hasattr(result, "material") + + +async def test_request_fingerprint_change_is_denied(): + with pytest.raises(CustodyAuthorizationDenied): + await authorize_provider_dispatch( + _handle(), + _request(request_fingerprint="b" * 64), + _authorization(max_uses=1), + ledger=OutcomeLedger(_reservation(request_fingerprint="b" * 64)), + now=NOW, + ) + + +async def test_connector_or_purpose_outside_scope_is_denied(): + with pytest.raises(CustodyAuthorizationDenied): + await authorize_provider_dispatch( + _handle(), + _request(purpose="unapproved_operation"), + _authorization(max_uses=1), + ledger=OutcomeLedger(), + now=NOW, + ) + + +async def test_expired_authorization_is_denied(): + with pytest.raises(CustodyAuthorizationDenied): + await authorize_provider_dispatch( + _handle(), + _request(), + _authorization(max_uses=1, not_after=NOW), + ledger=OutcomeLedger(), + now=NOW, + ) + + +async def test_unbounded_authorization_is_denied_before_ledger_reservation(): + ledger = OutcomeLedger() + with pytest.raises(CustodyAuthorizationDenied): + await authorize_provider_dispatch( + _handle(), _request(), _authorization(), ledger=ledger, now=NOW + ) + assert ledger.requests == [] + + +async def test_bounded_authorization_reserves_through_ledger(): + ledger = OutcomeLedger() + result = await authorize_provider_dispatch( + _handle(), + _request(), + _authorization(max_uses=1), + ledger=ledger, + now=NOW, + ) + assert result.reservation_id == "reservation-1" + assert ledger.requests == [_request()] + + +async def test_bounded_authorization_rejects_reservation_for_other_request(): + with pytest.raises(ConsumptionReservationRequired): + await authorize_provider_dispatch( + _handle(), + _request(), + _authorization(max_uses=1), + ledger=OutcomeLedger(_reservation(request_fingerprint="b" * 64)), + now=NOW, + ) + + +async def test_authorizer_obtains_verified_outcome_then_reserves_it(): + authorizer = CredentialCustodyAuthorizer( + OutcomeVerifier(_authorization(max_uses=1)), + OutcomeLedger(), + clock=lambda: NOW, + ) + result = await authorizer.authorize( + WorkloadAuthorizationReference("authorization-ref-1"), + _handle(), + _request(), + ) + assert result.authorization_id == "authorization-1" + + +async def test_invalid_initial_handle_is_denied_before_ledger_reservation(): + ledger = OutcomeLedger() + with pytest.raises(CustodyAuthorizationDenied): + await authorize_provider_dispatch( + _handle("sms-backup", "provider-backup"), + _request(), + _authorization(max_uses=1), + ledger=ledger, + now=NOW, + ) + assert ledger.requests == [] + + +async def test_one_reserved_dispatch_allows_scoped_primary_to_backup_selection(): + ledger = OutcomeLedger() + dispatch = await authorize_provider_dispatch( + _handle(), + _request(), + _authorization(max_uses=1), + ledger=ledger, + now=NOW, + ) + + primary = dispatch.authorize_handle(_handle()) + backup = dispatch.authorize_handle(_handle("sms-backup", "provider-backup")) + assert primary.reservation_id == backup.reservation_id == "reservation-1" + assert len(ledger.requests) == 1 + + with pytest.raises(CustodyAuthorizationDenied): + dispatch.authorize_handle(_handle("sms-unapproved", "provider-unapproved")) + + +def test_authorization_rejects_zero_provider_attempt_budget(): + with pytest.raises(ValueError): + _authorization(max_provider_attempts=0) + + +def test_outcome_allows_only_sanitized_failure_identifier(): + result = SanitizedCustodyOutcome( + operation_id="operation-1", + dispatch_id="dispatch-1", + provider_key="provider-primary", + status=CustodyResultStatus.FAILED, + audit_ref="audit-1", + failure_code="provider_timeout", + ) + assert result.failure_code == "provider_timeout" + + with pytest.raises(ValueError): + SanitizedCustodyOutcome( + operation_id="operation-1", + dispatch_id="dispatch-1", + provider_key="provider-primary", + status=CustodyResultStatus.FAILED, + audit_ref="audit-1", + failure_code="raw upstream detail: refused", + )