diff --git a/styx-api-service/src/test/java/com/spotify/styx/api/StatusResourceTest.java b/styx-api-service/src/test/java/com/spotify/styx/api/StatusResourceTest.java index 3b927acfe8..eb26ff2569 100644 --- a/styx-api-service/src/test/java/com/spotify/styx/api/StatusResourceTest.java +++ b/styx-api-service/src/test/java/com/spotify/styx/api/StatusResourceTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableSet; import com.spotify.apollo.Environment; import com.spotify.apollo.Response; import com.spotify.apollo.Status; @@ -132,7 +133,7 @@ public void testEventsRoundtrip() throws Exception { sinceVersion(Api.Version.V3); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI_1, TRIGGER, TriggerParameters.zero()), 0L, 0L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI_1, "exec0", "img0"), 1L, 1L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI_1, ImmutableSet.of()), 1L, 1L)); storage.writeEvent(SequenceEvent.create(Event.started(WFI_1), 2L, 2L)); Response response = diff --git a/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java b/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java index 56bbaf8e1c..3f1f185614 100644 --- a/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java +++ b/styx-api-service/src/test/java/com/spotify/styx/api/WorkflowResourceTest.java @@ -30,6 +30,7 @@ import static com.spotify.styx.model.SequenceEvent.create; import static com.spotify.styx.serialization.Json.deserialize; import static com.spotify.styx.serialization.Json.serialize; +import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -49,6 +50,7 @@ import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken; import com.google.cloud.datastore.Datastore; +import com.google.common.collect.ImmutableSet; import com.spotify.apollo.Environment; import com.spotify.apollo.Response; import com.spotify.apollo.Status; @@ -342,8 +344,10 @@ public void shouldReturnWorkflowInstancesData() throws Exception { WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10"); storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00"))); - storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01"))); - storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01"))); + storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03"))); + storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04"))); Response response = awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances"))); @@ -359,7 +363,7 @@ public void shouldReturnWorkflowInstancesData() throws Exception { assertJson(response, "[0].triggers.[0].complete", is(false)); assertJson(response, "[0].triggers.[0].executions", hasSize(1)); assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec")); - assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img")); + assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1")); assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2)); assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED")); assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED")); @@ -383,8 +387,10 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception { WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10"); storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00"))); - storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01"))); - storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01"))); + storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03"))); + storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04"))); Response response = awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances?start=2016-08-10"))); @@ -400,7 +406,7 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception { assertJson(response, "[0].triggers.[0].complete", is(false)); assertJson(response, "[0].triggers.[0].executions", hasSize(1)); assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec")); - assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img")); + assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1")); assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2)); assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED")); assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED")); @@ -412,8 +418,10 @@ public void shouldReturnWorkflowInstanceData() throws Exception { WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10"); storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00"))); - storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01"))); - storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01"))); + storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02"))); + storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03"))); + storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04"))); Response response = awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances/2016-08-10"))); @@ -429,14 +437,14 @@ public void shouldReturnWorkflowInstanceData() throws Exception { assertJson(response, "triggers.[0].complete", is(false)); assertJson(response, "triggers.[0].executions", hasSize(1)); assertJson(response, "triggers.[0].executions.[0].execution_id", is("exec")); - assertJson(response, "triggers.[0].executions.[0].docker_image", is("img")); + assertJson(response, "triggers.[0].executions.[0].docker_image", is("busybox:1.1")); assertJson(response, "triggers.[0].executions.[0].statuses", hasSize(2)); assertJson(response, "triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED")); assertJson(response, "triggers.[0].executions.[0].statuses.[1].status", is("STARTED")); assertJson(response, "triggers.[0].executions.[0].statuses.[0].timestamp", - is("2016-08-10T07:00:01Z")); + is("2016-08-10T07:00:03Z")); assertJson(response, "triggers.[0].executions.[0].statuses.[1].timestamp", - is("2016-08-10T07:00:02Z")); + is("2016-08-10T07:00:04Z")); } @Test diff --git a/styx-common/src/main/java/com/spotify/styx/model/EventVisitor.java b/styx-common/src/main/java/com/spotify/styx/model/EventVisitor.java index ee263ed92f..98fbad39b7 100644 --- a/styx-common/src/main/java/com/spotify/styx/model/EventVisitor.java +++ b/styx-common/src/main/java/com/spotify/styx/model/EventVisitor.java @@ -51,11 +51,4 @@ R submit(@Getter WorkflowInstance workflowInstance, ExecutionDescription executi R timeout(@Getter WorkflowInstance workflowInstance); R halt(@Getter WorkflowInstance workflowInstance); - // Note: Do not make changes to these deprecated event method signatures - @Deprecated - R timeTrigger(@Getter WorkflowInstance workflowInstance); - @Deprecated - R created(@Getter WorkflowInstance workflowInstance, String executionId, String dockerImage); - @Deprecated - R retry(@Getter WorkflowInstance workflowInstance); } diff --git a/styx-common/src/main/java/com/spotify/styx/serialization/PersistentEvent.java b/styx-common/src/main/java/com/spotify/styx/serialization/PersistentEvent.java index 05ef84722d..ea379dd517 100644 --- a/styx-common/src/main/java/com/spotify/styx/serialization/PersistentEvent.java +++ b/styx-common/src/main/java/com/spotify/styx/serialization/PersistentEvent.java @@ -41,10 +41,8 @@ @JsonTypeInfo(use = Id.NAME, visible = true) @JsonSubTypes({ - @JsonSubTypes.Type(value = PersistentEvent.class, name = "timeTrigger"), @JsonSubTypes.Type(value = PersistentEvent.TriggerExecution.class, name = "triggerExecution"), @JsonSubTypes.Type(value = PersistentEvent.Info.class, name = "info"), - @JsonSubTypes.Type(value = PersistentEvent.Created.class, name = "created"), @JsonSubTypes.Type(value = PersistentEvent.Dequeue.class, name = "dequeue"), @JsonSubTypes.Type(value = PersistentEvent.Started.class, name = "started"), @JsonSubTypes.Type(value = PersistentEvent.Terminate.class, name = "terminate"), @@ -71,10 +69,6 @@ public class PersistentEvent { public static class SerializerVisitor implements EventVisitor { - @Override - public PersistentEvent timeTrigger(WorkflowInstance workflowInstance) { - return new PersistentEvent("timeTrigger", workflowInstance.toKey()); - } @Override public PersistentEvent triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, @@ -87,11 +81,6 @@ public PersistentEvent info(WorkflowInstance workflowInstance, Message message) return new Info(workflowInstance.toKey(), message); } - @Override - public PersistentEvent created(WorkflowInstance workflowInstance, String executionId, String dockerImage) { - return new Created(workflowInstance.toKey(), executionId, Optional.of(dockerImage)); - } - @Override public PersistentEvent dequeue(WorkflowInstance workflowInstance, Set resourceIds) { return new Dequeue(workflowInstance.toKey(), Optional.of(resourceIds)); @@ -133,11 +122,6 @@ public PersistentEvent retryAfter(WorkflowInstance workflowInstance, long delayM return new RetryAfter(workflowInstance.toKey(), delayMillis); } - @Override - public PersistentEvent retry(WorkflowInstance workflowInstance) { - return new PersistentEvent("retry", workflowInstance.toKey()); - } - @Override public PersistentEvent stop(WorkflowInstance workflowInstance) { return new PersistentEvent("stop", workflowInstance.toKey()); @@ -169,12 +153,8 @@ public static PersistentEvent wrap(Event event) { public Event toEvent() { final WorkflowInstance workflowInstance = WorkflowInstance.parseKey(this.workflowInstance); switch (type) { - case "timeTrigger": - return Event.timeTrigger(workflowInstance); case "success": return Event.success(workflowInstance); - case "retry": - return Event.retry(workflowInstance); case "stop": return Event.stop(workflowInstance); case "timeout": @@ -239,28 +219,6 @@ public Event toEvent() { } } - - public static class Created extends PersistentEvent { - - public final String executionId; - public final String dockerImage; - - @JsonCreator - public Created( - @JsonProperty("workflow_instance") String workflowInstance, - @JsonProperty("execution_id") String executionId, - @JsonProperty("docker_image") Optional dockerImage) { - super("created", workflowInstance); - this.executionId = executionId; - this.dockerImage = dockerImage.orElse("UNKNOWN"); - } - - @Override - public Event toEvent() { - return Event.created(WorkflowInstance.parseKey(workflowInstance), executionId, dockerImage); - } - } - public static class Submitted extends PersistentEvent { public final String executionId; diff --git a/styx-common/src/main/java/com/spotify/styx/testdata/TestData.java b/styx-common/src/main/java/com/spotify/styx/testdata/TestData.java index ec8366b67d..5064ce1c57 100644 --- a/styx-common/src/main/java/com/spotify/styx/testdata/TestData.java +++ b/styx-common/src/main/java/com/spotify/styx/testdata/TestData.java @@ -158,6 +158,14 @@ public final class TestData { .commitSha(VALID_SHA) .build(); + public static final ExecutionDescription EXECUTION_DESCRIPTION2 = + ExecutionDescription.builder() + .dockerImage("busybox:1.2") + .dockerArgs("foo", "bar") + .secret(WorkflowConfiguration.Secret.create("secret", "/dev/null")) + .commitSha(VALID_SHA) + .build(); + public static final Workflow WORKFLOW_WITH_RESOURCES = Workflow.create(WORKFLOW_ID.componentId(), HOURLY_WORKFLOW_CONFIGURATION_WITH_RESOURCES); diff --git a/styx-common/src/main/java/com/spotify/styx/util/EventUtil.java b/styx-common/src/main/java/com/spotify/styx/util/EventUtil.java index 7629cf0727..821f99b34c 100644 --- a/styx-common/src/main/java/com/spotify/styx/util/EventUtil.java +++ b/styx-common/src/main/java/com/spotify/styx/util/EventUtil.java @@ -55,11 +55,6 @@ public static String info(Event event) { private enum EventInfoVisitor implements EventVisitor { INSTANCE; - @Override - public String timeTrigger(WorkflowInstance workflowInstance) { - return ""; - } - @Override public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters parameters) { @@ -72,11 +67,6 @@ public String info(WorkflowInstance workflowInstance, Message message) { return message.line(); } - @Override - public String created(WorkflowInstance workflowInstance, String executionId, String dockerImage) { - return String.format("Execution id: %s, Docker image: %s", executionId, dockerImage); - } - @Override public String dequeue(WorkflowInstance workflowInstance, Set resourceIds) { return ""; @@ -107,11 +97,6 @@ public String retryAfter(WorkflowInstance workflowInstance, long delayMillis) { return String.format("Delay (seconds): %d", TimeUnit.MILLISECONDS.toSeconds(delayMillis)); } - @Override - public String retry(WorkflowInstance workflowInstance) { - return ""; - } - @Override public String stop(WorkflowInstance workflowInstance) { return ""; @@ -145,11 +130,6 @@ public String submitted(WorkflowInstance workflowInstance, String executionId, S private enum EventNameVisitor implements EventVisitor { INSTANCE; - @Override - public String timeTrigger(WorkflowInstance workflowInstance) { - return "timeTrigger"; - } - @Override public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters parameters) { @@ -166,12 +146,6 @@ public String dequeue(WorkflowInstance workflowInstance, Set resourceIds return "dequeue"; } - @Override - public String created(WorkflowInstance workflowInstance, String executionId, - String dockerImage) { - return "created"; - } - @Override public String started(WorkflowInstance workflowInstance) { return "started"; @@ -197,11 +171,6 @@ public String retryAfter(WorkflowInstance workflowInstance, long delayMillis) { return "retryAfter"; } - @Override - public String retry(WorkflowInstance workflowInstance) { - return "retry"; - } - @Override public String stop(WorkflowInstance workflowInstance) { return "stop"; diff --git a/styx-common/src/test/java/com/spotify/styx/serialization/JsonTest.java b/styx-common/src/test/java/com/spotify/styx/serialization/JsonTest.java index 6158de41a2..c1779cd59e 100644 --- a/styx-common/src/test/java/com/spotify/styx/serialization/JsonTest.java +++ b/styx-common/src/test/java/com/spotify/styx/serialization/JsonTest.java @@ -32,7 +32,7 @@ public class JsonTest { - private static final Event EVENT = Event.retry(TestData.WORKFLOW_INSTANCE); + private static final Event EVENT = Event.retryAfter(TestData.WORKFLOW_INSTANCE, 0); private static final Trigger TRIGGER = Trigger.adhoc("foobar"); @Test diff --git a/styx-common/src/test/java/com/spotify/styx/serialization/PersistentEventTest.java b/styx-common/src/test/java/com/spotify/styx/serialization/PersistentEventTest.java index 688c018fdc..52e8cb40b8 100644 --- a/styx-common/src/test/java/com/spotify/styx/serialization/PersistentEventTest.java +++ b/styx-common/src/test/java/com/spotify/styx/serialization/PersistentEventTest.java @@ -66,10 +66,8 @@ public class PersistentEventTest { @Test public void testRoundtripAllEvents() throws Exception { - assertRoundtrip(Event.timeTrigger(INSTANCE1)); assertRoundtrip(Event.triggerExecution(INSTANCE1, UNKNOWN_TRIGGER, TRIGGER_PARAMETERS)); assertRoundtrip(Event.info(INSTANCE1, Message.info("InfoMessage"))); - assertRoundtrip(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE)); assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of("some-resource"))); assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of())); assertRoundtrip(Event.started(INSTANCE1)); @@ -77,7 +75,6 @@ public void testRoundtripAllEvents() throws Exception { assertRoundtrip(Event.runError(INSTANCE1, "ErrorMessage")); assertRoundtrip(Event.success(INSTANCE1)); assertRoundtrip(Event.retryAfter(INSTANCE1, 12345)); - assertRoundtrip(Event.retry(INSTANCE1)); assertRoundtrip(Event.stop(INSTANCE1)); assertRoundtrip(Event.timeout(INSTANCE1)); assertRoundtrip(Event.halt(INSTANCE1)); @@ -87,14 +84,12 @@ public void testRoundtripAllEvents() throws Exception { @Test public void testDeserializeFromJson() throws Exception { - assertThat(deserializeEvent(json("timeTrigger")), is(Event.timeTrigger(INSTANCE1))); assertThat(deserializeEvent(json("dequeue", "\"resource_ids\":[\"quux\"]")), is(Event.dequeue(INSTANCE1, ImmutableSet.of("quux")))); assertThat(deserializeEvent(json("dequeue")), is(Event.dequeue(INSTANCE1, ImmutableSet.of()))); assertThat(deserializeEvent(json("started")), is(Event.started(INSTANCE1))); assertThat(deserializeEvent(json("success")), is(Event.success(INSTANCE1))); - assertThat(deserializeEvent(json("retry")), is(Event.retry(INSTANCE1))); assertThat(deserializeEvent(json("stop")), is(Event.stop(INSTANCE1))); assertThat(deserializeEvent(json("timeout")), is(Event.timeout(INSTANCE1))); assertThat(deserializeEvent(json("halt")), is(Event.halt(INSTANCE1))); @@ -123,10 +118,6 @@ public void testDeserializeFromJson() throws Exception { deserializeEvent(json("submitted", "\"execution_id\":\"" + POD_NAME + "\",\"runner_id\":\"" + RUNNER_ID + "\"")), is(Event.submitted(INSTANCE1, POD_NAME, RUNNER_ID))); - assertThat( - deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\",\"docker_image\":\"" + DOCKER_IMAGE - + "\"")), - is(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE))); assertThat( deserializeEvent(json("runError", "\"message\":\"ErrorMessage\"")), is(Event.runError(INSTANCE1, "ErrorMessage"))); @@ -168,9 +159,6 @@ public void testDeserializeFromJsonWhenTransformationRequired() throws Exception assertThat( deserializeEvent(json("started", "\"pod_name\":\"" + POD_NAME + "\"")), is(Event.started(INSTANCE1))); // for backwards compatibility - assertThat( - deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\"")), - is(Event.created(INSTANCE1, POD_NAME, "UNKNOWN"))); assertThat( deserializeEvent(json("triggerExecution")), is(Event.triggerExecution(INSTANCE1, TRIGGER_UNKNOWN, TriggerParameters.zero()))); diff --git a/styx-scheduler-service/src/main/java/com/spotify/styx/docker/KubernetesDockerRunner.java b/styx-scheduler-service/src/main/java/com/spotify/styx/docker/KubernetesDockerRunner.java index ca3ebb14f5..111971fe07 100644 --- a/styx-scheduler-service/src/main/java/com/spotify/styx/docker/KubernetesDockerRunner.java +++ b/styx-scheduler-service/src/main/java/com/spotify/styx/docker/KubernetesDockerRunner.java @@ -788,11 +788,6 @@ private void closeWatch() { // fixme: add a Cause enum to the runError() event instead of this string matching private static class PullImageErrorMatcher implements EventVisitor { - @Override - public Boolean timeTrigger(WorkflowInstance workflowInstance) { - return false; - } - @Override public Boolean triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters parameters) { @@ -809,11 +804,6 @@ public Boolean dequeue(WorkflowInstance workflowInstance, Set resourceId return false; } - @Override - public Boolean created(WorkflowInstance workflowInstance, String executionId, String dockerImage) { - return false; - } - @Override public Boolean submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription, String executionId) { @@ -850,11 +840,6 @@ public Boolean retryAfter(WorkflowInstance workflowInstance, long delayMillis) { return false; } - @Override - public Boolean retry(WorkflowInstance workflowInstance) { - return false; - } - @Override public Boolean stop(WorkflowInstance workflowInstance) { return false; diff --git a/styx-scheduler-service/src/test/java/com/spotify/styx/api/SchedulerResourceTest.java b/styx-scheduler-service/src/test/java/com/spotify/styx/api/SchedulerResourceTest.java index 603c5e05a1..6f0e10f764 100644 --- a/styx-scheduler-service/src/test/java/com/spotify/styx/api/SchedulerResourceTest.java +++ b/styx-scheduler-service/src/test/java/com/spotify/styx/api/SchedulerResourceTest.java @@ -276,8 +276,8 @@ public void testInjectTimeoutEvent() throws Exception { } @Test - public void shouldFailOnInjectRetryEvent() throws Exception { - Event injectedEvent = Event.retry(WFI); + public void shouldFailOnInjectRetryAfterEvent() throws Exception { + Event injectedEvent = Event.retryAfter(WFI, 0); ByteString eventPayload = serialize(injectedEvent); CompletionStage> post = serviceHelper.request("POST", BASE + "/events", eventPayload); diff --git a/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java b/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java index dd8bd9cff7..c50ccf9a37 100644 --- a/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java +++ b/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java @@ -363,7 +363,7 @@ public void shouldRejectTriggerIfIsClosed() throws Exception { public void shouldRejectEventIfClosed() throws Exception { stateManager.close(); exception.expect(IsClosedException.class); - stateManager.receive(Event.timeTrigger(INSTANCE)); + stateManager.receive(Event.triggerExecution(INSTANCE, Trigger.natural(), TriggerParameters.zero())); } @Test diff --git a/styx-scheduler-service/src/test/java/com/spotify/styx/state/handlers/TimeoutHandlerTest.java b/styx-scheduler-service/src/test/java/com/spotify/styx/state/handlers/TimeoutHandlerTest.java index 58ea5ab715..db326ea608 100644 --- a/styx-scheduler-service/src/test/java/com/spotify/styx/state/handlers/TimeoutHandlerTest.java +++ b/styx-scheduler-service/src/test/java/com/spotify/styx/state/handlers/TimeoutHandlerTest.java @@ -53,7 +53,7 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnitParamsRunner.class) -public class TimeoutHandlerTest { +public class TimeoutHandlerTest { private Instant now = Instant.parse("2016-12-02T22:00:00Z"); private Time time = () -> now; diff --git a/styx-service-common/src/main/java/com/spotify/styx/state/RunState.java b/styx-service-common/src/main/java/com/spotify/styx/state/RunState.java index d232bf03dc..b33a5bd579 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/state/RunState.java +++ b/styx-service-common/src/main/java/com/spotify/styx/state/RunState.java @@ -123,23 +123,6 @@ private RunState transitionUpdates(Instant instant) { private class TransitionVisitor implements EventVisitor { - @Deprecated - @Override - public RunState timeTrigger(WorkflowInstance workflowInstance) { - switch (state()) { - case NEW: - return state( // for backwards compatibility - SUBMITTED, - data().builder() - .trigger(Trigger.unknown("UNKNOWN")) - .triggerId("UNKNOWN") // for backwards compatibility - .build()); - - default: - throw illegalTransition("timeTrigger"); - } - } - @Override public RunState triggerExecution(WorkflowInstance workflowInstance, Trigger trigger, TriggerParameters parameters) { @@ -158,26 +141,6 @@ public RunState triggerExecution(WorkflowInstance workflowInstance, Trigger trig } } - @Deprecated - @Override - public RunState created(WorkflowInstance workflowInstance, String executionId, - String dockerImage) { - switch (state()) { - case PREPARE: - case QUEUED: - return state( - SUBMITTED, // for backwards compatibility - data().builder() - .executionId(executionId) - .executionDescription(ExecutionDescription.forImage(dockerImage)) - .tries(data().tries() + 1) - .build()); - - default: - throw illegalTransition("created"); - } - } - @Override public RunState info(WorkflowInstance workflowInstance, Message message) { switch (state()) { @@ -249,8 +212,11 @@ public RunState submitted(WorkflowInstance workflowInstance, String executionId, public RunState started(WorkflowInstance workflowInstance) { switch (state()) { case SUBMITTED: - case PREPARE: return state(RUNNING); + case PREPARE: + return state(RUNNING, data().builder() + .tries(data().tries() + 1) + .build()); default: throw illegalTransition("started"); @@ -364,20 +330,6 @@ public RunState retryAfter(WorkflowInstance workflowInstance, long delayMillis) } } - @Deprecated - @Override - public RunState retry(WorkflowInstance workflowInstance) { - switch (state()) { - case TERMINATED: - case FAILED: - case QUEUED: - return state(PREPARE); - - default: - throw illegalTransition("retry"); - } - } - @Override public RunState stop(WorkflowInstance workflowInstance) { switch (state()) { diff --git a/styx-service-common/src/main/java/com/spotify/styx/storage/WFIExecutionBuilder.java b/styx-service-common/src/main/java/com/spotify/styx/storage/WFIExecutionBuilder.java index 4e9223d37e..b87c1c89f9 100644 --- a/styx-service-common/src/main/java/com/spotify/styx/storage/WFIExecutionBuilder.java +++ b/styx-service-common/src/main/java/com/spotify/styx/storage/WFIExecutionBuilder.java @@ -100,15 +100,6 @@ private void closeTrigger() { private class Reducer implements EventVisitor { - @Override - public Void timeTrigger(WorkflowInstance workflowInstance) { - currWorkflowInstance = workflowInstance; - completed = false; - - triggerTs = eventTs; - return null; - } - @Override public Void triggerExecution(WorkflowInstance workflowInstance, com.spotify.styx.state.Trigger trigger, TriggerParameters parameters) { @@ -133,17 +124,6 @@ public Void dequeue(WorkflowInstance workflowInstance, Set resourceIds) return null; } - @Override - public Void created(WorkflowInstance workflowInstance, String executionId, String dockerImage) { - currWorkflowInstance = workflowInstance; - currExecutionId = executionId; - currDockerImg = dockerImage; - - executionStatusList.add(ExecStatus.create(eventTs, Status.SUBMITTED.toString(), - Optional.empty())); - return null; - } - @Override public Void submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription, String executionId) { @@ -232,12 +212,6 @@ public Void retryAfter(WorkflowInstance workflowInstance, long delayMillis) { return null; } - @Override - public Void retry(WorkflowInstance workflowInstance) { - currWorkflowInstance = workflowInstance; - return null; - } - @Override public Void stop(WorkflowInstance workflowInstance) { currWorkflowInstance = workflowInstance; diff --git a/styx-service-common/src/test/java/com/spotify/styx/WorkflowInstanceEventFactory.java b/styx-service-common/src/test/java/com/spotify/styx/WorkflowInstanceEventFactory.java index cad1a2b261..ec3d4a375a 100644 --- a/styx-service-common/src/test/java/com/spotify/styx/WorkflowInstanceEventFactory.java +++ b/styx-service-common/src/test/java/com/spotify/styx/WorkflowInstanceEventFactory.java @@ -39,10 +39,6 @@ public WorkflowInstanceEventFactory(WorkflowInstance workflowInstance) { this.workflowInstance = workflowInstance; } - public Event timeTrigger() { - return Event.timeTrigger(workflowInstance); - } - public Event triggerExecution(Trigger trigger) { return triggerExecution(trigger, TriggerParameters.zero()); } @@ -55,10 +51,6 @@ public Event info(Message message) { return Event.info(workflowInstance, message); } - public Event created(String executionId, String dockerImage) { - return Event.created(workflowInstance, executionId, dockerImage); - } - public Event dequeue(Set resourceIds) { return Event.dequeue(workflowInstance, resourceIds); } @@ -95,10 +87,6 @@ public Event retryAfter(int delayMillis) { return Event.retryAfter(workflowInstance, delayMillis); } - public Event retry() { - return Event.retry(workflowInstance); - } - public Event stop() { return Event.stop(workflowInstance); } diff --git a/styx-service-common/src/test/java/com/spotify/styx/state/RunStateTest.java b/styx-service-common/src/test/java/com/spotify/styx/state/RunStateTest.java index dd6a5ecb39..0e4bf14760 100644 --- a/styx-service-common/src/test/java/com/spotify/styx/state/RunStateTest.java +++ b/styx-service-common/src/test/java/com/spotify/styx/state/RunStateTest.java @@ -32,6 +32,7 @@ import static com.spotify.styx.state.RunState.State.SUBMITTED; import static com.spotify.styx.state.RunState.State.SUBMITTING; import static com.spotify.styx.state.RunState.State.TERMINATED; +import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION2; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -141,25 +142,8 @@ public void testTransitionUpdates() { assertThat(transitioner.get(WORKFLOW_INSTANCE).counter(), is(0L)); } - @Test // for backwards compatibility - public void testTimeTriggerAndRetry() { - transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); - transitioner.receive(eventFactory.timeTrigger()); - transitioner.receive(eventFactory.started()); - transitioner.receive(eventFactory.terminate(1)); - transitioner.receive(eventFactory.retryAfter(777)); - - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(777L)); - - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.started()); - - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(RUNNING)); - } - @Test - public void testTimeTriggerAndRetry2() { + public void testTriggerAndRetryAfter() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); @@ -183,7 +167,10 @@ public void testTimeTriggerAndRetry2() { @Test public void testRunErrorOnCreating() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); - transitioner.receive(eventFactory.timeTrigger()); + transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, "exec1")); + transitioner.receive(eventFactory.submitted("exec1")); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); transitioner.receive(eventFactory.retryAfter(777)); @@ -192,7 +179,6 @@ public void testRunErrorOnCreating() { assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(777L)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1)); - transitioner.receive(eventFactory.retry()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); transitioner.receive(eventFactory.retryAfter(999)); @@ -218,7 +204,9 @@ public void testSetTrigger() { public void testSetExecutionId() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1)); transitioner.receive(eventFactory.started()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(RUNNING)); @@ -228,16 +216,16 @@ public void testSetExecutionId() { transitioner.receive(eventFactory.terminate(1)); transitioner.receive(eventFactory.retryAfter(999)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_2, DOCKER_IMAGE)); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_2)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2)); transitioner.receive(eventFactory.started()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(RUNNING)); assertThat( transitioner.get(WORKFLOW_INSTANCE).data().executionId(), equalTo(Optional.of(TEST_EXECUTION_ID_2))); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, QUEUED, - PREPARE, SUBMITTED, RUNNING)); + assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, QUEUED, + SUBMITTING, SUBMITTED, RUNNING)); } @Test @@ -269,23 +257,21 @@ public void testSubmitSetsExecutionId() { public void testSetsRetryDelay() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); transitioner.receive(eventFactory.retryAfter(777)); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(777L)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); transitioner.receive(eventFactory.retryAfter(999)); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryDelayMillis(), isPresentAndIs(999L)); - assertThat(outputs, contains(QUEUED, SUBMITTED, FAILED, QUEUED, PREPARE, SUBMITTED, - RUNNING, TERMINATED, QUEUED)); + assertThat(outputs, contains(QUEUED, PREPARE, FAILED, QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED)); } @Test @@ -310,37 +296,37 @@ public void testRetryDelayFromQueued() { } @Test - public void testRetryFromRunError() { + public void testRetryAfterFromRunError() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1)); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); - + transitioner.receive(eventFactory.retryAfter(0)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(SUBMITTED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); - assertThat(outputs, contains(QUEUED, SUBMITTED, FAILED, PREPARE, SUBMITTED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); + assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, FAILED, QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1)); } @Test - public void testManyRetriesFromRunError() { + public void testManyRetriesAfterFromRunError() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(SUBMITTED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(3)); - assertThat(outputs, contains(QUEUED, SUBMITTED, FAILED, PREPARE, SUBMITTED, FAILED, PREPARE, - SUBMITTED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, QUEUED, PREPARE, RUNNING, FAILED, QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(2)); } @@ -348,19 +334,17 @@ public void testManyRetriesFromRunError() { public void testMissingDependenciesAddsToCost() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE)); transitioner.receive(eventFactory.retryAfter(0)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE)); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryCost(), equalTo(0.2)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE, - SUBMITTED, RUNNING, TERMINATED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE, RUNNING, TERMINATED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0)); } @@ -368,19 +352,18 @@ public void testMissingDependenciesAddsToCost() { public void testMissingDependenciesIncrementsTries() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE)); transitioner.receive(eventFactory.retryAfter(0)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(RunState.MISSING_DEPS_EXIT_CODE)); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE, - SUBMITTED, RUNNING, TERMINATED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE, + RUNNING, TERMINATED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0)); } @@ -388,39 +371,39 @@ public void testMissingDependenciesIncrementsTries() { public void testErrorsAddsToCost() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); transitioner.receive(eventFactory.retryAfter(0)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(TERMINATED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().retryCost(), equalTo(2.0)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, QUEUED, PREPARE, - SUBMITTED, RUNNING, TERMINATED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE, + RUNNING, TERMINATED)); } @Test public void testFatalFromRunError() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); transitioner.receive(eventFactory.stop()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(ERROR)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, FAILED, ERROR)); + assertThat(outputs, contains(QUEUED, PREPARE, FAILED, ERROR)); } @Test - public void testSuccessFromTerm() { + public void testSuccessFromTerminated() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1)); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(0)); transitioner.receive(eventFactory.success()); @@ -428,77 +411,73 @@ public void testSuccessFromTerm() { assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(DONE)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(0)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, DONE)); + assertThat(outputs, contains(QUEUED, PREPARE, SUBMITTING, SUBMITTED, RUNNING, TERMINATED, DONE)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0)); } @Test - public void testRetryFromTerm() { + public void testRetryAfterFromTerminated() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(SUBMITTED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, PREPARE, SUBMITTED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1)); } @Test - public void testManyRetriesFromTerm() { + public void testManyRetriesAfterFromTerminated() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(7)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(SUBMITTED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(3)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(7)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, PREPARE, SUBMITTED, - RUNNING, TERMINATED, PREPARE, SUBMITTED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, QUEUED, PREPARE, + RUNNING, TERMINATED, QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(2)); } @Test - public void testFatalFromTerm() { + public void testFatalFromTerminated() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); transitioner.receive(eventFactory.stop()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(ERROR)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().lastExit(), isPresentAndIs(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, TERMINATED, ERROR)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, TERMINATED, ERROR)); } @Test - public void testRetryFromStartedThenRunError() { + public void testRetryAfterFromStartedThenRunError() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.retryAfter(0)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(SUBMITTED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(2)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, FAILED, PREPARE, SUBMITTED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(QUEUED)); + assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, QUEUED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(1)); } @@ -506,27 +485,25 @@ public void testRetryFromStartedThenRunError() { public void testFatalFromStartedThenRunError() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); transitioner.receive(eventFactory.stop()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(ERROR)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, FAILED, ERROR)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, ERROR)); } @Test public void testFailedFromTimeout() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.timeout()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(FAILED)); - assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, FAILED)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().consecutiveFailures(), equalTo(0)); } @@ -534,19 +511,19 @@ public void testFailedFromTimeout() { public void testRetriggerOfPartition() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.runError(TEST_ERROR_MESSAGE)); transitioner.receive(eventFactory.stop()); transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_2, DOCKER_IMAGE)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); transitioner.receive(eventFactory.started()); assertThat(transitioner.get(WORKFLOW_INSTANCE).state(), equalTo(RUNNING)); assertThat(transitioner.get(WORKFLOW_INSTANCE).data().tries(), equalTo(1)); - assertThat(outputs, contains(QUEUED, SUBMITTED, RUNNING, FAILED, ERROR, - QUEUED, SUBMITTED, RUNNING)); + assertThat(outputs, contains(QUEUED, PREPARE, RUNNING, FAILED, ERROR, + QUEUED, PREPARE, RUNNING)); } @Test @@ -605,26 +582,31 @@ public void testRunErrorFromQueuedState() { public void testStoresExecutedDockerImage() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE + "1")); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1)); assertThat( transitioner.get(WORKFLOW_INSTANCE).data().executionDescription().orElseThrow().dockerImage(), - equalTo(DOCKER_IMAGE + "1")); + equalTo(DOCKER_IMAGE)); } @Test public void testStoresLastExecutedDockerImage() { transitioner.initialize(RunState.fresh(WORKFLOW_INSTANCE)); transitioner.receive(eventFactory.triggerExecution(UNKNOWN_TRIGGER)); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE + "1")); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION, TEST_EXECUTION_ID_1)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_1)); transitioner.receive(eventFactory.started()); transitioner.receive(eventFactory.terminate(1)); - transitioner.receive(eventFactory.retry()); - transitioner.receive(eventFactory.created(TEST_EXECUTION_ID_1, DOCKER_IMAGE + "2")); + transitioner.receive(eventFactory.retryAfter(0)); + transitioner.receive(eventFactory.dequeue(ImmutableSet.of())); + transitioner.receive(eventFactory.submit(EXECUTION_DESCRIPTION2, TEST_EXECUTION_ID_2)); + transitioner.receive(eventFactory.submitted(TEST_EXECUTION_ID_2)); assertThat( transitioner.get(WORKFLOW_INSTANCE).data().executionDescription().orElseThrow().dockerImage(), - equalTo(DOCKER_IMAGE + "2")); + equalTo(EXECUTION_DESCRIPTION2.dockerImage())); } @Test diff --git a/styx-service-common/src/test/java/com/spotify/styx/storage/BigTableStorageTest.java b/styx-service-common/src/test/java/com/spotify/styx/storage/BigTableStorageTest.java index b5edd0f431..04f219c9e7 100644 --- a/styx-service-common/src/test/java/com/spotify/styx/storage/BigTableStorageTest.java +++ b/styx-service-common/src/test/java/com/spotify/styx/storage/BigTableStorageTest.java @@ -20,6 +20,8 @@ package com.spotify.styx.storage; +import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION; +import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION2; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -29,6 +31,7 @@ import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; +import com.google.common.collect.ImmutableSet; import com.spotify.styx.model.Event; import com.spotify.styx.model.SequenceEvent; import com.spotify.styx.model.TriggerParameters; @@ -100,29 +103,36 @@ private Connection setupBigTableMockTable(int numFailures) throws IOException { public void shouldReturnExecutionDataForWorkflowInstance() throws Exception { setUp(0); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI1, TRIGGER, TRIGGER_PARAMETERS), 0L, 0L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI1, "execId", "img"), 1L, 1L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI1, ImmutableSet.of()), 1L, 1L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI1, EXECUTION_DESCRIPTION, "execId"), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI1, "execId", "test"), 3L, 3L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 4L, 4L)); WorkflowInstanceExecutionData workflowInstanceExecutionData = storage.executionData(WFI1); assertThat(workflowInstanceExecutionData.triggers().get(0).triggerId(), is("triggerId")); assertThat(workflowInstanceExecutionData.triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId"))); - assertThat(workflowInstanceExecutionData.triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img"))); + assertThat(workflowInstanceExecutionData.triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.1"))); assertThat(workflowInstanceExecutionData.triggers().get(0).executions().get(0).statuses().get(0), is( - ExecStatus.create(Instant.ofEpochMilli(1L), "SUBMITTED", Optional.empty()))); + ExecStatus.create(Instant.ofEpochMilli(3L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.triggers().get(0).executions().get(0).statuses().get(1), is( - ExecStatus.create(Instant.ofEpochMilli(2L), "STARTED", Optional.empty()))); + ExecStatus.create(Instant.ofEpochMilli(4L), "STARTED", Optional.empty()))); } @Test public void shouldReturnExecutionDataForWorkflow() throws Exception { setUp(0); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI1, TRIGGER1, TRIGGER_PARAMETERS), 0L, 0L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI1, "execId1", "img1"), 1L, 1L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI1, ImmutableSet.of()), 1L, 1L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI1, EXECUTION_DESCRIPTION, "execId1"), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI1, "execId1", "test"), 3L, 3L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 4L, 4L)); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI2, TRIGGER2, TRIGGER_PARAMETERS), 0L, 3L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI2, "execId2", "img2"), 1L, 4L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI2, ImmutableSet.of()), 1L, 4L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI2, EXECUTION_DESCRIPTION2, "execId2"), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI2, "execId2", "test"), 3L, 6L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 4L, 7L)); List workflowInstanceExecutionData = storage.executionData(WORKFLOW_ID1, "", 100); @@ -131,18 +141,20 @@ public void shouldReturnExecutionDataForWorkflow() throws Exception { assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).triggerId(), is("triggerId1")); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId1"))); - assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img1"))); + assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.1"))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(0), is(ExecStatus.create(Instant.ofEpochMilli(1L), "SUBMITTED", Optional.empty()))); + .get(0), is(ExecStatus.create(Instant.ofEpochMilli(3L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(1), is(ExecStatus.create(Instant.ofEpochMilli(2L), "STARTED", Optional.empty()))); + .get(1), is(ExecStatus.create(Instant.ofEpochMilli(4L), "STARTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).triggerId(), is("triggerId2")); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId2"))); - assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img2"))); + assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.2"))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).statuses() - .get(0), is(ExecStatus.create(Instant.ofEpochMilli(4L), "SUBMITTED", Optional.empty()))); + .get(0), is(ExecStatus.create(Instant.ofEpochMilli(6L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).statuses() - .get(1), is(ExecStatus.create(Instant.ofEpochMilli(5L), "STARTED", Optional.empty()))); + .get(1), is(ExecStatus.create(Instant.ofEpochMilli(7L), "STARTED", Optional.empty()))); } @Test @@ -177,12 +189,16 @@ public void shouldLimitExecutionDataForWorkflow() throws Exception { public void shouldReturnRangeOfExecutionDataForWorkflow() throws Exception { setUp(0); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI1, TRIGGER1, TRIGGER_PARAMETERS), 0L, 0L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI1, "execId1", "img1"), 1L, 1L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI1, ImmutableSet.of()), 1L, 1L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI1, EXECUTION_DESCRIPTION, "execId1"), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI1, "execId1", "test"), 3L, 3L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 4L, 4L)); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI2, TRIGGER2, TRIGGER_PARAMETERS), 0L, 3L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI2, "execId2", "img2"), 1L, 4L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI2, ImmutableSet.of()), 1L, 4L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI2, EXECUTION_DESCRIPTION2, "execId2"), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI2, "execId2", "test"), 3L, 6L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 4L, 7L)); List workflowInstanceExecutionData = storage.executionData(WORKFLOW_ID1, WFI1.parameter(), ""); @@ -191,30 +207,36 @@ public void shouldReturnRangeOfExecutionDataForWorkflow() throws Exception { assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).triggerId(), is("triggerId1")); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId1"))); - assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img1"))); + assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.1"))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(0), is(ExecStatus.create(Instant.ofEpochMilli(1L), "SUBMITTED", Optional.empty()))); + .get(0), is(ExecStatus.create(Instant.ofEpochMilli(3L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(1), is(ExecStatus.create(Instant.ofEpochMilli(2L), "STARTED", Optional.empty()))); + .get(1), is(ExecStatus.create(Instant.ofEpochMilli(4L), "STARTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).triggerId(), is("triggerId2")); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId2"))); - assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img2"))); + assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.2"))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).statuses() - .get(0), is(ExecStatus.create(Instant.ofEpochMilli(4L), "SUBMITTED", Optional.empty()))); + .get(0), is(ExecStatus.create(Instant.ofEpochMilli(6L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(1).triggers().get(0).executions().get(0).statuses() - .get(1), is(ExecStatus.create(Instant.ofEpochMilli(5L), "STARTED", Optional.empty()))); + .get(1), is(ExecStatus.create(Instant.ofEpochMilli(7L), "STARTED", Optional.empty()))); } @Test - public void shouldReturnExecutionDataForOneWorkflow() throws Exception { + public void shouldReturnRangeOfExecutionDataExcludingStopValue() throws Exception { setUp(0); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI1, TRIGGER1, TRIGGER_PARAMETERS), 0L, 0L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI1, "execId1", "img1"), 1L, 1L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI1, ImmutableSet.of()), 1L, 1L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI1, EXECUTION_DESCRIPTION, "execId1"), 2L, 2L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI1, "execId1", "test"), 3L, 3L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI1), 4L, 4L)); storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI2, TRIGGER2, TRIGGER_PARAMETERS), 0L, 3L)); - storage.writeEvent(SequenceEvent.create(Event.created(WFI2, "execId2", "img2"), 1L, 4L)); - storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI2, ImmutableSet.of()), 1L, 4L)); + storage.writeEvent(SequenceEvent.create(Event.submit(WFI2, EXECUTION_DESCRIPTION2, "execId2"), 2L, 5L)); + storage.writeEvent(SequenceEvent.create(Event.submitted(WFI2, "execId2", "test"), 3L, 6L)); + storage.writeEvent(SequenceEvent.create(Event.started(WFI2), 4L, 7L)); List workflowInstanceExecutionData = storage.executionData(WORKFLOW_ID1, WFI1.parameter(), WFI2.parameter()); @@ -223,11 +245,12 @@ public void shouldReturnExecutionDataForOneWorkflow() throws Exception { assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).triggerId(), is("triggerId1")); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).executionId(), is(Optional.of("execId1"))); - assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), is(Optional.of("img1"))); + assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).dockerImage(), + is(Optional.of("busybox:1.1"))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(0), is(ExecStatus.create(Instant.ofEpochMilli(1L), "SUBMITTED", Optional.empty()))); + .get(0), is(ExecStatus.create(Instant.ofEpochMilli(3L), "SUBMITTED", Optional.empty()))); assertThat(workflowInstanceExecutionData.get(0).triggers().get(0).executions().get(0).statuses() - .get(1), is(ExecStatus.create(Instant.ofEpochMilli(2L), "STARTED", Optional.empty()))); + .get(1), is(ExecStatus.create(Instant.ofEpochMilli(4L), "STARTED", Optional.empty()))); } @Test diff --git a/styx-service-common/src/test/java/com/spotify/styx/storage/WFIExecutionBuilderTest.java b/styx-service-common/src/test/java/com/spotify/styx/storage/WFIExecutionBuilderTest.java index d780c153f4..7796109529 100644 --- a/styx-service-common/src/test/java/com/spotify/styx/storage/WFIExecutionBuilderTest.java +++ b/styx-service-common/src/test/java/com/spotify/styx/storage/WFIExecutionBuilderTest.java @@ -159,7 +159,6 @@ public void testGeneralExample() { SequenceEvent.create(E.terminate(RunState.MISSING_DEPS_EXIT_CODE), c++, ts("07:58")), SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")), - SequenceEvent.create(E.retry(), c++, ts("08:56")), SequenceEvent.create(E.submit(desc("img2"), "exec-id-01"), c++, ts("08:55")), SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")), SequenceEvent.create(E.started(), c++, ts("08:57")), @@ -174,7 +173,6 @@ public void testGeneralExample() { SequenceEvent.create(E.terminate(1), c++, ts("09:58")), SequenceEvent.create(E.retryAfter(10), c++, ts("09:59")), - SequenceEvent.create(E.retry(), c++, ts("10:56")), SequenceEvent.create(E.submit(desc("img4", "sha4"), "exec-id-11"), c++, ts("10:55")), SequenceEvent.create(E.submitted("exec-id-11"), c++, ts("10:56")), SequenceEvent.create(E.started(), c++, ts("10:57")) @@ -308,7 +306,6 @@ public void testTimeout() { SequenceEvent.create(E.timeout(), c++, ts("07:58")), SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")), - SequenceEvent.create(E.retry(), c++, ts("08:56")), SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")), SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")), SequenceEvent.create(E.started(), c++, ts("08:57")) @@ -407,7 +404,6 @@ public void testRunError() { SequenceEvent.create(E.runError("First failure"), c++, ts("07:58")), SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")), - SequenceEvent.create(E.retry(), c++, ts("08:56")), SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")), SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")), SequenceEvent.create(E.started(), c++, ts("08:57")), @@ -531,7 +527,6 @@ public void testStop() { SequenceEvent.create(E.runError("First failure"), c++, ts("07:58")), SequenceEvent.create(E.retryAfter(10), c++, ts("07:59")), - SequenceEvent.create(E.retry(), c++, ts("08:56")), SequenceEvent.create(E.submit(desc("img2", "sha2"), "exec-id-01"), c++, ts("08:55")), SequenceEvent.create(E.submitted("exec-id-01"), c++, ts("08:56")), SequenceEvent.create(E.started(), c++, ts("08:57")),