Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 1 addition & 48 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type JobDb struct {
jobsByQueue map[string]immutable.SortedSet[*Job]
jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job]
leasedJobs *immutable.Set[*Job]
terminalJobs *immutable.Set[*Job]
unvalidatedJobs *immutable.Set[*Job]
// Configured priority classes.
priorityClasses map[string]types.PriorityClass
Expand Down Expand Up @@ -137,15 +136,13 @@ func NewJobDbWithSchedulingKeyGenerator(
}
unvalidatedJobs := immutable.NewSet[*Job](JobHasher{})
leasedJobs := immutable.NewSet[*Job](JobHasher{})
terminalJobs := immutable.NewSet[*Job](JobHasher{})
return &JobDb{
jobsById: immutable.NewMap[string, *Job](nil),
jobsByRunId: immutable.NewMap[string, string](nil),
jobsByGangKey: map[gangKey]immutable.Set[string]{},
jobsByQueue: map[string]immutable.SortedSet[*Job]{},
jobsByPoolAndQueue: map[string]map[string]immutable.SortedSet[*Job]{},
leasedJobs: &leasedJobs,
terminalJobs: &terminalJobs,
unvalidatedJobs: &unvalidatedJobs,
priorityClasses: priorityClasses,
defaultPriorityClass: defaultPriorityClass,
Expand Down Expand Up @@ -178,7 +175,6 @@ func (jobDb *JobDb) Clone() *JobDb {
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue),
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
priorityClasses: jobDb.priorityClasses,
defaultPriorityClass: jobDb.defaultPriorityClass,
Expand Down Expand Up @@ -353,7 +349,6 @@ func (jobDb *JobDb) ReadTxn() *Txn {
jobsByQueue: jobDb.jobsByQueue,
jobsByPoolAndQueue: jobDb.jobsByPoolAndQueue,
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
bidPriceSnapshot: jobDb.bidPriceSnapshot,
active: true,
Expand All @@ -376,7 +371,6 @@ func (jobDb *JobDb) WriteTxn() *Txn {
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue),
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
bidPriceSnapshot: jobDb.bidPriceSnapshot,
active: true,
Expand All @@ -399,7 +393,6 @@ func (jobDb *JobDb) DryRunTxn() *Txn {
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue),
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
bidPriceSnapshot: jobDb.bidPriceSnapshot,
active: true,
Expand Down Expand Up @@ -444,8 +437,6 @@ type Txn struct {
jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job]
// Jobs that are currently leased
leasedJobs *immutable.Set[*Job]
// Jobs that are currently in a terminal state
terminalJobs *immutable.Set[*Job]
// Jobs that require submit checking
unvalidatedJobs *immutable.Set[*Job]
// The current snapshot of bid prices - allowing look up of bidding prices on job creation
Expand Down Expand Up @@ -473,7 +464,6 @@ func (txn *Txn) Commit() {
txn.jobDb.jobsByQueue = txn.jobsByQueue
txn.jobDb.jobsByPoolAndQueue = txn.jobsByPoolAndQueue
txn.jobDb.leasedJobs = txn.leasedJobs
txn.jobDb.terminalJobs = txn.terminalJobs
txn.jobDb.unvalidatedJobs = txn.unvalidatedJobs
txn.jobDb.bidPriceSnapshot = txn.bidPriceSnapshot

Expand Down Expand Up @@ -614,11 +604,6 @@ func (txn *Txn) Upsert(jobs []*Job) error {
txn.leasedJobs = &newLeasedJobs
}

if existingJob.InTerminalState() {
newTerminalJobs := txn.terminalJobs.Delete(existingJob)
txn.terminalJobs = &newTerminalJobs
}

if !existingJob.Validated() {
newUnvalidatedJobs := txn.unvalidatedJobs.Delete(existingJob)
txn.unvalidatedJobs = &newUnvalidatedJobs
Expand All @@ -629,7 +614,7 @@ func (txn *Txn) Upsert(jobs []*Job) error {

// Now need to insert jobs, runs and queuedJobs. This can be done in parallel.
wg := sync.WaitGroup{}
wg.Add(7)
wg.Add(6)

// jobs
go func() {
Expand Down Expand Up @@ -792,30 +777,6 @@ func (txn *Txn) Upsert(jobs []*Job) error {
}
}()

// Terminal jobs
go func() {
defer wg.Done()
if hasJobs {
for _, job := range jobs {
if job.InTerminalState() {
terminalJobs := txn.terminalJobs.Add(job)
txn.terminalJobs = &terminalJobs
}
}
} else {
terminalJobs := map[*Job]bool{}

for _, job := range jobs {
if job.InTerminalState() {
terminalJobs[job] = true
}
}

terminalJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(terminalJobs)...)
txn.terminalJobs = &terminalJobsImmutable
}
}()

// Unvalidated jobs
go func() {
defer wg.Done()
Expand Down Expand Up @@ -957,11 +918,6 @@ func (txn *Txn) GetAllLeasedJobs() []*Job {
return txn.leasedJobs.Items()
}

// GetAllTerminalJobs returns all terminal jobs in the database
func (txn *Txn) GetAllTerminalJobs() []*Job {
return txn.terminalJobs.Items()
}

// GetAll returns all jobs in the database.
func (txn *Txn) GetAll() []*Job {
allJobs := make([]*Job, 0, txn.jobsById.Len())
Expand Down Expand Up @@ -1034,9 +990,6 @@ func (txn *Txn) delete(jobId string) {
newLeasedJobs := txn.leasedJobs.Delete(job)
txn.leasedJobs = &newLeasedJobs

newTerminalJobs := txn.terminalJobs.Delete(job)
txn.terminalJobs = &newTerminalJobs

newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job)
txn.unvalidatedJobs = &newUnvalidatedJobs
}
Expand Down
62 changes: 0 additions & 62 deletions internal/scheduler/jobdb/jobdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,68 +147,6 @@ func TestJobDb_LeasedJobs_Deleted(t *testing.T) {
assert.Empty(t, txn.GetAllLeasedJobs())
}

func TestJobDb_TestGetTerminalJobs(t *testing.T) {
jobDb := NewTestJobDb()
job1 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5)
job2 := newJob().WithQueued(true)
job3 := newJob().WithQueued(false).WithSucceeded(true)
job4 := newJob().WithQueued(false).WithCancelled(true)
job5 := newJob().WithQueued(false).WithFailed(true)
job6 := newJob().WithQueued(true).WithFailed(true)
txn := jobDb.WriteTxn()

err := txn.Upsert([]*Job{job1, job2, job3, job4, job5, job6})
require.NoError(t, err)

expected := []*Job{job3, job4, job5, job6}
actual := txn.GetAllTerminalJobs()
sort.SliceStable(actual, func(i, j int) bool { return actual[i].id < actual[j].id })
sort.SliceStable(expected, func(i, j int) bool { return expected[i].id < expected[j].id })
assert.Equal(t, expected, actual)
}

func TestJobDb_TerminalJobs_Lifecycle(t *testing.T) {
jobDb := NewTestJobDb()

upsert := func(jobDb *JobDb, job *Job) {
txn := jobDb.WriteTxn()
err := txn.Upsert([]*Job{job})
require.NoError(t, err)
txn.Commit()
}

job1 := newJob().WithQueued(true)
upsert(jobDb, job1)
assert.Empty(t, jobDb.ReadTxn().GetAllTerminalJobs())

// leased
job1 = job1.WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5)
upsert(jobDb, job1)
assert.Empty(t, jobDb.ReadTxn().GetAllTerminalJobs())

// finished
job1 = job1.WithSucceeded(true)
upsert(jobDb, job1)
assert.NotEmpty(t, jobDb.ReadTxn().GetAllTerminalJobs())
}

func TestJobDb_TerminalJobs_Deleted(t *testing.T) {
jobDb := NewTestJobDb()
job1 := newJob().WithFailed(true)
txn := jobDb.WriteTxn()

err := txn.Upsert([]*Job{job1})
require.NoError(t, err)

expected := []*Job{job1}
actual := txn.GetAllTerminalJobs()
assert.Equal(t, expected, actual)

err = txn.BatchDelete([]string{job1.Id()})
require.NoError(t, err)
assert.Empty(t, txn.GetAllTerminalJobs())
}

func TestJobDb_TestGetUnvalidated(t *testing.T) {
jobDb := NewTestJobDb()
job1 := newJob().WithValidated(false)
Expand Down
35 changes: 14 additions & 21 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
}(ctx)
// Update job state.
ctx.Info("Syncing internal state with database")
updatedJobs, jsts, newJobsSerial, newRunsSerial, err := s.syncState(ctx, false, cycleNumber%10 == 0)
updatedJobs, jsts, newJobsSerial, newRunsSerial, err := s.syncState(ctx, false)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
// syncState updates jobs in jobDb to match state in postgres and returns all updated jobs along with
// the new jobsSerial and runsSerial cursor values that should be applied once the resulting events
// have been published successfully.
func (s *Scheduler) syncState(ctx *armadacontext.Context, initial, fullJobGc bool) ([]*jobdb.Job, []jobdb.JobStateTransitions, int64, int64, error) {
func (s *Scheduler) syncState(ctx *armadacontext.Context, initial bool) ([]*jobdb.Job, []jobdb.JobStateTransitions, int64, int64, error) {
txn := s.jobDb.WriteTxn()
defer txn.Abort()

Expand Down Expand Up @@ -501,25 +501,12 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context, initial, fullJobGc boo

// Delete jobs in a terminal state.
idsOfJobsToDelete := make([]string, 0)
deletionCandidates := jobDbJobs
if fullJobGc {
// Occasional full gc so jobs that were not deleted
// earlier as ShortJobPenalty was being applied
// eventually get deleted.
deletionCandidates = txn.GetAll()
}
shortJobCount := 0
for _, j := range deletionCandidates {
if !j.InTerminalState() {
continue
}
if s.shortJobPenalty.ShouldApplyPenalty(j) {
shortJobCount++
continue
for _, j := range jobDbJobs {
if j.InTerminalState() {
idsOfJobsToDelete = append(idsOfJobsToDelete, j.Id())
}
idsOfJobsToDelete = append(idsOfJobsToDelete, j.Id())
}
ctx.Logger().Infof("Deleting %d jobs out of %d considered for deletion (%d short jobs, full job gc=%t)", len(idsOfJobsToDelete), len(deletionCandidates), shortJobCount, fullJobGc)
ctx.Logger().Infof("Deleting %d terminal jobs out of %d updated jobs", len(idsOfJobsToDelete), len(jobDbJobs))
if err := txn.BatchDelete(idsOfJobsToDelete); err != nil {
return nil, nil, 0, 0, err
}
Expand Down Expand Up @@ -1070,6 +1057,9 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo
}

if !origJob.Equal(job) {
if job.InTerminalState() {
s.shortJobPenalty.ReportFinishedJob(job)
}
if err := txn.Upsert([]*jobdb.Job{job}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1128,7 +1118,9 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb
run := job.LatestRun()
if run != nil && !job.Queued() && staleExecutors[run.Executor()] {
ctx.Warnf("Cancelling job %s as it is running on lost executor %s", job.Id(), run.Executor())
jobsToUpdate = append(jobsToUpdate, job.WithQueued(false).WithFailed(true).WithUpdatedRun(run.WithFailed(true)))
expiredJob := job.WithQueued(false).WithFailed(true).WithUpdatedRun(run.WithFailed(true))
s.shortJobPenalty.ReportFinishedJob(expiredJob)
jobsToUpdate = append(jobsToUpdate, expiredJob)

leaseExpiredError := &armadaevents.Error{
Terminal: true,
Expand Down Expand Up @@ -1234,6 +1226,7 @@ func (s *Scheduler) submitCheck(ctx *armadacontext.Context, txn *jobdb.Txn) ([]*
}
} else {
job = job.WithFailed(true).WithQueued(false)
s.shortJobPenalty.ReportFinishedJob(job)
jobsToUpdate = append(jobsToUpdate, job)

es.Events[0].Event = &armadaevents.EventSequence_Event_JobErrors{
Expand Down Expand Up @@ -1275,7 +1268,7 @@ func (s *Scheduler) initialise(ctx *armadacontext.Context) error {
case <-ctx.Done():
return nil
default:
if _, _, newJobsSerial, newRunsSerial, err := s.syncState(ctx, true, false); err != nil {
if _, _, newJobsSerial, newRunsSerial, err := s.syncState(ctx, true); err != nil {
ctx.Logger().
WithStacktrace(err).
Error("failed to initialise; trying again in 1 second")
Expand Down
Loading
Loading