From 0569e418f9f74d86a5f3e9a6af9244bfca87f371 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Tue, 27 Jan 2026 11:29:29 +0100 Subject: [PATCH] feat: permit async startup --- .../java/org/eclipse/dataplane/Dataplane.java | 99 ++++++++++--------- .../port/store/InMemoryDataFlowStore.java | 25 ++++- .../org/eclipse/dataplane/ControlPlane.java | 8 ++ .../dataplane/scenario/ConsumerPullTest.java | 60 +++++++++-- 4 files changed, 136 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 020db3d..32f5953 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -47,11 +47,13 @@ import java.util.Set; import java.util.UUID; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static java.util.Collections.emptyMap; public class Dataplane { - private final DataFlowStore store = new InMemoryDataFlowStore(); + private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private final DataFlowStore store = new InMemoryDataFlowStore(objectMapper); private String id; private String name; private String description; @@ -67,7 +69,6 @@ public class Dataplane { private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented")); private final HttpClient httpClient = HttpClient.newHttpClient(); - private final ObjectMapper objectMapper = new ObjectMapper(); public static Builder newInstance() { return new Builder(); @@ -184,22 +185,31 @@ public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { return store.findById(dataFlowId) .compose(onPrepare::action) .compose(dataFlow -> { + dataFlow.transitionToPrepared(); var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); - return notifyControlPlane("prepared", dataFlow, message) - .compose(response -> { - var successful = response.statusCode() >= 200 && response.statusCode() < 300; - if (successful) { - dataFlow.transitionToPrepared(); - return save(dataFlow); - } - - return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response)); - }); + return notifyControlPlane("prepared", dataFlow, message); }); } + /** + * Notify the control plane that the data flow has been started. + * + * @param dataFlowId the data flow id. + */ + public Result notifyStarted(String dataFlowId, OnStart onStart) { + return store.findById(dataFlowId) + .compose(onStart::action) + .compose(dataFlow -> { + dataFlow.transitionToStarted(); + + var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + + return notifyControlPlane("started", dataFlow, message); + + }); + } /** * Notify the control plane that the data flow has been completed. @@ -208,36 +218,26 @@ public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { */ public Result notifyCompleted(String dataFlowId) { return store.findById(dataFlowId) - .compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined - .compose(response -> { - var successful = response.statusCode() >= 200 && response.statusCode() < 300; - if (successful) { - dataFlow.transitionToCompleted(); - return save(dataFlow); - } - - return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response)); - })); + .compose(dataFlow -> { + dataFlow.transitionToCompleted(); + + return notifyControlPlane("completed", dataFlow, emptyMap()); // TODO DataFlowCompletedMessage not defined + }); } /** * Notify the control plane that the data flow failed for some reason * * @param dataFlowId id of the data flow - * @param throwable the error + * @param throwable the error */ public Result notifyErrored(String dataFlowId, Throwable throwable) { return store.findById(dataFlowId) - .compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined - .compose(response -> { - var successful = response.statusCode() >= 200 && response.statusCode() < 300; - if (successful) { - dataFlow.transitionToTerminated(throwable.getMessage()); - return save(dataFlow); - } - - return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response)); - })); + .compose(dataFlow -> { + dataFlow.transitionToTerminated(throwable.getMessage()); + + return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined + }); } public Result started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) { @@ -256,7 +256,7 @@ public Result started(String flowId, DataFlowStartedNotificationMessage st /** * Received notification that the flow has been completed * - * @param flowId id of the data flow + * @param flowId id of the data flow * @return result indicating whether data flow was completed successfully */ public Result completed(String flowId) { @@ -288,17 +288,26 @@ public Result registerOn(String controlPlaneEndpoint) { }); } - private Result> notifyControlPlane(String action, DataFlow dataFlow, Object message) { - return toJson(message).map(body -> { - var endpoint = dataFlow.callbackEndpointFor(action); - var request = HttpRequest.newBuilder() - .uri(URI.create(endpoint)) - .header("content-type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(body)) - .build(); - - return httpClient.send(request, HttpResponse.BodyHandlers.discarding()); - }); + private Result notifyControlPlane(String action, DataFlow dataFlow, Object message) { + return toJson(message) + .map(body -> { + var endpoint = dataFlow.callbackEndpointFor(action); + var request = HttpRequest.newBuilder() + .uri(URI.create(endpoint)) + .header("content-type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build(); + + return httpClient.send(request, HttpResponse.BodyHandlers.discarding()); + }) + .compose(response -> { + var successful = response.statusCode() >= 200 && response.statusCode() < 300; + if (successful) { + return save(dataFlow); + } + + return Result.failure(new DataFlowNotifyControlPlaneFailed(action, response)); + }); } private Result toJson(Object message) { diff --git a/src/main/java/org/eclipse/dataplane/port/store/InMemoryDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/InMemoryDataFlowStore.java index 42f507f..314cd4f 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/InMemoryDataFlowStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/InMemoryDataFlowStore.java @@ -14,6 +14,8 @@ package org.eclipse.dataplane.port.store; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.port.exception.DataFlowNotFoundException; @@ -23,12 +25,21 @@ public class InMemoryDataFlowStore implements DataFlowStore { - private final Map store = new HashMap<>(); + private final Map store = new HashMap<>(); + private final ObjectMapper objectMapper; + + public InMemoryDataFlowStore(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } @Override public Result save(DataFlow dataFlow) { - store.put(dataFlow.getId(), dataFlow); - return Result.success(); + try { + store.put(dataFlow.getId(), objectMapper.writeValueAsString(dataFlow)); + return Result.success(); + } catch (JsonProcessingException e) { + return Result.failure(e); + } } @Override @@ -37,6 +48,12 @@ public Result findById(String flowId) { if (dataFlow == null) { return Result.failure(new DataFlowNotFoundException("DataFlow %s not found".formatted(flowId))); } - return Result.success(dataFlow); + + try { + var deserialized = objectMapper.readValue(dataFlow, DataFlow.class); + return Result.success(deserialized); + } catch (JsonProcessingException e) { + return Result.failure(e); + } } } diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index e112f12..0e9bd72 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import static jakarta.ws.rs.core.MediaType.WILDCARD; +import static org.assertj.core.api.Assertions.assertThat; /** * This simulates control plane for both consumer and provider. @@ -102,7 +103,14 @@ public ControlPlaneController(DataplaneClient counterPart) { @Path("/{transferId}/dataflow/prepared") @Consumes(WILDCARD) public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + assertThat(message.state()).isEqualTo("PREPARED"); + } + @POST + @Path("/{transferId}/dataflow/started") + @Consumes(WILDCARD) + public void started(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + assertThat(message.state()).isEqualTo("STARTED"); } @POST diff --git a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index ca0f68f..49db959 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -24,6 +24,8 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; +import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,6 +43,7 @@ import static org.awaitility.Awaitility.await; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED; +import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTING; class ConsumerPullTest { @@ -69,18 +72,14 @@ void shouldPullDataFromProvider() { var transferType = "FileSystem-PULL"; var processId = UUID.randomUUID().toString(); var consumerProcessId = "consumer_" + processId; - var prepareMessage = new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", - "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(), - transferType, emptyList(), emptyMap()); + var prepareMessage = createPrepareMessage(consumerProcessId, transferType); var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; - var startMessage = new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId", - "theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(), - transferType, null, emptyList(), emptyMap()); + var startMessage = createStartMessage(providerProcessId, transferType); var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -92,6 +91,39 @@ void shouldPullDataFromProvider() { }); } + @Test + void shouldPermitAsyncStartup() { + var transferType = "FileSystemAsync-PULL"; + var processId = UUID.randomUUID().toString(); + var consumerProcessId = "consumer_" + processId; + var prepareMessage = createPrepareMessage(consumerProcessId, transferType); + controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + + var providerProcessId = "provider_" + processId; + var startMessage = createStartMessage(providerProcessId, transferType); + var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + assertThat(startResponse.state()).isEqualTo(STARTING.name()); + assertThat(startResponse.dataAddress()).isNull(); + + providerDataPlane.completeStartup(providerProcessId); + + assertThat(controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state()) + .isEqualTo(STARTED.name()); + } + + private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String transferType) { + return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId", + "theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(), + transferType, null, emptyList(), emptyMap()); + } + + private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) { + return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", + "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(), + transferType, emptyList(), emptyMap()); + } + + private class ConsumerDataPlane { private final Path storage; @@ -144,7 +176,22 @@ public Object controller() { return sdk.controller(); } + public void completeStartup(String dataFlowId) { + sdk.getById(dataFlowId) + .compose(dataFlow -> sdk.notifyStarted(dataFlowId, this::prepareSourceDataAddress)) + .orElseThrow(f -> new RuntimeException(f.getCause())); + } + private Result onStart(DataFlow dataFlow) { + if (dataFlow.getTransferType().equals("FileSystemAsync-PULL")) { + dataFlow.transitionToStarting(); + return Result.success(dataFlow); + } + + return prepareSourceDataAddress(dataFlow); + } + + private Result prepareSourceDataAddress(DataFlow dataFlow) { try { var destinationDirectory = Files.createTempDirectory(dataFlow.getId()); for (var i = 0; i < filesToBeCreated; i++) { @@ -154,7 +201,6 @@ private Result onStart(DataFlow dataFlow) { var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList()); dataFlow.setDataAddress(dataAddress); - return Result.success(dataFlow); } catch (IOException e) { return Result.failure(e);