diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index a87cacb2770..6bea9242c89 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -6,7 +6,6 @@ pulsarSendTimeout: 5s internedStringsCacheSize: 100000 queueRefreshPeriod: 10s publishMetricsToPulsar: false -jobMetadataMigrationPhase: cutover metrics: port: 9000 jobStateMetricsResetInterval: 12h diff --git a/config/scheduleringester/config.yaml b/config/scheduleringester/config.yaml index bdd9ae62398..43604d2fbfb 100644 --- a/config/scheduleringester/config.yaml +++ b/config/scheduleringester/config.yaml @@ -21,4 +21,3 @@ pulsar: subscriptionName: "scheduler-ingester" batchSize: 10000 batchDuration: 500ms -jobMetadataMigrationPhase: cutover diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index e7bf618d5ac..6cfd82bc990 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -13,7 +13,6 @@ import ( profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" armadaresource "github.com/armadaproject/armada/internal/common/resource" "github.com/armadaproject/armada/internal/common/types" - schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/server/configuration" "github.com/armadaproject/armada/pkg/client" ) @@ -91,14 +90,6 @@ type Configuration struct { PricingApi PricingApiConfig // Whether to publish metrics To Pulsar. This is currently experimental PublishMetricsToPulsar bool - // JobMetadataMigrationPhase controls whether submit_message and groups are - // read from the jobs table, the job_metadata table, or both (coalesced), - // during the migration. Required; default ("cutover"). - // Operators must coordinate this value with the scheduleringester's JobMetadataMigrationPhase: - // a mismatch can cause new submissions to be unreadable - // (e.g. ingester=cutover with scheduler=legacy omits submit_message/groups from leases) - // or to fail entirely (e.g. scheduler=cutover while rows without a job_metadata counterpart still exist). - JobMetadataMigrationPhase schedulerdb.JobMetadataMigrationPhase `validate:"required,oneof=legacy dualWrite cutover"` } type SubmitCheckConfig struct { diff --git a/internal/scheduler/configuration/validation_test.go b/internal/scheduler/configuration/validation_test.go index 653db94c415..459977d3d47 100644 --- a/internal/scheduler/configuration/validation_test.go +++ b/internal/scheduler/configuration/validation_test.go @@ -9,7 +9,6 @@ import ( commonconfig "github.com/armadaproject/armada/internal/common/config" "github.com/armadaproject/armada/internal/common/types" - schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" ) func TestMutate(t *testing.T) { @@ -160,7 +159,6 @@ func createValidMinimalConfig() Configuration { Pulsar: commonconfig.PulsarConfig{ URL: "pulsar", }, - JobMetadataMigrationPhase: schedulerdb.JobMetadataMigrationPhaseLegacy, } } diff --git a/internal/scheduler/database/db_pruner_test.go b/internal/scheduler/database/db_pruner_test.go index 79097db89f1..2b4906bf632 100644 --- a/internal/scheduler/database/db_pruner_test.go +++ b/internal/scheduler/database/db_pruner_test.go @@ -241,7 +241,6 @@ func populateRequiredJobFields(job Job) Job { job.Queue = "test-queue" job.UserID = "" job.Submitted = 0 - job.SubmitMessage = []byte{} job.SchedulingInfo = []byte{} return job } diff --git a/internal/scheduler/database/job_metadata_migration.go b/internal/scheduler/database/job_metadata_migration.go deleted file mode 100644 index f203329ae60..00000000000 --- a/internal/scheduler/database/job_metadata_migration.go +++ /dev/null @@ -1,46 +0,0 @@ -package database - -import "github.com/pkg/errors" - -// JobMetadataMigrationPhase is a temporary flag controlling the lazy migration -// of submit_message and groups from the jobs table to a separate job_metadata -// table. It is set in both the scheduler and scheduleringester configs -// controlling read and write behavior, respectively. An unrecognized or empty -// value is rejected. -type JobMetadataMigrationPhase string - -const ( - // JobMetadataMigrationPhaseLegacy: reads come from jobs only. - // Pre-migration baseline. Safe to hold while scheduleringester writes to jobs. - JobMetadataMigrationPhaseLegacy JobMetadataMigrationPhase = "legacy" - // JobMetadataMigrationPhaseDualWrite: reads use a LEFT JOIN + COALESCE - // so pre- and post-migration row shapes both resolve. - JobMetadataMigrationPhaseDualWrite JobMetadataMigrationPhase = "dualWrite" - // JobMetadataMigrationPhaseCutover: reads use an INNER JOIN job_metadata. - // Migration end state. Operators must verify job_metadata has been backfilled - // for every non-terminal jobs row before selecting this phase - // (e.g. via: SELECT COUNT(*) FROM jobs j LEFT JOIN job_metadata jm ON j.job_id=jm.job_id - // WHERE jm.job_id IS NULL AND j.terminated = false); - JobMetadataMigrationPhaseCutover JobMetadataMigrationPhase = "cutover" -) - -func (p JobMetadataMigrationPhase) Validate() error { - switch p { - case JobMetadataMigrationPhaseLegacy, JobMetadataMigrationPhaseDualWrite, JobMetadataMigrationPhaseCutover: - return nil - } - return errors.Errorf("invalid JobMetadataMigrationPhase %q: must be one of %q, %q, %q", - p, JobMetadataMigrationPhaseLegacy, JobMetadataMigrationPhaseDualWrite, JobMetadataMigrationPhaseCutover) -} - -// WritesJobs reports whether the scheduleringester should populate the legacy -// jobs.submit_message / jobs.groups columns on new submissions. -func (p JobMetadataMigrationPhase) WritesJobs() bool { - return p == JobMetadataMigrationPhaseLegacy || p == JobMetadataMigrationPhaseDualWrite -} - -// WritesJobMetadata reports whether the scheduleringester should insert a -// job_metadata row alongside each new jobs row. -func (p JobMetadataMigrationPhase) WritesJobMetadata() bool { - return p == JobMetadataMigrationPhaseDualWrite || p == JobMetadataMigrationPhaseCutover -} diff --git a/internal/scheduler/database/job_metadata_migration_test.go b/internal/scheduler/database/job_metadata_migration_test.go deleted file mode 100644 index 32b1f76a18e..00000000000 --- a/internal/scheduler/database/job_metadata_migration_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package database - -import ( - "testing" - "time" - - "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - clocktesting "k8s.io/utils/clock/testing" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/util" -) - -func TestFetchJobRunLeases_MigrationPhases(t *testing.T) { - const executorName = "test-executor" - - legacyJobID := util.NewULID() - migratedJobID := util.NewULID() - bothJobID := util.NewULID() - - legacyGroups := []byte("legacy-groups") - legacySubmit := []byte("legacy-submit") - migratedGroups := []byte("migrated-groups") - migratedSubmit := []byte("migrated-submit") - bothGroupsOnJobs := []byte("both-jobs-groups") - bothSubmitOnJobs := []byte("both-jobs-submit") - bothGroupsOnMetadata := []byte("both-metadata-groups") - bothSubmitOnMetadata := []byte("both-metadata-submit") - - seed := func(ctx *armadacontext.Context, db *pgxpool.Pool) ([]Run, error) { - legacyJob := Job{JobID: legacyJobID, JobSet: "js", Queue: "q", Groups: legacyGroups, SubmitMessage: legacySubmit, SchedulingInfo: []byte{}} - migratedJob := Job{JobID: migratedJobID, JobSet: "js", Queue: "q", SchedulingInfo: []byte{}} - bothJob := Job{JobID: bothJobID, JobSet: "js", Queue: "q", Groups: bothGroupsOnJobs, SubmitMessage: bothSubmitOnJobs, SchedulingInfo: []byte{}} - if err := upsertJobs(ctx, db, []Job{legacyJob, migratedJob, bothJob}); err != nil { - return nil, err - } - // Seed job_metadata for migrated + both - for _, r := range []struct { - jobID, groups, submit []byte - }{ - {[]byte(migratedJobID), migratedGroups, migratedSubmit}, - {[]byte(bothJobID), bothGroupsOnMetadata, bothSubmitOnMetadata}, - } { - if _, err := db.Exec(ctx, - "INSERT INTO job_metadata (job_id, groups, submit_message) VALUES ($1, $2, $3)", - string(r.jobID), r.groups, r.submit, - ); err != nil { - return nil, err - } - } - // Clear jobs.submit_message for migrated row - if _, err := db.Exec(ctx, - "UPDATE jobs SET submit_message = NULL, groups = NULL WHERE job_id = $1", - migratedJobID, - ); err != nil { - return nil, err - } - runs := []Run{ - {RunID: uuid.NewString(), JobID: legacyJobID, JobSet: "js", Executor: executorName, Pool: "p"}, - {RunID: uuid.NewString(), JobID: migratedJobID, JobSet: "js", Executor: executorName, Pool: "p"}, - {RunID: uuid.NewString(), JobID: bothJobID, JobSet: "js", Executor: executorName, Pool: "p"}, - } - return runs, upsertRuns(ctx, db, runs) - } - - type expected struct { - groups []byte - submit []byte - } - - cases := []struct { - name string - phase JobMetadataMigrationPhase - want map[string]expected - }{ - { - name: "dualWrite prefers job_metadata when present", - phase: JobMetadataMigrationPhaseDualWrite, - want: map[string]expected{ - legacyJobID: {legacyGroups, legacySubmit}, // falls back to jobs - migratedJobID: {migratedGroups, migratedSubmit}, // from job_metadata - bothJobID: {bothGroupsOnMetadata, bothSubmitOnMetadata}, // job_metadata wins - }, - }, - { - name: "legacy phase reads only the jobs columns", - phase: JobMetadataMigrationPhaseLegacy, - want: map[string]expected{ - legacyJobID: {legacyGroups, legacySubmit}, - migratedJobID: {nil, nil}, - bothJobID: {bothGroupsOnJobs, bothSubmitOnJobs}, - }, - }, - { - name: "cutover phase reads only the job_metadata columns", - phase: JobMetadataMigrationPhaseCutover, - want: map[string]expected{ - legacyJobID: {nil, nil}, - migratedJobID: {migratedGroups, migratedSubmit}, - bothJobID: {bothGroupsOnMetadata, bothSubmitOnMetadata}, - }, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - err := WithTestDb(func(_ *Queries, db *pgxpool.Pool) error { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 10*time.Second) - defer cancel() - - runs, err := seed(ctx, db) - require.NoError(t, err) - - repo := NewPostgresJobRepository(db, defaultBatchSize, tc.phase) - leases, err := repo.FetchJobRunLeases(ctx, executorName, 100, nil) - require.NoError(t, err) - - runByJobID := map[string]string{} - for _, r := range runs { - runByJobID[r.JobID] = r.RunID - } - got := map[string]expected{} - for _, lease := range leases { - // Invert run-id -> job-id - for jobID, runID := range runByJobID { - if runID == lease.RunID { - got[jobID] = expected{lease.Groups, lease.SubmitMessage} - } - } - } - for jobID, want := range tc.want { - assert.Equal(t, want, got[jobID], "job %s", jobID) - } - return nil - }) - require.NoError(t, err) - }) - } -} - -func TestPruner_DeletesJobMetadataRows(t *testing.T) { - err := WithTestDb(func(_ *Queries, db *pgxpool.Pool) error { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 30*time.Second) - defer cancel() - - jobID := util.NewULID() - terminatedJob := Job{ - JobID: jobID, JobSet: "js", Queue: "q", SchedulingInfo: []byte{}, - Succeeded: true, - } - require.NoError(t, upsertJobs(ctx, db, []Job{terminatedJob})) - _, err := db.Exec(ctx, - "INSERT INTO job_metadata (job_id, submit_message, groups) VALUES ($1, $2, $3)", - jobID, []byte("submit"), []byte("groups"), - ) - require.NoError(t, err) - - // Backdate last_modified so the pruner considers this row. - _, err = db.Exec(ctx, - "UPDATE jobs SET last_modified = now() - interval '1 day' WHERE job_id = $1", jobID) - require.NoError(t, err) - - conn, err := db.Acquire(ctx) - require.NoError(t, err) - defer conn.Release() - require.NoError(t, PruneDb(ctx, conn.Conn(), 100, time.Minute, clocktesting.NewFakeClock(time.Now().Add(time.Hour)))) - - var metadataCount int - require.NoError(t, db.QueryRow(ctx, "SELECT count(*) FROM job_metadata WHERE job_id = $1", jobID).Scan(&metadataCount)) - assert.Equal(t, 0, metadataCount, "pruner should delete the job_metadata row") - - var jobCount int - require.NoError(t, db.QueryRow(ctx, "SELECT count(*) FROM jobs WHERE job_id = $1", jobID).Scan(&jobCount)) - assert.Equal(t, 0, jobCount, "pruner should delete the jobs row") - return nil - }) - require.NoError(t, err) -} diff --git a/internal/scheduler/database/job_repository.go b/internal/scheduler/database/job_repository.go index 1f1dc8dd012..2c8ba223630 100644 --- a/internal/scheduler/database/job_repository.go +++ b/internal/scheduler/database/job_repository.go @@ -68,38 +68,13 @@ type PostgresJobRepository struct { db *pgxpool.Pool // maximum number of rows to fetch from postgres in a single query batchSize int32 - // migrationPhase controls how FetchJobRunLeases resolves the submit_message - // and groups columns during the job_metadata lazy migration. - migrationPhase JobMetadataMigrationPhase - // Pre-built expressions computed from migrationPhase, used by FetchJobRunLeases - leaseGroupsExpr string - leaseSubmitMessageExpr string - leaseJoinClause string } -func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32, migrationPhase JobMetadataMigrationPhase) *PostgresJobRepository { - r := &PostgresJobRepository{ - db: db, - batchSize: batchSize, - migrationPhase: migrationPhase, +func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository { + return &PostgresJobRepository{ + db: db, + batchSize: batchSize, } - - switch migrationPhase { - case JobMetadataMigrationPhaseLegacy: - r.leaseGroupsExpr = "j.groups" - r.leaseSubmitMessageExpr = "j.submit_message" - r.leaseJoinClause = "" - case JobMetadataMigrationPhaseCutover: - r.leaseGroupsExpr = "jm.groups" - r.leaseSubmitMessageExpr = "jm.submit_message" - r.leaseJoinClause = "JOIN job_metadata jm ON j.job_id = jm.job_id" - default: - r.leaseGroupsExpr = "COALESCE(jm.groups, j.groups)" - r.leaseSubmitMessageExpr = "COALESCE(jm.submit_message, j.submit_message)" - r.leaseJoinClause = "LEFT JOIN job_metadata jm ON j.job_id = jm.job_id" - } - - return r } func (r *PostgresJobRepository) FetchInitialJobs(ctx *armadacontext.Context) ([]Job, []Run, *int64, *int64, error) { @@ -380,18 +355,18 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex } query := fmt.Sprintf(` - SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, %s, %s, jr.pod_requirements_overlay + SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, jm.groups, jm.submit_message, jr.pod_requirements_overlay FROM runs jr LEFT JOIN %s as tmp ON (tmp.run_id = jr.run_id) JOIN jobs j ON jr.job_id = j.job_id - %s + JOIN job_metadata jm ON j.job_id = jm.job_id WHERE jr.executor = $1 AND tmp.run_id IS NULL AND jr.terminated = false ORDER BY jr.serial LIMIT %d; - `, r.leaseGroupsExpr, r.leaseSubmitMessageExpr, tmpTable, r.leaseJoinClause, maxResults) + `, tmpTable, maxResults) rows, err := tx.Query(ctx, query, executor) if err != nil { diff --git a/internal/scheduler/database/job_repository_test.go b/internal/scheduler/database/job_repository_test.go index e6630f9196c..58cfc5be244 100644 --- a/internal/scheduler/database/job_repository_test.go +++ b/internal/scheduler/database/job_repository_test.go @@ -10,12 +10,14 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/compress" "github.com/armadaproject/armada/internal/common/database" + armadamaps "github.com/armadaproject/armada/internal/common/maps" protoutil "github.com/armadaproject/armada/internal/common/proto" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -31,7 +33,6 @@ func TestFetchInitialJobs(t *testing.T) { Queue: "test-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } expectedLeasedJob := Job{ @@ -69,7 +70,6 @@ func TestFetchInitialJobs(t *testing.T) { Queued: true, QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } expectedQueuedJob := Job{ @@ -89,7 +89,6 @@ func TestFetchInitialJobs(t *testing.T) { QueuedVersion: 1, Cancelled: true, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } cancelledJobRun := Run{ @@ -108,7 +107,6 @@ func TestFetchInitialJobs(t *testing.T) { QueuedVersion: 1, Failed: true, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } failedJobRun := Run{ @@ -128,7 +126,6 @@ func TestFetchInitialJobs(t *testing.T) { QueuedVersion: 1, Succeeded: true, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } succeededJobRun := Run{ @@ -476,7 +473,6 @@ func createTestJobs(numJobs int) ([]Job, []Job) { Succeeded: true, Failed: true, SchedulingInfo: []byte{byte(i)}, - SubmitMessage: []byte{}, } } @@ -502,6 +498,20 @@ func createTestJobs(numJobs int) ([]Job, []Job) { return dbJobs, expectedJobs } +func createTestJobMetadata(jobs []Job) map[string]JobMetadatum { + return armadamaps.FromSlice( + jobs, + func(job Job) string { return job.JobID }, + func(job Job) JobMetadatum { + return JobMetadatum{ + JobID: job.JobID, + SubmitMessage: []byte("submit-" + job.JobID), + Groups: []byte("groups-" + job.JobID), + } + }, + ) +} + func TestFindInactiveRuns(t *testing.T) { runIds := make([]string, 3) for i := 0; i < len(runIds); i++ { @@ -569,6 +579,7 @@ func TestFindInactiveRuns(t *testing.T) { func TestFetchJobRunLeases(t *testing.T) { const executorName = "testExecutor" dbJobs, _ := createTestJobs(5) + jobMetadata := createTestJobMetadata(dbJobs) // first three runs can be picked up by executor // last three runs are not available @@ -623,14 +634,15 @@ func TestFetchJobRunLeases(t *testing.T) { } expectedLeases := make([]*JobRunLease, 4) for i := range expectedLeases { + jobID := dbRuns[i].JobID expectedLeases[i] = &JobRunLease{ RunID: dbRuns[i].RunID, Queue: dbJobs[i].Queue, Pool: dbRuns[i].Pool, JobSet: dbJobs[i].JobSet, UserID: dbJobs[i].UserID, - Groups: dbJobs[i].Groups, - SubmitMessage: dbJobs[i].SubmitMessage, + SubmitMessage: jobMetadata[jobID].SubmitMessage, + Groups: jobMetadata[jobID].Groups, PodRequirementsOverlay: dbRuns[i].PodRequirementsOverlay, } } @@ -691,6 +703,8 @@ func TestFetchJobRunLeases(t *testing.T) { // Set up db err := upsertJobs(ctx, repo.db, tc.dbJobs) require.NoError(t, err) + err = upsertJobMetadata(ctx, repo.db, maps.Values(jobMetadata)) + require.NoError(t, err) err = upsertRuns(ctx, repo.db, tc.dbRuns) require.NoError(t, err) @@ -761,7 +775,6 @@ func TestSelectJobsByExecutorAndQueues(t *testing.T) { Queue: "test-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } otherQueueJob := Job{ JobID: util.NewULID(), @@ -769,7 +782,6 @@ func TestSelectJobsByExecutorAndQueues(t *testing.T) { Queue: "other-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } activeRun := Run{ @@ -884,7 +896,6 @@ func TestSelectLeasedJobsByQueue(t *testing.T) { Queue: "test-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } leasedRun := Run{ @@ -980,7 +991,6 @@ func TestSelectPendingJobsByQueue(t *testing.T) { Queue: "test-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } pendingRun := Run{ @@ -1066,7 +1076,6 @@ func TestSelectRunningJobsByQueue(t *testing.T) { Queue: "test-queue", QueuedVersion: 1, SchedulingInfo: []byte{byte(0)}, - SubmitMessage: []byte{}, } runningRun := Run{ @@ -1160,7 +1169,7 @@ func TestSelectRunningJobsByQueue(t *testing.T) { func withJobRepository(action func(repository *PostgresJobRepository) error) error { return WithTestDb(func(_ *Queries, db *pgxpool.Pool) error { - repo := NewPostgresJobRepository(db, defaultBatchSize, JobMetadataMigrationPhaseLegacy) + repo := NewPostgresJobRepository(db, defaultBatchSize) return action(repo) }) } @@ -1169,6 +1178,10 @@ func upsertJobs(ctx *armadacontext.Context, db *pgxpool.Pool, jobs []Job) error return database.UpsertWithTransaction(ctx, db, "jobs", jobs, database.WithExcludeColumns("terminated")) } +func upsertJobMetadata(ctx *armadacontext.Context, db *pgxpool.Pool, metadata []JobMetadatum) error { + return database.UpsertWithTransaction(ctx, db, "job_metadata", metadata) +} + func upsertRuns(ctx *armadacontext.Context, db *pgxpool.Pool, runs []Run) error { return database.UpsertWithTransaction(ctx, db, "runs", runs, database.WithExcludeColumns("terminated")) } diff --git a/internal/scheduler/database/migrations/040_drop_jobs_legacy_metadata_columns.sql b/internal/scheduler/database/migrations/040_drop_jobs_legacy_metadata_columns.sql new file mode 100644 index 00000000000..af1f6aa5cb7 --- /dev/null +++ b/internal/scheduler/database/migrations/040_drop_jobs_legacy_metadata_columns.sql @@ -0,0 +1,3 @@ +-- These columns have been migrated to job_metadata +ALTER TABLE jobs DROP COLUMN IF EXISTS submit_message; +ALTER TABLE jobs DROP COLUMN IF EXISTS groups; diff --git a/internal/scheduler/database/models.go b/internal/scheduler/database/models.go index a8cb88ae8a2..65d288176c2 100644 --- a/internal/scheduler/database/models.go +++ b/internal/scheduler/database/models.go @@ -30,7 +30,6 @@ type Job struct { Queue string `db:"queue"` UserID string `db:"user_id"` Submitted int64 `db:"submitted"` - Groups []byte `db:"groups"` Priority int64 `db:"priority"` Queued bool `db:"queued"` QueuedVersion int32 `db:"queued_version"` @@ -39,7 +38,6 @@ type Job struct { CancelByJobsetRequested bool `db:"cancel_by_jobset_requested"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` - SubmitMessage []byte `db:"submit_message"` SchedulingInfo []byte `db:"scheduling_info"` SchedulingInfoVersion int32 `db:"scheduling_info_version"` Serial int64 `db:"serial"` diff --git a/internal/scheduler/database/query.sql.go b/internal/scheduler/database/query.sql.go index 7e261b53b72..fbdcbf75593 100644 --- a/internal/scheduler/database/query.sql.go +++ b/internal/scheduler/database/query.sql.go @@ -539,8 +539,32 @@ func (q *Queries) SelectInitialRuns(ctx context.Context, arg SelectInitialRunsPa return items, nil } +const selectJobMetadata = `-- name: SelectJobMetadata :many +SELECT job_id, submit_message, groups FROM job_metadata WHERE job_id = ANY($1::text[]) +` + +func (q *Queries) SelectJobMetadata(ctx context.Context, jobIds []string) ([]JobMetadatum, error) { + rows, err := q.db.Query(ctx, selectJobMetadata, jobIds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []JobMetadatum + for rows.Next() { + var i JobMetadatum + if err := rows.Scan(&i.JobID, &i.SubmitMessage, &i.Groups); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const selectJobsByExecutorAndQueues = `-- name: SelectJobsByExecutorAndQueues :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM runs jr JOIN jobs j ON jr.job_id = j.job_id @@ -572,7 +596,6 @@ func (q *Queries) SelectJobsByExecutorAndQueues(ctx context.Context, arg SelectJ &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -581,7 +604,6 @@ func (q *Queries) SelectJobsByExecutorAndQueues(ctx context.Context, arg SelectJ &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -604,7 +626,7 @@ func (q *Queries) SelectJobsByExecutorAndQueues(ctx context.Context, arg SelectJ } const selectJobsByNodeAndExecutorAndQueues = `-- name: SelectJobsByNodeAndExecutorAndQueues :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM runs jr JOIN jobs j ON jr.job_id = j.job_id @@ -636,7 +658,6 @@ func (q *Queries) SelectJobsByNodeAndExecutorAndQueues(ctx context.Context, arg &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -645,7 +666,6 @@ func (q *Queries) SelectJobsByNodeAndExecutorAndQueues(ctx context.Context, arg &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -690,7 +710,7 @@ func (q *Queries) SelectLatestJobSerial(ctx context.Context) (int64, error) { } const selectLeasedJobsByQueue = `-- name: SelectLeasedJobsByQueue :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM runs jr JOIN jobs j ON jr.job_id = j.job_id @@ -722,7 +742,6 @@ func (q *Queries) SelectLeasedJobsByQueue(ctx context.Context, arg SelectLeasedJ &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -731,7 +750,6 @@ func (q *Queries) SelectLeasedJobsByQueue(ctx context.Context, arg SelectLeasedJ &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -776,7 +794,7 @@ func (q *Queries) SelectMaxRunSerial(ctx context.Context) (int64, error) { } const selectNewJobs = `-- name: SelectNewJobs :many -SELECT job_id, job_set, queue, user_id, submitted, groups, priority, queued, queued_version, cancel_requested, cancelled, cancel_by_jobset_requested, succeeded, failed, submit_message, scheduling_info, scheduling_info_version, serial, last_modified, validated, pools, bid_price, cancel_user, price_band, terminated FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2 +SELECT job_id, job_set, queue, user_id, submitted, priority, queued, queued_version, cancel_requested, cancelled, cancel_by_jobset_requested, succeeded, failed, scheduling_info, scheduling_info_version, serial, last_modified, validated, pools, bid_price, cancel_user, price_band, terminated FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2 ` type SelectNewJobsParams struct { @@ -799,7 +817,6 @@ func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([ &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -808,7 +825,6 @@ func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([ &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -947,7 +963,7 @@ func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsFor } const selectPendingJobsByQueue = `-- name: SelectPendingJobsByQueue :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM runs jr JOIN jobs j ON jr.job_id = j.job_id @@ -979,7 +995,6 @@ func (q *Queries) SelectPendingJobsByQueue(ctx context.Context, arg SelectPendin &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -988,7 +1003,6 @@ func (q *Queries) SelectPendingJobsByQueue(ctx context.Context, arg SelectPendin &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -1011,7 +1025,7 @@ func (q *Queries) SelectPendingJobsByQueue(ctx context.Context, arg SelectPendin } const selectQueuedJobsByQueue = `-- name: SelectQueuedJobsByQueue :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM jobs j WHERE j.queue = ANY($1::text[]) AND j.queued = true @@ -1038,7 +1052,6 @@ func (q *Queries) SelectQueuedJobsByQueue(ctx context.Context, arg SelectQueuedJ &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -1047,7 +1060,6 @@ func (q *Queries) SelectQueuedJobsByQueue(ctx context.Context, arg SelectQueuedJ &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, @@ -1095,7 +1107,7 @@ func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []string) ([]J } const selectRunningJobsByQueue = `-- name: SelectRunningJobsByQueue :many -SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.groups, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.submit_message, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated +SELECT j.job_id, j.job_set, j.queue, j.user_id, j.submitted, j.priority, j.queued, j.queued_version, j.cancel_requested, j.cancelled, j.cancel_by_jobset_requested, j.succeeded, j.failed, j.scheduling_info, j.scheduling_info_version, j.serial, j.last_modified, j.validated, j.pools, j.bid_price, j.cancel_user, j.price_band, j.terminated FROM runs jr JOIN jobs j ON jr.job_id = j.job_id @@ -1127,7 +1139,6 @@ func (q *Queries) SelectRunningJobsByQueue(ctx context.Context, arg SelectRunnin &i.Queue, &i.UserID, &i.Submitted, - &i.Groups, &i.Priority, &i.Queued, &i.QueuedVersion, @@ -1136,7 +1147,6 @@ func (q *Queries) SelectRunningJobsByQueue(ctx context.Context, arg SelectRunnin &i.CancelByJobsetRequested, &i.Succeeded, &i.Failed, - &i.SubmitMessage, &i.SchedulingInfo, &i.SchedulingInfoVersion, &i.Serial, diff --git a/internal/scheduler/database/query/query.sql b/internal/scheduler/database/query/query.sql index e19f6c91767..0b89aa39884 100644 --- a/internal/scheduler/database/query/query.sql +++ b/internal/scheduler/database/query/query.sql @@ -4,6 +4,9 @@ SELECT * FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2; -- name: SelectAllJobIds :many SELECT job_id FROM jobs; +-- name: SelectJobMetadata :many +SELECT * FROM job_metadata WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); + -- name: SelectMaxJobSerial :one SELECT serial FROM jobs ORDER BY serial DESC LIMIT 1; diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index eb80b73adac..88f1add4b68 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -3058,7 +3058,7 @@ func TestCycleConsistency(t *testing.T) { newScheduler := func(db *pgxpool.Pool) *Scheduler { scheduler, err := NewScheduler( testfixtures.NewJobDb(resourceListFactory), - database.NewPostgresJobRepository(db, 1024, schedulerdb.JobMetadataMigrationPhaseDualWrite), + database.NewPostgresJobRepository(db, 1024), &testExecutorRepository{ updateTimes: map[string]time.Time{"test-executor": testClock.Now()}, }, @@ -3100,7 +3100,6 @@ func TestCycleConsistency(t *testing.T) { time.Second, time.Second, 10*time.Second, - schedulerdb.JobMetadataMigrationPhaseDualWrite, ) // Create two scheduler using the same db connection. @@ -3440,7 +3439,7 @@ func dbOpsFromDbObjects( // jobUpdatesByJobId := make(map[string]*database.Job) insertJobsDbOp := make(scheduleringester.InsertJobs, len(jobUpdates)) for _, dbJob := range jobUpdates { - insertJobsDbOp[dbJob.JobID] = dbJob + insertJobsDbOp[dbJob.JobID] = &scheduleringester.JobInsertion{Job: dbJob} // jobUpdatesByJobId[dbJob.JobID] = dbJob } dbOps = scheduleringester.AppendDbOperation(dbOps, fixInsertJobsDbOp(insertJobsDbOp)) @@ -3496,7 +3495,7 @@ func dbOpsFromDbObjects( func fixInsertJobsDbOp(dbOp scheduleringester.InsertJobs) scheduleringester.InsertJobs { for _, job := range dbOp { // This field must be non-null when written to postgres. - job.SubmitMessage = make([]byte, 0) + job.Metadata.SubmitMessage = make([]byte, 0) } return dbOp } diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index eced47c831f..9520df8304d 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -110,11 +110,7 @@ func Run(config schedulerconfig.Configuration) error { return errors.WithMessage(err, "Error opening connection to postgres") } defer db.Close() - if err := config.JobMetadataMigrationPhase.Validate(); err != nil { - return errors.WithMessage(err, "invalid JobMetadataMigrationPhase") - } - ctx.Infof("Scheduler JobMetadataMigrationPhase: %q", config.JobMetadataMigrationPhase) - jobRepository := database.NewPostgresJobRepository(db, int32(config.DatabaseFetchSize), config.JobMetadataMigrationPhase) + jobRepository := database.NewPostgresJobRepository(db, int32(config.DatabaseFetchSize)) executorRepository := database.NewPostgresExecutorRepository(db) // //////////////////////////////////////////////////////////////////////// diff --git a/internal/scheduleringester/config.go b/internal/scheduleringester/config.go index 3dad1aee570..3a0f5beadcb 100644 --- a/internal/scheduleringester/config.go +++ b/internal/scheduleringester/config.go @@ -7,7 +7,6 @@ import ( commonconfig "github.com/armadaproject/armada/internal/common/config" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" - schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/server/configuration" ) @@ -26,10 +25,6 @@ type Configuration struct { BatchDuration time.Duration // If non-nil, configures pprof profiling Profiling *profilingconfig.ProfilingConfig - // JobMetadataMigrationPhase controls whether submit_message and groups are - // written to the jobs table, the job_metadata table, or both, during the - // migration. Required; default ("cutover") - JobMetadataMigrationPhase schedulerdb.JobMetadataMigrationPhase `validate:"required,oneof=legacy dualWrite cutover"` } func (c Configuration) Mutate() (commonconfig.Config, error) { diff --git a/internal/scheduleringester/dbops.go b/internal/scheduleringester/dbops.go index 5e879f3bd16..571a4c893fe 100644 --- a/internal/scheduleringester/dbops.go +++ b/internal/scheduleringester/dbops.go @@ -59,6 +59,16 @@ type JobRunDetails struct { DbRun *schedulerdb.Run } +type JobInsertion struct { + Job *schedulerdb.Job + Metadata JobInsertionMetadata +} + +type JobInsertionMetadata struct { + SubmitMessage []byte + Groups []byte +} + type JobQueuedStateUpdate struct { Queued bool QueuedStateVersion int32 @@ -185,7 +195,7 @@ func discardNilOps(ops []DbOperation) []DbOperation { } type ( - InsertJobs map[string]*schedulerdb.Job + InsertJobs map[string]*JobInsertion InsertRuns map[string]*JobRunDetails UpdateJobSetPriorities map[JobSetKey]int64 MarkJobSetsCancelRequested struct { @@ -454,7 +464,7 @@ func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool { switch op := b.(type) { case jobSetOperation: for _, job := range a { - if op.AffectsJobSet(job.Queue, job.JobSet) { + if op.AffectsJobSet(job.Job.Queue, job.Job.JobSet) { return false } } @@ -666,7 +676,7 @@ func (cq CancelQueue) CanBeAppliedBefore(b DbOperation) bool { func definesJobInSet[M ~map[JobSetKey]V, V any](a M, b DbOperation) bool { if op, ok := b.(InsertJobs); ok { for _, job := range op { - if _, ok := a[JobSetKey{queue: job.Queue, jobSet: job.JobSet}]; ok { + if _, ok := a[JobSetKey{queue: job.Job.Queue, jobSet: job.Job.JobSet}]; ok { return true } } @@ -691,7 +701,7 @@ func definesRunInSet[M ~map[JobSetKey]V, V any](a M, b DbOperation) bool { func definesJob[M ~map[string]V, V any](a M, b DbOperation) bool { if op, ok := b.(InsertJobs); ok { for _, job := range op { - if _, ok := a[job.JobID]; ok { + if _, ok := a[job.Job.JobID]; ok { return true } } diff --git a/internal/scheduleringester/dbops_test.go b/internal/scheduleringester/dbops_test.go index 6ed18f0d597..7f66cfc22ab 100644 --- a/internal/scheduleringester/dbops_test.go +++ b/internal/scheduleringester/dbops_test.go @@ -148,56 +148,56 @@ func TestDbOperationOptimisation(t *testing.T) { Ops []DbOperation // Ops sequence to optimise. }{ "InsertJobs": {N: 1, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, // 1 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 1 - InsertJobs{jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}}, // 1 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 1 + InsertJobs{jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}}, // 1 }}, "InsertJobs, InsertRuns": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 2 InsertRuns{runIds[1]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 2 InsertRuns{runIds[2]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[2], RunID: runIds[2]}}}, // 2 }}, "UpdateJobSetPriorities": {N: 3, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 - UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set1"}: 1}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 3 - UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set2"}: 2}, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 3 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 + UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set1"}: 1}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 3 + UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set2"}: 2}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 3 }}, "UpdateJobSetPriorities, UpdateJobPriorities": {N: 5, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 1 - &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 1}, []string{jobIds[0]}}, // 2 // 2 - UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set1"}: 2}, // 3 - &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 3}, []string{jobIds[1]}}, // 4 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set2"}}, // 1 - &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 4}, []string{jobIds[1]}}, // 5 - &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 4}, []string{jobIds[2]}}, // 5 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 1 + &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 1}, []string{jobIds[0]}}, // 2 // 2 + UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set1"}: 2}, // 3 + &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 3}, []string{jobIds[1]}}, // 4 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set2"}}}, // 1 + &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 4}, []string{jobIds[1]}}, // 5 + &UpdateJobPriorities{JobReprioritiseKey{JobSetKey{queue: testQueueName, jobSet: "set1"}, 4}, []string{jobIds[2]}}, // 5 }}, "MarkJobSetsCancelRequested": {N: 3, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 MarkJobSetsCancelRequested{ cancelUser: f.CancelUser, jobSets: map[JobSetKey]*JobSetCancelAction{ {queue: testQueueName, jobSet: "set1"}: {cancelQueued: true, cancelLeased: true}, }, }, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 3 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 3 MarkJobSetsCancelRequested{ cancelUser: f.CancelUser, jobSets: map[JobSetKey]*JobSetCancelAction{ {queue: testQueueName, jobSet: "set2"}: {cancelQueued: true, cancelLeased: true}, }, }, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 3 }}, "MarkJobSetsCancelRequested, MarkJobsCancelRequested": {N: 4, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 1 MarkJobsCancelRequested{ cancelUser: f.CancelUser, jobIds: map[JobSetKey][]string{ @@ -210,7 +210,7 @@ func TestDbOperationOptimisation(t *testing.T) { {queue: testQueueName, jobSet: "set1"}: {cancelQueued: true, cancelLeased: true}, }, }, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 4 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 4 MarkJobsCancelRequested{ cancelUser: f.CancelUser, jobIds: map[JobSetKey][]string{ @@ -225,80 +225,80 @@ func TestDbOperationOptimisation(t *testing.T) { }, // 4 }}, "MarkRunsForJobPreemptRequested": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 1 MarkRunsForJobPreemptRequested{JobSetKey{queue: testQueueName, jobSet: "set1"}: map[string]string{jobIds[0]: "test-reason"}}, // 2 // 2 MarkRunsForJobPreemptRequested{JobSetKey{queue: testQueueName, jobSet: "set1"}: map[string]string{jobIds[1]: "test-reason"}}, // 2 // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 1 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 1 MarkRunsForJobPreemptRequested{JobSetKey{queue: testQueueName, jobSet: "set1"}: map[string]string{jobIds[2]: "test-reason"}}, // 2 }}, "MarkJobsSucceeded": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 - MarkJobsSucceeded{jobIds[0]: true}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2 - MarkJobsSucceeded{jobIds[1]: true}, // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 + MarkJobsSucceeded{jobIds[0]: true}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 2 + MarkJobsSucceeded{jobIds[1]: true}, // 2 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 2 }}, "MarkJobsFailed": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 - MarkJobsFailed{jobIds[0]: true}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2 - MarkJobsFailed{jobIds[1]: true}, // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 + MarkJobsFailed{jobIds[0]: true}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 2 + MarkJobsFailed{jobIds[1]: true}, // 2 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 2 }}, "MarkJobsCancelled": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 - MarkJobsCancelled{jobIds[0]: time.Time{}}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 2 - MarkJobsCancelled{jobIds[1]: time.Time{}}, // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 2 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 + MarkJobsCancelled{jobIds[0]: time.Time{}}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 2 + MarkJobsCancelled{jobIds[1]: time.Time{}}, // 2 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 2 }}, "MarkRunsSucceeded": {N: 3, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 MarkRunsSucceeded{runIds[0]: time.Time{}}, // 3 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 MarkRunsSucceeded{runIds[1]: time.Time{}}, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 3 }}, "MarkRunsFailed": {N: 3, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 MarkRunsFailed{runIds[0]: &JobRunFailed{true, true, time.Time{}}}, // 3 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 MarkRunsFailed{runIds[1]: &JobRunFailed{true, true, time.Time{}}}, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 3 }}, "MarkRunsRunning": {N: 3, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 InsertRuns{runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}}, // 2 MarkRunsRunning{runIds[0]: time.Time{}}, // 3 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 3 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 3 InsertRuns{runIds[1]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[1]}}}, // 3 MarkRunsRunning{runIds[1]: time.Time{}}, // 3 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 3 }}, "InsertPartitionMarker": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 - &InsertPartitionMarker{markers: []*schedulerdb.Marker{}}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 1 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 + &InsertPartitionMarker{markers: []*schedulerdb.Marker{}}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 1 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 1 }}, "UpdateJobSchedulingInfo": {N: 2, Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}}, // 1 UpdateJobSchedulingInfo{jobIds[0]: &JobSchedulingInfoUpdate{[]byte("job 1"), 1}}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 1 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 1 UpdateJobSchedulingInfo{jobIds[1]: &JobSchedulingInfoUpdate{[]byte("job 2"), 1}}, // 2 - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2]}}, // 1 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2]}}}, // 1 UpdateJobSchedulingInfo{jobIds[2]: &JobSchedulingInfoUpdate{[]byte("job 3"), 1}}, // 2 }}, "UpdateJobQueuedState": {N: 2, Ops: []DbOperation{ - UpdateJobQueuedState{jobIds[0]: &JobQueuedStateUpdate{true, 1}}, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}}, // 1 - UpdateJobQueuedState{jobIds[1]: &JobQueuedStateUpdate{false, 1}}, // 2 - UpdateJobQueuedState{jobIds[2]: &JobQueuedStateUpdate{true, 3}}, // 2 + UpdateJobQueuedState{jobIds[0]: &JobQueuedStateUpdate{true, 1}}, // 2 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}}, // 1 + UpdateJobQueuedState{jobIds[1]: &JobQueuedStateUpdate{false, 1}}, // 2 + UpdateJobQueuedState{jobIds[2]: &JobQueuedStateUpdate{true, 3}}, // 2 }}, // No merging will occur for the below operations, so len(Ops) == N "UpsertExecutorSettings": {N: 1, Ops: []DbOperation{ @@ -363,11 +363,11 @@ func TestInsertJobRequestCancel(t *testing.T) { for i := 0; i < 2; i++ { job := &schedulerdb.Job{JobID: util.NewULID(), Queue: testQueueName, JobSet: "set1"} expectedCancelledIds[job.JobID] = true - ops = append(ops, InsertJobs{job.JobID: job}) + ops = append(ops, InsertJobs{job.JobID: &JobInsertion{Job: job}}) } for i := 0; i < 2; i++ { job := &schedulerdb.Job{JobID: util.NewULID(), Queue: testQueueName, JobSet: "set2"} - ops = append(ops, InsertJobs{job.JobID: job}) + ops = append(ops, InsertJobs{job.JobID: &JobInsertion{Job: job}}) } // Cancel one job set. @@ -381,11 +381,11 @@ func TestInsertJobRequestCancel(t *testing.T) { // Submit some more jobs to both job sets. for i := 0; i < 2; i++ { job := &schedulerdb.Job{JobID: util.NewULID(), Queue: testQueueName, JobSet: "set2"} - ops = append(ops, InsertJobs{job.JobID: job}) + ops = append(ops, InsertJobs{job.JobID: &JobInsertion{Job: job}}) } for i := 0; i < 2; i++ { job := &schedulerdb.Job{JobID: util.NewULID(), Queue: testQueueName, JobSet: "set1"} - ops = append(ops, InsertJobs{job.JobID: job}) + ops = append(ops, InsertJobs{job.JobID: &JobInsertion{Job: job}}) } // Apply ops to a database. @@ -453,8 +453,8 @@ func (db *mockDb) apply(op DbOperation) error { case InsertJobs: n := len(db.Jobs) for _, job := range o { - job := *job // Copy primitive types - db.Jobs[job.JobID] = &job + j := *job.Job // Copy primitive types + db.Jobs[j.JobID] = &j } if len(db.Jobs) != n+len(o) { return errors.New("duplicate job id") diff --git a/internal/scheduleringester/ingester.go b/internal/scheduleringester/ingester.go index 93c61a20df0..fe0578f6028 100644 --- a/internal/scheduleringester/ingester.go +++ b/internal/scheduleringester/ingester.go @@ -24,11 +24,6 @@ import ( // Run will create a pipeline that will take Armada event messages from Pulsar and update the schedulerDb. // This pipeline will run until a SIGTERM is received. func Run(config Configuration) error { - if err := config.JobMetadataMigrationPhase.Validate(); err != nil { - return errors.WithMessage(err, "invalid JobMetadataMigrationPhase") - } - log.Infof("SchedulerIngester JobMetadataMigrationPhase: %q", config.JobMetadataMigrationPhase) - svcMetrics := metrics.NewMetrics(metrics.ArmadaEventIngesterMetricsPrefix + "armada_scheduler_ingester_") log.Infof("opening connection pool to postgres") @@ -36,7 +31,7 @@ func Run(config Configuration) error { if err != nil { panic(errors.WithMessage(err, "Error opening connection to postgres")) } - schedulerDb := NewSchedulerDb(db, svcMetrics, 100*time.Millisecond, 60*time.Second, 5*time.Second, config.JobMetadataMigrationPhase) + schedulerDb := NewSchedulerDb(db, svcMetrics, 100*time.Millisecond, 60*time.Second, 5*time.Second) jobSetEventsConverter, err := NewJobSetEventsInstructionConverter(svcMetrics) if err != nil { diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index 57f249c53b9..4c303de6dad 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -177,20 +177,24 @@ func (c *JobSetEventsInstructionConverter) handleSubmitJob(job *armadaevents.Sub } } - return []DbOperation{InsertJobs{jobId: &schedulerdb.Job{ - JobID: jobId, - JobSet: meta.jobset, - UserID: meta.user, - Groups: compressedGroups, - Queue: meta.queue, - Queued: true, - QueuedVersion: 0, - Submitted: submitTime.UnixNano(), - Priority: int64(job.Priority), - SubmitMessage: compressedSubmitJobBytes, - SchedulingInfo: schedulingInfoBytes, - SchedulingInfoVersion: int32(schedulingInfo.Version), - PriceBand: pricingBand, + return []DbOperation{InsertJobs{jobId: &JobInsertion{ + Job: &schedulerdb.Job{ + JobID: jobId, + JobSet: meta.jobset, + UserID: meta.user, + Queue: meta.queue, + Queued: true, + QueuedVersion: 0, + Submitted: submitTime.UnixNano(), + Priority: int64(job.Priority), + SchedulingInfo: schedulingInfoBytes, + SchedulingInfoVersion: int32(schedulingInfo.Version), + PriceBand: pricingBand, + }, + Metadata: JobInsertionMetadata{ + SubmitMessage: compressedSubmitJobBytes, + Groups: compressedGroups, + }, }}}, nil } diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index 14f70bedf95..a34025169b0 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -35,36 +35,44 @@ func TestConvertEventSequence(t *testing.T) { }{ "submit": { events: []*armadaevents.EventSequence_Event{f.Submit}, - expected: []DbOperation{InsertJobs{f.JobId: &schedulerdb.Job{ - JobID: f.JobId, - JobSet: f.JobsetName, - UserID: f.UserId, - Groups: compress.MustCompressStringArray(f.Groups, compressor), - Queue: f.Queue, - Queued: true, - QueuedVersion: 0, - Priority: int64(f.Priority), - Submitted: f.BaseTime.UnixNano(), - SubmitMessage: protoutil.MustMarshallAndCompress(f.Submit.GetSubmitJob(), compressor), - SchedulingInfo: protoutil.MustMarshall(getExpectedSubmitMessageSchedulingInfo(t)), - PriceBand: 1, + expected: []DbOperation{InsertJobs{f.JobId: &JobInsertion{ + Job: &schedulerdb.Job{ + JobID: f.JobId, + JobSet: f.JobsetName, + UserID: f.UserId, + Queue: f.Queue, + Queued: true, + QueuedVersion: 0, + Priority: int64(f.Priority), + Submitted: f.BaseTime.UnixNano(), + SchedulingInfo: protoutil.MustMarshall(getExpectedSubmitMessageSchedulingInfo(t)), + PriceBand: 1, + }, + Metadata: JobInsertionMetadata{ + Groups: compress.MustCompressStringArray(f.Groups, compressor), + SubmitMessage: protoutil.MustMarshallAndCompress(f.Submit.GetSubmitJob(), compressor), + }, }}}, }, "submit with annotations we want to filter": { events: []*armadaevents.EventSequence_Event{f.SubmitWithIrrelevantAnnotations}, - expected: []DbOperation{InsertJobs{f.JobId: &schedulerdb.Job{ - JobID: f.JobId, - JobSet: f.JobsetName, - UserID: f.UserId, - Groups: compress.MustCompressStringArray(f.Groups, compressor), - Queue: f.Queue, - Queued: true, - QueuedVersion: 0, - Priority: int64(f.Priority), - Submitted: f.BaseTime.UnixNano(), - SubmitMessage: protoutil.MustMarshallAndCompress(f.SubmitWithIrrelevantAnnotations.GetSubmitJob(), compressor), - SchedulingInfo: protoutil.MustMarshall(getExpectedSubmitMessageSchedulingInfo(t)), - PriceBand: 1, + expected: []DbOperation{InsertJobs{f.JobId: &JobInsertion{ + Job: &schedulerdb.Job{ + JobID: f.JobId, + JobSet: f.JobsetName, + UserID: f.UserId, + Queue: f.Queue, + Queued: true, + QueuedVersion: 0, + Priority: int64(f.Priority), + Submitted: f.BaseTime.UnixNano(), + SchedulingInfo: protoutil.MustMarshall(getExpectedSubmitMessageSchedulingInfo(t)), + PriceBand: 1, + }, + Metadata: JobInsertionMetadata{ + Groups: compress.MustCompressStringArray(f.Groups, compressor), + SubmitMessage: protoutil.MustMarshallAndCompress(f.SubmitWithIrrelevantAnnotations.GetSubmitJob(), compressor), + }, }}}, }, "job run leased": { @@ -482,13 +490,13 @@ func assertOperationsEqual(t *testing.T, expectedOps []DbOperation, actualOps [] for k, expectedSubmit := range expectedOp.(InsertJobs) { actualSubmit, ok := actualSubmits[k] assert.True(t, ok) - assertSubmitMessagesEqual(t, expectedSubmit.SubmitMessage, actualSubmit.SubmitMessage) - assertSchedulingInfoEqual(t, expectedSubmit.SchedulingInfo, actualSubmit.SchedulingInfo) + assertSubmitMessagesEqual(t, expectedSubmit.Metadata.SubmitMessage, actualSubmit.Metadata.SubmitMessage) + assertSchedulingInfoEqual(t, expectedSubmit.Job.SchedulingInfo, actualSubmit.Job.SchedulingInfo) // nil out the byte arrays - actualSubmit.SchedulingInfo = nil - actualSubmit.SubmitMessage = nil - expectedSubmit.SchedulingInfo = nil - expectedSubmit.SubmitMessage = nil + actualSubmit.Job.SchedulingInfo = nil + actualSubmit.Metadata.SubmitMessage = nil + expectedSubmit.Job.SchedulingInfo = nil + expectedSubmit.Metadata.SubmitMessage = nil assert.Equal(t, expectedSubmit, actualSubmit) } case InsertJobRunErrors: diff --git a/internal/scheduleringester/job_metadata_migration_test.go b/internal/scheduleringester/job_metadata_migration_test.go deleted file mode 100644 index 96f1ff67fc1..00000000000 --- a/internal/scheduleringester/job_metadata_migration_test.go +++ /dev/null @@ -1,142 +0,0 @@ -package scheduleringester - -import ( - "testing" - "time" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/armadaproject/armada/internal/common/armadacontext" - schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" -) - -// TestInsertJobs_MigrationPhases verifies that an InsertJobs op routes -// submit_message and groups to the correct table(s) based on the configured -// migration phase of the SchedulerDb. -func TestInsertJobs_MigrationPhases(t *testing.T) { - cases := []struct { - name string - phase schedulerdb.JobMetadataMigrationPhase - wantJobsHasBlobs bool - wantMetadataRowWrote bool - }{ - {"legacy", schedulerdb.JobMetadataMigrationPhaseLegacy, true, false}, - {"dualWrite", schedulerdb.JobMetadataMigrationPhaseDualWrite, true, true}, - {"cutover", schedulerdb.JobMetadataMigrationPhaseCutover, false, true}, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - require.NoError(t, schedulerdb.WithTestDb(func(_ *schedulerdb.Queries, db *pgxpool.Pool) error { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 10*time.Second) - defer cancel() - - sdb := &SchedulerDb{db: db, migrationPhase: tc.phase} - - jobID := "job-" + string(tc.phase) - submitMessage := []byte("submit-" + string(tc.phase)) - groups := []byte("groups-" + string(tc.phase)) - - op := InsertJobs{jobID: &schedulerdb.Job{ - JobID: jobID, - JobSet: "set", - Queue: "queue", - SchedulingInfo: []byte{}, - SubmitMessage: submitMessage, - Groups: groups, - }} - - require.NoError(t, pgx.BeginTxFunc(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { - return sdb.WriteDbOp(ctx, tx, op) - })) - - var jobSubmitMessage, jobGroups []byte - require.NoError(t, db.QueryRow(ctx, - "SELECT submit_message, groups FROM jobs WHERE job_id = $1", jobID, - ).Scan(&jobSubmitMessage, &jobGroups)) - - if tc.wantJobsHasBlobs { - assert.Equal(t, submitMessage, jobSubmitMessage) - assert.Equal(t, groups, jobGroups) - } else { - assert.Nil(t, jobSubmitMessage) - assert.Nil(t, jobGroups) - } - - var metadataCount int - require.NoError(t, db.QueryRow(ctx, - "SELECT COUNT(*) FROM job_metadata WHERE job_id = $1", jobID, - ).Scan(&metadataCount)) - - if tc.wantMetadataRowWrote { - require.Equal(t, 1, metadataCount) - var metadataSubmitMessage, metadataGroups []byte - require.NoError(t, db.QueryRow(ctx, - "SELECT submit_message, groups FROM job_metadata WHERE job_id = $1", jobID, - ).Scan(&metadataSubmitMessage, &metadataGroups)) - assert.Equal(t, submitMessage, metadataSubmitMessage) - assert.Equal(t, groups, metadataGroups) - } else { - assert.Equal(t, 0, metadataCount) - } - - return nil - })) - }) - } -} - -// TestInsertJobs_CutoverPreservesLegacyColumns verifies that upserting an -// existing jobs row in cutover phase does not overwrite its legacy -// submit_message/groups columns (they must stay readable for any not-yet- -// backfilled rows). -func TestInsertJobs_CutoverPreservesLegacyColumns(t *testing.T) { - require.NoError(t, schedulerdb.WithTestDb(func(_ *schedulerdb.Queries, db *pgxpool.Pool) error { - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 10*time.Second) - defer cancel() - - legacySdb := &SchedulerDb{db: db, migrationPhase: schedulerdb.JobMetadataMigrationPhaseLegacy} - cutoverSdb := &SchedulerDb{db: db, migrationPhase: schedulerdb.JobMetadataMigrationPhaseCutover} - - jobID := "job-preserve" - legacySubmitMessage := []byte("legacy-sm") - legacyGroups := []byte("legacy-g") - - legacyOp := InsertJobs{jobID: &schedulerdb.Job{ - JobID: jobID, - JobSet: "set", - Queue: "queue", - SchedulingInfo: []byte{}, - SubmitMessage: legacySubmitMessage, - Groups: legacyGroups, - }} - require.NoError(t, pgx.BeginTxFunc(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { - return legacySdb.WriteDbOp(ctx, tx, legacyOp) - })) - - // Re-upsert the same job in cutover phase. Legacy columns must be untouched. - cutoverOp := InsertJobs{jobID: &schedulerdb.Job{ - JobID: jobID, - JobSet: "set", - Queue: "queue", - SchedulingInfo: []byte{}, - SubmitMessage: []byte("should-not-be-written"), - Groups: []byte("should-not-be-written"), - }} - require.NoError(t, pgx.BeginTxFunc(ctx, db, pgx.TxOptions{}, func(tx pgx.Tx) error { - return cutoverSdb.WriteDbOp(ctx, tx, cutoverOp) - })) - - var jobSubmitMessage, jobGroups []byte - require.NoError(t, db.QueryRow(ctx, - "SELECT submit_message, groups FROM jobs WHERE job_id = $1", jobID, - ).Scan(&jobSubmitMessage, &jobGroups)) - assert.Equal(t, legacySubmitMessage, jobSubmitMessage) - assert.Equal(t, legacyGroups, jobGroups) - - return nil - })) -} diff --git a/internal/scheduleringester/schedulerdb.go b/internal/scheduleringester/schedulerdb.go index 78733872e4e..e6ad653d54c 100644 --- a/internal/scheduleringester/schedulerdb.go +++ b/internal/scheduleringester/schedulerdb.go @@ -33,7 +33,6 @@ type SchedulerDb struct { initialBackOff time.Duration maxBackOff time.Duration lockTimeout time.Duration - migrationPhase schedulerdb.JobMetadataMigrationPhase } func NewSchedulerDb( @@ -42,7 +41,6 @@ func NewSchedulerDb( initialBackOff time.Duration, maxBackOff time.Duration, lockTimeout time.Duration, - migrationPhase schedulerdb.JobMetadataMigrationPhase, ) *SchedulerDb { return &SchedulerDb{ db: db, @@ -50,7 +48,6 @@ func NewSchedulerDb( initialBackOff: initialBackOff, maxBackOff: maxBackOff, lockTimeout: lockTimeout, - migrationPhase: migrationPhase, } } @@ -103,35 +100,20 @@ func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOper queries := schedulerdb.New(tx) switch o := op.(type) { case InsertJobs: - if s.migrationPhase.WritesJobMetadata() { - specs := make([]any, 0, len(o)) - for _, v := range o { - specs = append(specs, schedulerdb.JobMetadatum{ - JobID: v.JobID, - SubmitMessage: v.SubmitMessage, - Groups: v.Groups, - }) - } - if err := database.Upsert(ctx, tx, "job_metadata", specs); err != nil { - return err - } - } + metadata := make([]any, 0, len(o)) records := make([]any, 0, len(o)) for _, v := range o { - job := *v - if !s.migrationPhase.WritesJobs() { - // Cutover phase: submit_message and groups live only in job_metadata. - job.SubmitMessage = nil - job.Groups = nil - } - records = append(records, job) + metadata = append(metadata, schedulerdb.JobMetadatum{ + JobID: v.Job.JobID, + SubmitMessage: v.Metadata.SubmitMessage, + Groups: v.Metadata.Groups, + }) + records = append(records, *v.Job) } - excluded := []string{"terminated"} - if !s.migrationPhase.WritesJobs() { - excluded = append(excluded, "submit_message", "groups") + if err := database.Upsert(ctx, tx, "job_metadata", metadata); err != nil { + return err } - err := database.Upsert(ctx, tx, "jobs", records, database.WithExcludeColumns(excluded...)) - if err != nil { + if err := database.Upsert(ctx, tx, "jobs", records, database.WithExcludeColumns("terminated")); err != nil { return err } case InsertRuns: diff --git a/internal/scheduleringester/schedulerdb_test.go b/internal/scheduleringester/schedulerdb_test.go index 7611f139942..92465786780 100644 --- a/internal/scheduleringester/schedulerdb_test.go +++ b/internal/scheduleringester/schedulerdb_test.go @@ -43,18 +43,18 @@ func TestWriteOps(t *testing.T) { }{ "InsertJobs": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, }, InsertJobs{ - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, }}, "Submit Check": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, }, MarkJobsValidated{ jobIds[0]: []string{"cpu"}, @@ -63,10 +63,10 @@ func TestWriteOps(t *testing.T) { }}, "InsertRuns": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -85,20 +85,20 @@ func TestWriteOps(t *testing.T) { }}, "UpdateJobSetPriorities": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, - jobIds[4]: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set1"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, + jobIds[4]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set1"}}, }, UpdateJobSetPriorities{JobSetKey{queue: testQueueName, jobSet: "set1"}: 1}, }}, "UpdateJobPriorities": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, }, &UpdateJobPriorities{ key: JobReprioritiseKey{ @@ -113,11 +113,11 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsForJobPreemptRequested": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set2"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: "queue-2", JobSet: "set1"}, - jobIds[4]: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set2"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: "queue-2", JobSet: "set1"}}, + jobIds[4]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0], Queue: testQueueName, JobSet: "set1"}}, @@ -130,10 +130,10 @@ func TestWriteOps(t *testing.T) { }}, "PreemptNode": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: "queue-2", JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: "queue-2", JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0], Queue: testQueueName, JobSet: "set1", Executor: "executor-1", Node: "node-1"}}, @@ -147,8 +147,8 @@ func TestWriteOps(t *testing.T) { }}, "PreemptNode - PriorityClass": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", SchedulingInfo: mustMarshalSchedulingInfo(t, "pc-1")}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1", SchedulingInfo: mustMarshalSchedulingInfo(t, "pc-2")}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", SchedulingInfo: mustMarshalSchedulingInfo(t, "pc-1")}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1", SchedulingInfo: mustMarshalSchedulingInfo(t, "pc-2")}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0], Queue: testQueueName, JobSet: "set1", Executor: "executor-1", Node: "node-1"}}, @@ -165,11 +165,11 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobSetsCancelRequested": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, - jobIds[4]: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set1"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, + jobIds[4]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[4], Queue: "queue-2", JobSet: "set1"}}, }, MarkJobSetsCancelRequested{ cancelUser: testfixtures.CancelUser, @@ -180,10 +180,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobSetsCancelRequested - Queued only": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", Queued: true}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2", Queued: true}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1", Queued: false}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2", Queued: false}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", Queued: true}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2", Queued: true}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1", Queued: false}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2", Queued: false}}, }, MarkJobSetsCancelRequested{ cancelUser: testfixtures.CancelUser, @@ -194,10 +194,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobSetsCancelRequested - Leased only": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", Queued: true}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2", Queued: true}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1", Queued: false}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2", Queued: false}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1", Queued: true}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2", Queued: true}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1", Queued: false}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2", Queued: false}}, }, MarkJobSetsCancelRequested{ cancelUser: testfixtures.CancelUser, @@ -208,10 +208,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobsCancelRequested": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], Queue: testQueueName, JobSet: "set2"}}, }, MarkJobsCancelRequested{ cancelUser: testfixtures.CancelUser, @@ -223,10 +223,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobsCancelled": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -241,10 +241,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobsSucceeded": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, MarkJobsSucceeded{ jobIds[0]: true, @@ -253,8 +253,8 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsPending": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -267,8 +267,8 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsPreempted": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0]}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1]}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0]}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1]}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -280,10 +280,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkJobsFailed": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, MarkJobsFailed{ jobIds[0]: true, @@ -292,10 +292,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsSucceeded": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -310,8 +310,8 @@ func TestWriteOps(t *testing.T) { }}, "UpdateJobSchedulingInfo": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, }, UpdateJobSchedulingInfo{ jobIds[0]: &JobSchedulingInfoUpdate{ @@ -340,10 +340,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsFailed": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -359,10 +359,10 @@ func TestWriteOps(t *testing.T) { }}, "MarkRunsRunning": {Ops: []DbOperation{ InsertJobs{ - jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}, - jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}, - jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}, - jobIds[3]: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}, + jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], JobSet: "set1"}}, + jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], JobSet: "set2"}}, + jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], JobSet: "set1"}}, + jobIds[3]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[3], JobSet: "set2"}}, }, InsertRuns{ runIds[0]: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobIds[0], RunID: runIds[0]}}, @@ -462,7 +462,7 @@ func TestWriteOps(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { err := schedulerdb.WithTestDb(func(_ *schedulerdb.Queries, db *pgxpool.Pool) error { - schedulerDb := &SchedulerDb{db: db, migrationPhase: schedulerdb.JobMetadataMigrationPhaseLegacy} + schedulerDb := &SchedulerDb{db: db} serials := make(map[string]int64) for _, op := range tc.Ops { err := assertOpSuccess(t, schedulerDb, serials, addDefaultValues(op)) @@ -490,28 +490,28 @@ func TestScoping(t *testing.T) { }{ "Single DbOp jobSet scope": { Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, }, Scope: JobSetEventsLockKey, Error: false, }, "Multiple DbOp jobSet scope": { Ops: []DbOperation{ - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, // 1 + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, // 1 MarkJobSetsCancelRequested{ cancelUser: testfixtures.CancelUser, jobSets: map[JobSetKey]*JobSetCancelAction{ {queue: testQueueName, jobSet: "set1"}: {cancelQueued: true, cancelLeased: true}, }, }, // 2 - InsertJobs{jobIds[1]: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}, // 3 + InsertJobs{jobIds[1]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[1], Queue: testQueueName, JobSet: "set1"}}}, // 3 MarkJobSetsCancelRequested{ cancelUser: testfixtures.CancelUser, jobSets: map[JobSetKey]*JobSetCancelAction{ {queue: testQueueName, jobSet: "set2"}: {cancelQueued: true, cancelLeased: true}, }, // 3 }, - InsertJobs{jobIds[2]: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}, // 3 + InsertJobs{jobIds[2]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[2], Queue: testQueueName, JobSet: "set1"}}}, // 3 }, Scope: JobSetEventsLockKey, Error: false, @@ -552,7 +552,7 @@ func TestScoping(t *testing.T) { DeleteExecutorSettings{executorIds[0]: &ExecutorSettingsDelete{ ExecutorID: testfixtures.ExecutorId, }}, - InsertJobs{jobIds[0]: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}, + InsertJobs{jobIds[0]: &JobInsertion{Job: &schedulerdb.Job{JobID: jobIds[0], Queue: testQueueName, JobSet: "set1"}}}, }, Scope: JobSetEventsLockKey, Error: false, @@ -573,14 +573,14 @@ func addDefaultValues(op DbOperation) DbOperation { switch o := op.(type) { case InsertJobs: for _, job := range o { - if job.Groups == nil { - job.Groups = make([]byte, 0) + if job.Metadata.Groups == nil { + job.Metadata.Groups = make([]byte, 0) } - if job.SubmitMessage == nil { - job.SubmitMessage = make([]byte, 0) + if job.Metadata.SubmitMessage == nil { + job.Metadata.SubmitMessage = make([]byte, 0) } - if job.SchedulingInfo == nil { - job.SchedulingInfo = make([]byte, 0) + if job.Job.SchedulingInfo == nil { + job.Job.SchedulingInfo = make([]byte, 0) } } case InsertRuns: @@ -627,17 +627,19 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] actual := make(InsertJobs) for _, job := range jobs { job := job - actual[job.JobID] = &job + actual[job.JobID] = &JobInsertion{Job: &job} serials["jobs"] = max(serials["jobs"], job.Serial) if v, ok := expected[job.JobID]; ok { - v.Serial = job.Serial - v.LastModified = job.LastModified + v.Job.Serial = job.Serial + v.Job.LastModified = job.LastModified } } for k, v := range expected { // Compute Terminated field to match database GENERATED column: terminated = cancelled OR succeeded OR failed - terminated := v.Cancelled || v.Succeeded || v.Failed - v.Terminated = &terminated + terminated := v.Job.Cancelled || v.Job.Succeeded || v.Job.Failed + v.Job.Terminated = &terminated + v.Metadata.SubmitMessage = nil + v.Metadata.Groups = nil assert.Equal(t, v, actual[k]) } case InsertRuns: @@ -1152,24 +1154,31 @@ func assertOpSuccess(t *testing.T, schedulerDb *SchedulerDb, serials map[string] func TestStore(t *testing.T) { jobId := util.ULID().String() runId := uuid.NewString() + groups := []byte("groups") + submitMessage := []byte("submit-message") ops := []DbOperation{ InsertJobs{ - jobId: &schedulerdb.Job{ - JobID: jobId, - JobSet: "set1", - Groups: make([]byte, 0), - SubmitMessage: make([]byte, 0), - SchedulingInfo: make([]byte, 0), + jobId: &JobInsertion{ + Job: &schedulerdb.Job{ + JobID: jobId, + Queue: testQueueName, + JobSet: "set1", + SchedulingInfo: make([]byte, 0), + }, + Metadata: JobInsertionMetadata{ + Groups: groups, + SubmitMessage: submitMessage, + }, }, }, InsertRuns{ - runId: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobId, RunID: runId}}, + runId: &JobRunDetails{Queue: testQueueName, DbRun: &schedulerdb.Run{JobID: jobId, RunID: runId, Executor: "executor", Pool: "pool"}}, }, } ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) defer cancel() err := schedulerdb.WithTestDb(func(q *schedulerdb.Queries, db *pgxpool.Pool) error { - schedulerDb := NewSchedulerDb(db, metrics.NewMetrics("test"), time.Second, time.Second, 10*time.Second, schedulerdb.JobMetadataMigrationPhaseLegacy) + schedulerDb := NewSchedulerDb(db, metrics.NewMetrics("test"), time.Second, time.Second, 10*time.Second) err := schedulerDb.Store(ctx, &DbOperationsWithMessageIds{Ops: ops}) require.NoError(t, err) @@ -1181,6 +1190,10 @@ func TestStore(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{runId}, runIds) + metadata, err := q.SelectJobMetadata(ctx, []string{jobId}) + require.NoError(t, err) + require.Equal(t, []schedulerdb.JobMetadatum{{JobID: jobId, SubmitMessage: submitMessage, Groups: groups}}, metadata) + return nil }) require.NoError(t, err) @@ -1215,7 +1228,7 @@ func TestPoolFiltering(t *testing.T) { job := func(id string, queued bool, pools []string) *schedulerdb.Job { return &schedulerdb.Job{ JobID: id, Queue: q, JobSet: js, Queued: queued, Pools: pools, - SchedulingInfo: info, Groups: []byte{}, SubmitMessage: []byte{}, + SchedulingInfo: info, } } run := func(id, jobID, pool string) *schedulerdb.Run { @@ -1343,7 +1356,7 @@ func TestPoolFiltering(t *testing.T) { insertJobs := InsertJobs{} for _, j := range tc.jobs { - insertJobs[j.JobID] = j + insertJobs[j.JobID] = &JobInsertion{Job: j, Metadata: JobInsertionMetadata{SubmitMessage: []byte{}, Groups: []byte{}}} } execTx(insertJobs)