diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e5..019f543 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -98,6 +98,14 @@ public interface WorkflowManager extends Closeable */ RunId submitSubTask(RunId runId, RunId parentRunId, Task task); + /** + * Submit a task directly to executor. Only the top parent is executed, not children. + * The executor in turn, will not attempt to send task results to the scheduler. + * + * @param task task to execute + */ + void submitRootTaskDirect(Task task); + /** * Update task progress info. This method is meant to be used inside of {@link TaskExecutor} * for a running task to update its execution progress(0-100). diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index a6ce0c6..9958786 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -601,4 +601,10 @@ private List makeTaskConsumers(QueueFactory queueFactory, List> taskQueues = new HashMap>(); + private Map> startedTasksCache = new HashMap>(); + private static final TaskType nullTaskType = new TaskType("", "", false); + private static final String DIRECT_SUBMIT_RUN_ID_PREFIX = "DIRECT_SUBMIT"; private enum State { LATENT, @@ -463,16 +468,19 @@ private void executeTask(TaskExecutor taskExecutor, ExecutableTask executableTas throw new RuntimeException(String.format("null returned from task executor for run: %s, task %s", executableTask.getRunId(), executableTask.getTaskId())); } - byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); - try { - // Send task result to scheduler to further advance the workflow - sendWorkflowToKafka(executableTask.getRunId(), bytes); - storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), - serializer.serialize(result)); - - } catch (Exception e) { - log.error("Could not set completed data for executable task: {}", executableTask, e); - throw e; + // No need to send task result if the RunID denotes that this is a directsubmit since scheduler does not manage it. + if (!executableTask.getRunId().getId().startsWith(DIRECT_SUBMIT_RUN_ID_PREFIX)) { + byte[] bytes = serializer.serialize(new WorkflowMessage(executableTask.getTaskId(), result)); + try { + // Send task result to scheduler to further advance the workflow + sendWorkflowToKafka(executableTask.getRunId(), bytes); + storageMgr.saveTaskResult(executableTask.getRunId(), executableTask.getTaskId(), + serializer.serialize(result)); + + } catch (Exception e) { + log.error("Could not set completed data for executable task: {}", executableTask, e); + throw e; + } } } @@ -498,4 +506,47 @@ private List makeTaskConsumers(QueueFactory queueFactory, List producer = taskQueues.get(task.getTaskType()); + if (producer == null) { + this.getKafkaConf().createTaskTopicIfNeeded(task.getTaskType()); + producer = new KafkaProducer( + this.getKafkaConf().getProducerProps()); + taskQueues.put(task.getTaskType(), producer); + } + + producer.send(new ProducerRecord( + this.getKafkaConf().getTaskExecTopic(task.getTaskType()), runnableTaskBytes), + new Callback() { + @Override + public void onCompletion(RecordMetadata m, Exception e) { + if (e != null) { + log.error("Error creating record for Run {} to task type {}", runId, task.getTaskType(), + e); + } else { + log.debug("RunId {} produced record to topic {}, partition [{}] @ offset {}", runId, + m.topic(), m.partition(), m.offset()); + } + } + }); + startedTasksCache.get(runId.getId()).add(task.getTaskId().getId()); + log.debug("Sent task to queue: {}", task); + + } catch (Exception e) { + String message = "Could not start task " + task; + log.error(message, e); + throw new RuntimeException(e); + } + } }