Skip to content

Commit 0c0cf25

Browse files
Added Workflows (#359)
* Implemented Workflows for OpenDC
1 parent 089c449 commit 0c0cf25

File tree

18 files changed

+388
-116
lines changed

18 files changed

+388
-116
lines changed

opendc-compute/opendc-compute-api/src/main/kotlin/org/opendc/compute/api/Flavor.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,19 @@ public interface Flavor : Resource {
4141
* The amount of gpu cores available to the task.
4242
*/
4343
public val gpuCoreCount: Int
44+
45+
/**
46+
* Set of Tasks that need to be finished before this can startAdd commentMore actions
47+
*/
48+
public val dependencies: Set<String>
49+
50+
/**
51+
* Set of Tasks that need to be finished before this can startAdd commentMore actions
52+
*/
53+
public val parents: Set<String>
54+
55+
/**
56+
* Set of Tasks that need to be finished before this can startAdd commentMore actions
57+
*/
58+
public val children: Set<String>
4459
}

opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,19 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver {
119119
*/
120120
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
121121

122+
private final List<SchedulingRequest> blockedTasks = new ArrayList<>();
123+
122124
/**
123125
* The active tasks in the system.
124126
*/
125127
private final Map<ServiceTask, SimHost> activeTasks = new HashMap<>();
126128

127129
/**
128130
* The active tasks in the system.
129-
* TODO: this is not doing anything, maybe delete it?
130131
*/
131-
private final Map<ServiceTask, SimHost> completedTasks = new HashMap<>();
132+
private final List<String> completedTasks = new ArrayList<>();
133+
134+
private final List<String> terminatedTasks = new ArrayList<>();
132135

133136
/**
134137
* The registered flavors for this compute service.
@@ -209,9 +212,11 @@ public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @No
209212

210213
if (newState == TaskState.COMPLETED) {
211214
tasksCompleted++;
215+
addCompletedTask(task);
212216
}
213217
if (newState == TaskState.TERMINATED) {
214218
tasksTerminated++;
219+
addTerminatedTask(task);
215220
}
216221

217222
if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
@@ -430,17 +435,83 @@ SchedulingRequest schedule(ServiceTask task, boolean atFront) {
430435
long now = clock.millis();
431436
SchedulingRequest request = new SchedulingRequest(task, now);
432437

433-
if (atFront) {
434-
taskQueue.addFirst(request);
435-
} else {
436-
taskQueue.add(request);
438+
ServiceFlavor flavor = task.getFlavor();
439+
for (String taskName : this.terminatedTasks) {
440+
if (flavor.isInDependencies(taskName)) {
441+
// Terminate task
442+
task.setState(TaskState.TERMINATED);
443+
}
437444
}
445+
446+
// Remove all completed tasks from the pending dependencies
447+
flavor.updatePendingDependencies(this.completedTasks);
448+
449+
// If there are still pending dependencies, we cannot schedule the task yet
450+
Set<String> pendingDependencies = flavor.getDependencies();
451+
if (!pendingDependencies.isEmpty()) {
452+
// If the task has pending dependencies, we cannot schedule it yet
453+
LOGGER.debug("Task {} has pending dependencies: {}", task.getUid(), pendingDependencies);
454+
blockedTasks.add(request);
455+
return null;
456+
}
457+
458+
// Add the request at the front or the back of the queue
459+
if (atFront) taskQueue.addFirst(request);
460+
else taskQueue.add(request);
461+
438462
tasksPending++;
439463

440464
requestSchedulingCycle();
441465
return request;
442466
}
443467

468+
void addCompletedTask(ServiceTask task) {
469+
String taskName = task.getName();
470+
471+
if (!this.completedTasks.contains(taskName)) {
472+
this.completedTasks.add(taskName);
473+
}
474+
475+
List<SchedulingRequest> requestsToRemove = new ArrayList<>();
476+
477+
for (SchedulingRequest request : blockedTasks) {
478+
request.getTask().getFlavor().updatePendingDependencies(taskName);
479+
480+
Set<String> pendingDependencies = request.getTask().getFlavor().getDependencies();
481+
482+
if (pendingDependencies.isEmpty()) {
483+
requestsToRemove.add(request);
484+
taskQueue.add(request);
485+
tasksPending++;
486+
}
487+
}
488+
489+
for (SchedulingRequest request : requestsToRemove) {
490+
blockedTasks.remove(request);
491+
}
492+
}
493+
494+
void addTerminatedTask(ServiceTask task) {
495+
String taskName = task.getName();
496+
497+
List<SchedulingRequest> requestsToRemove = new ArrayList<>();
498+
499+
if (!this.terminatedTasks.contains(taskName)) {
500+
this.terminatedTasks.add(taskName);
501+
}
502+
503+
for (SchedulingRequest request : blockedTasks) {
504+
if (request.getTask().getFlavor().isInDependencies(taskName)) {
505+
requestsToRemove.add(request);
506+
request.getTask().setState(TaskState.TERMINATED);
507+
}
508+
}
509+
510+
for (SchedulingRequest request : requestsToRemove) {
511+
blockedTasks.remove(request);
512+
}
513+
}
514+
444515
void delete(ServiceFlavor flavor) {
445516
flavorById.remove(flavor.getUid());
446517
flavors.remove(flavor);
@@ -612,12 +683,19 @@ public Flavor findFlavor(@NotNull UUID id) {
612683

613684
@NotNull
614685
public ServiceFlavor newFlavor(
615-
@NotNull String name, int cpuCount, long memorySize, int gpuCoreCount, @NotNull Map<String, ?> meta) {
686+
@NotNull String name,
687+
int cpuCount,
688+
long memorySize,
689+
int gpuCoreCount,
690+
@NotNull Set<String> parents,
691+
@NotNull Set<String> children,
692+
@NotNull Map<String, ?> meta) {
616693
checkOpen();
617694

618695
final ComputeService service = this.service;
619696
UUID uid = new UUID(service.clock.millis(), service.random.nextLong());
620-
ServiceFlavor flavor = new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, meta);
697+
ServiceFlavor flavor =
698+
new ServiceFlavor(service, uid, name, cpuCount, memorySize, gpuCoreCount, parents, children, meta);
621699

622700
// service.flavorById.put(uid, flavor);
623701
// service.flavors.add(flavor);

opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
package org.opendc.compute.simulator.service;
2424

2525
import java.util.Collections;
26+
import java.util.HashSet;
27+
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Objects;
30+
import java.util.Set;
2831
import java.util.UUID;
2932
import org.jetbrains.annotations.NotNull;
3033
import org.opendc.compute.api.Flavor;
@@ -39,6 +42,9 @@ public final class ServiceFlavor implements Flavor {
3942
private final int cpuCoreCount;
4043
private final long memorySize;
4144
private final int gpuCoreCount;
45+
private final Set<String> parents;
46+
private final Set<String> children;
47+
private final Set<String> dependencies;
4248
private final Map<String, ?> meta;
4349

4450
ServiceFlavor(
@@ -48,13 +54,18 @@ public final class ServiceFlavor implements Flavor {
4854
int cpuCoreCount,
4955
long memorySize,
5056
int gpuCoreCount,
57+
Set<String> parents,
58+
Set<String> children,
5159
Map<String, ?> meta) {
5260
this.service = service;
5361
this.uid = uid;
5462
this.name = name;
5563
this.cpuCoreCount = cpuCoreCount;
5664
this.memorySize = memorySize;
5765
this.gpuCoreCount = gpuCoreCount;
66+
this.parents = parents;
67+
this.dependencies = new HashSet<>(parents);
68+
this.children = children;
5869
this.meta = meta;
5970
}
6071

@@ -118,4 +129,33 @@ public int hashCode() {
118129
public String toString() {
119130
return "Flavor[uid=" + uid + ",name=" + name + "]";
120131
}
132+
133+
@Override
134+
public @NotNull Set<String> getDependencies() {
135+
return dependencies;
136+
}
137+
138+
public void updatePendingDependencies(List<String> completedTasks) {
139+
for (String task : completedTasks) {
140+
this.updatePendingDependencies(task);
141+
}
142+
}
143+
144+
public void updatePendingDependencies(String completedTask) {
145+
this.dependencies.remove(completedTask);
146+
}
147+
148+
public boolean isInDependencies(String task) {
149+
return this.dependencies.contains(task);
150+
}
151+
152+
@Override
153+
public @NotNull Set<@NotNull String> getParents() {
154+
return parents;
155+
}
156+
157+
@Override
158+
public @NotNull Set<@NotNull String> getChildren() {
159+
return children;
160+
}
121161
}

opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy
2929
import org.opendc.trace.Trace
3030
import org.opendc.trace.conv.TABLE_RESOURCES
3131
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
32+
import org.opendc.trace.conv.resourceChildren
3233
import org.opendc.trace.conv.resourceCpuCapacity
3334
import org.opendc.trace.conv.resourceCpuCount
3435
import org.opendc.trace.conv.resourceDeadline
3536
import org.opendc.trace.conv.resourceDuration
3637
import org.opendc.trace.conv.resourceGpuCapacity
3738
import org.opendc.trace.conv.resourceGpuCount
38-
import org.opendc.trace.conv.resourceGpuMemCapacity
3939
import org.opendc.trace.conv.resourceID
4040
import org.opendc.trace.conv.resourceMemCapacity
4141
import org.opendc.trace.conv.resourceNature
42+
import org.opendc.trace.conv.resourceParents
4243
import org.opendc.trace.conv.resourceStateCpuUsage
4344
import org.opendc.trace.conv.resourceStateDuration
4445
import org.opendc.trace.conv.resourceStateGpuUsage
@@ -136,7 +137,8 @@ public class ComputeWorkloadLoader(
136137
val memCol = reader.resolve(resourceMemCapacity)
137138
val gpuCapacityCol = reader.resolve(resourceGpuCapacity) // Assuming GPU capacity is also present
138139
val gpuCoreCountCol = reader.resolve(resourceGpuCount) // Assuming GPU cores are also present
139-
val gpuMemoryCol = reader.resolve(resourceGpuMemCapacity) // Assuming GPU memory is also present
140+
val parentsCol = reader.resolve(resourceParents)
141+
val childrenCol = reader.resolve(resourceChildren)
140142
val natureCol = reader.resolve(resourceNature)
141143
val deadlineCol = reader.resolve(resourceDeadline)
142144

@@ -166,6 +168,10 @@ public class ComputeWorkloadLoader(
166168
}
167169
val gpuCoreCount = reader.getInt(gpuCoreCountCol) // Default to 0 if not present
168170
val gpuMemory = 0L // currently not implemented
171+
172+
val parents = reader.getSet(parentsCol, String::class.java) // No dependencies in the trace
173+
val children = reader.getSet(childrenCol, String::class.java) // No dependencies in the trace
174+
169175
val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray())
170176
var nature = reader.getString(natureCol)
171177
var deadline = reader.getLong(deadlineCol)
@@ -181,15 +187,17 @@ public class ComputeWorkloadLoader(
181187
Task(
182188
uid,
183189
id,
190+
submissionTime,
191+
duration,
192+
parents!!,
193+
children!!,
184194
cpuCount,
185195
cpuCapacity,
196+
totalLoad,
186197
memCapacity.roundToLong(),
187198
gpuCoreCount,
188199
gpuUsage,
189200
gpuMemory,
190-
totalLoad,
191-
submissionTime,
192-
duration,
193201
nature,
194202
deadline,
195203
builder.build(),

opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,17 @@ import java.util.UUID
4040
public data class Task(
4141
val uid: UUID,
4242
val name: String,
43+
var submissionTime: Long,
44+
val duration: Long,
45+
val parents: Set<String> = emptySet(),
46+
val children: Set<String> = emptySet(),
4347
val cpuCount: Int,
4448
val cpuCapacity: Double,
49+
val totalCpuLoad: Double,
4550
val memCapacity: Long,
4651
val gpuCount: Int = 0,
4752
val gpuCapacity: Double = 0.0,
4853
val gpuMemCapacity: Long = 0L,
49-
val totalLoad: Double,
50-
var submissionTime: Long,
51-
val duration: Long,
5254
val nature: String?,
5355
var deadline: Long,
5456
val trace: TraceWorkload,

opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ public abstract class WorkloadLoader(private val submissionTime: String? = null)
6464

6565
val res = mutableListOf<Task>()
6666

67-
val totalLoad = workload.sumOf { it.totalLoad }
67+
val totalLoad = workload.sumOf { it.totalCpuLoad }
6868
val desiredLoad = totalLoad * fraction
6969
var currentLoad = 0.0
7070

7171
while (currentLoad < desiredLoad) {
7272
val entry = workload.random()
7373
res += entry
7474

75-
currentLoad += entry.totalLoad
75+
currentLoad += entry.totalCpuLoad
7676
}
7777

7878
logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" }

opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ public suspend fun ComputeService.replay(
150150
entry.cpuCount,
151151
entry.memCapacity,
152152
entry.gpuCount,
153+
entry.parents,
154+
entry.children,
153155
flavorMeta,
154156
),
155157
workload,

opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ import java.time.LocalDateTime
5454
import java.time.ZoneOffset
5555
import java.util.UUID
5656
import kotlin.collections.ArrayList
57-
import kotlin.compareTo
5857

5958
/**
6059
* Obtain the topology factory for the test.
@@ -86,15 +85,17 @@ fun createTestTask(
8685
return Task(
8786
UUID.nameUUIDFromBytes(name.toByteArray()),
8887
name,
88+
LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli(),
89+
duration,
90+
emptySet(),
91+
emptySet(),
8992
fragments.maxOf { it.cpuCoreCount() },
9093
fragments.maxOf { it.cpuUsage },
94+
1800000.0,
9195
memCapacity,
9296
gpuCount = fragments.maxOfOrNull { it.gpuCoreCount() } ?: 0,
9397
gpuCapacity = fragments.maxOfOrNull { it.gpuUsage } ?: 0.0,
9498
gpuMemCapacity = fragments.maxOfOrNull { it.gpuMemoryUsage } ?: 0L,
95-
1800000.0,
96-
LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli(),
97-
duration,
9899
"",
99100
-1,
100101
TraceWorkload(

opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ public val resourceGpuCapacity: String = "gpu_capacity"
9090
@JvmField
9191
public val resourceGpuMemCapacity: String = "gpu_mem_capacity"
9292

93+
/**
94+
* The parents of the resource that need to be completed before this resource can be used.
95+
*/
96+
@JvmField
97+
public val resourceParents: String = "parents"
98+
99+
/**
100+
* The children of the resource that cannot be started before this is completed.
101+
*/
102+
@JvmField
103+
public val resourceChildren: String = "children"
104+
93105
/**
94106
* Nature of the task. Delayable, interruptible, etc.
95107
*/

0 commit comments

Comments
 (0)