diff --git a/interceptor.go b/interceptor.go new file mode 100644 index 0000000..5be4dce --- /dev/null +++ b/interceptor.go @@ -0,0 +1,37 @@ +package flow + +import "context" + +// StepInterceptor intercepts the full lifecycle of a step (all retry attempts). +// Skipped and Canceled steps do not enter the interceptor chain. +type StepInterceptor interface { + InterceptStep(ctx context.Context, step Steper, next func(context.Context) error) error +} + +// AttemptInterceptor intercepts each individual attempt (Before → Do → After). +// The error returned by next (if any) is the attempt's failure — it is available +// for inspection before being returned. +type AttemptInterceptor interface { + InterceptAttempt(ctx context.Context, step Steper, attempt uint64, next func(context.Context) error) error +} + +// StepInterceptorFunc is a function adapter for StepInterceptor. +type StepInterceptorFunc func(ctx context.Context, step Steper, next func(context.Context) error) error + +func (f StepInterceptorFunc) InterceptStep(ctx context.Context, step Steper, next func(context.Context) error) error { + return f(ctx, step, next) +} + +// AttemptInterceptorFunc is a function adapter for AttemptInterceptor. +type AttemptInterceptorFunc func(ctx context.Context, step Steper, attempt uint64, next func(context.Context) error) error + +func (f AttemptInterceptorFunc) InterceptAttempt(ctx context.Context, step Steper, attempt uint64, next func(context.Context) error) error { + return f(ctx, step, attempt, next) +} + +// InterceptorReceiver is implemented by steps that contain a sub-workflow. +// stepExecution calls PrependInterceptors once (in executeWithRetry, before the retry loop) +// so that parent interceptors wrap child interceptors for the entire step lifetime. +type InterceptorReceiver interface { + PrependInterceptors(step []StepInterceptor, attempt []AttemptInterceptor) +} diff --git a/openspec/specs/step-interceptor/spec.md b/openspec/specs/step-interceptor/spec.md new file mode 100644 index 0000000..e061ef2 --- /dev/null +++ b/openspec/specs/step-interceptor/spec.md @@ -0,0 +1,241 @@ +## ADDED Requirements + +### Requirement: Two-layer interceptor types + +go-workflow SHALL provide two orthogonal interceptor interfaces for global, structured +observability across all Steps in a Workflow: + +- `StepInterceptor` wraps the **full lifecycle** of a Step (all retry attempts, called once + per Step). +- `AttemptInterceptor` wraps **each individual attempt** (called once per attempt, including + retried attempts). + +```go +type StepInterceptor interface { + InterceptStep(ctx context.Context, step Steper, next func(context.Context) error) error +} +type AttemptInterceptor interface { + InterceptAttempt(ctx context.Context, step Steper, attempt uint64, next func(context.Context) error) error +} +``` + +Function adapters `StepInterceptorFunc` and `AttemptInterceptorFunc` are provided so callers +can pass plain functions. + +The `Steper` value passed to interceptors is the canonical Step identifier — the same +pointer used as the map key inside `Workflow`. Callers needing a human-readable name SHALL +call `flow.String(step)`. + +#### Scenario: StepInterceptor fires exactly once per step +- **WHEN** a Step executes (succeeds, fails, or retries any number of times) +- **THEN** each registered `StepInterceptor.InterceptStep` is invoked exactly once + +#### Scenario: AttemptInterceptor fires once per attempt +- **WHEN** a Step is retried N times (i.e. N+1 attempts total) +- **THEN** each registered `AttemptInterceptor.InterceptAttempt` is invoked N+1 times, + with `attempt` taking values `0, 1, ..., N` + +#### Scenario: Attempt error is observable +- **WHEN** an `AttemptInterceptor` calls `next(ctx)` and the attempt fails +- **THEN** `next` returns the attempt's error and the interceptor MAY inspect it before + returning + +--- + +### Requirement: Skipped and Canceled steps bypass the interceptor chain + +Steps whose `Condition` evaluates to a terminal status (`Skipped` or `Canceled`) before +execution SHALL NOT enter the `StepInterceptor` chain. The Workflow SHALL evaluate the +Condition inline in the scheduling loop (`tick()`) and settle the step's terminal +`StepResult` directly — without spawning a worker goroutine and without consuming a +`MaxConcurrency` lease. The post-run status remains queryable via +`workflow.StateOf(step).GetStatus()`. + +This avoids the footgun of forcing every interceptor to check whether the step "will +actually execute" before calling `next`, and ensures terminal-by-condition steps do not +serialize behind a low concurrency limit. + +#### Scenario: Skipped step does not invoke interceptors +- **WHEN** a Step's Condition returns `Skipped` +- **THEN** no `StepInterceptor` or `AttemptInterceptor` is invoked for that step +- **AND** `workflow.StateOf(step).GetStatus()` returns `Skipped` + +#### Scenario: Canceled-by-condition step does not invoke interceptors +- **WHEN** a Step's Condition returns `Canceled` +- **THEN** no `StepInterceptor` or `AttemptInterceptor` is invoked for that step + +#### Scenario: Skipped step does not consume a concurrency lease +- **GIVEN** a Workflow with `MaxConcurrency = 1` and a chain `a → b → c` where `b`'s + Condition returns `Skipped` +- **WHEN** the Workflow runs +- **THEN** `b` is settled inline; no worker goroutine is spawned for `b`; `b` does not + occupy the single available lease while `a` or `c` are running + +--- + +### Requirement: Workflow registration of interceptors + +`Workflow` SHALL expose two slice fields for global interceptor registration: + +```go +type Workflow struct { + StepInterceptors []StepInterceptor // [0] outermost, [len-1] innermost + AttemptInterceptors []AttemptInterceptor // [0] outermost, [len-1] innermost + IsolateInterceptors bool // if true, do not inherit from a parent workflow +} +``` + +Nil/empty slices mean no interceptors. Existing workflows without interceptors SHALL behave +identically to before this feature was added (zero-value safe, no allocations on the hot +path). + +#### Scenario: Outer-to-inner ordering +- **WHEN** `StepInterceptors = [A, B]` are registered +- **THEN** the execution order is `A:before → B:before → step → B:after → A:after` + +#### Scenario: No interceptors means no behavioural change +- **WHEN** a Workflow is constructed without `StepInterceptors` or `AttemptInterceptors` +- **THEN** all existing semantics (retries, conditions, BeforeStep/AfterStep) are unchanged + +--- + +### Requirement: BeforeStep / AfterStep are orthogonal to interceptors + +`BeforeStep` and `AfterStep` callbacks (configured per-step via `StepConfig`) execute +**inside** the `AttemptInterceptor` chain — they wrap a single `Do` call. Interceptors are +workflow-level and apply globally; `BeforeStep`/`AfterStep` are step-level and configured +per-step. Both mechanisms are preserved and complementary. + +The full execution stack for a single attempt is: + +``` +StepInterceptor[0] → ... → StepInterceptor[N-1] + → retry loop + → AttemptInterceptor[0] → ... → AttemptInterceptor[M-1] + → BeforeStep callbacks + → step.Do(ctx) + → AfterStep callbacks +``` + +#### Scenario: BeforeStep runs inside AttemptInterceptor +- **WHEN** an `AttemptInterceptor` calls `next(ctx)` +- **THEN** the chain reaches the per-step `BeforeStep` callbacks before `step.Do` runs + +--- + +### Requirement: Interceptor propagation to nested workflows + +`Workflow` SHALL implement the `InterceptorReceiver` interface so that when a `*Workflow` +(or a step embedding `SubWorkflow`) is used as a Step inside another Workflow, the parent's +interceptors are prepended to the child's interceptor stack. + +```go +type InterceptorReceiver interface { + PrependInterceptors(step []StepInterceptor, attempt []AttemptInterceptor) +} +``` + +`stepExecution` calls `PrependInterceptors` exactly once per step, in `executeWithRetry` +before the retry loop begins. Inheritance is **per-run scoped**: + +- The user-supplied `StepInterceptors` / `AttemptInterceptors` slices SHALL NOT be mutated. +- The inherited prefix SHALL be stored on private `inheritedStep` / `inheritedAttempt` + fields and combined with the base only when constructing the run-time chain. +- The inherited fields SHALL be cleared via `defer` at the start of every `Do()` so all + exit paths (success, preflight error, panic) reset the per-run state. +- The public `Reset()` method SHALL also clear the inherited fields. The internal + `reset()` (called by `Do()` itself) SHALL NOT, since clearing there would wipe the + prefix the parent just wrote and break inheritance. + +`SubWorkflow.PrependInterceptors` SHALL delegate to the embedded `Workflow.PrependInterceptors`. + +#### Scenario: Nested *Workflow inherits parent interceptors +- **GIVEN** a parent Workflow with a `StepInterceptor` X, and a child `*Workflow` containing + step `S` added as a step in the parent +- **WHEN** the parent runs +- **THEN** X is invoked for both the child workflow step and the inner step S + +#### Scenario: SubWorkflow inherits parent interceptors +- **GIVEN** a parent Workflow with a `StepInterceptor` X, and a step embedding `SubWorkflow` + containing step `S` +- **WHEN** the parent runs +- **THEN** X is invoked for both the outer step and the inner step S + +#### Scenario: PrependInterceptors does not duplicate across retries +- **WHEN** a sub-workflow step is retried N times +- **THEN** parent interceptors are prepended exactly once, not N times + +#### Scenario: PrependInterceptors does not accumulate across repeated Do() runs +- **GIVEN** a parent containing a child sub-workflow +- **WHEN** the parent's `Do()` is invoked N times in succession +- **THEN** each invocation results in the parent's interceptors firing exactly once per + step (no compounding across runs) + +--- + +### Requirement: Opting out of inheritance via IsolateInterceptors + +A nested `Workflow` MAY set `IsolateInterceptors = true` to opt out of inheriting +interceptors from its parent. When true, `Workflow.PrependInterceptors` SHALL be a no-op +and the workflow runs only with its own registered interceptors. + +This is intended for self-contained sub-workflows that define their own observability +pipeline (e.g., their own tracer or event sink) that must not be wrapped by parent +interceptors. + +#### Scenario: Isolated child does not see parent interceptors +- **GIVEN** a parent Workflow with `StepInterceptor` X and a child Workflow with + `IsolateInterceptors = true` and its own `StepInterceptor` Y, containing inner step S +- **WHEN** the parent runs the child as a step +- **THEN** X is invoked exactly once (for the child workflow step itself) +- **AND** Y is invoked for inner step S +- **AND** X is NOT invoked for inner step S + +--- + +### Requirement: Attempt counter ownership and increment timing + +The internal `stepExecution` SHALL own the attempt counter (`uint64`), exposed to +`AttemptInterceptor` as the `attempt` parameter. The counter is incremented after each +attempt completes — including attempts that are short-circuited by an +`AttemptInterceptor` (e.g., one that returns without calling `next`). + +This guarantees the value passed as `attempt` is monotonically increasing and zero-indexed, +regardless of interceptor behaviour. + +#### Scenario: Attempt counter starts at zero +- **WHEN** a Step's first attempt runs +- **THEN** the `attempt` argument to `AttemptInterceptor.InterceptAttempt` is `0` + +#### Scenario: Attempt counter increments even when interceptor short-circuits +- **WHEN** an `AttemptInterceptor` returns without calling `next` +- **THEN** the next attempt (if retried) still receives `attempt = previous + 1` + +--- + +### Requirement: DontPanic protects interceptor panics + +When `Workflow.DontPanic` is `true`, panics raised inside user-provided `StepInterceptor` +or `AttemptInterceptor` implementations SHALL be caught and converted to errors using the +same `catchPanicAsError` mechanism already applied to `Before` / `Do` / `After`. This +prevents: + +- Process crashes from a faulty user interceptor. +- `MaxConcurrency` lease leaks (an unrecovered panic skips the deferred `unlease`). +- Loss of `signalStatusChange`, which would otherwise hang the main `Do()` loop. + +When `DontPanic` is `false` (the default), interceptor panics propagate as in normal Go +semantics. + +#### Scenario: Panicking StepInterceptor under DontPanic +- **GIVEN** a Workflow with `DontPanic = true` and a `StepInterceptor` that panics +- **WHEN** the Workflow runs +- **THEN** `Do()` returns an error within a bounded time +- **AND** the step's `StepResult.Err` carries the panic value +- **AND** the workflow does not hang waiting for a status signal + +#### Scenario: Panicking AttemptInterceptor under DontPanic +- **GIVEN** a Workflow with `DontPanic = true` and an `AttemptInterceptor` that panics +- **WHEN** the Workflow runs +- **THEN** `Do()` returns an error within a bounded time +- **AND** the step's `StepResult.Err` carries the panic value diff --git a/workflow.go b/workflow.go index 57c817d..9f9f487 100644 --- a/workflow.go +++ b/workflow.go @@ -44,10 +44,31 @@ type Workflow struct { Clock clock.Clock // Clock for retry and unit test DefaultOption *StepOption // DefaultOption is the default option for all Steps + StepInterceptors []StepInterceptor // per-step global interceptors (immutable base) + AttemptInterceptors []AttemptInterceptor // per-attempt global interceptors (immutable base) + IsolateInterceptors bool // if true, do not inherit interceptors from a parent workflow + StepBuilder // StepBuilder to call BuildStep() for Steps steps map[Steper]*State // the internal states of Steps + // inheritedStep / inheritedAttempt are populated by PrependInterceptors when + // this workflow runs as a child step under a parent. The lifecycle is: + // 1. Parent writes them BEFORE calling child.Do() (in executeWithRetry). + // 2. child.Do() reads them while building the effective interceptor chain. + // 3. child.Do()'s defer clears them after waitGroup.Wait() (covers all + // exit paths: success, preflight error, panic). + // + // They are NOT cleared by the internal reset() — reset() runs at the start + // of Do(), which would wipe out what the parent just wrote and break + // inheritance. The public Reset() method does clear them, since users call + // Reset() between runs and expect a fully-fresh state. + // + // They are never merged into StepInterceptors / AttemptInterceptors so the + // user-supplied base stays untouched and repeated runs do not accumulate. + inheritedStep []StepInterceptor + inheritedAttempt []AttemptInterceptor + statusChange *sync.Cond // a condition to signal the status change to proceed tick leaseBucket chan struct{} // constraint max concurrency of running Steps, nil means no limit waitGroup sync.WaitGroup // to prevent goroutine leak @@ -239,6 +260,13 @@ func (w *Workflow) Reset() error { } defer w.isRunning.Unlock() w.reset() + // Unlike the internal reset() (which Do() calls at its own start), Reset() also + // clears interceptors inherited from a parent during a previous run. The internal + // reset() must not clear them, because the parent writes them just before calling + // child.Do(), and child.Do() then calls reset() — clearing there would wipe the + // just-written prefix and break inheritance. + w.inheritedStep = nil + w.inheritedAttempt = nil return nil } @@ -258,6 +286,56 @@ func (w *Workflow) reset() { } } +// PrependInterceptors implements InterceptorReceiver on Workflow itself, +// so a Workflow used directly as a step (or embedded via SubWorkflow) can +// inherit interceptors from its parent. If IsolateInterceptors is true, +// the call is a no-op and the workflow uses only its own interceptors. +// +// The inherited slices are stored separately from StepInterceptors / +// AttemptInterceptors so the user-supplied base is never mutated and +// repeated runs do not accumulate. +func (w *Workflow) PrependInterceptors(step []StepInterceptor, attempt []AttemptInterceptor) { + if w.IsolateInterceptors { + return + } + if len(step) > 0 { + merged := make([]StepInterceptor, 0, len(step)+len(w.inheritedStep)) + merged = append(merged, step...) + merged = append(merged, w.inheritedStep...) + w.inheritedStep = merged + } + if len(attempt) > 0 { + mergedA := make([]AttemptInterceptor, 0, len(attempt)+len(w.inheritedAttempt)) + mergedA = append(mergedA, attempt...) + mergedA = append(mergedA, w.inheritedAttempt...) + w.inheritedAttempt = mergedA + } +} + +// effectiveStepInterceptors returns the chain to invoke for this run: +// inherited (from parent, if any) prepended to the user-configured base. +// The result is never written back to either field. +func (w *Workflow) effectiveStepInterceptors() []StepInterceptor { + if len(w.inheritedStep) == 0 { + return w.StepInterceptors + } + out := make([]StepInterceptor, 0, len(w.inheritedStep)+len(w.StepInterceptors)) + out = append(out, w.inheritedStep...) + out = append(out, w.StepInterceptors...) + return out +} + +// effectiveAttemptInterceptors mirrors effectiveStepInterceptors for AttemptInterceptors. +func (w *Workflow) effectiveAttemptInterceptors() []AttemptInterceptor { + if len(w.inheritedAttempt) == 0 { + return w.AttemptInterceptors + } + out := make([]AttemptInterceptor, 0, len(w.inheritedAttempt)+len(w.AttemptInterceptors)) + out = append(out, w.inheritedAttempt...) + out = append(out, w.AttemptInterceptors...) + return out +} + // Do starts the Step execution in topological order, // and waits until all Steps terminated. // @@ -268,6 +346,14 @@ func (w *Workflow) Do(ctx context.Context) error { return ErrWorkflowIsRunning } defer w.isRunning.Unlock() + // Clear inherited interceptors set by a parent during this run on every exit + // path, so the next time this workflow runs (under any parent, or standalone) + // it starts fresh and PrependInterceptors does not accumulate. Using defer + // ensures even early exits (Empty, preflight failure, panic) reset state. + defer func() { + w.inheritedStep = nil + w.inheritedAttempt = nil + }() // if no steps to run if w.Empty() { return nil @@ -303,6 +389,14 @@ func (w *Workflow) Do(ctx context.Context) error { } const scanned StepStatus = "scanned" // a private status for preflight + +type stepExecution struct { + w *Workflow + step Steper + state *State + attempt uint64 +} + func isAllUpstreamScanned(ups map[Steper]StepResult) bool { for _, up := range ups { if up.Status != scanned { @@ -362,82 +456,69 @@ func (w *Workflow) preflight() error { // tick will not block, it starts a goroutine for each runnable Step. // tick returns true if all steps in all phases are terminated. +// +// The Step's Condition is evaluated here (in the tick goroutine, holding +// statusChange.L) so that: +// - Steps whose Condition resolves to a terminal status (Skipped/Canceled) +// are settled inline without spawning a goroutine or consuming a +// concurrency lease. +// - Steps that will execute have their status set to Running before the +// worker goroutine is spawned, so a subsequent tick cannot double-spawn +// them. +// +// Inline-settled steps may unblock downstream steps in the same tick. Because +// no goroutine is spawned for them, no signalStatusChange is fired — so we +// loop until a single pass produces no inline progress, otherwise the main +// loop in Do() would Wait() forever for a signal that never comes. func (w *Workflow) tick(ctx context.Context) bool { - if w.IsTerminated() { - return true - } - for step := range w.steps { - state := w.StateOf(step) - // we only process pending Steps - if state.GetStatus() != Pending { - continue - } - // we only process Steps whose all upstreams are terminated - ups := w.UpstreamOf(step) - if isAnyUpstreamNotTerminated(ups) { - continue - } - option := state.Option() - cond := DefaultCondition - if option != nil && option.Condition != nil { - cond = option.Condition + for { + if w.IsTerminated() { + return true } - // if condition is evaluated to terminate - if nextStatus := cond(ctx, ups); nextStatus.IsTerminated() { - state.SetStepResult(StepResult{ - Status: nextStatus, - FinishedAt: w.Clock.Now(), - }) - w.waitGroup.Add(1) - go func() { - defer w.waitGroup.Done() - w.signalStatusChange() // it locks w.statusChange.L, so we need another goroutine - }() - continue + progressed := false + for step := range w.steps { + state := w.StateOf(step) + // we only process pending Steps + if state.GetStatus() != Pending { + continue + } + // we only process Steps whose all upstreams are terminated + ups := w.UpstreamOf(step) + if isAnyUpstreamNotTerminated(ups) { + continue + } + + // Evaluate Condition inline. If terminal (Skipped/Canceled), settle + // the step here — no goroutine, no lease, no interceptor chain. + cond := DefaultCondition + if option := state.Option(); option != nil && option.Condition != nil { + cond = option.Condition + } + if nextStatus := cond(ctx, ups); nextStatus.IsTerminated() { + state.SetStepResult(StepResult{ + Status: nextStatus, + FinishedAt: w.Clock.Now(), + }) + progressed = true + continue + } + + // Step will execute: take a lease and spawn a worker goroutine. + // SetStatus(Running) happens here (under statusChange.L) so a + // subsequent tick won't see it as Pending and double-spawn. + if w.lease() { + state.SetStatus(Running) + w.waitGroup.Add(1) + ex := &stepExecution{w: w, step: step, state: state} + go ex.run(ctx) + } } - // kick off the Step - if w.lease() { - state.SetStatus(Running) - w.waitGroup.Add(1) - go func(ctx context.Context, step Steper, state *State) { - defer w.waitGroup.Done() - - var err error - status := Failed - defer func() { - state.SetStepResult(StepResult{ - Status: status, - Err: err, - FinishedAt: w.Clock.Now(), - }) - // Release the lease BEFORE signalling, so that when the main - // loop wakes up in tick() it can immediately acquire a new lease. - // Previously unlease() was a separate earlier defer (LIFO), meaning - // signal fired first → tick() saw a full bucket → went back to - // Wait() → deadlock when MaxConcurrency=1 with chained steps. - w.unlease() - w.signalStatusChange() - }() - - err = w.runStep(ctx, step, state) - if err == nil { - status = Succeeded - return - } - status = StatusFromError(err) - if status == Failed { // do some extra checks - switch { - case - DefaultIsCanceled(err), - errors.Is(err, context.Canceled), - errors.Is(err, context.DeadlineExceeded): - status = Canceled - } - } - }(ctx, step, state) + // If we settled any step inline this pass, re-iterate to give downstream + // steps a chance to be picked up without waiting for a signal. + if !progressed { + return false } } - return false } func (w *Workflow) signalStatusChange() { @@ -446,45 +527,128 @@ func (w *Workflow) signalStatusChange() { w.statusChange.Signal() } -func (w *Workflow) runStep(ctx context.Context, step Steper, state *State) error { - // set Step-level timeout for the Step +func (ex *stepExecution) run(ctx context.Context) { + defer ex.w.waitGroup.Done() + + // By the time we get here, tick() has already evaluated the Condition + // (terminal results are settled inline) and set the status to Running. + // Build the StepInterceptor chain; innermost next is executeWithRetry. + // When DontPanic is true, each interceptor invocation is wrapped in + // catchPanicAsError so a panicking user interceptor cannot crash the + // process or leave the lease unreleased / status unsignalled. + stepNext := func(ctx context.Context) error { return ex.executeWithRetry(ctx) } + stepICs := ex.w.effectiveStepInterceptors() + for i := len(stepICs) - 1; i >= 0; i-- { + // ic and nextLocal are declared inside the loop body with :=, so they + // are fresh variables on every iteration and the closure below captures + // each iteration's instance independently. The explicit naming is to + // make the per-iteration scoping obvious to readers. + ic := stepICs[i] + nextLocal := stepNext + stepNext = func(ctx context.Context) error { + if ex.w.DontPanic { + return catchPanicAsError(func() error { + return ic.InterceptStep(ctx, ex.step, nextLocal) + }) + } + return ic.InterceptStep(ctx, ex.step, nextLocal) + } + } + + err := stepNext(ctx) + status := StatusFromError(err) + if status == Failed { + switch { + case DefaultIsCanceled(err), + errors.Is(err, context.Canceled), + errors.Is(err, context.DeadlineExceeded): + status = Canceled + } + } + + ex.state.SetStepResult(StepResult{ + Status: status, + Err: err, + FinishedAt: ex.w.Clock.Now(), + }) + // Release the lease BEFORE signalling, so that when the main loop wakes up + // in tick() it can immediately acquire a new lease. + ex.w.unlease() + ex.w.signalStatusChange() +} + +func (ex *stepExecution) executeWithRetry(ctx context.Context) error { + option := ex.state.Option() + + // Propagate the effective chain (inherited prefix + this workflow's own base) + // so multi-level nesting (grandparent → parent → child) accumulates correctly + // within one run, while the user-supplied base on each workflow stays untouched. + if recv, ok := ex.step.(InterceptorReceiver); ok { + recv.PrependInterceptors(ex.w.effectiveStepInterceptors(), ex.w.effectiveAttemptInterceptors()) + } + + attemptChain := ex.buildAttemptChain() + var notAfter time.Time - option := state.Option() if option != nil && option.Timeout != nil { - notAfter = w.Clock.Now().Add(*option.Timeout) + notAfter = ex.w.Clock.Now().Add(*option.Timeout) var cancel func() - ctx, cancel = w.Clock.WithDeadline(ctx, notAfter) + ctx, cancel = ex.w.Clock.WithDeadline(ctx, notAfter) defer cancel() } - // run the Step with or without retry - do := w.makeDoForStep(step, state) - return w.retry(option.RetryOption)(ctx, do, notAfter) + + return ex.w.retry(option.RetryOption)(ctx, attemptChain, notAfter) } -// makeDoForStep is panic-free from Step's Do and Input. -func (w *Workflow) makeDoForStep(step Steper, state *State) func(ctx context.Context) error { - return func(root context.Context) error { - do := func(fn func() error) error { return fn() } - if w.DontPanic { - do = catchPanicAsError - } - // call Before callbacks - var ctxStep context.Context - err := do(func() error { - ctxBefore, errBefore := state.Before(root, step, do) // pass do to Before to guard each Before callback - ctxStep = ctxBefore // use the context returned by Before for the following Do - return errBefore - }) - if err != nil { - err = ErrBeforeStep{err} - } else { // only call step.Do if all Before callbacks succeed - err = do(func() error { return step.Do(ctxStep) }) // step.Do will not change ctxStep +func (ex *stepExecution) buildAttemptChain() func(context.Context) error { + chain := func(ctx context.Context) error { + return ex.runAttempt(ctx) + } + attemptICs := ex.w.effectiveAttemptInterceptors() + for i := len(attemptICs) - 1; i >= 0; i-- { + // ic and nextLocal are declared inside the loop body with :=, so they + // are fresh variables on every iteration and the closure below captures + // each iteration's instance independently. + ic := attemptICs[i] + nextLocal := chain + chain = func(ctx context.Context) error { + if ex.w.DontPanic { + return catchPanicAsError(func() error { + return ic.InterceptAttempt(ctx, ex.step, ex.attempt, nextLocal) + }) + } + return ic.InterceptAttempt(ctx, ex.step, ex.attempt, nextLocal) } - // call After callbacks, will use the ctxStep for After callbacks - return do(func() error { return state.After(ctxStep, step, err) }) + } + // Wrap the full attempt chain (including interceptors) so ex.attempt is always + // incremented after each attempt regardless of whether interceptors short-circuit. + inner := chain + return func(ctx context.Context) error { + defer func() { ex.attempt++ }() + return inner(ctx) } } +func (ex *stepExecution) runAttempt(ctx context.Context) error { + do := func(fn func() error) error { return fn() } + if ex.w.DontPanic { + do = catchPanicAsError + } + + var ctxStep context.Context + err := do(func() error { + ctxBefore, errBefore := ex.state.Before(ctx, ex.step, do) + ctxStep = ctxBefore + return errBefore + }) + if err != nil { + err = ErrBeforeStep{err} + } else { + err = do(func() error { return ex.step.Do(ctxStep) }) + } + return do(func() error { return ex.state.After(ctxStep, ex.step, err) }) +} + func (w *Workflow) lease() bool { if w.leaseBucket == nil { return true @@ -556,3 +720,9 @@ func (s *SubWorkflow) Do(ctx context.Context) error { return s.w.Do(ctx) } // Reset resets the sub-workflow to ready for BuildStep() func (s *SubWorkflow) Reset() { s.w = Workflow{} } + +// PrependInterceptors implements InterceptorReceiver by delegating to the +// embedded Workflow. +func (s *SubWorkflow) PrependInterceptors(step []StepInterceptor, attempt []AttemptInterceptor) { + s.w.PrependInterceptors(step, attempt) +} diff --git a/workflow_test.go b/workflow_test.go index ae5168f..48e4958 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -2,12 +2,14 @@ package flow import ( "context" + "errors" "fmt" "sync" "sync/atomic" "testing" "time" + "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" ) @@ -303,3 +305,301 @@ func TestMaxConcurrencyDeadlockStress(t *testing.T) { } wg.Wait() } + +func TestStepExecution_BasicSuccess(t *testing.T) { + t.Parallel() + var stepped []Steper + step := NoOp("a") + w := &Workflow{ + StepInterceptors: []StepInterceptor{ + StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + stepped = append(stepped, s) + return next(ctx) + }), + }, + } + w.Add(Step(step)) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, []Steper{step}, stepped) +} + +func TestStepExecution_StepInterceptorOrder(t *testing.T) { + t.Parallel() + var order []string + makeIC := func(name string) StepInterceptor { + return StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + order = append(order, name+":before") + err := next(ctx) + order = append(order, name+":after") + return err + }) + } + w := &Workflow{ + StepInterceptors: []StepInterceptor{makeIC("A"), makeIC("B")}, + } + w.Add(Step(NoOp("s"))) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, []string{"A:before", "B:before", "B:after", "A:after"}, order) +} + +// TestStepExecution_StepInterceptorChain_NoVariableCapture guards against the +// classic Go closure-over-loop-variable bug in the chain builder. With 3+ +// interceptors, a buggy closure would either reorder calls, call the same +// interceptor multiple times, or self-recurse via `next`. We verify (a) the +// exact order of before/after across 4 interceptors, (b) the inner step runs +// exactly once, and (c) no stack explosion. +func TestStepExecution_StepInterceptorChain_NoVariableCapture(t *testing.T) { + t.Parallel() + var order []string + var stepRan int + makeIC := func(name string) StepInterceptor { + return StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + order = append(order, name+":before") + err := next(ctx) + order = append(order, name+":after") + return err + }) + } + step := Func("s", func(ctx context.Context) error { + stepRan++ + return nil + }) + w := &Workflow{ + StepInterceptors: []StepInterceptor{makeIC("A"), makeIC("B"), makeIC("C"), makeIC("D")}, + } + w.Add(Step(step)) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, 1, stepRan, "inner step must run exactly once") + assert.Equal(t, []string{ + "A:before", "B:before", "C:before", "D:before", + "D:after", "C:after", "B:after", "A:after", + }, order) +} + +// TestStepExecution_AttemptInterceptorChain_NoVariableCapture mirrors the above +// for AttemptInterceptors. With retries, the chain is built once but invoked +// per attempt — any closure capture bug would surface as wrong order, missing +// before/after pairs, or recursion. +func TestStepExecution_AttemptInterceptorChain_NoVariableCapture(t *testing.T) { + t.Parallel() + var order []string + makeIC := func(name string) AttemptInterceptor { + return AttemptInterceptorFunc(func(ctx context.Context, s Steper, attempt uint64, next func(context.Context) error) error { + order = append(order, fmt.Sprintf("%s:before:%d", name, attempt)) + err := next(ctx) + order = append(order, fmt.Sprintf("%s:after:%d", name, attempt)) + return err + }) + } + calls := 0 + step := Func("s", func(ctx context.Context) error { + calls++ + if calls < 3 { + return errors.New("boom") + } + return nil + }) + w := &Workflow{ + AttemptInterceptors: []AttemptInterceptor{makeIC("X"), makeIC("Y"), makeIC("Z")}, + } + w.Add(Step(step).Retry(func(o *RetryOption) { + o.Attempts = 3 + o.Backoff = &backoff.ZeroBackOff{} + })) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, []string{ + "X:before:0", "Y:before:0", "Z:before:0", "Z:after:0", "Y:after:0", "X:after:0", + "X:before:1", "Y:before:1", "Z:before:1", "Z:after:1", "Y:after:1", "X:after:1", + "X:before:2", "Y:before:2", "Z:before:2", "Z:after:2", "Y:after:2", "X:after:2", + }, order) +} + +func TestStepExecution_AttemptInterceptorOrder(t *testing.T) { + t.Parallel() + var order []string + makeIC := func(name string) AttemptInterceptor { + return AttemptInterceptorFunc(func(ctx context.Context, s Steper, attempt uint64, next func(context.Context) error) error { + order = append(order, name+":before") + err := next(ctx) + order = append(order, name+":after") + return err + }) + } + w := &Workflow{ + AttemptInterceptors: []AttemptInterceptor{makeIC("X"), makeIC("Y")}, + } + w.Add(Step(NoOp("s"))) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, []string{"X:before", "Y:before", "Y:after", "X:after"}, order) +} + +func TestStepExecution_SkippedStep(t *testing.T) { + t.Parallel() + interceptorCalled := false + step := NoOp("a") + w := &Workflow{ + StepInterceptors: []StepInterceptor{ + StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + interceptorCalled = true + return next(ctx) + }), + }, + } + w.Add(Step(step).When(func(_ context.Context, _ map[Steper]StepResult) StepStatus { + return Skipped + })) + assert.NoError(t, w.Do(context.Background())) + assert.False(t, interceptorCalled, "interceptor must not be called for skipped steps") +} + +func TestStepExecution_RetryingStep(t *testing.T) { + t.Parallel() + var attempts []uint64 + mu := sync.Mutex{} + boom := errors.New("boom") + callCount := 0 + step := Func("s", func(ctx context.Context) error { + callCount++ + if callCount < 3 { + return boom + } + return nil + }) + w := &Workflow{ + AttemptInterceptors: []AttemptInterceptor{ + AttemptInterceptorFunc(func(ctx context.Context, s Steper, attempt uint64, next func(context.Context) error) error { + mu.Lock() + attempts = append(attempts, attempt) + mu.Unlock() + return next(ctx) + }), + }, + } + w.Add(Step(step).Retry(func(o *RetryOption) { + o.Attempts = 3 + o.Backoff = &backoff.ZeroBackOff{} + })) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, []uint64{0, 1, 2}, attempts) +} + +func TestWorkflow_NoInterceptors_NoRegression(t *testing.T) { + t.Parallel() + // Workflows without interceptors must not regress existing behaviour. + step := NoOp("a") + w := &Workflow{} + w.Add(Step(step)) + assert.NoError(t, w.Do(context.Background())) + assert.Equal(t, Succeeded, w.StateOf(step).GetStatus()) +} + +// TestSkippedStep_DoesNotConsumeLease verifies that a Skipped step does NOT +// occupy a concurrency lease nor spawn a worker goroutine. With +// MaxConcurrency=1 and a chain a → b(skip) → c, b being skipped must not +// block c from running concurrently with a's *next* tick — and most +// importantly, b must not even briefly hold the only lease. +// +// We assert this by attaching an AttemptInterceptor that records every step +// that actually runs through the worker path. b must not appear there. +func TestSkippedStep_DoesNotConsumeLease(t *testing.T) { + t.Parallel() + + var ranSteps []string + mu := sync.Mutex{} + ic := AttemptInterceptorFunc(func(ctx context.Context, s Steper, attempt uint64, next func(context.Context) error) error { + mu.Lock() + ranSteps = append(ranSteps, String(s)) + mu.Unlock() + return next(ctx) + }) + + a, b, c := NoOp("a"), NoOp("b"), NoOp("c") + w := &Workflow{ + MaxConcurrency: 1, + AttemptInterceptors: []AttemptInterceptor{ic}, + } + w.Add( + Step(a), + Step(b).DependsOn(a).When(func(_ context.Context, _ map[Steper]StepResult) StepStatus { + return Skipped + }), + Step(c).DependsOn(b).When(AllSucceededOrSkipped), + ) + + done := make(chan error, 1) + go func() { done <- w.Do(context.Background()) }() + select { + case err := <-done: + assert.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("workflow did not complete within 5s") + } + + // Skipped step b must not have entered the worker path. + assert.Equal(t, Skipped, w.StateOf(b).GetStatus()) + mu.Lock() + defer mu.Unlock() + for _, name := range ranSteps { + assert.NotEqual(t, "b", name, "skipped step must not consume a worker lease / fire AttemptInterceptor") + } + assert.ElementsMatch(t, []string{"a", "c"}, ranSteps) +} + +// TestInterceptorPanic_DontPanic ensures that when DontPanic is true, a panic +// inside a user StepInterceptor is converted to an error rather than crashing +// the process or leaving the lease unreleased / status unsignalled. +func TestInterceptorPanic_DontPanic(t *testing.T) { + t.Parallel() + step := NoOp("a") + w := &Workflow{ + DontPanic: true, + StepInterceptors: []StepInterceptor{ + StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + panic("boom from StepInterceptor") + }), + }, + } + w.Add(Step(step)) + + done := make(chan error, 1) + go func() { done <- w.Do(context.Background()) }() + select { + case err := <-done: + // Workflow returns ErrWorkflow because the step ended in Failed. + assert.Error(t, err) + stepErr := w.StateOf(step).GetStepResult().Err + assert.Error(t, stepErr) + assert.Contains(t, stepErr.Error(), "boom from StepInterceptor") + case <-time.After(5 * time.Second): + t.Fatal("workflow hung after panicking interceptor — lease leak suspected") + } +} + +// TestAttemptInterceptorPanic_DontPanic mirrors the StepInterceptor panic test +// but for AttemptInterceptor. It ensures the panic is caught for retried +// attempts as well. +func TestAttemptInterceptorPanic_DontPanic(t *testing.T) { + t.Parallel() + step := NoOp("a") + w := &Workflow{ + DontPanic: true, + AttemptInterceptors: []AttemptInterceptor{ + AttemptInterceptorFunc(func(ctx context.Context, s Steper, attempt uint64, next func(context.Context) error) error { + panic("boom from AttemptInterceptor") + }), + }, + } + w.Add(Step(step)) + + done := make(chan error, 1) + go func() { done <- w.Do(context.Background()) }() + select { + case err := <-done: + assert.Error(t, err) + stepErr := w.StateOf(step).GetStepResult().Err + assert.Error(t, stepErr) + assert.Contains(t, stepErr.Error(), "boom from AttemptInterceptor") + case <-time.After(5 * time.Second): + t.Fatal("workflow hung after panicking AttemptInterceptor — lease leak suspected") + } +} diff --git a/wrap_test.go b/wrap_test.go index df695bb..6d8bd0d 100644 --- a/wrap_test.go +++ b/wrap_test.go @@ -2,9 +2,13 @@ package flow import ( "context" + "errors" "strings" + "sync" + "sync/atomic" "testing" + "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" ) @@ -150,3 +154,198 @@ func TestBuildStep(t *testing.T) { assert.Equal(t, []string{"Reset", "BuildStep"}, s.calls) }) } + +func TestSubWorkflow_InterceptorPropagation(t *testing.T) { + t.Parallel() + + var stepped []Steper + mu := sync.Mutex{} + ic := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + mu.Lock() + stepped = append(stepped, s) + mu.Unlock() + return next(ctx) + }) + + innerStep := NoOp("inner") + type mySubStep struct{ SubWorkflow } + sub := &mySubStep{} + sub.Add(Step(innerStep)) + + w := &Workflow{StepInterceptors: []StepInterceptor{ic}} + w.Add(Step(sub)) + + assert.NoError(t, w.Do(context.Background())) + + // Parent interceptor must have seen both the sub step and the inner step. + assert.GreaterOrEqual(t, len(stepped), 2) + found := false + for _, s := range stepped { + if s == innerStep { + found = true + } + } + assert.True(t, found, "parent interceptor should see inner step via propagation") +} + +func TestSubWorkflow_ChildInterceptorPreserved(t *testing.T) { + t.Parallel() + + var parentStepped, childStepped []Steper + pmu, cmu := sync.Mutex{}, sync.Mutex{} + + parentIC := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + pmu.Lock() + parentStepped = append(parentStepped, s) + pmu.Unlock() + return next(ctx) + }) + childIC := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + cmu.Lock() + childStepped = append(childStepped, s) + cmu.Unlock() + return next(ctx) + }) + + innerStep := NoOp("inner") + type mySubStep struct{ SubWorkflow } + sub := &mySubStep{} + sub.Add(Step(innerStep)) + sub.w.StepInterceptors = []StepInterceptor{childIC} + + w := &Workflow{StepInterceptors: []StepInterceptor{parentIC}} + w.Add(Step(sub)) + + assert.NoError(t, w.Do(context.Background())) + + // Parent sees sub + inner (propagated); child sees inner only. + assert.GreaterOrEqual(t, len(parentStepped), 2) + assert.GreaterOrEqual(t, len(childStepped), 1) +} + +func TestSubWorkflow_InterceptorNotDuplicatedOnRetry(t *testing.T) { + t.Parallel() + + var count atomic.Int32 + sink := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + count.Add(1) + return next(ctx) + }) + + attempts := 0 + inner := Func("inner", func(ctx context.Context) error { + attempts++ + if attempts < 2 { + return errors.New("fail once") + } + return nil + }) + + type mySubStep struct{ SubWorkflow } + sub := &mySubStep{} + sub.Add(Step(inner).Retry(func(o *RetryOption) { + o.Attempts = 3 + o.Backoff = &backoff.ZeroBackOff{} + })) + + w := &Workflow{StepInterceptors: []StepInterceptor{sink}} + w.Add(Step(sub)) + assert.NoError(t, w.Do(context.Background())) + + // parent interceptor must fire exactly twice: + // once for the outer sub step, once for the inner step (regardless of retry count). + assert.Equal(t, int32(2), count.Load()) +} + +func TestWorkflow_AsStep_InheritsInterceptors(t *testing.T) { + t.Parallel() + + var stepped []Steper + mu := sync.Mutex{} + ic := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + mu.Lock() + stepped = append(stepped, s) + mu.Unlock() + return next(ctx) + }) + + innerStep := NoOp("inner") + child := &Workflow{} + child.Add(Step(innerStep)) + + parent := &Workflow{StepInterceptors: []StepInterceptor{ic}} + parent.Add(Step(child)) + assert.NoError(t, parent.Do(context.Background())) + + // parent's interceptor should see both the child workflow step and the inner step + found := false + for _, s := range stepped { + if s == innerStep { + found = true + } + } + assert.True(t, found, "parent interceptor should see inner step via Workflow.PrependInterceptors") +} + +// TestSubWorkflow_PrependInterceptorsIdempotentAcrossDo ensures that running the +// same parent (with a sub-workflow child) multiple times does NOT accumulate +// prepended interceptors on the child. The parent's interceptor should fire +// exactly twice per run (outer sub step + inner step), regardless of how many +// times Do() is called. +func TestSubWorkflow_PrependInterceptorsIdempotentAcrossDo(t *testing.T) { + t.Parallel() + + var count atomic.Int32 + ic := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + count.Add(1) + return next(ctx) + }) + + innerStep := NoOp("inner") + type mySubStep struct{ SubWorkflow } + sub := &mySubStep{} + sub.Add(Step(innerStep)) + + parent := &Workflow{StepInterceptors: []StepInterceptor{ic}} + parent.Add(Step(sub)) + + const runs = 3 + for i := 0; i < runs; i++ { + count.Store(0) + // reset both parent and child step states so the workflow is re-runnable + assert.NoError(t, parent.Reset()) + assert.NoError(t, parent.Do(context.Background())) + assert.Equalf(t, int32(2), count.Load(), + "run %d: parent interceptor must fire exactly 2 times (outer sub + inner), accumulation detected", i) + } +} + +func TestSubWorkflow_IsolateInterceptors(t *testing.T) { + t.Parallel() + + var parentCount, childCount atomic.Int32 + parentIC := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + parentCount.Add(1) + return next(ctx) + }) + childIC := StepInterceptorFunc(func(ctx context.Context, s Steper, next func(context.Context) error) error { + childCount.Add(1) + return next(ctx) + }) + + innerStep := NoOp("inner") + type mySubStep struct{ SubWorkflow } + sub := &mySubStep{} + sub.Add(Step(innerStep)) + sub.w.StepInterceptors = []StepInterceptor{childIC} + sub.w.IsolateInterceptors = true + + w := &Workflow{StepInterceptors: []StepInterceptor{parentIC}} + w.Add(Step(sub)) + assert.NoError(t, w.Do(context.Background())) + + // parent only sees the outer sub step (1), not the inner step (since isolated) + assert.Equal(t, int32(1), parentCount.Load()) + // child only sees the inner step + assert.Equal(t, int32(1), childCount.Load()) +}