Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
BindForStepResponse,
ConnectorClient,
ConnectorContext,
DaprConnectorClient,
FakeConnectorClient,
NoopConnectorClient,
SlotSpec,
Expand All @@ -62,6 +63,7 @@
"ConnectorClient",
"ConnectorContext",
"DaprActivityRuntimeClient",
"DaprConnectorClient",
"FakeActivityRuntimeClient",
"FakeConnectorClient",
"NoopActivityRuntimeClient",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,41 @@
from dataclasses import dataclass, field
from datetime import datetime
from types import MappingProxyType
from typing import Protocol, runtime_checkable
from typing import Any, Final, Protocol, runtime_checkable

import httpx

from custos_workflow.clients._dapr_invoke import (
DEFAULT_OUTBOUND_RPC_TIMEOUT_SECONDS,
DaprInvokeEndpoint,
build_invoke_url,
)
from custos_workflow.clients._errors import OutboundRpcError

__all__ = [
"BIND_FOR_STEP_DAPR_METHOD",
"BindForStepRequest",
"BindForStepResponse",
"ConnectorClient",
"ConnectorContext",
"DaprConnectorClient",
"FakeConnectorClient",
"NoopConnectorClient",
"SlotSpec",
]

#: Dapr Service-Invocation ``method`` name for Connector Service's
#: ``BindForStep`` RPC. Pinned here so the adapter and any
#: smoke-test fixture key off the same constant.
BIND_FOR_STEP_DAPR_METHOD: Final[str] = "BindForStep"

#: HTTP status code the Dapr sidecar surfaces when an upstream
#: cancelled the request (nginx-style ``client-closed-request``).
#: Mapped to :class:`OutboundRpcCancelledError` rather than
#: :class:`OutboundRpcStatusError` so callers can short-circuit
#: cleanly instead of retrying a request that no longer matters.
_CLIENT_CLOSED_REQUEST_STATUS: Final[int] = 499


# ---------------------------------------------------------------------------
# Connector slot declarations
Expand Down Expand Up @@ -328,3 +349,267 @@ class ConnectorBindError(OutboundRpcError):
:class:`OutboundRpcError.__init_subclass__`, so a concrete
bind-error subclass cannot ship with an unknown ``kind``.
"""


# ---------------------------------------------------------------------------
# Production adapter: Dapr Service-Invocation HTTP transport
# ---------------------------------------------------------------------------


def _request_to_wire(request: BindForStepRequest) -> Mapping[str, Any]:
"""Render a :class:`BindForStepRequest` to its camelCase wire form.

The wire envelope is pinned in ``design.md`` § *Internal RPC
outbound* — :attr:`SlotSpec.capabilities` order is preserved
so Connector Service's audit log reflects exactly what the
Step Coordinator declared.
"""
return {
"stepKey": request.step_key,
"slots": [
{
"name": spec.name,
"connectorRef": spec.connector_ref,
"capabilities": list(spec.capabilities),
}
for spec in request.slots
],
}


def _parse_iso_utc(value: Any) -> datetime:
"""Parse a wire ``expiresAt`` string into a tz-aware datetime.

Accepts the canonical ``…Z`` suffix Connector Service emits
(per ``design.md`` § *Internal RPCs*) as well as any explicit
``±HH:MM`` offset. Naïve timestamps and non-string values are
rejected with :class:`ValueError` so the caller can surface
them as :class:`OutboundRpcDecodeError`.
"""
if not isinstance(value, str):
raise ValueError(f"expiresAt must be an ISO-8601 string, got {type(value).__name__}")
# ``datetime.fromisoformat`` rejects a trailing ``Z`` before
# Python 3.11; normalise to ``+00:00`` so the adapter works
# uniformly on the CI matrix.
normalised = value[:-1] + "+00:00" if value.endswith("Z") else value
try:
parsed = datetime.fromisoformat(normalised)
except ValueError as exc:
raise ValueError(f"expiresAt is not a valid ISO-8601 timestamp: {value!r}") from exc
if parsed.tzinfo is None:
raise ValueError(f"expiresAt must be timezone-aware (no trailing Z or offset): {value!r}")
return parsed


def _response_from_wire(body: Any) -> BindForStepResponse:
"""Reconstruct a :class:`BindForStepResponse` from a wire body.

