Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pulsarSendTimeout: 5s
internedStringsCacheSize: 100000
queueRefreshPeriod: 10s
publishMetricsToPulsar: false
jobMetadataMigrationPhase: cutover
metrics:
port: 9000
jobStateMetricsResetInterval: 12h
Expand Down
1 change: 0 additions & 1 deletion config/scheduleringester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ pulsar:
subscriptionName: "scheduler-ingester"
batchSize: 10000
batchDuration: 500ms
jobMetadataMigrationPhase: cutover
9 changes: 0 additions & 9 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
"github.com/armadaproject/armada/internal/common/types"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/server/configuration"
"github.com/armadaproject/armada/pkg/client"
)
Expand Down Expand Up @@ -91,14 +90,6 @@ type Configuration struct {
PricingApi PricingApiConfig
// Whether to publish metrics To Pulsar. This is currently experimental
PublishMetricsToPulsar bool
// JobMetadataMigrationPhase controls whether submit_message and groups are
// read from the jobs table, the job_metadata table, or both (coalesced),
// during the migration. Required; default ("cutover").
// Operators must coordinate this value with the scheduleringester's JobMetadataMigrationPhase:
// a mismatch can cause new submissions to be unreadable
// (e.g. ingester=cutover with scheduler=legacy omits submit_message/groups from leases)
// or to fail entirely (e.g. scheduler=cutover while rows without a job_metadata counterpart still exist).
JobMetadataMigrationPhase schedulerdb.JobMetadataMigrationPhase `validate:"required,oneof=legacy dualWrite cutover"`
}

type SubmitCheckConfig struct {
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/configuration/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

commonconfig "github.com/armadaproject/armada/internal/common/config"
"github.com/armadaproject/armada/internal/common/types"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
)

func TestMutate(t *testing.T) {
Expand Down Expand Up @@ -160,7 +159,6 @@ func createValidMinimalConfig() Configuration {
Pulsar: commonconfig.PulsarConfig{
URL: "pulsar",
},
JobMetadataMigrationPhase: schedulerdb.JobMetadataMigrationPhaseLegacy,
}
}

Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/database/db_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func populateRequiredJobFields(job Job) Job {
job.Queue = "test-queue"
job.UserID = ""
job.Submitted = 0
job.SubmitMessage = []byte{}
job.SchedulingInfo = []byte{}
return job
}
Expand Down
46 changes: 0 additions & 46 deletions internal/scheduler/database/job_metadata_migration.go

This file was deleted.

181 changes: 0 additions & 181 deletions internal/scheduler/database/job_metadata_migration_test.go

This file was deleted.

39 changes: 7 additions & 32 deletions internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,13 @@ type PostgresJobRepository struct {
db *pgxpool.Pool
// maximum number of rows to fetch from postgres in a single query
batchSize int32
// migrationPhase controls how FetchJobRunLeases resolves the submit_message
// and groups columns during the job_metadata lazy migration.
migrationPhase JobMetadataMigrationPhase
// Pre-built expressions computed from migrationPhase, used by FetchJobRunLeases
leaseGroupsExpr string
leaseSubmitMessageExpr string
leaseJoinClause string
}

func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32, migrationPhase JobMetadataMigrationPhase) *PostgresJobRepository {
r := &PostgresJobRepository{
db: db,
batchSize: batchSize,
migrationPhase: migrationPhase,
func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository {
return &PostgresJobRepository{
db: db,
batchSize: batchSize,
}

switch migrationPhase {
case JobMetadataMigrationPhaseLegacy:
r.leaseGroupsExpr = "j.groups"
r.leaseSubmitMessageExpr = "j.submit_message"
r.leaseJoinClause = ""
case JobMetadataMigrationPhaseCutover:
r.leaseGroupsExpr = "jm.groups"
r.leaseSubmitMessageExpr = "jm.submit_message"
r.leaseJoinClause = "JOIN job_metadata jm ON j.job_id = jm.job_id"
default:
r.leaseGroupsExpr = "COALESCE(jm.groups, j.groups)"
r.leaseSubmitMessageExpr = "COALESCE(jm.submit_message, j.submit_message)"
r.leaseJoinClause = "LEFT JOIN job_metadata jm ON j.job_id = jm.job_id"
}

return r
}

func (r *PostgresJobRepository) FetchInitialJobs(ctx *armadacontext.Context) ([]Job, []Run, *int64, *int64, error) {
Expand Down Expand Up @@ -380,18 +355,18 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
}

query := fmt.Sprintf(`
SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, %s, %s, jr.pod_requirements_overlay
SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, jm.groups, jm.submit_message, jr.pod_requirements_overlay
FROM runs jr
LEFT JOIN %s as tmp ON (tmp.run_id = jr.run_id)
JOIN jobs j
ON jr.job_id = j.job_id
%s
JOIN job_metadata jm ON j.job_id = jm.job_id
WHERE jr.executor = $1
AND tmp.run_id IS NULL
AND jr.terminated = false
ORDER BY jr.serial
LIMIT %d;
`, r.leaseGroupsExpr, r.leaseSubmitMessageExpr, tmpTable, r.leaseJoinClause, maxResults)
`, tmpTable, maxResults)

rows, err := tx.Query(ctx, query, executor)
if err != nil {
Expand Down
Loading
Loading