Skip to content

Commit 0e44c69

Browse files
wuxinqiangCodeCooker17
authored andcommitted
[KYLIN-5590] spark cube job supports priority, add job execution limit
1 parent 0e0d8a1 commit 0e44c69

File tree

14 files changed

+79
-25
lines changed

14 files changed

+79
-25
lines changed

core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,14 @@ public int getMaxConcurrentJobLimit() {
805805
return Integer.parseInt(getOptional("kylin.job.max-concurrent-jobs", "10"));
806806
}
807807

808+
public int getLowPriorityBar() {
809+
return Integer.parseInt(getOptional("kylin.job.low-priority-bar", "0"));
810+
}
811+
812+
public int getLowPriorityJobLimit() {
813+
return Integer.parseInt(getOptional("kylin.job.low-priority-limit", "10"));
814+
}
815+
808816
public String getHiveDependencyFilterList() {
809817
return this.getOptional("kylin.job.dependency-filter-list", "[^,]*hive-exec[^,]*?\\.jar" + "|"
810818
+ "[^,]*hive-metastore[^,]*?\\.jar" + "|" + "[^,]*hive-hcatalog-core[^,]*?\\.jar");
@@ -876,6 +884,10 @@ public boolean getSchedulerPriorityConsidered() {
876884
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", FALSE));
877885
}
878886

887+
public boolean IsJobPreemptiveExecution() {
888+
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-preemptive-execution", TRUE));
889+
}
890+
879891
public Integer getSchedulerPriorityBarFetchFromQueue() {
880892
return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20"));
881893
}

core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegme
7474

7575
/** Merge multiple small segments into a big one. */
7676
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
77-
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
77+
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, 0);
78+
}
79+
80+
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
81+
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, priorityOffset);
7882
}
7983

8084
/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */

core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public interface IBatchCubingEngine {
3636
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset);
3737

3838
/** Merge multiple small segments into a big one. */
39-
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
39+
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset);
4040

4141
/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
4242
public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);

core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public boolean getJobPriorityConsidered() {
114114
return config.getSchedulerPriorityConsidered();
115115
}
116116

117+
public boolean IsJobPreemptiveExecution() {
118+
return config.IsJobPreemptiveExecution();
119+
}
120+
121+
public int getLowPriorityBar() {
122+
return config.getLowPriorityBar();
123+
}
124+
125+
public int getLowPriorityJobLimit() {
126+
return config.getLowPriorityJobLimit();
127+
}
128+
117129
/**
118130
* @return the priority bar for fetching jobs from job priority queue
119131
*/

core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.Set;
23-
2423
import org.apache.kylin.shaded.com.google.common.collect.Sets;
2524
import org.apache.kylin.job.engine.JobEngineConfig;
2625
import org.apache.kylin.job.execution.AbstractExecutable;

core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ synchronized public void run() {
6161
// fetch job from jobPriorityQueue first to reduce chance to scan job list
6262
Map<String, Integer> leftJobPriorities = Maps.newHashMap();
6363
Pair<AbstractExecutable, Integer> executableWithPriority;
64-
while ((executableWithPriority = jobPriorityQueue.peek()) != null
64+
65+
while (jobEngineConfig.IsJobPreemptiveExecution()
66+
&& (executableWithPriority = jobPriorityQueue.peek()) != null
6567
// the priority of jobs in pendingJobPriorities should be above a threshold
6668
&& executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
6769
executableWithPriority = jobPriorityQueue.poll();
@@ -147,7 +149,9 @@ synchronized public void run() {
147149
jobPriorityQueue.add(new Pair<>(executable, priority));
148150
}
149151

150-
while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) {
152+
while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()
153+
&& (executableWithPriority.getSecond() > jobEngineConfig.getLowPriorityBar()
154+
|| runningJobs.size() < jobEngineConfig.getLowPriorityJobLimit())) {
151155
addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
152156
}
153157

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
4343

4444
@Override
4545
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
46-
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter);
46+
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter, priorityOffset);
4747
}
4848

