Skip to content

Commit 59ab5a2

Browse files
committed
ensure abnormal jobs do not interfere with normal task scheduling
1 parent 2466d08 commit 59ab5a2

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -371,18 +371,19 @@ protected void unprotectUpdateProgress() throws UserException {
371371
}
372372

373373
@Override
374-
protected void preCheckNeedSchedule() throws UserException {
374+
protected boolean preCheckNeedSchedule(boolean isNeedAutoSchedule) throws UserException {
375375
// If user does not specify kafka partition,
376376
// We will fetch partition from kafka server periodically
377-
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
377+
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || isNeedAutoSchedule) {
378378
if (customKafkaPartitions != null && !customKafkaPartitions.isEmpty()) {
379-
return;
379+
return true;
380380
}
381-
updateKafkaPartitions();
381+
return updateKafkaPartitions();
382382
}
383+
return true;
383384
}
384385

385-
private void updateKafkaPartitions() throws UserException {
386+
private boolean updateKafkaPartitions() throws UserException {
386387
try {
387388
this.newCurrentKafkaPartition = getAllKafkaPartitions();
388389
} catch (Exception e) {
@@ -397,7 +398,9 @@ private void updateKafkaPartitions() throws UserException {
397398
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
398399
false /* not replay */);
399400
}
401+
return false;
400402
}
403+
return true;
401404
}
402405

403406
// if customKafkaPartition is not null, then return false immediately
@@ -406,7 +409,7 @@ private void updateKafkaPartitions() throws UserException {
406409
// update current kafka partition at the same time
407410
// current kafka partitions = customKafkaPartitions == 0 ? all partition of kafka topic : customKafkaPartitions
408411
@Override
409-
protected boolean unprotectNeedReschedule() throws UserException {
412+
protected boolean unprotectNeedReschedule(boolean isNeedAutoSchedule) throws UserException {
410413
// only running and need_schedule job need to be changed current kafka partitions
411414
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
412415
if (CollectionUtils.isNotEmpty(customKafkaPartitions)) {
@@ -447,11 +450,15 @@ protected boolean unprotectNeedReschedule() throws UserException {
447450
}
448451

449452
}
453+
return isNeedAutoSchedule;
454+
}
455+
456+
@Override
457+
protected boolean isNeedAutoSchedule() {
450458
if (this.state == JobState.PAUSED) {
451459
return ScheduleRule.isNeedAutoSchedule(this);
452460
}
453461
return false;
454-
455462
}
456463

457464
@Override

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,11 +1510,15 @@ public void update() throws UserException {
15101510
}
15111511
}
15121512

1513-
preCheckNeedSchedule();
1513+
boolean isNeedAutoSchedule = isNeedAutoSchedule();
1514+
1515+
if (!preCheckNeedSchedule(isNeedAutoSchedule)) {
1516+
return;
1517+
}
15141518

15151519
writeLock();
15161520
try {
1517-
if (unprotectNeedReschedule()) {
1521+
if (unprotectNeedReschedule(isNeedAutoSchedule)) {
15181522
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
15191523
.add("msg", "Job need to be rescheduled")
15201524
.build());
@@ -1530,14 +1534,18 @@ public void update() throws UserException {
15301534
// Because unprotectUpdateProgress() is protected by writelock.
15311535
// So if there are time-consuming operations, they should be done in this method.
15321536
// (Such as getAllKafkaPartitions() in KafkaRoutineLoad)
1533-
protected void preCheckNeedSchedule() throws UserException {
1534-
1537+
protected boolean preCheckNeedSchedule(boolean isNeedAutoSchedule) throws UserException {
1538+
return false;
15351539
}
15361540

15371541
protected void unprotectUpdateProgress() throws UserException {
15381542
}
15391543

1540-
protected boolean unprotectNeedReschedule() throws UserException {
1544+
protected boolean unprotectNeedReschedule(boolean isNeedAutoSchedule) throws UserException {
1545+
return false;
1546+
}
1547+
1548+
protected boolean isNeedAutoSchedule() {
15411549
return false;
15421550
}
15431551

0 commit comments

Comments
 (0)