From 3c2eeaed7dd57d1b8e157dc7545430e430667f05 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Mon, 26 Jan 2026 15:55:14 +0100 Subject: [PATCH] feat: permit async preparation --- .../java/org/eclipse/dataplane/Dataplane.java | 100 +++++++++++------- .../dataplane/domain/dataflow/DataFlow.java | 4 + ... => DataFlowNotifyControlPlaneFailed.java} | 10 +- .../DataFlowNotifyErroredFailed.java | 30 ------ .../org/eclipse/dataplane/ControlPlane.java | 8 ++ .../org/eclipse/dataplane/DataplaneTest.java | 4 +- .../dataplane/scenario/ProviderPushTest.java | 50 +++++++-- 7 files changed, 125 insertions(+), 81 deletions(-) rename src/main/java/org/eclipse/dataplane/port/exception/{DataFlowNotifyCompletedFailed.java => DataFlowNotifyControlPlaneFailed.java} (71%) delete mode 100644 src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyErroredFailed.java diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index cf470c1..020db3d 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -34,8 +34,7 @@ import org.eclipse.dataplane.logic.OnSuspend; import org.eclipse.dataplane.logic.OnTerminate; import org.eclipse.dataplane.port.DataPlaneSignalingApiController; -import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed; -import org.eclipse.dataplane.port.exception.DataFlowNotifyErroredFailed; +import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.eclipse.dataplane.port.exception.DataplaneNotRegistered; import org.eclipse.dataplane.port.store.DataFlowStore; import org.eclipse.dataplane.port.store.InMemoryDataFlowStore; @@ -48,6 +47,8 @@ import java.util.Set; import java.util.UUID; +import static java.util.Collections.emptyMap; + public class Dataplane { private final DataFlowStore store = new InMemoryDataFlowStore(); @@ -175,33 +176,50 @@ public Result terminate(String dataFlowId, DataFlowTerminateMessage messag } /** - * Notify the control plane that the data flow has been completed. + * Notify the control plane that the data flow has been prepared. * - * @param dataFlowId id of the data flow + * @param dataFlowId the data flow id. */ - public Result notifyCompleted(String dataFlowId) { + public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { return store.findById(dataFlowId) + .compose(onPrepare::action) .compose(dataFlow -> { - var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/completed"; + var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); - var request = HttpRequest.newBuilder() - .uri(URI.create(endpoint)) - .header("content-type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowCompletedMessage not defined - .build(); + return notifyControlPlane("prepared", dataFlow, message) + .compose(response -> { + var successful = response.statusCode() >= 200 && response.statusCode() < 300; + if (successful) { + dataFlow.transitionToPrepared(); + return save(dataFlow); + } - var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding()); + return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response)); + }); - var successful = response.statusCode() >= 200 && response.statusCode() < 300; - if (successful) { - dataFlow.transitionToCompleted(); - return save(dataFlow); - } - - return Result.failure(new DataFlowNotifyCompletedFailed(response)); }); } + + /** + * Notify the control plane that the data flow has been completed. + * + * @param dataFlowId id of the data flow + */ + public Result notifyCompleted(String dataFlowId) { + return store.findById(dataFlowId) + .compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined + .compose(response -> { + var successful = response.statusCode() >= 200 && response.statusCode() < 300; + if (successful) { + dataFlow.transitionToCompleted(); + return save(dataFlow); + } + + return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response)); + })); + } + /** * Notify the control plane that the data flow failed for some reason * @@ -210,25 +228,16 @@ public Result notifyCompleted(String dataFlowId) { */ public Result notifyErrored(String dataFlowId, Throwable throwable) { return store.findById(dataFlowId) - .compose(dataFlow -> { - var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/errored"; - - var request = HttpRequest.newBuilder() - .uri(URI.create(endpoint)) - .header("content-type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowErroredMessage not defined - .build(); - - var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding()); - - var successful = response.statusCode() >= 200 && response.statusCode() < 300; - if (successful) { - dataFlow.transitionToTerminated(throwable.getMessage()); - return save(dataFlow); - } - - return Result.failure(new DataFlowNotifyErroredFailed(response)); - }); + .compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined + .compose(response -> { + var successful = response.statusCode() >= 200 && response.statusCode() < 300; + if (successful) { + dataFlow.transitionToTerminated(throwable.getMessage()); + return save(dataFlow); + } + + return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response)); + })); } public Result started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) { @@ -279,7 +288,20 @@ public Result registerOn(String controlPlaneEndpoint) { }); } - private Result toJson(DataPlaneRegistrationMessage message) { + private Result> notifyControlPlane(String action, DataFlow dataFlow, Object message) { + return toJson(message).map(body -> { + var endpoint = dataFlow.callbackEndpointFor(action); + var request = HttpRequest.newBuilder() + .uri(URI.create(endpoint)) + .header("content-type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build(); + + return httpClient.send(request, HttpResponse.BodyHandlers.discarding()); + }); + } + + private Result toJson(Object message) { try { return Result.success(objectMapper.writeValueAsString(message)); } catch (JsonProcessingException e) { diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index d51ea0c..9641c1a 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -153,6 +153,10 @@ public void setDataAddress(DataAddress dataAddress) { this.dataAddress = dataAddress; } + public String callbackEndpointFor(String action) { + return getCallbackAddress() + "/transfers/" + getId() + "/dataflow/" + action; + } + public static class Builder { private final DataFlow dataFlow = new DataFlow(); diff --git a/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyCompletedFailed.java b/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java similarity index 71% rename from src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyCompletedFailed.java rename to src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java index 5f9e43d..0a05fa4 100644 --- a/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyCompletedFailed.java +++ b/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java @@ -16,15 +16,21 @@ import java.net.http.HttpResponse; -public class DataFlowNotifyCompletedFailed extends Exception { +public class DataFlowNotifyControlPlaneFailed extends Exception { + private final String action; private final HttpResponse response; - public DataFlowNotifyCompletedFailed(HttpResponse response) { + public DataFlowNotifyControlPlaneFailed(String action, HttpResponse response) { super("control-plane responded with %s".formatted(response.statusCode())); + this.action = action; this.response = response; } public HttpResponse getResponse() { return response; } + + public String getAction() { + return action; + } } diff --git a/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyErroredFailed.java b/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyErroredFailed.java deleted file mode 100644 index 5d243f8..0000000 --- a/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyErroredFailed.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2025 Think-it GmbH - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Think-it GmbH - initial API and implementation - * - */ - -package org.eclipse.dataplane.port.exception; - -import java.net.http.HttpResponse; - -public class DataFlowNotifyErroredFailed extends Exception { - private final HttpResponse response; - - public DataFlowNotifyErroredFailed(HttpResponse response) { - super("control-plane responded with %s".formatted(response.statusCode())); - this.response = response; - } - - public HttpResponse getResponse() { - return response; - } -} diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 4b43683..e112f12 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -20,6 +20,7 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; @@ -97,6 +98,13 @@ public ControlPlaneController(DataplaneClient counterPart) { this.counterPart = counterPart; } + @POST + @Path("/{transferId}/dataflow/prepared") + @Consumes(WILDCARD) + public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + + } + @POST @Path("/{transferId}/dataflow/completed") @Consumes(WILDCARD) diff --git a/src/test/java/org/eclipse/dataplane/DataplaneTest.java b/src/test/java/org/eclipse/dataplane/DataplaneTest.java index 09faa33..ae4e90a 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneTest.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneTest.java @@ -18,7 +18,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.port.exception.DataFlowNotFoundException; -import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed; +import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.eclipse.dataplane.port.exception.DataplaneNotRegistered; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -92,7 +92,7 @@ void shouldReturnFailedFuture_whenControlPlaneRespondWithError() { var result = dataplane.notifyCompleted("dataFlowId"); assertThat(result.failed()).isTrue(); - assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyCompletedFailed.class); + assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyControlPlaneFailed.class); assertThat(dataplane.status("dataFlowId").getContent().state()).isNotEqualTo(COMPLETED.name()); } diff --git a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java index bc01d8c..c4a0f58 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java @@ -43,6 +43,7 @@ import static org.awaitility.Awaitility.await; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; +import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARING; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED; @@ -72,7 +73,7 @@ void shouldPushDataToEndpointPreparedByConsumer() { var transferType = "FileSystem-PUSH"; var processId = UUID.randomUUID().toString(); var consumerProcessId = "consumer_" + processId; - var prepareMessage = createPrepareMessage(consumerProcessId, transferType); + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); @@ -103,7 +104,7 @@ void shouldSendError_whenFlowFails() { var transferType = "FileSystem-PUSH"; var processId = UUID.randomUUID().toString(); var consumerProcessId = "consumer_" + processId; - var prepareMessage = createPrepareMessage(consumerProcessId, transferType); + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); var invalidDataAddress = new DataAddress("FileSystem", "", emptyList()); @@ -121,15 +122,32 @@ void shouldSendError_whenFlowFails() { }); } + @Test + void shouldPermitAsyncPreparation() { + var transferType = "FileSystemAsync-PUSH"; + var processId = UUID.randomUUID().toString(); + var consumerProcessId = "consumer_" + processId; + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); + + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + assertThat(prepareResponse.state()).isEqualTo(PREPARING.name()); + assertThat(prepareResponse.dataAddress()).isNull(); + + consumerDataPlane.completePreparation(consumerProcessId); + + assertThat(controlPlane.consumerStatus(consumerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state()) + .isEqualTo(PREPARED.name()); + } + private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String callbackAddress, String transferType, DataAddress destinationDataAddress) { return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId", "theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", callbackAddress, transferType, destinationDataAddress, emptyList(), emptyMap()); } - private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) { + private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String callbackAddress, String transferType) { return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", - "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", "theCallbackAddress", + "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress, transferType, emptyList(), emptyMap()); } @@ -178,13 +196,24 @@ private static class ConsumerDataPlane { .onTerminate(Result::success) .build(); + public void completePreparation(String dataFlowId) { + sdk.getById(dataFlowId) + .compose(dataFlow -> sdk.notifyPrepared(dataFlowId, this::prepareDestinationDataAddress)) + .orElseThrow(f -> new RuntimeException(f.getCause())); + } + private Result onPrepare(DataFlow dataFlow) { - try { - var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content"); - var dataAddress = new DataAddress("FileSystem", "file", destinationFile.toString(), emptyList()); + if (dataFlow.getTransferType().equals("FileSystemAsync-PUSH")) { + dataFlow.transitionToPreparing(); + return Result.success(dataFlow); + } - dataFlow.setDataAddress(dataAddress); + return prepareDestinationDataAddress(dataFlow); + } + private @NonNull Result prepareDestinationDataAddress(DataFlow dataFlow) { + try { + dataFlow.setDataAddress(destinationDataAddress(dataFlow)); return Result.success(dataFlow); } catch (IOException e) { return Result.failure(e); @@ -205,6 +234,11 @@ private Result onCompleted(DataFlow dataFlow) { } } + private @NonNull DataAddress destinationDataAddress(DataFlow dataFlow) throws IOException { + var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content"); + return new DataAddress("file", destinationFile.toString(), emptyList()); + } + public Object controller() { return sdk.controller(); }