4949
@Override
50-
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
51-
return NSparkMergingJob.merge(mergeSegment, submitter);
50+
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
51+
return NSparkMergingJob.merge(mergeSegment, submitter, priorityOffset);
5252
}
5353

5454
@Override

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.TimeZone;
2727
import java.util.UUID;
2828
import java.util.stream.Collectors;
29-
3029
import org.apache.kylin.common.KylinConfig;
3130
import org.apache.kylin.cube.CubeInstance;
3231
import org.apache.kylin.cube.CubeManager;
@@ -54,12 +53,12 @@ public class NSparkCubingJob extends CubingJob {
5453
private CubeInstance cube;
5554

5655
// for test use only
57-
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter) {
58-
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString());
56+
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, Integer priorityOffset) {
57+
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString(), priorityOffset);
5958
}
6059

6160
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, CubingJobTypeEnum jobType,
62-
String jobId) {
61+
String jobId, Integer priorityOffset) {
6362
Preconditions.checkArgument(!segments.isEmpty());
6463
Preconditions.checkArgument(submitter != null);
6564
NSparkCubingJob job = new NSparkCubingJob();
@@ -79,6 +78,7 @@ public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter
7978
}
8079
builder.append(format.format(new Date(System.currentTimeMillis())));
8180
job.setId(jobId);
81+
job.setPriority(priorityOffset);
8282
job.setName(builder.toString());
8383
job.setProjectName(job.cube.getProject());
8484
job.setTargetSubject(job.cube.getModel().getId());

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ public class NSparkMergingJob extends CubingJob {
4242
@SuppressWarnings("unused")
4343
private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class);
4444

45-
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter) {
46-
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString());
45+
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, Integer priorityOffset) {
46+
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString(), priorityOffset);
4747
}
4848

4949
/**
5050
* Merge the segments that are contained in the given mergedSegment
5151
*
5252
* @param mergedSegment, new segment that expect to merge, which should contains a couple of ready segments.
5353
*/
54-
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) {
54+
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId, Integer priorityOffset) {
5555
CubeInstance cube = mergedSegment.getCubeInstance();
5656

5757
NSparkMergingJob job = new NSparkMergingJob();
@@ -66,6 +66,7 @@ public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter
6666
builder.append(format.format(new Date(System.currentTimeMillis())));
6767
job.setName(builder.toString());
6868
job.setId(jobId);
69+
job.setPriority(priorityOffset);
6970
job.setTargetSubject(mergedSegment.getModel().getUuid());
7071
job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid())));
7172
job.setProject(mergedSegment.getProject());

kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public ExecutableState buildCuboid(String cubeName, SegmentRange.TSRange tsRange
177177
DataModelManager.getInstance(config).getModels();
178178
// ready cube, segment, cuboid layout
179179
CubeSegment oneSeg = cubeMgr.appendSegment(cube, tsRange);
180-
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN");
180+
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN", 0);
181181
NSparkCubingStep sparkStep = job.getSparkCubingStep();
182182
StorageURL distMetaUrl = StorageURL.valueOf(sparkStep.getDistMetaUrl());
183183
Assert.assertEquals("hdfs", distMetaUrl.getScheme());
@@ -199,7 +199,7 @@ protected ExecutableState mergeSegments(String cubeName, long start, long end, b
199199
ExecutableManager execMgr = ExecutableManager.getInstance(config);
200200
CubeInstance cube = cubeMgr.reloadCube(cubeName);
201201
CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new SegmentRange.TSRange(start, end), null, force);
202-
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN");
202+
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN", 0);
203203
execMgr.addJob(mergeJob);
204204
ExecutableState result = wait(mergeJob);
205205
if (config.cleanStorageAfterDelOperation()) {

0 commit comments

Comments
 (0)