Validates every contract the Step Coordinator depends on:

* Body is a mapping with a single ``"contexts"`` key whose
value is itself a mapping (per ``design.md`` § *Internal
RPCs*).
* Each context entry carries the four required keys
(``slotName`` / ``handle`` / ``expiresAt`` /
``connectorKind``).
* ``expiresAt`` parses to a tz-aware datetime; naïve values
are rejected up-front (mirrored by
:class:`ConnectorContext.__post_init__`).
* Slot-name ↔ key alignment matches (mirrored by
:class:`BindForStepResponse.__post_init__`).

Any contract violation surfaces as
:class:`OutboundRpcDecodeError` so the retry driver routes
the failure as ``permanent`` (a malformed response is a
contract violation, not a transient).
"""
# Lazy import to keep ``_errors`` out of this module's top-level
# imports — ``_errors`` already imports the activity-runtime
# module and adding ``connector`` to its top imports would
# close a circular ring.
from custos_workflow.clients._errors import OutboundRpcDecodeError

if not isinstance(body, Mapping):
raise OutboundRpcDecodeError(
f"Connector BindForStep response body must be a JSON object, got {type(body).__name__}"
)
contexts_raw = body.get("contexts")
if contexts_raw is None:
raise OutboundRpcDecodeError(
"Connector BindForStep response is missing the required 'contexts' field"
)
if not isinstance(contexts_raw, Mapping):
raise OutboundRpcDecodeError(
f"Connector BindForStep response 'contexts' must be a JSON object, "
f"got {type(contexts_raw).__name__}"
)

rebuilt: dict[str, ConnectorContext] = {}
for slot_name, raw_ctx in contexts_raw.items():
if not isinstance(raw_ctx, Mapping):
raise OutboundRpcDecodeError(
f"Connector BindForStep response contexts[{slot_name!r}] "
f"must be a JSON object, got {type(raw_ctx).__name__}"
)
missing = {"slotName", "handle", "expiresAt", "connectorKind"} - set(raw_ctx)
if missing:
raise OutboundRpcDecodeError(
f"Connector BindForStep response contexts[{slot_name!r}] "
f"is missing required field(s): {sorted(missing)!r}"
)
# ``ConnectorContext.__post_init__`` only checks
# truthiness, so a non-empty non-string value (e.g. an
# int) would slip through and leak invalid types into
# downstream scheduling. Enforce string typing here so
# the contract violation surfaces as a decode error
# (always permanent) instead.
for field_name in ("slotName", "handle", "connectorKind"):
field_value = raw_ctx[field_name]
if not isinstance(field_value, str):
raise OutboundRpcDecodeError(
f"Connector BindForStep response contexts[{slot_name!r}]."
f"{field_name} must be a string, "
f"got {type(field_value).__name__}"
)
try:
Comment thread
toddysm marked this conversation as resolved.
expires_at = _parse_iso_utc(raw_ctx["expiresAt"])
except ValueError as exc:
raise OutboundRpcDecodeError(
f"Connector BindForStep response contexts[{slot_name!r}].expiresAt "
f"is invalid: {exc}"
) from exc
try:
ctx = ConnectorContext(
slot_name=raw_ctx["slotName"],
handle=raw_ctx["handle"],
expires_at=expires_at,
connector_kind=raw_ctx["connectorKind"],
)
except (TypeError, ValueError) as exc:
raise OutboundRpcDecodeError(
f"Connector BindForStep response contexts[{slot_name!r}] "
f"failed ConnectorContext invariants: {exc}"
) from exc
rebuilt[slot_name] = ctx

try:
return BindForStepResponse(contexts=rebuilt)
except ValueError as exc:
# Slot-name ↔ key mismatch enforced by
# ``BindForStepResponse.__post_init__``.
raise OutboundRpcDecodeError(
f"Connector BindForStep response failed BindForStepResponse invariants: {exc}"
) from exc


@dataclass(slots=True)
class DaprConnectorClient:
"""Production :class:`ConnectorClient` adapter over Dapr Service Invocation.

