-
Notifications
You must be signed in to change notification settings - Fork 48
Implement direct submission of tasks to Kafka. #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: initial-version-kafka
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -601,4 +601,10 @@ private List<QueueConsumer> makeTaskConsumers(QueueFactory queueFactory, List<Ta | |
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void submitRootTaskDirect(Task task) | ||
| { | ||
| throw(new UnsupportedOperationException("Direct submission to Kafka is unsupported in Zookeeper based workflow")); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Direct submission is .... |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ | |
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Maps; | ||
| import com.mongodb.MongoInterruptedException; | ||
| import com.nirmata.workflow.WorkflowManager; | ||
| import com.nirmata.workflow.admin.RunInfo; | ||
| import com.nirmata.workflow.admin.TaskDetails; | ||
|
|
@@ -53,6 +52,8 @@ | |
| import org.apache.kafka.clients.producer.RecordMetadata; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Clock; | ||
| import java.time.LocalDateTime; | ||
| import java.time.ZoneOffset; | ||
| import java.util.HashMap; | ||
|
|
@@ -85,7 +86,11 @@ public class WorkflowManagerKafkaImpl implements WorkflowManager, WorkflowAdmin | |
| private final Serializer serializer; | ||
| private final Executor taskRunnerService; | ||
|
|
||
| private Map<TaskType, Producer<String, byte[]>> taskQueues = new HashMap<TaskType, Producer<String, byte[]>>(); | ||
| private Map<String, Set<String>> startedTasksCache = new HashMap<String, Set<String>>(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This startedTasksCache has no use. |
||
|
|
||
| private static final TaskType nullTaskType = new TaskType("", "", false); | ||
| private static final String DIRECT_SUBMIT_RUN_ID_PREFIX = "DIRECT_SUBMIT"; | ||
|
|
||
| private enum State { | ||
| LATENT, | ||
|
|
@@ -463,16 +468,19 @@ private void executeTask(TaskExecutor taskExecutor, ExecutableTask executableTas | |
| throw new RuntimeException(String.format("null returned from task executor for run: %s, task %s", | ||
| executableTask.getRunId(), executableTask.getTaskId())); | ||
| } | ||
| byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); | ||
| try { | ||
| // Send task result to scheduler to further advance the workflow | ||
| sendWorkflowToKafka(executableTask.getRunId(), bytes); | ||
| storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), | ||
| serializer.serialize(result)); | ||
|
|
||
| } catch (Exception e) { | ||
| log.error("Could not set completed data for executable task: {}", executableTask, e); | ||
| throw e; | ||
| // No need to send task result if the RunID denotes that this is a directsubmit since scheduler does not manage it. | ||
| if (!executableTask.getRunId().getId().startsWith(DIRECT_SUBMIT_RUN_ID_PREFIX)) { | ||
| byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); | ||
| try { | ||
| // Send task result to scheduler to further advance the workflow | ||
| sendWorkflowToKafka(executableTask.getRunId(), bytes); | ||
| storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), | ||
| serializer.serialize(result)); | ||
|
|
||
| } catch (Exception e) { | ||
| log.error("Could not set completed data for executable task: {}", executableTask, e); | ||
| throw e; | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, a DEBUG message in else, to say, executed the direct-submit task. |
||
| } | ||
|
|
||
|
|
@@ -498,4 +506,47 @@ private List<QueueConsumer> makeTaskConsumers(QueueFactory queueFactory, List<Ta | |
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void submitRootTaskDirect(Task task) { | ||
| try { | ||
| RunId runId = RunId.newRandomIdWithPrefix(DIRECT_SUBMIT_RUN_ID_PREFIX); | ||
|
|
||
| StartedTask startedTask = new StartedTask(this.getInstanceName(), | ||
| LocalDateTime.now(Clock.systemUTC()), 0); | ||
| storageMgr.setStartedTask(runId, task.getTaskId(), this.getSerializer().serialize(startedTask)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No storage manager here. There could be numerous directly submitted tasks with no relation between them. |
||
|
|
||
| byte[] runnableTaskBytes = this.getSerializer().serialize(task); | ||
|
|
||
| Producer<String, byte[]> producer = taskQueues.get(task.getTaskType()); | ||
| if (producer == null) { | ||
| this.getKafkaConf().createTaskTopicIfNeeded(task.getTaskType()); | ||
| producer = new KafkaProducer<String, byte[]>( | ||
| this.getKafkaConf().getProducerProps()); | ||
| taskQueues.put(task.getTaskType(), producer); | ||
| } | ||
|
|
||
| producer.send(new ProducerRecord<String, byte[]>( | ||
| this.getKafkaConf().getTaskExecTopic(task.getTaskType()), runnableTaskBytes), | ||
| new Callback() { | ||
| @Override | ||
| public void onCompletion(RecordMetadata m, Exception e) { | ||
| if (e != null) { | ||
| log.error("Error creating record for Run {} to task type {}", runId, task.getTaskType(), | ||
| e); | ||
| } else { | ||
| log.debug("RunId {} produced record to topic {}, partition [{}] @ offset {}", runId, | ||
| m.topic(), m.partition(), m.offset()); | ||
| } | ||
| } | ||
| }); | ||
| startedTasksCache.get(runId.getId()).add(task.getTaskId().getId()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cache of no use and will keep hogging memory, ultimately stopping the program. |
||
| log.debug("Sent task to queue: {}", task); | ||
|
|
||
| } catch (Exception e) { | ||
| String message = "Could not start task " + task; | ||
| log.error(message, e); | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submitSimpleTaskDirect looks more relevant instead of RootTask