Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 61 additions & 39 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import org.eclipse.dataplane.logic.OnSuspend;
import org.eclipse.dataplane.logic.OnTerminate;
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed;
import org.eclipse.dataplane.port.exception.DataFlowNotifyErroredFailed;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
import org.eclipse.dataplane.port.store.DataFlowStore;
import org.eclipse.dataplane.port.store.InMemoryDataFlowStore;
Expand All @@ -48,6 +47,8 @@
import java.util.Set;
import java.util.UUID;

import static java.util.Collections.emptyMap;

public class Dataplane {

private final DataFlowStore store = new InMemoryDataFlowStore();
Expand Down Expand Up @@ -175,33 +176,50 @@ public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage messag
}

/**
* Notify the control plane that the data flow has been completed.
* Notify the control plane that the data flow has been prepared.
*
* @param dataFlowId id of the data flow
* @param dataFlowId the data flow id.
*/
public Result<Void> notifyCompleted(String dataFlowId) {
public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
return store.findById(dataFlowId)
.compose(onPrepare::action)
.compose(dataFlow -> {
var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/completed";
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);

var request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowCompletedMessage not defined
.build();
return notifyControlPlane("prepared", dataFlow, message)
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToPrepared();
return save(dataFlow);
}

var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());
return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response));
});

var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToCompleted();
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyCompletedFailed(response));
});
}


/**
* Notify the control plane that the data flow has been completed.
*
* @param dataFlowId id of the data flow
*/
public Result<Void> notifyCompleted(String dataFlowId) {
return store.findById(dataFlowId)
.compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToCompleted();
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response));
}));
}

/**
* Notify the control plane that the data flow failed for some reason
*
Expand All @@ -210,25 +228,16 @@ public Result<Void> notifyCompleted(String dataFlowId) {
*/
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
return store.findById(dataFlowId)
.compose(dataFlow -> {
var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/errored";

var request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowErroredMessage not defined
.build();

var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());

var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToTerminated(throwable.getMessage());
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyErroredFailed(response));
});
.compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToTerminated(throwable.getMessage());
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response));
}));
}

public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
Expand Down Expand Up @@ -279,7 +288,20 @@ public Result<Void> registerOn(String controlPlaneEndpoint) {
});
}

private Result<String> toJson(DataPlaneRegistrationMessage message) {
private Result<HttpResponse<Void>> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
return toJson(message).map(body -> {
var endpoint = dataFlow.callbackEndpointFor(action);
var request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
});
}

private Result<String> toJson(Object message) {
try {
return Result.success(objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public void setDataAddress(DataAddress dataAddress) {
this.dataAddress = dataAddress;
}

public String callbackEndpointFor(String action) {
return getCallbackAddress() + "/transfers/" + getId() + "/dataflow/" + action;
}

public static class Builder {
private final DataFlow dataFlow = new DataFlow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

import java.net.http.HttpResponse;

public class DataFlowNotifyCompletedFailed extends Exception {
public class DataFlowNotifyControlPlaneFailed extends Exception {
private final String action;
private final HttpResponse<Void> response;

public DataFlowNotifyCompletedFailed(HttpResponse<Void> response) {
public DataFlowNotifyControlPlaneFailed(String action, HttpResponse<Void> response) {
super("control-plane responded with %s".formatted(response.statusCode()));
this.action = action;
this.response = response;
}

public HttpResponse<Void> getResponse() {
return response;
}

public String getAction() {
return action;
}
}

This file was deleted.

8 changes: 8 additions & 0 deletions src/test/java/org/eclipse/dataplane/ControlPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
Expand Down Expand Up @@ -97,6 +98,13 @@ public ControlPlaneController(DataplaneClient counterPart) {
this.counterPart = counterPart;
}

@POST
@Path("/{transferId}/dataflow/prepared")
@Consumes(WILDCARD)
public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {

}

@POST
@Path("/{transferId}/dataflow/completed")
@Consumes(WILDCARD)
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/eclipse/dataplane/DataplaneTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;
import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -92,7 +92,7 @@ void shouldReturnFailedFuture_whenControlPlaneRespondWithError() {
var result = dataplane.notifyCompleted("dataFlowId");

assertThat(result.failed()).isTrue();
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyCompletedFailed.class);
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyControlPlaneFailed.class);
assertThat(dataplane.status("dataFlowId").getContent().state()).isNotEqualTo(COMPLETED.name());
}

Expand Down
50 changes: 42 additions & 8 deletions src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARING;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED;

Expand Down Expand Up @@ -72,7 +73,7 @@ void shouldPushDataToEndpointPreparedByConsumer() {
var transferType = "FileSystem-PUSH";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);

var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
Expand Down Expand Up @@ -103,7 +104,7 @@ void shouldSendError_whenFlowFails() {
var transferType = "FileSystem-PUSH";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);

controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
var invalidDataAddress = new DataAddress("FileSystem", "", emptyList());
Expand All @@ -121,15 +122,32 @@ void shouldSendError_whenFlowFails() {
});
}

@Test
void shouldPermitAsyncPreparation() {
var transferType = "FileSystemAsync-PUSH";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);

var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
assertThat(prepareResponse.state()).isEqualTo(PREPARING.name());
assertThat(prepareResponse.dataAddress()).isNull();

consumerDataPlane.completePreparation(consumerProcessId);

assertThat(controlPlane.consumerStatus(consumerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state())
.isEqualTo(PREPARED.name());
}

private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String callbackAddress, String transferType, DataAddress destinationDataAddress) {
return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", callbackAddress,
transferType, destinationDataAddress, emptyList(), emptyMap());
}

private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) {
private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String callbackAddress, String transferType) {
return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", "theCallbackAddress",
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress,
transferType, emptyList(), emptyMap());
}

Expand Down Expand Up @@ -178,13 +196,24 @@ private static class ConsumerDataPlane {
.onTerminate(Result::success)
.build();

public void completePreparation(String dataFlowId) {
sdk.getById(dataFlowId)
.compose(dataFlow -> sdk.notifyPrepared(dataFlowId, this::prepareDestinationDataAddress))
.orElseThrow(f -> new RuntimeException(f.getCause()));
}

private Result<DataFlow> onPrepare(DataFlow dataFlow) {
try {
var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content");
var dataAddress = new DataAddress("FileSystem", "file", destinationFile.toString(), emptyList());
if (dataFlow.getTransferType().equals("FileSystemAsync-PUSH")) {
dataFlow.transitionToPreparing();
return Result.success(dataFlow);
}

dataFlow.setDataAddress(dataAddress);
return prepareDestinationDataAddress(dataFlow);
}

private @NonNull Result<DataFlow> prepareDestinationDataAddress(DataFlow dataFlow) {
try {
dataFlow.setDataAddress(destinationDataAddress(dataFlow));
return Result.success(dataFlow);
} catch (IOException e) {
return Result.failure(e);
Expand All @@ -205,6 +234,11 @@ private Result<DataFlow> onCompleted(DataFlow dataFlow) {
}
}

private @NonNull DataAddress destinationDataAddress(DataFlow dataFlow) throws IOException {
var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content");
return new DataAddress("file", destinationFile.toString(), emptyList());
}

public Object controller() {
return sdk.controller();
}
Expand Down