Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 54 additions & 45 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import java.util.Set;
import java.util.UUID;

import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static java.util.Collections.emptyMap;

public class Dataplane {

private final DataFlowStore store = new InMemoryDataFlowStore();
private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
private final DataFlowStore store = new InMemoryDataFlowStore(objectMapper);
private String id;
private String name;
private String description;
Expand All @@ -67,7 +69,6 @@ public class Dataplane {
private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented"));

private final HttpClient httpClient = HttpClient.newHttpClient();
private final ObjectMapper objectMapper = new ObjectMapper();

public static Builder newInstance() {
return new Builder();
Expand Down Expand Up @@ -184,22 +185,31 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
return store.findById(dataFlowId)
.compose(onPrepare::action)
.compose(dataFlow -> {
dataFlow.transitionToPrepared();
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);

return notifyControlPlane("prepared", dataFlow, message)
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToPrepared();
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response));
});
return notifyControlPlane("prepared", dataFlow, message);

});
}

/**
* Notify the control plane that the data flow has been started.
*
* @param dataFlowId the data flow id.
*/
public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
return store.findById(dataFlowId)
.compose(onStart::action)
.compose(dataFlow -> {
dataFlow.transitionToStarted();

var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);

return notifyControlPlane("started", dataFlow, message);

});
}

/**
* Notify the control plane that the data flow has been completed.
Expand All @@ -208,36 +218,26 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
*/
public Result<Void> notifyCompleted(String dataFlowId) {
return store.findById(dataFlowId)
.compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToCompleted();
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response));
}));
.compose(dataFlow -> {
dataFlow.transitionToCompleted();

return notifyControlPlane("completed", dataFlow, emptyMap()); // TODO DataFlowCompletedMessage not defined
});
}

/**
* Notify the control plane that the data flow failed for some reason
*
* @param dataFlowId id of the data flow
* @param throwable the error
* @param throwable the error
*/
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
return store.findById(dataFlowId)
.compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
dataFlow.transitionToTerminated(throwable.getMessage());
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response));
}));
.compose(dataFlow -> {
dataFlow.transitionToTerminated(throwable.getMessage());

return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
});
}

public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
Expand All @@ -256,7 +256,7 @@ public Result<Void> started(String flowId, DataFlowStartedNotificationMessage st
/**
* Received notification that the flow has been completed
*
* @param flowId id of the data flow
* @param flowId id of the data flow
* @return result indicating whether data flow was completed successfully
*/
public Result<Void> completed(String flowId) {
Expand Down Expand Up @@ -288,17 +288,26 @@ public Result<Void> registerOn(String controlPlaneEndpoint) {
});
}

private Result<HttpResponse<Void>> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
return toJson(message).map(body -> {
var endpoint = dataFlow.callbackEndpointFor(action);
var request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
});
private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
return toJson(message)
.map(body -> {
var endpoint = dataFlow.callbackEndpointFor(action);
var request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
})
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
if (successful) {
return save(dataFlow);
}

return Result.failure(new DataFlowNotifyControlPlaneFailed(action, response));
});
}

