Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.exceptions.WorkflowEventFireException;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
Expand All @@ -32,6 +34,7 @@
import org.apache.commons.collections4.MapUtils;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -128,6 +131,19 @@ private void doFireSingleWorkflowEventBus(final IWorkflowExecutionRunnable workf
ThreadUtils.sleep(5_000);
return;
}

// If task initializeTaskExecutionContext before dispatch is failed
// construct and publish a dedicated TaskFatalLifecycleEvent
// so that the event will be handled by TaskFatalLifecycleEventHandler
if (ExceptionUtils.isTaskExecutionContextCreateException(ex)) {
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
.endTime(new Date())
.build();
workflowEventBus.publish(taskFatalEvent);
Comment on lines +139 to +144
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe type casting without verification. While the TaskExecutionContextCreateException is currently only thrown when handling TaskDispatchLifecycleEvent (which extends AbstractTaskLifecycleEvent), it would be safer to check the type before casting to prevent potential ClassCastException if the code structure changes. Consider adding an instanceof check before the cast.

Suggested change
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
.endTime(new Date())
.build();
workflowEventBus.publish(taskFatalEvent);
if (lifecycleEvent instanceof AbstractTaskLifecycleEvent) {
AbstractTaskLifecycleEvent taskLifecycleEvent = (AbstractTaskLifecycleEvent) lifecycleEvent;
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
.taskExecutionRunnable(taskLifecycleEvent.getTaskExecutionRunnable())
.endTime(new Date())
.build();
workflowEventBus.publish(taskFatalEvent);
} else {
log.warn(
"TaskExecutionContextCreateException occurred for non-task lifecycle event: {}",
lifecycleEvent.getClass().getName(),
ex);
}

Copilot uses AI. Check for mistakes.
}

workflowEventBus.getWorkflowEventBusSummary().decreaseFireSuccessEventCount();
workflowEventBus.getWorkflowEventBusSummary().increaseFireFailedEventCount();
throw new WorkflowEventFireException(lifecycleEvent, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
* Dispatch the task instance to target.
*/
DISPATCH,
/**
* Task instance encounters catastrophic failure(such as initialization failure), it will enter a failed state.
*/
FATAL,
/**
* The task instance is dispatched to the target executor server.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import java.util.Date;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
@AllArgsConstructor
public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {

private final ITaskExecutionRunnable taskExecutionRunnable;

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractTaskLifecycleEvent.getTaskExecutionRunnable
; it is advisable to add an Override annotation.
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides AbstractTaskLifecycleEvent.getTaskExecutionRunnable; it is advisable to add an Override annotation.

Copilot uses AI. Check for mistakes.

private final Date endTime;

@Override
public ILifecycleEventType getEventType() {
return TaskLifecycleEventType.FATAL;
}

@Override
public String toString() {
return "TaskFatalLifecycleEvent{" +
"task=" + taskExecutionRunnable.getName() +
", endTime=" + endTime +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import org.springframework.stereotype.Component;

@Component
public class TaskFatalLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {

@Override
public void handle(final ITaskStateAction taskStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
taskStateAction.onFatalEvent(workflowExecutionRunnable, taskExecutionRunnable, taskFatalEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return TaskLifecycleEventType.FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
Expand Down Expand Up @@ -99,6 +100,38 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
}
}

@Override
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
persistentTaskInstanceFatalEventToDB(taskExecutionRunnable, taskFatalEvent);

if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
return;
}

// If all successors are condition tasks, then the task will not be marked as failure.
// And the DAG will continue to execute.
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
return;
}
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
}

private void persistentTaskInstanceFatalEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstance.setEndTime(taskFatalEvent.getEndTime());
taskInstanceDao.updateById(taskInstance);
}

@Override
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
Expand Down Expand Up @@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskDispatchLifecycleEvent taskDispatchEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
* <p> This method is called when the task encounters catastrophic failure (e.g., initialization failure).
*/
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
* <p> This method is called when the task has been dispatched to executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -109,7 +110,14 @@ public void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRu
taskInstance.getDelayTime(),
remainTimeMills);
}
taskExecutionRunnable.initializeTaskExecutionContext();

try {
taskExecutionRunnable.initializeTaskExecutionContext();
} catch (Exception ex) {
log.error("Failed to initialize task execution context, taskName: {}", taskInstance.getName(), ex);
throw new TaskExecutionContextCreateException(ex.getMessage());

Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary blank line after the throw statement. This extra blank line should be removed to maintain consistent code formatting.

Suggested change

Copilot uses AI. Check for mistakes.
}
workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.dolphinscheduler.server.master.exception;

public class TaskExecutionContextCreateException extends MasterException {
public class TaskExecutionContextCreateException extends RuntimeException {

public TaskExecutionContextCreateException(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.server.master.utils;

import org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;

import org.springframework.dao.DataAccessResourceFailureException;

public class ExceptionUtils {
Expand All @@ -25,4 +27,7 @@ public static boolean isDatabaseConnectedFailedException(Throwable e) {
return e instanceof DataAccessResourceFailureException;
}

public static boolean isTaskExecutionContextCreateException(Throwable e) {
return e instanceof TaskExecutionContextCreateException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,36 @@ public void testStartWorkflow_with_oneFailedTask() {
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) fatal")
public void testStartWorkflow_with_oneFatalTask() {
final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflow))
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.FAILURE));
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
Expand Down Expand Up @@ -1403,6 +1433,46 @@ void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFailed() {
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(3)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("D");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
Expand Down Expand Up @@ -1435,4 +1505,37 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
final String yaml =
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});
masterContainer.assertAllResourceReleased();
}
}
Loading
Loading