Skip to content
This repository was archived by the owner on Jul 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import com.spotify.apollo.Environment;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void testEventsRoundtrip() throws Exception {
sinceVersion(Api.Version.V3);

storage.writeEvent(SequenceEvent.create(Event.triggerExecution(WFI_1, TRIGGER, TriggerParameters.zero()), 0L, 0L));
storage.writeEvent(SequenceEvent.create(Event.created(WFI_1, "exec0", "img0"), 1L, 1L));
storage.writeEvent(SequenceEvent.create(Event.dequeue(WFI_1, ImmutableSet.of()), 1L, 1L));
storage.writeEvent(SequenceEvent.create(Event.started(WFI_1), 2L, 2L));

Response<ByteString> response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.spotify.styx.model.SequenceEvent.create;
import static com.spotify.styx.serialization.Json.deserialize;
import static com.spotify.styx.serialization.Json.serialize;
import static com.spotify.styx.testdata.TestData.EXECUTION_DESCRIPTION;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -49,6 +50,7 @@

import com.google.api.client.googleapis.auth.oauth2.GoogleIdToken;
import com.google.cloud.datastore.Datastore;
import com.google.common.collect.ImmutableSet;
import com.spotify.apollo.Environment;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
Expand Down Expand Up @@ -342,8 +344,10 @@ public void shouldReturnWorkflowInstancesData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances")));
Expand All @@ -359,7 +363,7 @@ public void shouldReturnWorkflowInstancesData() throws Exception {
assertJson(response, "[0].triggers.[0].complete", is(false));
assertJson(response, "[0].triggers.[0].executions", hasSize(1));
assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
Expand All @@ -383,8 +387,10 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances?start=2016-08-10")));
Expand All @@ -400,7 +406,7 @@ public void shouldReturnWorkflowRangeOfInstancesData() throws Exception {
assertJson(response, "[0].triggers.[0].complete", is(false));
assertJson(response, "[0].triggers.[0].executions", hasSize(1));
assertJson(response, "[0].triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "[0].triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "[0].triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
Expand All @@ -412,8 +418,10 @@ public void shouldReturnWorkflowInstanceData() throws Exception {

WorkflowInstance wfi = WorkflowInstance.create(WORKFLOW.id(), "2016-08-10");
storage.writeEvent(create(Event.triggerExecution(wfi, NATURAL_TRIGGER, TRIGGER_PARAMETERS), 0L, ms("07:00:00")));
storage.writeEvent(create(Event.created(wfi, "exec", "img"), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.started(wfi), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.dequeue(wfi, ImmutableSet.of()), 1L, ms("07:00:01")));
storage.writeEvent(create(Event.submit(wfi, EXECUTION_DESCRIPTION, "exec"), 2L, ms("07:00:02")));
storage.writeEvent(create(Event.submitted(wfi, "exec", "test"), 3L, ms("07:00:03")));
storage.writeEvent(create(Event.started(wfi), 4L, ms("07:00:04")));

Response<ByteString> response =
awaitResponse(serviceHelper.request("GET", path("/foo/bar/instances/2016-08-10")));
Expand All @@ -429,14 +437,14 @@ public void shouldReturnWorkflowInstanceData() throws Exception {
assertJson(response, "triggers.[0].complete", is(false));
assertJson(response, "triggers.[0].executions", hasSize(1));
assertJson(response, "triggers.[0].executions.[0].execution_id", is("exec"));
assertJson(response, "triggers.[0].executions.[0].docker_image", is("img"));
assertJson(response, "triggers.[0].executions.[0].docker_image", is("busybox:1.1"));
assertJson(response, "triggers.[0].executions.[0].statuses", hasSize(2));
assertJson(response, "triggers.[0].executions.[0].statuses.[0].status", is("SUBMITTED"));
assertJson(response, "triggers.[0].executions.[0].statuses.[1].status", is("STARTED"));
assertJson(response, "triggers.[0].executions.[0].statuses.[0].timestamp",
is("2016-08-10T07:00:01Z"));
is("2016-08-10T07:00:03Z"));
assertJson(response, "triggers.[0].executions.[0].statuses.[1].timestamp",
is("2016-08-10T07:00:02Z"));
is("2016-08-10T07:00:04Z"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,4 @@ R submit(@Getter WorkflowInstance workflowInstance, ExecutionDescription executi
R timeout(@Getter WorkflowInstance workflowInstance);
R halt(@Getter WorkflowInstance workflowInstance);

// Note: Do not make changes to these deprecated event method signatures
@Deprecated
R timeTrigger(@Getter WorkflowInstance workflowInstance);
@Deprecated
R created(@Getter WorkflowInstance workflowInstance, String executionId, String dockerImage);
@Deprecated
R retry(@Getter WorkflowInstance workflowInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@

@JsonTypeInfo(use = Id.NAME, visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = PersistentEvent.class, name = "timeTrigger"),
@JsonSubTypes.Type(value = PersistentEvent.TriggerExecution.class, name = "triggerExecution"),
@JsonSubTypes.Type(value = PersistentEvent.Info.class, name = "info"),
@JsonSubTypes.Type(value = PersistentEvent.Created.class, name = "created"),
@JsonSubTypes.Type(value = PersistentEvent.Dequeue.class, name = "dequeue"),
@JsonSubTypes.Type(value = PersistentEvent.Started.class, name = "started"),
@JsonSubTypes.Type(value = PersistentEvent.Terminate.class, name = "terminate"),
Expand All @@ -71,10 +69,6 @@ public class PersistentEvent {

public static class SerializerVisitor implements EventVisitor<PersistentEvent> {

@Override
public PersistentEvent timeTrigger(WorkflowInstance workflowInstance) {
return new PersistentEvent("timeTrigger", workflowInstance.toKey());
}

@Override
public PersistentEvent triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
Expand All @@ -87,11 +81,6 @@ public PersistentEvent info(WorkflowInstance workflowInstance, Message message)
return new Info(workflowInstance.toKey(), message);
}

@Override
public PersistentEvent created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return new Created(workflowInstance.toKey(), executionId, Optional.of(dockerImage));
}

@Override
public PersistentEvent dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds) {
return new Dequeue(workflowInstance.toKey(), Optional.of(resourceIds));
Expand Down Expand Up @@ -133,11 +122,6 @@ public PersistentEvent retryAfter(WorkflowInstance workflowInstance, long delayM
return new RetryAfter(workflowInstance.toKey(), delayMillis);
}

@Override
public PersistentEvent retry(WorkflowInstance workflowInstance) {
return new PersistentEvent("retry", workflowInstance.toKey());
}

@Override
public PersistentEvent stop(WorkflowInstance workflowInstance) {
return new PersistentEvent("stop", workflowInstance.toKey());
Expand Down Expand Up @@ -169,12 +153,8 @@ public static PersistentEvent wrap(Event event) {
public Event toEvent() {
final WorkflowInstance workflowInstance = WorkflowInstance.parseKey(this.workflowInstance);
switch (type) {
case "timeTrigger":
return Event.timeTrigger(workflowInstance);
case "success":
return Event.success(workflowInstance);
case "retry":
return Event.retry(workflowInstance);
case "stop":
return Event.stop(workflowInstance);
case "timeout":
Expand Down Expand Up @@ -239,28 +219,6 @@ public Event toEvent() {
}
}


public static class Created extends PersistentEvent {

public final String executionId;
public final String dockerImage;

@JsonCreator
public Created(
@JsonProperty("workflow_instance") String workflowInstance,
@JsonProperty("execution_id") String executionId,
@JsonProperty("docker_image") Optional<String> dockerImage) {
super("created", workflowInstance);
this.executionId = executionId;
this.dockerImage = dockerImage.orElse("UNKNOWN");
}

@Override
public Event toEvent() {
return Event.created(WorkflowInstance.parseKey(workflowInstance), executionId, dockerImage);
}
}

public static class Submitted extends PersistentEvent {

public final String executionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ public final class TestData {
.commitSha(VALID_SHA)
.build();

public static final ExecutionDescription EXECUTION_DESCRIPTION2 =
ExecutionDescription.builder()
.dockerImage("busybox:1.2")
.dockerArgs("foo", "bar")
.secret(WorkflowConfiguration.Secret.create("secret", "/dev/null"))
.commitSha(VALID_SHA)
.build();

public static final Workflow WORKFLOW_WITH_RESOURCES = Workflow.create(WORKFLOW_ID.componentId(),
HOURLY_WORKFLOW_CONFIGURATION_WITH_RESOURCES);

Expand Down
31 changes: 0 additions & 31 deletions styx-common/src/main/java/com/spotify/styx/util/EventUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public static String info(Event event) {
private enum EventInfoVisitor implements EventVisitor<String> {
INSTANCE;

@Override
public String timeTrigger(WorkflowInstance workflowInstance) {
return "";
}

@Override
public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -72,11 +67,6 @@ public String info(WorkflowInstance workflowInstance, Message message) {
return message.line();
}

@Override
public String created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return String.format("Execution id: %s, Docker image: %s", executionId, dockerImage);
}

@Override
public String dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds) {
return "";
Expand Down Expand Up @@ -107,11 +97,6 @@ public String retryAfter(WorkflowInstance workflowInstance, long delayMillis) {
return String.format("Delay (seconds): %d", TimeUnit.MILLISECONDS.toSeconds(delayMillis));
}

@Override
public String retry(WorkflowInstance workflowInstance) {
return "";
}

@Override
public String stop(WorkflowInstance workflowInstance) {
return "";
Expand Down Expand Up @@ -145,11 +130,6 @@ public String submitted(WorkflowInstance workflowInstance, String executionId, S
private enum EventNameVisitor implements EventVisitor<String> {
INSTANCE;

@Override
public String timeTrigger(WorkflowInstance workflowInstance) {
return "timeTrigger";
}

@Override
public String triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -166,12 +146,6 @@ public String dequeue(WorkflowInstance workflowInstance, Set<String> resourceIds
return "dequeue";
}

@Override
public String created(WorkflowInstance workflowInstance, String executionId,
String dockerImage) {
return "created";
}

@Override
public String started(WorkflowInstance workflowInstance) {
return "started";
Expand All @@ -197,11 +171,6 @@ public String retryAfter(WorkflowInstance workflowInstance, long delayMillis) {
return "retryAfter";
}

@Override
public String retry(WorkflowInstance workflowInstance) {
return "retry";
}

@Override
public String stop(WorkflowInstance workflowInstance) {
return "stop";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class JsonTest {

private static final Event EVENT = Event.retry(TestData.WORKFLOW_INSTANCE);
private static final Event EVENT = Event.retryAfter(TestData.WORKFLOW_INSTANCE, 0);
private static final Trigger TRIGGER = Trigger.adhoc("foobar");

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,15 @@ public class PersistentEventTest {

@Test
public void testRoundtripAllEvents() throws Exception {
assertRoundtrip(Event.timeTrigger(INSTANCE1));
assertRoundtrip(Event.triggerExecution(INSTANCE1, UNKNOWN_TRIGGER, TRIGGER_PARAMETERS));
assertRoundtrip(Event.info(INSTANCE1, Message.info("InfoMessage")));
assertRoundtrip(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE));
assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of("some-resource")));
assertRoundtrip(Event.dequeue(INSTANCE1, ImmutableSet.of()));
assertRoundtrip(Event.started(INSTANCE1));
assertRoundtrip(Event.terminate(INSTANCE1, Optional.of(20)));
assertRoundtrip(Event.runError(INSTANCE1, "ErrorMessage"));
assertRoundtrip(Event.success(INSTANCE1));
assertRoundtrip(Event.retryAfter(INSTANCE1, 12345));
assertRoundtrip(Event.retry(INSTANCE1));
assertRoundtrip(Event.stop(INSTANCE1));
assertRoundtrip(Event.timeout(INSTANCE1));
assertRoundtrip(Event.halt(INSTANCE1));
Expand All @@ -87,14 +84,12 @@ public void testRoundtripAllEvents() throws Exception {

@Test
public void testDeserializeFromJson() throws Exception {
assertThat(deserializeEvent(json("timeTrigger")), is(Event.timeTrigger(INSTANCE1)));
assertThat(deserializeEvent(json("dequeue", "\"resource_ids\":[\"quux\"]")),
is(Event.dequeue(INSTANCE1, ImmutableSet.of("quux"))));
assertThat(deserializeEvent(json("dequeue")),
is(Event.dequeue(INSTANCE1, ImmutableSet.of())));
assertThat(deserializeEvent(json("started")), is(Event.started(INSTANCE1)));
assertThat(deserializeEvent(json("success")), is(Event.success(INSTANCE1)));
assertThat(deserializeEvent(json("retry")), is(Event.retry(INSTANCE1)));
assertThat(deserializeEvent(json("stop")), is(Event.stop(INSTANCE1)));
assertThat(deserializeEvent(json("timeout")), is(Event.timeout(INSTANCE1)));
assertThat(deserializeEvent(json("halt")), is(Event.halt(INSTANCE1)));
Expand Down Expand Up @@ -123,10 +118,6 @@ public void testDeserializeFromJson() throws Exception {
deserializeEvent(json("submitted", "\"execution_id\":\"" + POD_NAME + "\",\"runner_id\":\"" + RUNNER_ID
+ "\"")),
is(Event.submitted(INSTANCE1, POD_NAME, RUNNER_ID)));
assertThat(
deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\",\"docker_image\":\"" + DOCKER_IMAGE
+ "\"")),
is(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE)));
assertThat(
deserializeEvent(json("runError", "\"message\":\"ErrorMessage\"")),
is(Event.runError(INSTANCE1, "ErrorMessage")));
Expand Down Expand Up @@ -168,9 +159,6 @@ public void testDeserializeFromJsonWhenTransformationRequired() throws Exception
assertThat(
deserializeEvent(json("started", "\"pod_name\":\"" + POD_NAME + "\"")),
is(Event.started(INSTANCE1))); // for backwards compatibility
assertThat(
deserializeEvent(json("created", "\"execution_id\":\"" + POD_NAME + "\"")),
is(Event.created(INSTANCE1, POD_NAME, "UNKNOWN")));
assertThat(
deserializeEvent(json("triggerExecution")),
is(Event.triggerExecution(INSTANCE1, TRIGGER_UNKNOWN, TriggerParameters.zero())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,6 @@ private void closeWatch() {
// fixme: add a Cause enum to the runError() event instead of this string matching
private static class PullImageErrorMatcher implements EventVisitor<Boolean> {

@Override
public Boolean timeTrigger(WorkflowInstance workflowInstance) {
return false;
}

@Override
public Boolean triggerExecution(WorkflowInstance workflowInstance, Trigger trigger,
TriggerParameters parameters) {
Expand All @@ -809,11 +804,6 @@ public Boolean dequeue(WorkflowInstance workflowInstance, Set<String> resourceId
return false;
}

@Override
public Boolean created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return false;
}

@Override
public Boolean submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription,
String executionId) {
Expand Down Expand Up @@ -850,11 +840,6 @@ public Boolean retryAfter(WorkflowInstance workflowInstance, long delayMillis) {
return false;
}

@Override
public Boolean retry(WorkflowInstance workflowInstance) {
return false;
}

@Override
public Boolean stop(WorkflowInstance workflowInstance) {
return false;
Expand Down
Loading