Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,16 @@ var JobRunPreempted = &armadaevents.EventSequence_Event{
},
}

var JobRunTerminated = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunTerminated{
JobRunTerminated: &armadaevents.JobRunTerminated{
JobId: JobId,
RunId: RunId,
},
},
}

var JobRunFailed = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
Expand Down
1 change: 1 addition & 0 deletions internal/executor/domain/pod_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ const (
IngressReported = "ingress_reported"
MarkedForDeletion = "deletion_requested"
JobPreemptedAnnotation = "reported_preempted"
JobRunTerminatedReported = "reported_terminated"
)
22 changes: 22 additions & 0 deletions internal/executor/reporter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,28 @@ func CreateJobIngressInfoEvent(pod *v1.Pod, clusterId string, associatedServices
return sequence, nil
}

func CreateJobRunTerminatedEvent(pod *v1.Pod) (*armadaevents.EventSequence, error) {
sequence := createEmptySequence(pod)
jobId, runId, err := extractIds(pod)
if err != nil {
return nil, err
}
finishedAt, err := types.TimestampProto(util.LatestContainerFinishedAt(pod))
if err != nil {
return nil, err
}
sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{
Created: finishedAt,
Event: &armadaevents.EventSequence_Event_JobRunTerminated{
JobRunTerminated: &armadaevents.JobRunTerminated{
JobId: jobId,
RunId: runId,
},
},
})
Comment thread
greptile-apps[bot] marked this conversation as resolved.
return sequence, nil
}

func CreateSimpleJobPreemptedEvent(pod *v1.Pod) (*armadaevents.EventSequence, error) {
sequence := createEmptySequence(pod)
preemptedJobId, preemptedRunId, err := extractIds(pod)
Expand Down
76 changes: 74 additions & 2 deletions internal/executor/service/job_state_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (stateReporter *JobStateReporter) podEventHandler() cache.ResourceEventHand
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", obj)
return
}
go stateReporter.reportCurrentStatus(pod)
go func() {
stateReporter.reportCurrentStatus(pod)
stateReporter.attemptToReportJobRunTerminatedEvent(pod)
}()
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
Expand All @@ -64,7 +67,10 @@ func (stateReporter *JobStateReporter) podEventHandler() cache.ResourceEventHand
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", newObj)
return
}
go stateReporter.reportStatusUpdate(oldPod, newPod)
go func() {
stateReporter.reportStatusUpdate(oldPod, newPod)
stateReporter.attemptToReportJobRunTerminatedEvent(newPod)
}()
},
}
}
Expand Down Expand Up @@ -142,6 +148,57 @@ func (stateReporter *JobStateReporter) reportCurrentStatus(pod *v1.Pod) {
}
}

// attemptToReportJobRunTerminatedEvent emits JobRunTerminated once per pod when the
// pod reaches a terminal phase (PodSucceeded or PodFailed). Called from two
// places:
//
// 1. Inline, from the pod-event handler, so we observe the terminal-phase
// transition the moment the informer reports it. This is required for
// pods armada itself deleted (cancel/preempt): once the containers exit,
// the Kubernetes API removes the pod object within a couple of seconds,
// and the executor's informer cache loses it. The reconciliation interval
// (default 15s) is wider than that window, so a reconcile-only design
// would race against pod deletion and never see the terminal phase -
// terminated_timestamp would stay NULL.
//
// 2. From the periodic reconciliation pass in ReportMissingJobEvents, as a
// safety net for inline emissions that did not happen (executor restart
// between phase transition and emission, informer watch hiccup).
//
// Idempotent: a successful emission annotates the pod with reported_terminated,
// and subsequent calls (inline or reconciliation) skip annotated pods.
func (stateReporter *JobStateReporter) attemptToReportJobRunTerminatedEvent(pod *v1.Pod) {
if !util.IsManagedPod(pod) {
return
}
if !util.IsInTerminalState(pod) {
return
}
if util.HasJobRunTerminatedBeenReported(pod) {
return
}
event, err := reporter.CreateJobRunTerminatedEvent(pod)
if err != nil {
log.Errorf("Failed to build JobRunTerminated event for pod %s: %v", pod.Name, err)
return
}
stateReporter.eventReporter.QueueEvent(reporter.EventMessage{Event: event, JobRunId: util.ExtractJobRunId(pod)}, func(err error) {
if err != nil {
log.Errorf("Failed to report JobRunTerminated for pod %s: %v", pod.Name, err)
return
}
if err := stateReporter.addAnnotationToMarkJobRunTerminatedReported(pod); err != nil {
log.Errorf("Failed to annotate pod %s as JobRunTerminated-reported: %v", pod.Name, err)
}
})
}

func (stateReporter *JobStateReporter) addAnnotationToMarkJobRunTerminatedReported(pod *v1.Pod) error {
return stateReporter.clusterContext.AddAnnotation(pod, map[string]string{
domain2.JobRunTerminatedReported: time.Now().String(),
})
}

func (stateReporter *JobStateReporter) addAnnotationToMarkStateReported(pod *v1.Pod) error {
annotations := make(map[string]string)
annotationName := string(pod.Status.Phase)
Expand Down Expand Up @@ -183,6 +240,21 @@ func (stateReporter *JobStateReporter) ReportMissingJobEvents() {
stateReporter.attemptToReportIngressInfoEvent(pod)
}
}

// The pod-event handler normally emits JobRunTerminated inline the moment the
// informer reports terminal phase. This loop catches pods where that inline
// emission did not happen: the executor restarted after the pod went terminal
// but before the handler ran, or the informer dropped the watch event.
// Idempotent via the reported_terminated annotation set on successful emission.
podsWithTerminatedNotReported := util.FilterPods(allBatchPods, func(pod *v1.Pod) bool {
return util.IsInTerminalState(pod) && !util.HasJobRunTerminatedBeenReported(pod)
})

for _, pod := range podsWithTerminatedNotReported {
if !stateReporter.eventReporter.HasPendingEvents(pod) {
stateReporter.attemptToReportJobRunTerminatedEvent(pod)
}
}
}

func (stateReporter *JobStateReporter) attemptToReportIngressInfoEvent(pod *v1.Pod) {
Expand Down
Loading
Loading