Skip to content
This repository was archived by the owner on Jul 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import com.spotify.apollo.Environment;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void testEventsRoundtrip() throws Exception {
sinceVersion(Api.Version.V3);

storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI_1, TRIGGER, TriggerParameters.zero()), 0L, 0L));
storage.writeEvent(SequenceEvent.create(Event.created(WFI_1, "exec0", "img0"), 1L, 1L));
storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI_1, ImmutableSet.of()), 1L, 1L));
storage.writeEvent(SequenceEvent.create(Event.started(WFI_1), 2L, 2L));

Response<ByteString> response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.spotify.styx.model.SequenceEvent.create;
import static com.spotify.styx.serialization.Json.deserialize;
import static com.spotify.styx.serialization.Json.serialize;
import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -49,6 +50,7 @@

import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken;
import com.google.cloud.datastore.Datastore;
import com.google.common.collect.ImmutableSet;
import com.spotify.apollo.Environment;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
Expand Down Expand Up @@ -342,8 +344,10 @@ public void shouldReturnWorkflowInstancesData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances")));
Expand All @@ -359,7 +363,7 @@ public void shouldReturnWorkflowInstancesData() throws Exception {
assertJson(response, "[0].triggers.[0].complete", is(false));
assertJson(response, "[0].triggers.[0].executions", hasSize(1));
assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
Expand All @@ -383,8 +387,10 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances?start=2016-08-10")));
Expand All @@ -400,7 +406,7 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception {
assertJson(response, "[0].triggers.[0].complete", is(false));
assertJson(response, "[0].triggers.[0].executions", hasSize(1));
assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
Expand All @@ -412,8 +418,10 @@ public void shouldReturnWorkflowInstanceData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances/2016-08-10")));
Expand All @@ -429,14 +437,14 @@ public void shouldReturnWorkflowInstanceData() throws Exception {
assertJson(response, "triggers.[0].complete", is(false));
assertJson(response, "triggers.[0].executions", hasSize(1));
assertJson(response, "triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
assertJson(response, "triggers.[0].executions.[0].statuses.[0].timestamp",
is("2016-08-10T07:00:01Z"));
is("2016-08-10T07:00:03Z"));
assertJson(response, "triggers.[0].executions.[0].statuses.[1].timestamp",
is("2016-08-10T07:00:02Z"));
is("2016-08-10T07:00:04Z"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,5 @@ R submit(@Getter WorkflowInstance workflowInstance, ExecutionDescription executi

// Note: Do not make changes to these deprecated event method signatures
@Deprecated
R timeTrigger(@Getter WorkflowInstance workflowInstance);
@Deprecated
R created(@Getter WorkflowInstance workflowInstance, String executionId, String dockerImage);
@Deprecated
R retry(@Getter WorkflowInstance workflowInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@

@JsonTypeInfo(use = Id.NAME, visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = PersistentEvent.class, name = "timeTrigger"),
@JsonSubTypes.Type(value = PersistentEvent.TriggerExecution.class, name = "triggerExecution"),
@JsonSubTypes.Type(value = PersistentEvent.Info.class, name = "info"),
@JsonSubTypes.Type(value = PersistentEvent.Created.class, name = "created"),
@JsonSubTypes.Type(value = PersistentEvent.Dequeue.class, name = "dequeue"),
@JsonSubTypes.Type(value = PersistentEvent.Started.class, name = "started"),
@JsonSubTypes.Type(value = PersistentEvent.Terminate.class, name = "terminate"),
Expand All @@ -71,10 +69,6 @@ public class PersistentEvent {

public static class SerializerVisitor implements EventVisitor<PersistentEvent> {

@Override
public PersistentEvent timeTrigger(WorkflowInstance workflowInstance) {
return new PersistentEvent("timeTrigger", workflowInstance.toKey());
}

@Override
public PersistentEvent triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
Expand All @@ -87,11 +81,6 @@ public PersistentEvent info(WorkflowInstance workflowInstance, Message message)
return new Info(workflowInstance.toKey(), message);
}

@Override
public PersistentEvent created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return new Created(workflowInstance.toKey(), executionId, Optional.of(dockerImage));
}

@Override
public PersistentEvent dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds) {
return new Dequeue(workflowInstance.toKey(), Optional.of(resourceIds));
Expand Down Expand Up @@ -169,8 +158,6 @@ public static PersistentEvent wrap(Event event) {
public Event toEvent() {
final WorkflowInstance workflowInstance = WorkflowInstance.parseKey(this.workflowInstance);
switch (type) {
case "timeTrigger":
return Event.timeTrigger(workflowInstance);
case "success":
return Event.success(workflowInstance);
case "retry":
Expand Down Expand Up @@ -239,28 +226,6 @@ public Event toEvent() {
}
}


public static class Created extends PersistentEvent {

public final String executionId;
public final String dockerImage;

@JsonCreator
public Created(
@JsonProperty("workflow_instance") String workflowInstance,
@JsonProperty("execution_id") String executionId,
@JsonProperty("docker_image") Optional<String> dockerImage) {
super("created", workflowInstance);
this.executionId = executionId;
this.dockerImage = dockerImage.orElse("UNKNOWN");
}

@Override
public Event toEvent() {
return Event.created(WorkflowInstance.parseKey(workflowInstance), executionId, dockerImage);
}
}

public static class Submitted extends PersistentEvent {

public final String executionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ public final class TestData {
.commitSha(VALID_SHA)
.build();

public static final ExecutionDescription EXECUTION_DESCRIPTION2 =
ExecutionDescription.builder()
.dockerImage("busybox:1.2")
.dockerArgs("foo", "bar")
.secret(WorkflowConfiguration.Secret.create("secret", "/dev/null"))
.commitSha(VALID_SHA)
.build();

public static final Workflow WORKFLOW_WITH_RESOURCES = Workflow.create(WORKFLOW_ID.componentId(),
HOURLY_WORKFLOW_CONFIGURATION_WITH_RESOURCES);

Expand Down
21 changes: 0 additions & 21 deletions styx-common/src/main/java/com/spotify/styx/util/EventUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public static String info(Event event) {
private enum EventInfoVisitor implements EventVisitor<String> {
INSTANCE;

@Override
public String timeTrigger(WorkflowInstance workflowInstance) {
return "";
}

@Override
public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -72,11 +67,6 @@ public String info(WorkflowInstance workflowInstance, Message message) {
return message.line();
}

@Override
public String created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return String.format("Execution id: %s, Docker image: %s", executionId, dockerImage);
}

@Override
public String dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds) {
return "";
Expand Down Expand Up @@ -145,11 +135,6 @@ public String submitted(WorkflowInstance workflowInstance, String executionId, S
private enum EventNameVisitor implements EventVisitor<String> {
INSTANCE;

@Override
public String timeTrigger(WorkflowInstance workflowInstance) {
return "timeTrigger";
}

@Override
public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -166,12 +151,6 @@ public String dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds
return "dequeue";
}

@Override
public String created(WorkflowInstance workflowInstance, String executionId,
String dockerImage) {
return "created";
}

@Override
public String started(WorkflowInstance workflowInstance) {
return "started";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ public class PersistentEventTest {

@Test
public void testRoundtripAllEvents() throws Exception {
assertRoundtrip(Event.timeTrigger(INSTANCE1));
assertRoundtrip(Event.triggerExecution(INSTANCE1, UNKNOWN_TRIGGER, TRIGGER_PARAMETERS));
assertRoundtrip(Event.info(INSTANCE1, Message.info("InfoMessage")));
assertRoundtrip(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE));
assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of("some-resource")));
assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of()));
assertRoundtrip(Event.started(INSTANCE1));
Expand All @@ -87,7 +85,6 @@ public void testRoundtripAllEvents() throws Exception {

@Test
public void testDeserializeFromJson() throws Exception {
assertThat(deserializeEvent(json("timeTrigger")), is(Event.timeTrigger(INSTANCE1)));
assertThat(deserializeEvent(json("dequeue", "\"resource_ids\":[\"quux\"]")),
is(Event.dequeue(INSTANCE1, ImmutableSet.of("quux"))));
assertThat(deserializeEvent(json("dequeue")),
Expand Down Expand Up @@ -123,10 +120,6 @@ public void testDeserializeFromJson() throws Exception {
deserializeEvent(json("submitted", "\"execution_id\":\"" + POD_NAME + "\",\"runner_id\":\"" + RUNNER_ID
+ "\"")),
is(Event.submitted(INSTANCE1, POD_NAME, RUNNER_ID)));
assertThat(
deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\",\"docker_image\":\"" + DOCKER_IMAGE
+ "\"")),
is(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE)));
assertThat(
deserializeEvent(json("runError", "\"message\":\"ErrorMessage\"")),
is(Event.runError(INSTANCE1, "ErrorMessage")));
Expand Down Expand Up @@ -168,9 +161,6 @@ public void testDeserializeFromJsonWhenTransformationRequired() throws Exception
assertThat(
deserializeEvent(json("started", "\"pod_name\":\"" + POD_NAME + "\"")),
is(Event.started(INSTANCE1))); // for backwards compatibility
assertThat(
deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\"")),
is(Event.created(INSTANCE1, POD_NAME, "UNKNOWN")));
assertThat(
deserializeEvent(json("triggerExecution")),
is(Event.triggerExecution(INSTANCE1, TRIGGER_UNKNOWN, TriggerParameters.zero())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,6 @@ private void closeWatch() {
// fixme: add a Cause enum to the runError() event instead of this string matching
private static class PullImageErrorMatcher implements EventVisitor<Boolean> {

@Override
public Boolean timeTrigger(WorkflowInstance workflowInstance) {
return false;
}

@Override
public Boolean triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -809,11 +804,6 @@ public Boolean dequeue(WorkflowInstance workflowInstance, Set<String> resourceId
return false;
}

@Override
public Boolean created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return false;
}

@Override
public Boolean submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription,
String executionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void shouldRejectTriggerIfIsClosed() throws Exception {
public void shouldRejectEventIfClosed() throws Exception {
stateManager.close();
exception.expect(IsClosedException.class);
stateManager.receive(Event.timeTrigger(INSTANCE));
stateManager.receive(Event.triggerExecution(INSTANCE, Trigger.natural(), TriggerParameters.zero()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.mockito.MockitoAnnotations;

@RunWith(JUnitParamsRunner.class)
public class TimeoutHandlerTest {
public class TimeoutHandlerTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unintended change.


private Instant now = Instant.parse("2016-12-02T22:00:00Z");
private Time time = () -> now;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,6 @@ private RunState transitionUpdates(Instant instant) {

private class TransitionVisitor implements EventVisitor<RunState> {

@Deprecated
@Override
public RunState timeTrigger(WorkflowInstance workflowInstance) {
switch (state()) {
case NEW:
return state( // for backwards compatibility
SUBMITTED,
data().builder()
.trigger(Trigger.unknown("UNKNOWN"))
.triggerId("UNKNOWN") // for backwards compatibility
.build());

default:
throw illegalTransition("timeTrigger");
}
}

@Override
public RunState triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -158,26 +141,6 @@ public RunState triggerExecution(WorkflowInstance workflowInstance, Trigger trig
}
}

@Deprecated
@Override
public RunState created(WorkflowInstance workflowInstance, String executionId,
String dockerImage) {
switch (state()) {
case PREPARE:
case QUEUED:
return state(
SUBMITTED, // for backwards compatibility
data().builder()
.executionId(executionId)
.executionDescription(ExecutionDescription.forImage(dockerImage))
.tries(data().tries() + 1)
.build());

default:
throw illegalTransition("created");
}
}

@Override
public RunState info(WorkflowInstance workflowInstance, Message message) {
switch (state()) {
Expand Down Expand Up @@ -249,8 +212,11 @@ public RunState submitted(WorkflowInstance workflowInstance, String executionId,
public RunState started(WorkflowInstance workflowInstance) {
switch (state()) {
case SUBMITTED:
case PREPARE:
return state(RUNNING);
case PREPARE:
return state(RUNNING, data().builder()
.tries(data().tries() + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@honnix do you know why we allow this transition from PREPARE directly to RUNNING? Is it in case we lose the events in between?

fyi, we added this change to keep track of the number of tries here because we found that this would be a path in the state machine that doesn't increment tries at any other transition. Not sure it's the right thing to do though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what the reason was. Missing tries increase on that transition branch was introduced by #33.

I doubt bypassing submitting and submitted states would actually work when executing a workflow instance because that would mean missing many things. During replay, this tries is not important I think.

My suggestion is we can remove support of this transition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW there are a few things in this file marked as backward compatibility, e.g. https://github.com/spotify/styx/pull/826/files#diff-05de7b680cea03b57e3a1df7bbbc1258R179 . I think we can also kill those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the input. I think we'll do that as part of a following PR just to keep this PR limited in scope.

.build());

default:
throw illegalTransition("started");
Expand Down
Loading