Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/nirmata/workflow/WorkflowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public interface WorkflowManager extends Closeable
*/
RunId submitSubTask(RunId runId, RunId parentRunId, Task task);

/**
* Submit a task directly to executor. Only the top parent is executed, not children.
* The executor in turn, will not attempt to send task results to the scheduler.
*
* @param task task to execute
*/
void submitRootTaskDirect(Task task);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submitSimpleTaskDirect looks more relevant instead of RootTask


/**
* Update task progress info. This method is meant to be used inside of {@link TaskExecutor}
* for a running task to update its execution progress(0-100).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,4 +601,10 @@ private List<QueueConsumer> makeTaskConsumers(QueueFactory queueFactory, List<Ta

return builder.build();
}

@Override
public void submitRootTaskDirect(Task task)
{
throw(new UnsupportedOperationException("Direct submission to Kafka is unsupported in Zookeeper based workflow"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct submission is ....
"To Kafka" is not needed

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.MongoInterruptedException;
import com.nirmata.workflow.WorkflowManager;
import com.nirmata.workflow.admin.RunInfo;
import com.nirmata.workflow.admin.TaskDetails;
Expand Down Expand Up @@ -53,6 +52,8 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
Expand Down Expand Up @@ -85,7 +86,11 @@ public class WorkflowManagerKafkaImpl implements WorkflowManager, WorkflowAdmin
private final Serializer serializer;
private final Executor taskRunnerService;

private Map<TaskType, Producer<String, byte[]>> taskQueues = new HashMap<TaskType, Producer<String, byte[]>>();
private Map<String, Set<String>> startedTasksCache = new HashMap<String, Set<String>>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This startedTasksCache has no use.


private static final TaskType nullTaskType = new TaskType("", "", false);
private static final String DIRECT_SUBMIT_RUN_ID_PREFIX = "DIRECT_SUBMIT";

private enum State {
LATENT,
Expand Down Expand Up @@ -463,16 +468,19 @@ private void executeTask(TaskExecutor taskExecutor, ExecutableTask executableTas
throw new RuntimeException(String.format("null returned from task executor for run: %s, task %s",
executableTask.getRunId(), executableTask.getTaskId()));
}
byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result));
try {
// Send task result to scheduler to further advance the workflow
sendWorkflowToKafka(executableTask.getRunId(), bytes);
storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(),
serializer.serialize(result));

} catch (Exception e) {
log.error("Could not set completed data for executable task: {}", executableTask, e);
throw e;
// No need to send task result if the RunID denotes that this is a directsubmit since scheduler does not manage it.
if (!executableTask.getRunId().getId().startsWith(DIRECT_SUBMIT_RUN_ID_PREFIX)) {
byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result));
try {
// Send task result to scheduler to further advance the workflow
sendWorkflowToKafka(executableTask.getRunId(), bytes);
storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(),
serializer.serialize(result));

} catch (Exception e) {
log.error("Could not set completed data for executable task: {}", executableTask, e);
throw e;
}
}
Copy link

@pns-nirmata pns-nirmata May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, a DEBUG message in else, to say, executed the direct-submit task.

}

Expand All @@ -498,4 +506,47 @@ private List<QueueConsumer> makeTaskConsumers(QueueFactory queueFactory, List<Ta

return builder.build();
}

@Override
public void submitRootTaskDirect(Task task) {
try {
RunId runId = RunId.newRandomIdWithPrefix(DIRECT_SUBMIT_RUN_ID_PREFIX);

StartedTask startedTask = new StartedTask(this.getInstanceName(),
LocalDateTime.now(Clock.systemUTC()), 0);
storageMgr.setStartedTask(runId, task.getTaskId(), this.getSerializer().serialize(startedTask));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No storage manager here. There could be numerous directly submitted tasks with no relation between them.


byte[] runnableTaskBytes = this.getSerializer().serialize(task);

Producer<String, byte[]> producer = taskQueues.get(task.getTaskType());
if (producer == null) {
this.getKafkaConf().createTaskTopicIfNeeded(task.getTaskType());
producer = new KafkaProducer<String, byte[]>(
this.getKafkaConf().getProducerProps());
taskQueues.put(task.getTaskType(), producer);
}

producer.send(new ProducerRecord<String, byte[]>(
this.getKafkaConf().getTaskExecTopic(task.getTaskType()), runnableTaskBytes),
new Callback() {
@Override
public void onCompletion(RecordMetadata m, Exception e) {
if (e != null) {
log.error("Error creating record for Run {} to task type {}", runId, task.getTaskType(),
e);
} else {
log.debug("RunId {} produced record to topic {}, partition [{}] @ offset {}", runId,
m.topic(), m.partition(), m.offset());
}
}
});
startedTasksCache.get(runId.getId()).add(task.getTaskId().getId());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache of no use and will keep hogging memory, ultimately stopping the program.

log.debug("Sent task to queue: {}", task);

} catch (Exception e) {
String message = "Could not start task " + task;
log.error(message, e);
throw new RuntimeException(e);
}
}
}