From 3af6b9fdfdc090e6459931dab88b73027b636b85 Mon Sep 17 00:00:00 2001 From: Dejan Zele Pejchev Date: Thu, 25 Jun 2026 14:55:06 +0200 Subject: [PATCH] Source scheduler failure metric labels from Error.FailureCategory/FailureSubcategory and remove TrackedErrorRegexes Signed-off-by: Dejan Zele Pejchev --- .../scheduler/configuration/configuration.go | 4 -- internal/scheduler/metrics/metrics.go | 16 ++------ internal/scheduler/metrics/state_metrics.go | 38 +------------------ .../scheduler/metrics/state_metrics_test.go | 24 +++++------- internal/scheduler/scheduler_test.go | 2 +- internal/scheduler/schedulerapp.go | 6 +-- 6 files changed, 17 insertions(+), 73 deletions(-) diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 665f074aec1..42fe76c2877 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -147,10 +147,6 @@ type MetricsConfig struct { // Used to calculate job seconds lost to preemption // Calculate as if the job checkpoints at these different intervals JobCheckpointIntervals []time.Duration - // Regexes used for job error categorisation. - // Specifically, the subCategory label for job failure counters is the first regex that matches the job error. - // If no regex matches, the subCategory label is the empty string. - TrackedErrorRegexes []string // Metrics are exported for these resources. TrackedResourceNames []v1.ResourceName // Node label key used to identify which scalable unit it belongs to. diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go index d96959d47b1..99613a3b0ba 100644 --- a/internal/scheduler/metrics/metrics.go +++ b/internal/scheduler/metrics/metrics.go @@ -1,10 +1,8 @@ package metrics import ( - "regexp" "time" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" @@ -24,19 +22,11 @@ type resettableMetric interface { Reset() } -func New(errorRegexes []string, trackedResourceNames []v1.ResourceName, jobCheckpointIntervals []time.Duration, jobStateMetricsResetInterval time.Duration, publisher pulsarutils.Publisher[*metricevents.Event], scalableUnitLabelKey string) (*Metrics, error) { - compiledErrorRegexes := make([]*regexp.Regexp, len(errorRegexes)) - for i, errorRegex := range errorRegexes { - if r, err := regexp.Compile(errorRegex); err != nil { - return nil, errors.WithStack(err) - } else { - compiledErrorRegexes[i] = r - } - } +func New(trackedResourceNames []v1.ResourceName, jobCheckpointIntervals []time.Duration, jobStateMetricsResetInterval time.Duration, publisher pulsarutils.Publisher[*metricevents.Event], scalableUnitLabelKey string) *Metrics { return &Metrics{ cycleMetrics: newCycleMetrics(publisher, scalableUnitLabelKey), - jobStateMetrics: newJobStateMetrics(compiledErrorRegexes, trackedResourceNames, jobCheckpointIntervals, jobStateMetricsResetInterval), - }, nil + jobStateMetrics: newJobStateMetrics(trackedResourceNames, jobCheckpointIntervals, jobStateMetricsResetInterval), + } } // DisableLeaderMetrics stops leader metrics from being produced. This is necessary because we only produce diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 774b7a2cd89..3b135ccf5df 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -2,7 +2,6 @@ package metrics import ( "math" - "regexp" "strings" "time" @@ -14,7 +13,6 @@ import ( ) type jobStateMetrics struct { - errorRegexes []*regexp.Regexp resetInterval time.Duration jobCheckpointIntervals []time.Duration lastResetTime time.Time @@ -35,7 +33,6 @@ type jobStateMetrics struct { } func newJobStateMetrics( - errorRegexes []*regexp.Regexp, trackedResourceNames []v1.ResourceName, jobCheckpointIntervals []time.Duration, resetInterval time.Duration, @@ -100,7 +97,7 @@ func newJobStateMetrics( jobErrorsByNode := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: ArmadaSchedulerMetricsPrefix + "job_error_classification_by_node", - Help: "Failed jobs ey error classification at the node level", + Help: "Failed jobs by error classification at the node level", }, []string{nodeLabel, poolLabel, clusterLabel, errorCategoryLabel, errorSubcategoryLabel}, ) @@ -112,7 +109,6 @@ func newJobStateMetrics( []string{queueLabel, poolLabel, checkpointLabel, resourceLabel}, ) return &jobStateMetrics{ - errorRegexes: errorRegexes, trackedResourceNames: trackedResourceNames, jobCheckpointIntervals: jobCheckpointIntervals, resetInterval: resetInterval, @@ -234,7 +230,7 @@ func (m *jobStateMetrics) ReportStateTransitions( m.updateStateDuration(job, failed, priorState, duration) m.completedRunDurations.WithLabelValues(job.Queue(), run.Pool()).Observe(duration) jobRunError := jobRunErrorsByRunId[run.Id()] - category, subCategory := m.failedCategoryAndSubCategoryFromJob(jobRunError) + category, subCategory := jobRunError.GetFailureCategory(), jobRunError.GetFailureSubcategory() m.jobErrorsByQueue.WithLabelValues(job.Queue(), run.Pool(), category, subCategory).Inc() m.jobErrorsByNode.WithLabelValues(run.NodeName(), run.Pool(), run.Executor(), category, subCategory).Inc() } @@ -294,16 +290,6 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio } } -func (m *jobStateMetrics) failedCategoryAndSubCategoryFromJob(err *armadaevents.Error) (string, string) { - category, message := errorTypeAndMessageFromError(err) - for _, r := range m.errorRegexes { - if r.MatchString(message) { - return category, r.String() - } - } - return category, "" -} - func (m *jobStateMetrics) reset() { m.jobStateCounterByNode.Reset() for _, metric := range m.allMetrics { @@ -357,23 +343,3 @@ func stateDuration(job *jobdb.Job, run *jobdb.JobRun, stateTime *time.Time) (flo // succeeded, failed, cancelled, preempted are not prior states return stateTime.Sub(*priorTime).Seconds(), prior } - -func errorTypeAndMessageFromError(err *armadaevents.Error) (string, string) { - if err == nil { - return "", "" - } - // The following errors relate to job run failures. - // We do not process JobRunPreemptedError as there is separate metric for preemption. - switch reason := err.Reason.(type) { - case *armadaevents.Error_LeaseExpired: - return "leaseExpired", "" - case *armadaevents.Error_PodError: - return "podError", reason.PodError.Message - case *armadaevents.Error_PodLeaseReturned: - return "podLeaseReturned", reason.PodLeaseReturned.Message - case *armadaevents.Error_JobRunPreemptedError: - return "jobRunPreempted", "" - default: - return "", "" - } -} diff --git a/internal/scheduler/metrics/state_metrics_test.go b/internal/scheduler/metrics/state_metrics_test.go index 8bf836ca717..7557459a647 100644 --- a/internal/scheduler/metrics/state_metrics_test.go +++ b/internal/scheduler/metrics/state_metrics_test.go @@ -1,7 +1,6 @@ package metrics import ( - "regexp" "strings" "testing" "time" @@ -10,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -44,7 +42,6 @@ func TestReportJobStateTransitions(t *testing.T) { } tests := map[string]struct { - errorRegexes []*regexp.Regexp trackedResourceNames []v1.ResourceName jsts []jobdb.JobStateTransitions jobRunErrorsByRunId map[string]*armadaevents.Error @@ -296,7 +293,7 @@ func TestReportJobStateTransitions(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - metrics := newJobStateMetrics(tc.errorRegexes, tc.trackedResourceNames, []time.Duration{}, 12*time.Hour) + metrics := newJobStateMetrics(tc.trackedResourceNames, []time.Duration{}, 12*time.Hour) metrics.ReportStateTransitions(tc.jsts, tc.jobRunErrorsByRunId) // jobStateCounterByQueue @@ -361,7 +358,7 @@ func TestReportJobPreempted(t *testing.T) { jobCheckpointIntervals := []time.Duration{time.Second * 5, time.Second * 30, time.Minute * 5} - metrics := newJobStateMetrics(nil, []v1.ResourceName{"cpu"}, jobCheckpointIntervals, 12*time.Hour) + metrics := newJobStateMetrics([]v1.ResourceName{"cpu"}, jobCheckpointIntervals, 12*time.Hour) metrics.ReportJobPreempted(job) expectedJobResourceSecondsLostToPreemptionByQueue := map[[4]string]float64{ @@ -384,12 +381,11 @@ func TestCategoriseErrors(t *testing.T) { job := baseJob.WithUpdatedRun(run) - r, err := regexp.Compile("generic pod error") - require.NoError(t, err) - jobRunErrorsByRunId := map[string]*armadaevents.Error{ run.Id(): { - Terminal: true, + Terminal: true, + FailureCategory: "infrastructure", + FailureSubcategory: "oom", Reason: &armadaevents.Error_PodError{ PodError: &armadaevents.PodError{ Message: "generic pod error", @@ -405,13 +401,13 @@ func TestCategoriseErrors(t *testing.T) { }, } - metrics := newJobStateMetrics([]*regexp.Regexp{r}, []v1.ResourceName{"cpu"}, []time.Duration{}, 12*time.Hour) + metrics := newJobStateMetrics([]v1.ResourceName{"cpu"}, []time.Duration{}, 12*time.Hour) metrics.ReportStateTransitions(jsts, jobRunErrorsByRunId) - actualjobErrorsByQueue := testutil.ToFloat64(metrics.jobErrorsByQueue.WithLabelValues(testQueue, testPool, "podError", "generic pod error")) + actualjobErrorsByQueue := testutil.ToFloat64(metrics.jobErrorsByQueue.WithLabelValues(testQueue, testPool, "infrastructure", "oom")) assert.InDelta(t, 1, actualjobErrorsByQueue, epsilon) - actualjobErrorsByNode := testutil.ToFloat64(metrics.jobErrorsByNode.WithLabelValues(testNode, testPool, testCluster, "podError", "generic pod error")) + actualjobErrorsByNode := testutil.ToFloat64(metrics.jobErrorsByNode.WithLabelValues(testNode, testPool, testCluster, "infrastructure", "oom")) assert.InDelta(t, 1, actualjobErrorsByNode, epsilon) } @@ -422,7 +418,7 @@ func TestReset(t *testing.T) { byQueueResourceLabels := append(byQueueAndStateLabels, "cpu") byNodeResourceLabels := append(byNodeLabels, "cpu") resourceSecondsLostToPreemptionLabels := append(byQueueLabels, "none", "cpu") - m := newJobStateMetrics(nil, nil, []time.Duration{}, 12*time.Hour) + m := newJobStateMetrics(nil, []time.Duration{}, 12*time.Hour) testReset := func(vec *prometheus.CounterVec, labels []string) { vec.WithLabelValues(labels...).Inc() @@ -472,7 +468,7 @@ func TestDisable(t *testing.T) { return collected } - m := newJobStateMetrics(nil, nil, []time.Duration{}, 12*time.Hour) + m := newJobStateMetrics(nil, []time.Duration{}, 12*time.Hour) // Enabled assert.NotZero(t, len(collect(m))) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index ae2a1f9f5a2..96e142f60d3 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -143,7 +143,7 @@ var ( } schedulingInfoWithUpdatedPriorityBytes = protoutil.MustMarshall(schedulingInfoWithUpdatedPriority) - schedulerMetrics, _ = metrics.New(nil, nil, []time.Duration{}, 12*time.Hour, pulsarutils.NoOpPublisher[*metricevents.Event]{}, "") + schedulerMetrics = metrics.New(nil, []time.Duration{}, 12*time.Hour, pulsarutils.NoOpPublisher[*metricevents.Event]{}, "") ) var queuedJob = testfixtures.NewJob( diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index daaf793a0d8..c12f4420006 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -379,17 +379,13 @@ func Run(config schedulerconfig.Configuration) error { return err } - schedulerMetrics, err := metrics.New( - config.Metrics.TrackedErrorRegexes, + schedulerMetrics := metrics.New( config.Metrics.TrackedResourceNames, config.Metrics.JobCheckpointIntervals, config.Metrics.JobStateMetricsResetInterval, metricPublisher, config.Metrics.ScalableUnitLabel, ) - if err != nil { - return err - } if err := prometheus.Register(schedulerMetrics); err != nil { return errors.WithStack(err) }