private Result<String> toJson(Object message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.dataplane.port.store;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;
Expand All @@ -23,12 +25,21 @@

public class InMemoryDataFlowStore implements DataFlowStore {

private final Map<String, DataFlow> store = new HashMap<>();
private final Map<String, String> store = new HashMap<>();
private final ObjectMapper objectMapper;

public InMemoryDataFlowStore(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public Result<Void> save(DataFlow dataFlow) {
store.put(dataFlow.getId(), dataFlow);
return Result.success();
try {
store.put(dataFlow.getId(), objectMapper.writeValueAsString(dataFlow));
return Result.success();
} catch (JsonProcessingException e) {
return Result.failure(e);
}
}

@Override
Expand All @@ -37,6 +48,12 @@ public Result<DataFlow> findById(String flowId) {
if (dataFlow == null) {
return Result.failure(new DataFlowNotFoundException("DataFlow %s not found".formatted(flowId)));
}
return Result.success(dataFlow);

try {
var deserialized = objectMapper.readValue(dataFlow, DataFlow.class);
return Result.success(deserialized);
} catch (JsonProcessingException e) {
return Result.failure(e);
}
}
}
8 changes: 8 additions & 0 deletions src/test/java/org/eclipse/dataplane/ControlPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Executors;

import static jakarta.ws.rs.core.MediaType.WILDCARD;
import static org.assertj.core.api.Assertions.assertThat;

/**
* This simulates control plane for both consumer and provider.
Expand Down Expand Up @@ -102,7 +103,14 @@ public ControlPlaneController(DataplaneClient counterPart) {
@Path("/{transferId}/dataflow/prepared")
@Consumes(WILDCARD)
public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
assertThat(message.state()).isEqualTo("PREPARED");
}

@POST
@Path("/{transferId}/dataflow/started")
@Consumes(WILDCARD)
public void started(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
assertThat(message.state()).isEqualTo("STARTED");
}

@POST
Expand Down
60 changes: 53 additions & 7 deletions src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.jspecify.annotations.NonNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -41,6 +43,7 @@
import static org.awaitility.Awaitility.await;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTING;

class ConsumerPullTest {

Expand Down Expand Up @@ -69,18 +72,14 @@ void shouldPullDataFromProvider() {
var transferType = "FileSystem-PULL";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(),
transferType, emptyList(), emptyMap());
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);

var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
assertThat(prepareResponse.dataAddress()).isNull();

var providerProcessId = "provider_" + processId;
var startMessage = new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(),
transferType, null, emptyList(), emptyMap());
var startMessage = createStartMessage(providerProcessId, transferType);
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
assertThat(startResponse.state()).isEqualTo(STARTED.name());
assertThat(startResponse.dataAddress()).isNotNull();
Expand All @@ -92,6 +91,39 @@ void shouldPullDataFromProvider() {
});
}

@Test
void shouldPermitAsyncStartup() {
var transferType = "FileSystemAsync-PULL";
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);

var providerProcessId = "provider_" + processId;
var startMessage = createStartMessage(providerProcessId, transferType);
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
assertThat(startResponse.state()).isEqualTo(STARTING.name());
assertThat(startResponse.dataAddress()).isNull();

providerDataPlane.completeStartup(providerProcessId);

assertThat(controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state())
.isEqualTo(STARTED.name());
}

private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String transferType) {
return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(),
transferType, null, emptyList(), emptyMap());
}

private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) {
return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(),
transferType, emptyList(), emptyMap());
}


private class ConsumerDataPlane {

private final Path storage;
Expand Down Expand Up @@ -144,7 +176,22 @@ public Object controller() {
return sdk.controller();
}

public void completeStartup(String dataFlowId) {
sdk.getById(dataFlowId)
.compose(dataFlow -> sdk.notifyStarted(dataFlowId, this::prepareSourceDataAddress))
.orElseThrow(f -> new RuntimeException(f.getCause()));
}

private Result<DataFlow> onStart(DataFlow dataFlow) {
if (dataFlow.getTransferType().equals("FileSystemAsync-PULL")) {
dataFlow.transitionToStarting();
return Result.success(dataFlow);
}

return prepareSourceDataAddress(dataFlow);
}

private Result<DataFlow> prepareSourceDataAddress(DataFlow dataFlow) {
try {
var destinationDirectory = Files.createTempDirectory(dataFlow.getId());
for (var i = 0; i < filesToBeCreated; i++) {
Expand All @@ -154,7 +201,6 @@ private Result<DataFlow> onStart(DataFlow dataFlow) {

var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList());
dataFlow.setDataAddress(dataAddress);

return Result.success(dataFlow);
} catch (IOException e) {
return Result.failure(e);
Expand Down