Skip to content
This repository was archived by the owner on Jul 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public RunState dequeue(WorkflowInstance workflowInstance, Set<String> resourceI
public RunState submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription,
String executionId) {
switch (state()) {
case QUEUED: // for backwards compatibility
case PREPARE:
return state(
SUBMITTING,
Expand Down Expand Up @@ -213,10 +212,6 @@ public RunState started(WorkflowInstance workflowInstance) {
switch (state()) {
case SUBMITTED:
return state(RUNNING);
case PREPARE:
return state(RUNNING, data().builder()
.tries(data().tries() + 1)
.build());

default:
throw illegalTransition("started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public void testSetExecutionId() {

transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.retryAfter(999));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());
Expand All @@ -225,7 +226,7 @@ public void testSetExecutionId() {
transitioner.get(WORKFLOW_INSTANCE).data().executionId(),
equalTo(Optional.of(TEST_EXECUTION_ID_2)));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED,
SUBMITTING, SUBMITTED, RUNNING));
PREPARE, SUBMITTING, SUBMITTED, RUNNING));
}

@Test
Expand Down Expand Up @@ -265,13 +266,16 @@ public void testSetsRetryDelay() {
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(777L));

transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.retryAfter(999));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(999L));
assertThat(outputs, contains(QUEUED, PREPARE, FAILED, QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED));
assertThat(outputs, contains(QUEUED, PREPARE, FAILED, QUEUED,
PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED));
}

@Test
Expand Down Expand Up @@ -316,17 +320,22 @@ public void testManyRetriesAfterFromRunError() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE));
transitioner.receive(eventFactory.retryAfter(0));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE));
transitioner.receive(eventFactory.retryAfter(0));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, QUEUED, PREPARE, RUNNING, FAILED, QUEUED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, FAILED, QUEUED, PREPARE,
SUBMITTING, SUBMITTED, RUNNING, FAILED, QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(2));
}

Expand All @@ -335,16 +344,21 @@ public void testMissingDependenciesAddsToCost() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE));
transitioner.receive(eventFactory.retryAfter(0));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryCost(), equalTo(0.2));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE, RUNNING, TERMINATED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE,
SUBMITTING, SUBMITTED, RUNNING, TERMINATED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0));
}

Expand All @@ -353,17 +367,21 @@ public void testMissingDependenciesIncrementsTries() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE));
transitioner.receive(eventFactory.retryAfter(0));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE,
RUNNING, TERMINATED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE,
SUBMITTING, SUBMITTED, RUNNING, TERMINATED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0));
}

Expand All @@ -372,17 +390,21 @@ public void testErrorsAddsToCost() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.retryAfter(0));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryCost(), equalTo(2.0));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE,
RUNNING, TERMINATED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE,
SUBMITTING, SUBMITTED, RUNNING, TERMINATED));
}

@Test
Expand Down Expand Up @@ -420,14 +442,16 @@ public void testRetryAfterFromTerminated() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.retryAfter(0));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(1));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1));
}

Expand All @@ -436,19 +460,23 @@ public void testManyRetriesAfterFromTerminated() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.retryAfter(0));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(7));
transitioner.receive(eventFactory.retryAfter(0));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(7));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE,
RUNNING, TERMINATED, QUEUED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE,
SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(2));
}

Expand All @@ -457,27 +485,31 @@ public void testFatalFromTerminated() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.terminate(1));
transitioner.receive(eventFactory.stop());

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(ERROR));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(1));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, ERROR));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, ERROR));
}

@Test
public void testRetryAfterFromStartedThenRunError() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE));
transitioner.receive(eventFactory.retryAfter(0));

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, QUEUED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, FAILED, QUEUED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1));
}

Expand All @@ -486,24 +518,28 @@ public void testFatalFromStartedThenRunError() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE));
transitioner.receive(eventFactory.stop());

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(ERROR));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, ERROR));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, FAILED, ERROR));
}

