Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions internal/common/errormatch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,26 @@ var KnownConditions = map[string]bool{
ConditionEvicted: true,
ConditionDeadlineExceeded: true,
}

// CategoryInternal is the static failure category stamped on Armada-generated
// and structural failures. Unlike operator-configured categorizer categories,
// these are hard-coded at the Error-construction site and never pass through the
// executor categorizer, so they bypass its length validation. Keep subcategory
// values under the lookout failure_subcategory varchar(63) bound (lookout
// migration 032). Preemption is deliberately not categorized as internal: it is
// a scheduling action, metered separately and excluded from failure metrics.
const CategoryInternal = "internal"

// Subcategories of CategoryInternal, each naming the structural cause stamped at
// its producing site.
const (
SubcategoryJobCreationFailed = "job-creation-failed" // executor failed to build the runnable job from the lease
SubcategoryPodMissing = "pod-missing" // reconciliation: pod vanished from Kubernetes for an active run
SubcategoryLeaseExpired = "lease-expired" // run cancelled because its executor went stale or was lost
SubcategoryMaxRunsExceeded = "max-runs-exceeded" // job exhausted its run attempts
SubcategoryJobRejected = "job-rejected" // submitcheck or validation rejected the job
SubcategoryStuckTerminating = "stuck-terminating" // pod stuck terminating, indicates a node issue
SubcategoryExternallyDeleted = "externally-deleted" // pod deleted out-of-band (not by Armada)
SubcategoryIssueHandlerError = "issue-handler-error" // pod unexpectedly changed state mid issue-handling
SubcategoryActiveDeadline = "active-deadline" // pod exceeded its active deadline seconds
)
32 changes: 32 additions & 0 deletions internal/common/errormatch/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package errormatch

import (
"testing"

"github.com/stretchr/testify/assert"
)

// maxFailureSubcategoryLen mirrors the lookout job_run.failure_subcategory
// varchar(63) bound (lookout migration 032). The internal subcategories below
// are stamped directly at the Error-construction site and never pass through
// the executor categorizer's own length validation, so guard them here.
const maxFailureSubcategoryLen = 63

func TestInternalCategoryConstants_WithinLengthBound(t *testing.T) {
values := map[string]string{
"CategoryInternal": CategoryInternal,
"SubcategoryJobCreationFailed": SubcategoryJobCreationFailed,
"SubcategoryPodMissing": SubcategoryPodMissing,
"SubcategoryLeaseExpired": SubcategoryLeaseExpired,
"SubcategoryMaxRunsExceeded": SubcategoryMaxRunsExceeded,
"SubcategoryJobRejected": SubcategoryJobRejected,
"SubcategoryStuckTerminating": SubcategoryStuckTerminating,
"SubcategoryExternallyDeleted": SubcategoryExternallyDeleted,
"SubcategoryIssueHandlerError": SubcategoryIssueHandlerError,
"SubcategoryActiveDeadline": SubcategoryActiveDeadline,
}
for name, value := range values {
assert.NotEmpty(t, value, "%s must not be empty", name)
assert.LessOrEqual(t, len(value), maxFailureSubcategoryLen, "%s = %q exceeds %d chars", name, value, maxFailureSubcategoryLen)
}
}
15 changes: 9 additions & 6 deletions internal/executor/reporter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,11 @@ func CreateSimpleJobPreemptedEvent(pod *v1.Pod) (*armadaevents.EventSequence, er
return sequence, nil
}

// CreateSimpleJobFailedEvent creates a failed event with no container details and no classification.
// Use for failures where pod container statuses are unavailable (preemption, submit failures).
// failure_category/failure_subcategory are left empty, resulting in NULL in the DB.
// This is intentional: these failures are not classifiable pod errors.
// CreateSimpleJobFailedEvent creates a failed event with no container details and
// leaves failure_category/failure_subcategory empty (NULL). Its callers are
// preemption (a scheduling action, deliberately uncategorized) and pod-submission
// failures, which relay the Kubernetes API error rather than an Armada-authored
// one and so are not internal.
func CreateSimpleJobFailedEvent(pod *v1.Pod, reason string, clusterId string, cause armadaevents.KubernetesReason) (*armadaevents.EventSequence, error) {
return CreateJobFailedEvent(pod, reason, cause, "", []*armadaevents.ContainerError{}, clusterId, "", "")
}
Expand Down Expand Up @@ -261,7 +262,7 @@ func CreateJobFailedEvent(pod *v1.Pod, reason string, cause armadaevents.Kuberne
return sequence, nil
}

