-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-17795][Master]Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available #17796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it configurable by users in the configuration file and it can be turned off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it configurable by users in the configuration file and it can be turned off.
good idea!
| * Maximum time allowed for a task to be successfully dispatched. | ||
| * Default: 5 minutes. | ||
| */ | ||
| private Duration dispatchTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Duration dispatchTimeout = Duration.ofMinutes(5); | |
| private Duration maxTaskDispatchDuration; |
dispatchTimeout might confuse with single RPC timeout when dispatch task, and the default value should be null or a large duration, should compatibility with the previous behavior.
And you need to add this in document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxTaskDispatchDuration
ok
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); | ||
| final TaskExecutionContext taskExecutionContext = taskExecutionRunnable.getTaskExecutionContext(); | ||
| final long timeoutMs = this.dispatchTimeout.toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute dispatchTimeout.toMillis() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to store the mills rather than store duration, then you can avoid execute
dispatchTimeout.toMillis()here.
Using a duration format like '5m' is more concise and readable. Most other configurations I’ve seen also avoid millisecond-level granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can translate the duration into milliseconds in the constructor.
| } | ||
|
|
||
| private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) { | ||
| final int taskId = taskExecutionRunnable.getId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final int taskId = taskExecutionRunnable.getId(); | |
| final int taskExecutionRunnableId = taskExecutionRunnable.getId(); |
Avoid confuse with task definition id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskExecutionRunnableId
final int taskInstanceId = taskExecutionRunnable.getId(); is it better?
/**
* Get the task instance id.
*
Need to know the id might change since the task instance might be regenerated.
*/
default int getId() {
return getTaskInstance().getId();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use taskExecutionRunnableId , since the outer layer is unaware that this is the taskInstanceId.
| * @param elapsed the time (in milliseconds) already spent attempting to dispatch the task | ||
| * @param timeoutMs the configured dispatch timeout threshold (in milliseconds) | ||
| */ | ||
| private void handleDispatchFailure(ITaskExecutionRunnable taskExecutionRunnable, TaskDispatchException exception, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the taskId and workflowId here, all ids should already be added by MDC. We should only need to print the exception here, the exception already contains failure message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't print the
taskIdandworkflowIdhere, all ids should already be added by MDC. We should only need to print the exception here, theexceptionalready contains failure message.
ok
|
|
||
| public WorkerGroupNotFoundException(String workerGroup) { | ||
| super("Cannot find worker group: " + workerGroup); | ||
| public WorkerGroupNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do this change?
restore, remove the task info
|
|
||
| public class WorkerNotFoundException extends TaskDispatchException { | ||
|
|
||
| public WorkerNotFoundException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public WorkerNotFoundException(String message) { | |
| public NoAvailableWorkerException(String workerGroup) { | |
| super("Cannot find available worker under worker group: " + workerGroup); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| final MasterConfig masterConfig = new MasterConfig(); | ||
| dispatcher = new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getDispatchTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a validated test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a validated test case.
ok, add test for dispatch timeout checker
| /** | ||
| * Timestamp (ms) when the task was first enqueued for dispatch. | ||
| */ | ||
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** | |
| * Timestamp (ms) when the task was first enqueued for dispatch. | |
| */ | |
| private final long firstDispatchEnqueueTimeMs = System.currentTimeMillis(); | |
| private final long firstDispatchTime = System.currentTimeMillis(); |
This is more like the creation time of the taskExecutionContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like the creation time of the
taskExecutionContext.
ok
877db06 to
d7bf4fa
Compare
| /** | ||
| * Configuration for the master's task dispatch timeout check mechanism. | ||
| * This controls whether the system enforces a time limit for dispatching tasks to workers, | ||
| * and if so, how long to wait before marking a task as failed due to dispatch timeout. | ||
| */ | ||
| private MasterDispatchTimeoutCheckerConfig dispatchTimeoutChecker = new MasterDispatchTimeoutCheckerConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** | |
| * Configuration for the master's task dispatch timeout check mechanism. | |
| * This controls whether the system enforces a time limit for dispatching tasks to workers, | |
| * and if so, how long to wait before marking a task as failed due to dispatch timeout. | |
| */ | |
| private MasterDispatchTimeoutCheckerConfig dispatchTimeoutChecker = new MasterDispatchTimeoutCheckerConfig(); | |
| private TaskDispatchPolicy taskDispatchPolicy = new TaskDispatchPolicy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| * If enabled, tasks that remain in the dispatch queue longer than {@link #maxTaskDispatchDuration} will be marked as failed to prevent indefinite queuing. | ||
| */ | ||
| @Data | ||
| public class MasterDispatchTimeoutCheckerConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public class MasterDispatchTimeoutCheckerConfig { | |
| public class TaskDispatchPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| /** | ||
| * Whether to enable the dispatch timeout checking mechanism. | ||
| */ | ||
| private boolean enabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private boolean enabled = false; | |
| private boolean dispatchTimeoutFailedEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| * Tasks exceeding this duration in the dispatch queue will be failed. | ||
| * Examples: "2m", "5m", "30m". Defaults to 5 minutes. | ||
| */ | ||
| private Duration maxTaskDispatchDuration = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private Duration maxTaskDispatchDuration = Duration.ofMinutes(5); | |
| private Duration maxTaskDispatchDuration; |
When we don't have a good suggested value, don't set a default value, as this can mislead many users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we don't have a good suggested value, don't set a default value, as this can mislead many users.
ok
| } | ||
| taskExecutorClient.dispatch(taskExecutionRunnable); | ||
| } catch (Exception e) { | ||
| } catch (TaskDispatchException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } catch (TaskDispatchException ex) { | |
| } catch (Exception ex) { |
Currently, at least in this PR, you can't guarantee that other exceptions won't be thrown here, right?
| @Data | ||
| @Builder | ||
| @AllArgsConstructor | ||
| public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent { | ||
|
|
||
| private final ITaskExecutionRunnable taskExecutionRunnable; | ||
|
|
||
| private final Date endTime; | ||
|
|
||
| @Override | ||
| public ILifecycleEventType getEventType() { | ||
| return TaskLifecycleEventType.FATAL; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "TaskFatalLifecycleEvent{" + | ||
| "task=" + taskExecutionRunnable.getName() + | ||
| ", endTime=" + endTime + | ||
| '}'; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
ok, will publish TaskFailedLifecycleEvent
| .untilAsserted(() -> verify(taskExecutorClient, times(2)).dispatch(taskExecutionRunnable)); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add UT or IT test case to verify the time out check is enabled. Your core logic has not been tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add UT or IT test case to verify the time out check is enabled. Your core logic has not been tested.
OK. I need to modify the master's configuration. I'll submit the changes and test them — I can't run the integration tests locally.
…heduler into Improvement-17795
Purpose of the pull request
close #17795
Brief change log
Add dispatch timeout checking logic to handle cases where the worker group does not exist or no workers are available
Verify this pull request
Successfully tested and verified in a real-world environment.
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md