@Test
public void testFailedFromTimeout() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.timeout());

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(FAILED));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, FAILED));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0));
}

Expand All @@ -512,18 +548,22 @@ public void testRetriggerOfPartition() {
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1));
transitioner.receive(eventFactory.started());
transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE));
transitioner.receive(eventFactory.stop());
transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE));
transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER));
transitioner.receive(eventFactory.dequeue(ImmutableSet.of()));
transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2));
transitioner.receive(eventFactory.started());

assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(RUNNING));
assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1));
assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, ERROR,
QUEUED, PREPARE, RUNNING));
assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, FAILED, ERROR,
QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public void testGeneralExample() {
SequenceEvent.create(E.terminate(RunState.MISSING_DEPS_EXIT_CODE), c++, ts("07:58")),
SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")),

SequenceEvent.create(E.dequeue(RESOURCE_IDS), c++, ts("08:54")),
SequenceEvent.create(E.submit(desc("img2"), "exec-id-01"), c++, ts("08:55")),
SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")),
SequenceEvent.create(E.started(), c++, ts("08:57")),
Expand All @@ -173,6 +174,7 @@ public void testGeneralExample() {
SequenceEvent.create(E.terminate(1), c++, ts("09:58")),
SequenceEvent.create(E.retryAfter(10), c++, ts("09:59")),

SequenceEvent.create(E.dequeue(RESOURCE_IDS), c++, ts("10:54")),
SequenceEvent.create(E.submit(desc("img4", "sha4"), "exec-id-11"), c++, ts("10:55")),
SequenceEvent.create(E.submitted("exec-id-11"), c++, ts("10:56")),
SequenceEvent.create(E.started(), c++, ts("10:57"))
Expand Down Expand Up @@ -306,6 +308,7 @@ public void testTimeout() {
SequenceEvent.create(E.timeout(), c++, ts("07:58")),
SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")),

SequenceEvent.create(E.dequeue(RESOURCE_IDS), c++, ts("08:54")),
SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")),
SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")),
SequenceEvent.create(E.started(), c++, ts("08:57"))
Expand Down Expand Up @@ -404,6 +407,7 @@ public void testRunError() {
SequenceEvent.create(E.runError("First failure"), c++, ts("07:58")),
SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")),

SequenceEvent.create(E.dequeue(RESOURCE_IDS), c++, ts("08:54")),
SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")),
SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")),
SequenceEvent.create(E.started(), c++, ts("08:57")),
Expand Down Expand Up @@ -527,6 +531,7 @@ public void testStop() {
SequenceEvent.create(E.runError("First failure"), c++, ts("07:58")),
SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")),

SequenceEvent.create(E.dequeue(RESOURCE_IDS), c++, ts("08:54")),
SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")),
SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")),
SequenceEvent.create(E.started(), c++, ts("08:57")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void restoreRunStateWhenMissingEvent() throws Exception {
events.add(SequenceEvent.create(
Event.triggerExecution(WORKFLOW_INSTANCE, Trigger.backfill("bf-1"), TRIGGER_PARAMETERS), 1L, 1L));
events.add(SequenceEvent.create(Event.dequeue(WORKFLOW_INSTANCE, RESOURCE_IDS), 2L, 2L));
// missing Event.submit(WORKFLOW_INSTANCE, EXECUTION_DESCRIPTION, "exec-1")
events.add(SequenceEvent.create(Event.submit(WORKFLOW_INSTANCE, EXECUTION_DESCRIPTION, "exec-1"), 3L, 3L));
Copy link
Member

@honnix honnix May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done on purpose in #609, as reflected by the name of this test case.

events.add(SequenceEvent.create(Event.submitted(WORKFLOW_INSTANCE, "exec-1", "test"), 4L, 4L));
events.add(SequenceEvent.create(Event.started(WORKFLOW_INSTANCE), 5L, 5L));
events.add(SequenceEvent.create(Event.terminate(WORKFLOW_INSTANCE, Optional.of(0)), 6L, 6L));
Expand Down