func CreateMinimalJobFailedEvent(jobId string, runId string, jobSet string, queue string, clusterId string, message string) (*armadaevents.EventSequence, error) {
func CreateMinimalJobFailedEvent(jobId string, runId string, jobSet string, queue string, clusterId string, message string, failureCategory string, failureSubcategory string) (*armadaevents.EventSequence, error) {
sequence := &armadaevents.EventSequence{}
sequence.Queue = queue
sequence.JobSetName = jobSet
Expand All @@ -274,7 +275,9 @@ func CreateMinimalJobFailedEvent(jobId string, runId string, jobSet string, queu
JobId: jobId,
Errors: []*armadaevents.Error{
{
Terminal: true,
Terminal: true,
FailureCategory: failureCategory,
FailureSubcategory: failureSubcategory,
Reason: &armadaevents.Error_PodError{
PodError: &armadaevents.PodError{
ObjectMeta: &armadaevents.ObjectMeta{
Expand Down
13 changes: 13 additions & 0 deletions internal/executor/reporter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,16 @@ func createService(serviceType v1.ServiceType, port int32, nodePort int32) *v1.S
},
}
}

func TestCreateMinimalJobFailedEvent_SetsInternalCategory(t *testing.T) {
result, err := CreateMinimalJobFailedEvent("job1", "run1", "jobset1", "queue1", "cluster1", "pod missing", errormatch.CategoryInternal, errormatch.SubcategoryPodMissing)
require.NoError(t, err)

require.Len(t, result.Events, 1)
event, ok := result.Events[0].Event.(*armadaevents.EventSequence_Event_JobRunErrors)
require.True(t, ok)
require.Len(t, event.JobRunErrors.Errors, 1)
assert.NotNil(t, event.JobRunErrors.Errors[0].GetPodError())
assert.Equal(t, errormatch.CategoryInternal, event.JobRunErrors.Errors[0].GetFailureCategory())
assert.Equal(t, errormatch.SubcategoryPodMissing, event.JobRunErrors.Errors[0].GetFailureSubcategory())
}
3 changes: 3 additions & 0 deletions internal/executor/service/job_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"golang.org/x/exp/maps"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/errormatch"
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/executor/configuration"
Expand Down Expand Up @@ -187,6 +188,8 @@ func (r *JobRequester) sendFailedEvent(details *failedJobCreationDetails) error
details.JobRunMeta.Queue,
r.clusterId.GetClusterId(),
details.Error.Error(),
errormatch.CategoryInternal,
errormatch.SubcategoryJobCreationFailed,
)
if err != nil {
return fmt.Errorf("failed to create job failed events because %s", err)
Expand Down
45 changes: 38 additions & 7 deletions internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/utils/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/errormatch"
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/executor/categorizer"
"github.com/armadaproject/armada/internal/executor/configuration"
Expand Down Expand Up @@ -424,11 +425,20 @@ func (p *PodIssueHandler) handleNonRetryableJobIssue(issue *issue) {
if !issue.RunIssue.Reported {
log.Infof("Handling non-retryable issue detected for job %s run %s", issue.RunIssue.JobId, issue.RunIssue.RunId)
podIssue := issue.RunIssue.PodIssue

result := p.classifier.ClassifyPodError(podIssue.OriginalPodState, podIssue.Message)
clusterId := p.clusterContext.GetClusterId()

message := result.AppendHint(podIssue.Message)
// Armada-detected structural pod issues are always internal failures.
// External causes (e.g. image pull, scheduling) are left to the operator
// categorizer, which applies its own defaultCategory when no rule matches.
var failureCategory, failureSubcategory, message string
if sub := internalSubcategoryForPodIssueType(podIssue.Type); sub != "" {
failureCategory, failureSubcategory = errormatch.CategoryInternal, sub
message = podIssue.Message
} else {
result := p.classifier.ClassifyPodError(podIssue.OriginalPodState, podIssue.Message)
failureCategory, failureSubcategory = result.Category, result.Subcategory
message = result.AppendHint(podIssue.Message)
}

failedEvent, err := reporter.CreateJobFailedEvent(
podIssue.OriginalPodState,
Expand All @@ -437,8 +447,8 @@ func (p *PodIssueHandler) handleNonRetryableJobIssue(issue *issue) {
podIssue.DebugMessage,
util.ExtractFailedPodContainerStatuses(podIssue.OriginalPodState, clusterId),
clusterId,
result.Category,
result.Subcategory)
failureCategory,
failureSubcategory)
if err != nil {
log.Errorf("Failed to create failed event for job %s because %s", issue.RunIssue.JobId, err)
return
Expand All @@ -450,7 +460,7 @@ func (p *PodIssueHandler) handleNonRetryableJobIssue(issue *issue) {
}
// Increment only after successful Report so failed sends do not inflate the counter.
// RecordJobFailure is a no-op when classification didn't run (empty category).
metrics.RecordJobFailure(result.Category, result.Subcategory)
metrics.RecordJobFailure(failureCategory, failureSubcategory)
p.markIssueReported(issue.RunIssue)
}

Expand All @@ -462,6 +472,25 @@ func (p *PodIssueHandler) handleNonRetryableJobIssue(issue *issue) {
}
}

// internalSubcategoryForPodIssueType returns the internal failure subcategory for
// Armada-detected structural pod issues. It returns "" for StuckStartingUp,
// UnableToSchedule, and FailedStartingUp, whose cause (e.g. image pull,
// scheduling) the operator categorizer should attribute instead.
func internalSubcategoryForPodIssueType(t podIssueType) string {
switch t {
case StuckTerminating:
return errormatch.SubcategoryStuckTerminating
case ExternallyDeleted:
return errormatch.SubcategoryExternallyDeleted
case ErrorDuringIssueHandling:
return errormatch.SubcategoryIssueHandlerError
case ActiveDeadlineExceeded:
return errormatch.SubcategoryActiveDeadline
default:
return ""
}
}

// For retryable issues we must:
// - Report JobReturnLeaseEvent
//
Expand Down Expand Up @@ -611,7 +640,9 @@ func (p *PodIssueHandler) handleReconciliationIssue(issue *issue) {
currentRunState.Meta.JobSet,
currentRunState.Meta.Queue,
p.clusterContext.GetClusterId(),
fmt.Sprintf("Pod is unexpectedly missing in Kubernetes"),
"Pod is unexpectedly missing in Kubernetes",
errormatch.CategoryInternal,
errormatch.SubcategoryPodMissing,
)
if err != nil {
log.Errorf("failed to create job failed event because %s", err)
Expand Down
41 changes: 29 additions & 12 deletions internal/executor/service/pod_issue_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckAndUnretryable(t *tes
assert.Len(t, failedEvent.JobRunErrors.Errors, 1)
assert.Contains(t, failedEvent.JobRunErrors.Errors[0].GetPodError().Message, "unrecoverable problem")
assert.Contains(t, failedEvent.JobRunErrors.Errors[0].GetPodError().DebugMessage, "Image pull has failed")
// StuckStartingUp is left to the operator categorizer (not forced internal), so
// its cause (e.g. image pull) can be attributed by classifier rules.
assert.NotEqual(t, errormatch.CategoryInternal, failedEvent.JobRunErrors.Errors[0].GetFailureCategory())
}

func TestPodIssueService_FailureCategorySet_WhenClassifierConfigured(t *testing.T) {
func TestPodIssueService_StructuralIssueIsInternal_RegardlessOfClassifier(t *testing.T) {
classifier, err := categorizer.NewClassifier(categorizer.ErrorCategoriesConfig{
Categories: []categorizer.CategoryConfig{
{
Expand Down Expand Up @@ -130,7 +133,9 @@ func TestPodIssueService_FailureCategorySet_WhenClassifierConfigured(t *testing.
)
require.NoError(t, err)

// Stuck terminating pod with an OOMKilled container - the classifier should match
// Stuck terminating pod that also has an OOMKilled container which the classifier
// would match. StuckTerminating is an Armada-detected structural issue, so it is
// always categorized as internal and the classifier never overrides it.
pod := makeTerminatingPod()
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
Expand All @@ -148,8 +153,8 @@ func TestPodIssueService_FailureCategorySet_WhenClassifierConfigured(t *testing.
failedEvent, ok := eventReporter.ReceivedEvents[0].Event.Events[0].Event.(*armadaevents.EventSequence_Event_JobRunErrors)
require.True(t, ok)

assert.Equal(t, "oom-failure", failedEvent.JobRunErrors.Errors[0].GetFailureCategory())
assert.Equal(t, "kernel-oom", failedEvent.JobRunErrors.Errors[0].GetFailureSubcategory())
assert.Equal(t, errormatch.CategoryInternal, failedEvent.JobRunErrors.Errors[0].GetFailureCategory())
assert.Equal(t, errormatch.SubcategoryStuckTerminating, failedEvent.JobRunErrors.Errors[0].GetFailureSubcategory())
// Verify ContainerErrors are populated (not empty) so retry engine has fallback data
assert.NotEmpty(t, failedEvent.JobRunErrors.Errors[0].GetPodError().ContainerErrors)
}
Expand All @@ -175,14 +180,6 @@ func TestPodIssueService_OnPodErrorClassifies(t *testing.T) {
},
expectMessageContains: "no match for platform in manifest",
},
"active deadline exceeded from executor-side detection": {
category: "user_error",
subcategory: "deadline_exceeded",
pattern: "exceeded active deadline",
// 10 minutes old with 5 minute deadline -> exceeded.
pod: func() *v1.Pod { return makePodWithDeadline(time.Now().Add(-time.Minute*10), 300, 0) },
expectMessageContains: "exceeded active deadline",
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -243,6 +240,8 @@ func TestPodIssueService_DeletesPodAndReportsFailed_IfStuckTerminating(t *testin
assert.True(t, ok)
assert.Len(t, failedEvent.JobRunErrors.Errors, 1)
assert.Contains(t, failedEvent.JobRunErrors.Errors[0].GetPodError().Message, "terminating")
assert.Equal(t, errormatch.CategoryInternal, failedEvent.JobRunErrors.Errors[0].GetFailureCategory())
assert.Equal(t, errormatch.SubcategoryStuckTerminating, failedEvent.JobRunErrors.Errors[0].GetFailureSubcategory())
}

func TestPodIssueService_HasIssue(t *testing.T) {
Expand Down Expand Up @@ -401,6 +400,8 @@ func TestPodIssueService_DeletesPodAndReportsFailed_IfExceedsActiveDeadline(t *t
assert.True(t, ok)
assert.Len(t, failedEvent.JobRunErrors.Errors, 1)
assert.Contains(t, failedEvent.JobRunErrors.Errors[0].GetPodError().Message, "exceeded active deadline")
assert.Equal(t, errormatch.CategoryInternal, failedEvent.JobRunErrors.Errors[0].GetFailureCategory())
assert.Equal(t, errormatch.SubcategoryActiveDeadline, failedEvent.JobRunErrors.Errors[0].GetFailureSubcategory())
} else {
assert.Equal(t, []*v1.Pod{tc.pod}, remainingActivePods)
assert.Len(t, eventsReporter.ReceivedEvents, 0)
Expand Down Expand Up @@ -870,3 +871,19 @@ func TestCreateDebugMessage(t *testing.T) {
})
}
}

func TestInternalSubcategoryForPodIssueType(t *testing.T) {
tests := map[podIssueType]string{
StuckTerminating: errormatch.SubcategoryStuckTerminating,
ExternallyDeleted: errormatch.SubcategoryExternallyDeleted,
ErrorDuringIssueHandling: errormatch.SubcategoryIssueHandlerError,
ActiveDeadlineExceeded: errormatch.SubcategoryActiveDeadline,
// Left to the operator categorizer, so no internal subcategory.
StuckStartingUp: "",
UnableToSchedule: "",
FailedStartingUp: "",
}
for issueType, want := range tests {
assert.Equal(t, want, internalSubcategoryForPodIssueType(issueType))
}
}
13 changes: 10 additions & 3 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/constants"
"github.com/armadaproject/armada/internal/common/errormatch"
protoutil "github.com/armadaproject/armada/internal/common/proto"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/leaderelection"
Expand Down Expand Up @@ -1038,7 +1039,9 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo
}

runError = &armadaevents.Error{
Terminal: true,
Terminal: true,
FailureCategory: errormatch.CategoryInternal,
FailureSubcategory: errormatch.SubcategoryMaxRunsExceeded,
Reason: &armadaevents.Error_MaxRunsExceeded{
MaxRunsExceeded: &armadaevents.MaxRunsExceeded{
Message: errorMessage,
Expand Down Expand Up @@ -1131,7 +1134,9 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb
jobsToUpdate = append(jobsToUpdate, job.WithQueued(false).WithFailed(true).WithUpdatedRun(run.WithFailed(true)))

leaseExpiredError := &armadaevents.Error{
Terminal: true,
Terminal: true,
FailureCategory: errormatch.CategoryInternal,
FailureSubcategory: errormatch.SubcategoryLeaseExpired,
Reason: &armadaevents.Error_LeaseExpired{
LeaseExpired: &armadaevents.LeaseExpired{},
},
Expand Down Expand Up @@ -1241,7 +1246,9 @@ func (s *Scheduler) submitCheck(ctx *armadacontext.Context, txn *jobdb.Txn) ([]*
JobId: job.Id(),
Errors: []*armadaevents.Error{
{
Terminal: true,
Terminal: true,
FailureCategory: errormatch.CategoryInternal,
FailureSubcategory: errormatch.SubcategoryJobRejected,
Reason: &armadaevents.Error_JobRejected{
JobRejected: &armadaevents.JobRejected{
Message: result.reason,
Expand Down
Loading
Loading