WF-IMPL-078: DaprConnectorClient.bind_for_step adapter#501
Merged
Conversation
Implement the production ConnectorClient HTTP adapter against the local Dapr sidecar's Service Invocation API. Serializes BindForStepRequest to the canonical camelCase wire envelope, POSTs to Connector Service, and reconstructs BindForStepResponse with MappingProxyType-frozen contexts and tz-aware ConnectorContext.expires_at. Source ------ - New constant BIND_FOR_STEP_DAPR_METHOD = 'BindForStep'. - New private constant _CLIENT_CLOSED_REQUEST_STATUS = 499. - Helper _request_to_wire(request) -> camelCase wire envelope with capability order preserved per slot. - Helper _parse_iso_utc(value) -> tz-aware datetime; rejects naive timestamps with ValueError so callers can re-raise as OutboundRpcDecodeError. - Helper _response_from_wire(body) -> BindForStepResponse with full shape validation (body must be Mapping, must carry a 'contexts' Mapping, each context entry must be a Mapping with the four required keys, expiresAt must be tz-aware, slot-name must align with map key, ConnectorContext invariants must hold). Every contract violation surfaces as OutboundRpcDecodeError (always permanent). - New @DataClass(slots=True) class DaprConnectorClient with fields http_client (httpx.AsyncClient), endpoint (DaprInvokeEndpoint), timeout (defaults to DEFAULT_OUTBOUND_RPC_TIMEOUT_SECONDS). - async def bind_for_step(request) -> BindForStepResponse: * Transport failure -> OutboundRpcTransportError (with the original httpx exception on __cause__). * HTTP 499 -> OutboundRpcCancelledError. * Any other non-2xx -> OutboundRpcStatusError with the observed status_code (the WF-IMPL-075 mapper classifies 408 / 429 / 5xx as retryable and the remaining 4xx as permanent). * JSON decode failure / shape mismatch -> OutboundRpcDecodeError. - __all__ updated with BIND_FOR_STEP_DAPR_METHOD + DaprConnectorClient; clients.__init__.py re-exports both. Tests (45 new tests) -------------------- - Pure helpers: TestParseIsoUtc (Z suffix, explicit offset, non-UTC offset, naive rejected, garbage rejected, non-string rejected), TestRequestToWire (single-slot round-trip, multi-slot order, capability order, empty capabilities -> empty list), TestResponseFromWire (happy path, every shape- validation failure mode). - Happy path single-slot + multi-slot, capability order preserved on the wire, URL targets connector-service + BindForStep, Content-Type: application/json header. - Slot-name mismatch -> OutboundRpcDecodeError. - Naive expiresAt in response -> OutboundRpcDecodeError. - Invalid JSON body / non-Mapping body -> OutboundRpcDecodeError. - Status-code matrix: 400/401/403/404/422 -> StatusError (permanent via taxonomy); 408/429/500/502/503/504 -> StatusError (retryable via taxonomy). status_code asserted. - HTTP 499 -> OutboundRpcCancelledError. - httpx.ConnectTimeout / ConnectError -> OutboundRpcTransportError with __cause__ preserved. - camelCase envelope shape assertion. - Per-call timeout propagates to req.extensions['timeout']. - type(response.contexts) is MappingProxyType (locked invariant). Quality ------- - ruff check / format clean. - mypy --strict clean (159 source files). - 1847 passed, 1 pre-existing flake (tests/test_observability.py::test_module_imports_under_noop_providers). - connector.py: 100 % coverage; full-suite ~ 99 %. Closes #489.
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the production DaprConnectorClient.bind_for_step adapter in workflow-service, using Dapr Service Invocation over httpx to call Connector Service and translating between the internal BindForStep* dataclasses and the canonical camelCase JSON wire contract.
Changes:
- Added
DaprConnectorClientwith Dapr invoke URL construction, request serialization, response validation, and WF-IMPL-075 outbound-RPC error normalization. - Added pure helpers for request/response wire translation and ISO-8601 parsing with tz-awareness enforcement.
- Added a comprehensive unit test suite using
httpx.MockTransportto pin wire shape, timeout propagation, and the transport/status/decode/cancel error matrix.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| src/services/workflow-service/src/custos_workflow/clients/connector.py | Adds the Dapr-backed production connector adapter plus wire helpers and error handling. |
| src/services/workflow-service/src/custos_workflow/clients/init.py | Re-exports DaprConnectorClient from the clients package. |
| src/services/workflow-service/tests/clients/test_dapr_connector.py | Adds tests covering happy path, contract validation failures, and error/status matrices. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Validate that slotName / handle / connectorKind are strings in _response_from_wire. ConnectorContext.__post_init__ only checks truthiness, so a non-empty non-string value (e.g. an int) would slip through and leak invalid types into downstream scheduling. Now surfaces as OutboundRpcDecodeError (always permanent), with a parametrised test covering all three fields.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #489. Implements the production
DaprConnectorClient.bind_for_stepHTTP adapter against the local Dapr sidecar's Service Invocation API, behind the lockedConnectorClientProtocol (WF-IMPL-050). SerializesBindForStepRequestto the canonical camelCase JSON envelope, POSTs to Connector Service, and reconstructsBindForStepResponsewithMappingProxyType-frozen contexts and tz-awareConnectorContext.expires_at.Changes
Source —
src/services/workflow-service/src/custos_workflow/clients/connector.pyBIND_FOR_STEP_DAPR_METHOD = "BindForStep"(exported via__all__)._CLIENT_CLOSED_REQUEST_STATUS = 499mirroring the activity-runtime adapter._request_to_wire(request)→ camelCase wire envelope (stepKey,slots[].name,connectorRef,capabilities). Capability order preserved per slot._parse_iso_utc(value)→ tz-awaredatetime; rejects naïve timestamps withValueErrorso callers can re-raise asOutboundRpcDecodeError._response_from_wire(body)→ fully validatedBindForStepResponse. Every contract violation (non-Mapping body, missingcontexts, missing context fields, naïveexpiresAt, slot-name ↔ key mismatch,ConnectorContextinvariant violation) surfaces asOutboundRpcDecodeError(always permanent per WF-IMPL-075 taxonomy).@dataclass(slots=True) class DaprConnectorClientwith fieldshttp_client: httpx.AsyncClient,endpoint: DaprInvokeEndpoint,timeout: float = DEFAULT_OUTBOUND_RPC_TIMEOUT_SECONDS.async def bind_for_step(request)mirrorsDaprActivityRuntimeClient.schedule_activity's error surface:OutboundRpcTransportError(originalhttpxexception on__cause__).OutboundRpcCancelledError.OutboundRpcStatusErrorcarrying the observedstatus_code(the WF-IMPL-075 mapper classifies 408 / 429 / 5xx as retryable and the remaining 4xx as permanent).OutboundRpcDecodeError.clients/__init__.pyre-exportsDaprConnectorClient.Tests —
tests/clients/test_dapr_connector.py(45 new tests)TestParseIsoUtc(Z suffix, explicit offset, non-UTC offset, naïve rejected, garbage rejected, non-string rejected),TestRequestToWire(single-slot round-trip, multi-slot order, capability order, empty capabilities → empty list),TestResponseFromWire(happy path + every shape-validation failure mode)./v1.0/invoke/connector-service/method/BindForStep,Content-Type: application/jsonheader.OutboundRpcDecodeError: slot-name mismatch, naïveexpiresAt, invalid JSON body, non-Mapping body.OutboundRpcStatusError(permanent via taxonomy); 408/429/500/502/503/504 →OutboundRpcStatusError(retryable via taxonomy);status_codeechoed on the exception.OutboundRpcCancelledError.httpx.ConnectTimeout/httpx.ConnectError→OutboundRpcTransportErrorwith__cause__preserved.req.extensions["timeout"].type(response.contexts) is MappingProxyType(asserted on happy path).Quality
ruff check/ruff format --checkclean.mypy --strictclean (159 source files).tests/test_observability.py::test_module_imports_under_noop_providers).connector.py: 100 % coverage; full-suite ~ 99 %.Design references
design/components/workflow-service/design.md§ Internal RPC outbound — BindForStep.design/components/connector-service/design.md§ Internal RPCs.design/components/workflow-service/changes/2026-05-18-002-bundle-g-binding-completion.md.Tracker