WF-IMPL-076: DaprActivityRuntimeClient adapter — ScheduleActivity (#487)#499
Merged
Merged
Conversation
Adds the production ActivityRuntimeClient adapter that posts each schedule_activity call to the Dapr sidecar's Service Invocation API and reconstructs an ActivityResultEnvelope from the response. Highlights: - DaprActivityRuntimeClient(http_client, endpoint, timeout=10.0) with an async schedule_activity that POSTs to /v1.0/invoke/<arm-app-id>/method/ScheduleActivity with the canonical Idempotency-Key header built from IdempotencyTriple. - camelCase wire envelope (runId, stepId, attempt, activityRef, inputs, connectorContexts, deadline) per the ARM design § Internal RPCs; ConnectorContext is serialised to the spec'd nested shape (slotName, handle, expiresAt, connectorKind). - Transport / status / decode / cancelled failure modes are mapped through the WF-IMPL-075 OutboundRpcError taxonomy and rendered via map_to_activity_envelope, so the return value is always a shape-valid ActivityResultEnvelope (4xx -> permanent / 5xx + 408/429 -> retryable / HTTP 499 -> cancelled / httpx.HTTPError -> retryable / shape-mismatch body -> permanent decode). - cancel_activity raises NotImplementedError pending WF-IMPL-077. - 100% coverage on the new adapter; full suite 1785 passed (1 known pre-existing flake), 98.97% line coverage. Closes #487.
Contributor
There was a problem hiding this comment.
Pull request overview
Implements the production DaprActivityRuntimeClient.schedule_activity HTTP adapter for calling the Activity Runtime Manager via the local Dapr sidecar Service Invocation API, including full request/response wire-shape marshaling and WF-IMPL-075 outbound-RPC error-to-envelope mapping.
Changes:
- Added
DaprActivityRuntimeClientimplementation inactivity_runtime.py, including camelCase wire serialization, idempotency header, and status/transport/decode/cancelled error normalization intoActivityResultEnvelope. - Added a comprehensive
httpx.MockTransport-based unit test suite covering success, status mapping matrix, transport/decode failures, idempotency header invariants, and timeout propagation. - Re-exported
DaprActivityRuntimeClientfromcustos_workflow.clients.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| src/services/workflow-service/src/custos_workflow/clients/activity_runtime.py | Adds the production Dapr Service Invocation adapter + wire (de)serialization helpers for ScheduleActivity. |
| src/services/workflow-service/tests/clients/test_dapr_activity_runtime.py | Adds unit tests for the adapter’s wire contract and locked error taxonomy mapping. |
| src/services/workflow-service/src/custos_workflow/clients/init.py | Re-exports DaprActivityRuntimeClient from the clients package. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Rewrite _iso_utc docstring to accurately describe that neither ScheduleActivityRequest nor IdempotencyTriple validate deadline tzinfo; this helper is the enforcement point. - Update the naïve-tzinfo branch comment to match the actual behaviour (reject, not treat as UTC). - Rename test_invalid_json_body_mapped_to_retryable_decode → test_invalid_json_body_mapped_to_permanent_decode so the name matches the asserted permanent class. - Wrap the three remaining standalone tests (test_cancel_activity_raises_not_implemented, test_default_timeout_matches_constant, test_timeout_override_honoured) with async with httpx.AsyncClient(...) to avoid leaking the client.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements WF-IMPL-076 — the production
DaprActivityRuntimeClient.schedule_activityadapter against the local Dapr sidecar's Service Invocation API, behind the lockedActivityRuntimeClientProtocol (WF-IMPL-049) and the WF-IMPL-075 outbound-RPC error taxonomy.What
src/services/workflow-service/src/custos_workflow/clients/activity_runtime.pyDaprActivityRuntimeClient(http_client, endpoint, timeout=10.0).async def schedule_activity(request) -> ActivityResultEnvelope:runId,stepId,attempt,activityRef,inputs,connectorContexts,deadline) per the ARM design § Internal RPCs.…/v1.0/invoke/<arm-app-id>/method/ScheduleActivitywithIdempotency-Key: {runId}|{stepId}|{attempt}(canonicalIdempotencyTriple.to_str).ActivityResultEnvelope; shape mismatches raiseOutboundRpcDecodeError.httpx.HTTPError→ transport, non-2xx → status, HTTP 499 → cancelled, decode failure → decode) throughmap_to_activity_envelope(exc, attempt=request.attempt)so the return value is always a shape-valid envelope.cancel_activityraisesNotImplementedErrorpending WF-IMPL-077.custos_workflow.clients.__all__.src/services/workflow-service/tests/clients/test_dapr_activity_runtime.pyhttpx.MockTransportcovering success / permanent / parametrised status matrix (400/401/404/422/408/429/500/502/503/504) / HTTP 499 cancelled / transport timeout / arbitraryhttpx.HTTPError/ invalid JSON body / shape-mismatch decode / idempotency header invariants (canonical encoding + attempt rotation) / URL targets the ARM app-id /cancel_activityraises / default + override timeout propagation.Acceptance criteria checklist
class_="permanent"envelope passed through unchanged.class_="permanent".class_="retryable".class_="retryable".class_="cancelled".error.code = "workflow.client.decode". Note: maps toclass_="permanent"per the WF-IMPL-075 locked taxonomy (decode = contract violation, not transient). This is one bullet in the acceptance criteria that conflicts with the locked taxonomy; the adapter follows the taxonomy lock.ActivityResultEnvelope.__post_init__invariants on every path.Idempotency-Keyheader asserted on every outbound request (canonicalIdempotencyTripleencoding).activity_runtime.pyrises from 100 % to 100 % including all new lines).ruff check,ruff format --check,mypy --strictclean.Test results
tests/test_observability.py::test_module_imports_under_noop_providers— same subprocessModuleNotFoundErrorcross-test pollution called out in the WF-IMPL-074 / 075 PRs; passes in isolation).clients/activity_runtime.py; full suite 98.97 % (≥ 90 % floor).ruff check . && ruff format --check .clean.mypy --strict src testsclean.Tracker
WF-IMPL-076:DaprActivityRuntimeClientadapter —ScheduleActivity.Notes for reviewer
activity_runtime.pyper the issue scope; needed lazy imports for_errorsandconnectorinside the method body and helpers to avoid a top-level cycle (_errorsimportsActivityResultClass/ActivityResultEnvelopefrom this module).async; the WF-IMPL-079 worker-wiring task will bridge this to the syncActivityRuntimeClientProtocol that theFakeDaprActivityDispatcherand orchestrator-side generator drive.