diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 6b9313e5edb97..8198b06046d45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -107,6 +107,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + // Sort offers by CPUs in a descending manner. + // This allows tasks to be assigned to most used workers first. + private val sortOffersDescending = conf.getBoolean("spark.mesos.sortOffersDescending", false) + + // Assign tasks in round robin manner. If set false, tasks will be assigned sequentially. + private val assignTasksRoundRobin = conf.getBoolean("spark.mesos.assignTaskRoundRobin", true) + // Reject offers with mismatched constraints in seconds private val rejectOfferDurationForUnmetConstraints = getRejectOfferDurationForUnmetConstraints(sc) @@ -324,8 +331,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( */ private def handleMatchedOffers( d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { - val tasks = buildMesosTasks(offers) - for (offer <- offers) { + val offersMaybeSorted = if (sortOffersDescending) { + offers.sortWith { (l, r) => + getResource(l.getResourcesList, "cpus") > getResource(r.getResourcesList, "cpus") + } + } else offers + val tasks = buildMesosTasks(offersMaybeSorted) + + for (offer <- offersMaybeSorted) { val offerAttributes = toAttributeMap(offer.getAttributesList) val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") @@ -388,46 +401,64 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( for (offer <- offers) { val slaveId = offer.getSlaveId.getValue val offerId = offer.getId.getValue - val resources = remainingResources(offerId) - - if (canLaunchTask(slaveId, resources)) { - // Create a task - launchTasks = true - val taskId = newMesosTaskId() - val offerCPUs = getResource(resources, "cpus").toInt + var resources = remainingResources(offerId) + + // sometimes you don't want to round-robin create + if (assignTasksRoundRobin) { + if (canLaunchTask(slaveId, resources)) { + launchTasks = true + // Create a task + createTask(offer, tasks, resources, remainingResources) + } + } else { + while (canLaunchTask(slaveId, resources)) { + launchTasks = true + createTask(offer, tasks, resources, remainingResources) + resources = remainingResources(offerId) + } + } + } + } + tasks.toMap + } - val taskCPUs = executorCores(offerCPUs) - val taskMemory = executorMemory(sc) + private def createTask( + offer: Offer, + tasks: mutable.Map[OfferID, List[MesosTaskInfo]], + resources: JList[Resource], + remainingResources: mutable.Map[String, JList[Resource]]): Unit = { + val slaveId = offer.getSlaveId.getValue + val taskId = newMesosTaskId() + val offerCPUs = getResource(resources, "cpus").toInt - slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) + val taskCPUs = executorCores(offerCPUs) + val taskMemory = executorMemory(sc) - val (resourcesLeft, resourcesToUse) = - partitionTaskResources(resources, taskCPUs, taskMemory) + slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName("Task " + taskId) + val (resourcesLeft, resourcesToUse) = + partitionTaskResources(resources, taskCPUs, taskMemory) - taskBuilder.addAllResources(resourcesToUse.asJava) + val taskBuilder = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) + .setName("Task " + taskId) - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - taskBuilder.getContainerBuilder - ) - } + taskBuilder.addAllResources(resourcesToUse.asJava) - tasks(offer.getId) ::= taskBuilder.build() - remainingResources(offerId) = resourcesLeft.asJava - totalCoresAcquired += taskCPUs - coresByTaskId(taskId) = taskCPUs - } - } + sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf, + taskBuilder.getContainerBuilder + ) } - tasks.toMap + + tasks(offer.getId) ::= taskBuilder.build() + remainingResources(offer.getId.getValue) = resourcesLeft.asJava + totalCoresAcquired += taskCPUs + coresByTaskId(taskId) = taskCPUs } /** Extracts task needed resources from a list of available resources. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c06379707a69a..0ba5c528a6600 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -176,6 +176,24 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o2") } + test("mesos does not assign tasks round-robin on offers") { + val executorCores = 4 + val maxCores = executorCores * 2 + setBackend(Map("spark.executor.cores" -> executorCores.toString, + "spark.cores.max" -> maxCores.toString, + "spark.mesos.assignTaskRoundRobin" -> "false", + "spark.mesos.sortOffersDescending" -> "false" + )) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + (executorMemory * 2, executorCores * 2), + (executorMemory * 2, executorCores * 2))) + + verifyTaskLaunched(driver, "o1") + verifyTaskLaunched(driver, "o1") + } + test("mesos creates multiple executors on a single slave") { val executorCores = 4 setBackend(Map("spark.executor.cores" -> executorCores.toString))