Posts each :meth:`bind_for_step` call as
``Content-Type: application/json`` to
``…/v1.0/invoke/<connector-app-id>/method/BindForStep`` against
the local Dapr sidecar. Failure modes are normalised through
the WF-IMPL-075
:class:`~custos_workflow.clients._errors.OutboundRpcError`
taxonomy so the retry-decision driver classifies bind failures
the same way it classifies activity-scheduling failures.

The adapter does **not** own the :class:`httpx.AsyncClient`
— the FastAPI lifespan hook (wired in WF-IMPL-080) is
responsible for building and ``aclose``-ing the client.

Method exposure
---------------

:meth:`bind_for_step` is exposed as ``async`` because the
underlying transport is async; the Step Coordinator's
activity-task bridge (WF-IMPL-079) adapts the async
boundary to the sync :class:`ConnectorClient` Protocol.

:param http_client: Lifespan-owned async HTTP client.
:param endpoint: Resolved Dapr Service-Invocation endpoint for
the Connector Service app-id (built by
:func:`~custos_workflow.clients._dapr_invoke.read_dapr_env`).
:param timeout: Per-request timeout in seconds. Defaults to
:data:`~custos_workflow.clients._dapr_invoke.DEFAULT_OUTBOUND_RPC_TIMEOUT_SECONDS`.
"""

http_client: httpx.AsyncClient
endpoint: DaprInvokeEndpoint
timeout: float = DEFAULT_OUTBOUND_RPC_TIMEOUT_SECONDS

async def bind_for_step(self, request: BindForStepRequest) -> BindForStepResponse:
"""Post one ``BindForStep`` call through the Dapr sidecar.

Always returns a :class:`BindForStepResponse` with a
:class:`MappingProxyType`-frozen ``contexts`` mapping on
success. Every transport-layer failure mode is raised as
the appropriate
:class:`~custos_workflow.clients._errors.OutboundRpcError`
subclass:

* Transport failure (no response observed) →
:class:`OutboundRpcTransportError`.
* HTTP 499 (upstream cancelled) →
:class:`OutboundRpcCancelledError`.
* Any other non-2xx →
:class:`OutboundRpcStatusError` carrying the observed
``status_code`` (the WF-IMPL-075 mapper classifies
408 / 429 / 5xx as retryable and the remaining 4xx as
permanent).
* Response body that isn't valid JSON, missing required
fields, mismatched slot keys, or carrying a naïve
``expiresAt`` → :class:`OutboundRpcDecodeError`
(always permanent — a malformed response is a contract
violation).
"""
# Lazy import to break the top-level cycle: ``_errors``
# imports ``ActivityResultClass`` / ``ActivityResultEnvelope``
# which keeps the dependency arrow pointing one way.
from custos_workflow.clients._errors import (
OutboundRpcCancelledError,
OutboundRpcDecodeError,
OutboundRpcStatusError,
OutboundRpcTransportError,
)

url = build_invoke_url(self.endpoint, BIND_FOR_STEP_DAPR_METHOD)
wire = _request_to_wire(request)

try:
response = await self.http_client.post(
url,
json=wire,
timeout=self.timeout,
headers={"Content-Type": "application/json"},
)
except httpx.HTTPError as exc:
# No response observed — transport-layer failure.
# Original ``httpx`` exception preserved on
# ``__cause__`` so the envelope mapper renders it
# into the ``cause`` chain.
raise OutboundRpcTransportError(f"Dapr BindForStep transport failure: {exc!r}") from exc

status_code = response.status_code
if status_code == _CLIENT_CLOSED_REQUEST_STATUS:
raise OutboundRpcCancelledError(
f"Dapr BindForStep cancelled upstream (HTTP {status_code})"
)
if status_code // 100 != 2:
body_preview = response.text[:200] if response.text else ""
raise OutboundRpcStatusError(
f"Dapr BindForStep returned HTTP {status_code}: {body_preview!r}",
status_code=status_code,
)

try:
body = response.json()
except ValueError as exc:
# Covers ``json.JSONDecodeError`` and any
# httpx-internal decoding failure.
raise OutboundRpcDecodeError(
f"Dapr BindForStep response is not valid JSON: {exc!r}"
) from exc

return _response_from_wire(body)
Loading
Loading