feat(scheduler): Extract short job penalty service#4969
Conversation
Greptile SummaryThis PR extracts short-job-penalty tracking from the scheduling hot path into a dedicated, mutex-protected
Confidence Score: 3/5Safe to merge for steady-state operation; scheduler restarts within the short-job cutoff window will silently discard all in-flight penalties. On every scheduler restart, terminal jobs loaded by FetchInitialJobs arrive at ReconcileDifferences already fully terminal. They are deleted from the in-memory jobDb without ReportFinishedJob being called, because generateUpdateMessagesFromJob early-returns for any job where InTerminalState() is true, and initialise discards the returned job slice entirely. Any queue that ran short jobs within the cutoff window before the restart enters the next scheduling cycle without those penalties applied. internal/scheduler/scheduler.go — the syncState terminal-deletion loop and the initialise call site; adding ReportFinishedJob calls there would close the restart gap Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Executor
participant DB
participant syncState
participant generateUpdateMessages
participant ShortJobPenalty
participant Schedule
Executor->>DB: "run.succeeded = true"
syncState->>DB: FetchJobUpdates (run update, job.succeeded still false)
syncState->>syncState: "ReconcileDifferences - job non-terminal (run=succeeded, job=running)"
syncState-->>generateUpdateMessages: jobDbJobs (non-terminal job with succeeded run)
generateUpdateMessages->>generateUpdateMessages: lastRun.Succeeded() → WithSucceeded(true)
generateUpdateMessages->>ShortJobPenalty: ReportFinishedJob ✓
generateUpdateMessages->>DB: publish Succeeded event
Note over DB: Pulsar consumer marks job.succeeded=true
syncState->>DB: "next cycle FetchJobUpdates (job.succeeded=true now)"
syncState->>syncState: job already terminal in jobDb - no-op change
syncState->>syncState: BatchDelete terminal job
Note over ShortJobPenalty: On RESTART
syncState->>DB: "FetchInitialJobs (job.succeeded=true AND run.succeeded=true)"
syncState->>syncState: "ReconcileDifferences → job.WithSucceeded(true) → InTerminalState=true"
syncState->>syncState: BatchDelete without ReportFinishedJob ✗
ShortJobPenalty-->>Schedule: no penalty for recently-completed short jobs
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Executor
participant DB
participant syncState
participant generateUpdateMessages
participant ShortJobPenalty
participant Schedule
Executor->>DB: "run.succeeded = true"
syncState->>DB: FetchJobUpdates (run update, job.succeeded still false)
syncState->>syncState: "ReconcileDifferences - job non-terminal (run=succeeded, job=running)"
syncState-->>generateUpdateMessages: jobDbJobs (non-terminal job with succeeded run)
generateUpdateMessages->>generateUpdateMessages: lastRun.Succeeded() → WithSucceeded(true)
generateUpdateMessages->>ShortJobPenalty: ReportFinishedJob ✓
generateUpdateMessages->>DB: publish Succeeded event
Note over DB: Pulsar consumer marks job.succeeded=true
syncState->>DB: "next cycle FetchJobUpdates (job.succeeded=true now)"
syncState->>syncState: job already terminal in jobDb - no-op change
syncState->>syncState: BatchDelete terminal job
Note over ShortJobPenalty: On RESTART
syncState->>DB: "FetchInitialJobs (job.succeeded=true AND run.succeeded=true)"
syncState->>syncState: "ReconcileDifferences → job.WithSucceeded(true) → InTerminalState=true"
syncState->>syncState: BatchDelete without ReportFinishedJob ✗
ShortJobPenalty-->>Schedule: no penalty for recently-completed short jobs
|
| @@ -49,5 +77,117 @@ func (sjp *ShortJobPenalty) ShouldApplyPenalty(job *jobdb.Job) bool { | |||
| return false | |||
| } | |||
|
|
|||
| return sjp.now.Sub(*jobStart) < sjp.cutoffDurationByPool[jobRun.Pool()] | |||
| return sjp.now.Sub(*jobStart) < sjp.cutoffs[jobRun.Pool()] | |||
| } | |||
There was a problem hiding this comment.
Undocumented mutex precondition on
shouldApplyPenalty
shouldApplyPenalty reads sjp.now and sjp.cutoffs directly without acquiring sjp.mu. This is currently safe because every call site (ReportFinishedJob) already holds the lock before dispatching here. However, the function carries an implicit contract — it must be called with the mutex held — which isn't documented. A future internal caller that invokes it without the lock would silently introduce a data race that only surfaces under -race and only when SetNow is called concurrently. Adding a brief comment (e.g. // caller must hold sjp.mu) makes the contract explicit and avoids a subtle footgun.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| func (sjp *ShortJobPenalty) subtractFromSums(pool, queue string, resources internaltypes.ResourceList) { | ||
| queueSums := sjp.sums[pool] | ||
| remaining := queueSums[queue].Subtract(resources) | ||
| if remaining.AllZero() { | ||
| delete(queueSums, queue) | ||
| if len(queueSums) == 0 { | ||
| delete(sjp.sums, pool) | ||
| } | ||
| return | ||
| } | ||
| queueSums[queue] = remaining | ||
| } |
There was a problem hiding this comment.
Nil map write in
subtractFromSums is unreachable but unguarded
queueSums := sjp.sums[pool] returns nil if the pool key was previously deleted from sums (which subtractFromSums itself does on line 188). If a heap entry were ever present without a matching sums entry — an invariant violation — the code would reach queueSums[queue] = remaining on a nil map and panic. The invariant is maintained correctly today (every heap.Push is paired with an addToSums call), but adding a nil guard or an assertion would make the invariant self-documenting and prevent a silent panic if the invariant is ever accidentally broken.
f39438e to
640e870
Compare
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
640e870 to
cb5940b
Compare
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
Signed-off-by: Trey Guckian <24757349+tgucks@users.noreply.github.com>
What type of PR is this?
Feature/refactor
What this PR does / why we need it
Extracts short-job-penalty tracking out of the scheduling hot path and the jobDb retention logic into a dedicated, self-contained ShortJobPenalty service.
Previously the penalty was recomputed every scheduling cycle by scanning all terminal jobs held in the jobDb. To make those jobs available for the scan, terminal short jobs were deliberately kept in the jobDb while their penalty was active. This required an occasional full GC to clean them up periodically. This PR makes ShortJobPenalty own its own state:
ReportFinishedJobat each point where a job can go terminal. This records the job's resources keyed by (pool, queue).Snapshot()and read back per-pool from that immutableShortJobPenaltySnapshotviaGetPenaltiesForPool, replacing the inline per-jobShouldApplyPenaltyaccumulation incalculateJobSchedulingInfo.runStart + cutoff[pool]), with a derived per-(pool, queue) running total cache kept in sync as entries are added and expired. Access is guarded by a mutex.This removes the need for the periodic full GC of terminal jobs from the jobDb. They're deleted as they go terminal now.
Special notes for your reviewer
ShortJobPenaltyis now stateful and should be concurrency-safe (sync.Mutex); the entries are the source of truth and sums is a derived cache. ASnapshot()taken once per cycle gives every pool a consistent point-in-time view.syncState's signature changed (dropped thefullJobGcbool); call sites in cycle and initialize were updated accordingly.