diff --git a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs index 0dc6a40..c60a7b1 100644 --- a/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs +++ b/DataPlane.Sdk.Api.Test/DataPlaneSignalingApiControllerTest.cs @@ -143,7 +143,7 @@ public async Task Prepare_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -174,7 +174,7 @@ public async Task Prepare_WhenReturnsSync_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -205,7 +205,7 @@ public async Task Prepare_WhenReturnsAsync_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -238,7 +238,7 @@ public async Task Prepare_WhenSdkReturnsInvalidState_Expect400() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -263,7 +263,7 @@ public async Task Prepare_WhenDataflowExists_Expect409() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -359,11 +359,7 @@ public async Task Start_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -396,11 +392,7 @@ public async Task Start_WhenAlreadyExists_ExpectConflict() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -432,11 +424,7 @@ public async Task Start_SdkHandlerWrongState_ExpectBadRequest() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type") - { - Properties = { ["test-key"] = "test-value" } - }, - DestinationDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } }, @@ -470,8 +458,7 @@ public async Task Start_WhenSdkReturnsStarted_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type"), - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -501,8 +488,7 @@ public async Task Start_WhenSdkReturnsStarting_Success() DatasetId = "test-asset", ParticipantId = TestUser, AgreementId = "test-agreement", - SourceDataAddress = new DataAddress("test-type"), - DestinationDataAddress = new DataAddress("test-type"), + DataAddress = new DataAddress("test-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -523,24 +509,25 @@ public async Task Start_WhenSdkReturnsStarting_Success() #endregion - #region StartById + #region Notify-Started [Fact] public async Task StartById_Success() { Sdk.OnStart = null; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.OK); var body = await response.Content.ReadFromJsonAsync(); body.ShouldNotBeNull(); @@ -553,14 +540,14 @@ public async Task StartById_WhenNotFound_ExpectError() { Sdk.OnStart = null; - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/not-exist/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/not-exist/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.NotFound); } @@ -573,14 +560,14 @@ public async Task StartById_InvalidState_ExpectConflict() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.Conflict); } @@ -593,17 +580,18 @@ public async Task StartById_WhenSdkReturnsStarting_Success() return StatusResult.Success(df); }; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.Accepted); response.Headers.Location.ShouldNotBeNull() .ToString().ShouldEndWith($"/api/v1/{TestUser}/dataflows/{flow.Id}"); @@ -622,17 +610,18 @@ public async Task StartById_WhenSdkReturnsStarted_Success() return StatusResult.Success(df); }; var flow = CreateDataFlow(); + flow.IsConsumer = true; DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); response.StatusCode.ShouldBe(HttpStatusCode.OK); var body = await response.Content.ReadFromJsonAsync(); body.ShouldNotBeNull(); @@ -652,14 +641,79 @@ public async Task StartById_SdkHandlerWrongState_ExpectBadRequest() DataFlowContext.DataFlows.Add(flow); await DataFlowContext.SaveChangesAsync(); - var startMsg = new DataFlowStartByIdMessage + var startMsg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["test-key"] = "test-value" } } }; - var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/start", startMsg); + var response = await HttpClient.PostAsJsonAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/started", startMsg); + response.StatusCode.ShouldBe(HttpStatusCode.Conflict); + } + + #endregion + + #region Complete + + [Fact] + public async Task Complete_Success() + { + Sdk.OnComplete = null; + var flow = CreateDataFlow(); + flow.State = DataFlowState.Started; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + + [Fact] + public async Task Complete_WhenNotFound_ExpectNotFound() + { + Sdk.OnComplete = null; + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/not-exist/completed", null); + response.StatusCode.ShouldBe(HttpStatusCode.NotFound); + } + + [Fact] + public async Task Complete_WithBody_ExpectBadRequest() + { + Sdk.OnComplete = null; + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/some-flow/completed", new StringContent("{\"foo\": \"bar\"}")); + response.StatusCode.ShouldBe(HttpStatusCode.BadRequest); + } + + [Fact] + public async Task Complete_WhenSdkReportsError_ExpectBadRequest() + { + Sdk.OnComplete = df => StatusResult.Failed(new StatusFailure + { + Message = "test error", + Reason = FailureReason.ServiceUnavailable + }); + + var flow = CreateDataFlow(); + flow.State = DataFlowState.Started; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); + response.StatusCode.ShouldBe(HttpStatusCode.ServiceUnavailable); + } + + [Fact] + public async Task Complete_WhenFlowInWrongState_ExpectBadRequest() + { + var flow = CreateDataFlow(); + flow.State = DataFlowState.Terminated; + DataFlowContext.DataFlows.Add(flow); + await DataFlowContext.SaveChangesAsync(); + + var response = await HttpClient.PostAsync($"/api/v1/{TestUser}/dataflows/{flow.Id}/completed", null); response.StatusCode.ShouldBe(HttpStatusCode.Conflict); } diff --git a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs index 56675c8..1f92d4c 100644 --- a/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs +++ b/DataPlane.Sdk.Api/Controllers/DataPlaneSignalingApiController.cs @@ -82,8 +82,9 @@ public async Task Start([FromRoute] string participantContextId, } [Authorize] - [HttpPost("{dataFlowId}/start")] - public async Task StartById([FromRoute] string participantContextId, [FromRoute] string dataFlowId, DataFlowStartByIdMessage startMessage) + [HttpPost("{dataFlowId}/started")] + public async Task StartById([FromRoute] string participantContextId, [FromRoute] string dataFlowId, + DataFlowStartedNotificationMessage startMessage) { if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) { @@ -144,6 +145,24 @@ public async Task Terminate([FromRoute] string participantContext return statusResult.IsSucceeded ? Ok() : StatusCode((int)statusResult.Failure!.Reason, statusResult); } + [Authorize] + [HttpPost("{dataFlowId}/completed")] + public async Task Complete([FromRoute] string participantContextId, [FromRoute] string dataFlowId) + { + if (!(await authorizationService.AuthorizeAsync(User, new ResourceTuple(participantContextId, dataFlowId), "DataFlowAccess")).Succeeded) + { + return Forbid(); + } + + if (Request.ContentLength > 0) + { + return BadRequest("Request body is not allowed for this endpoint"); + } + + var result = await signalingService.CompleteAsync(dataFlowId); + return result.IsSucceeded ? Ok() : StatusCode((int)result.Failure!.Reason, result); + } + [Authorize] [HttpGet("{dataFlowId}/status")] public async Task GetStatus([FromRoute] string dataFlowId, [FromRoute] string participantContextId) diff --git a/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs b/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs index 1bf821f..a869f20 100644 --- a/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs +++ b/DataPlane.Sdk.Core.Test/Data/DataFlowContextTest.cs @@ -22,7 +22,7 @@ public async Task SaveAsync_ShouldAddNewDataFlow() entry.Entity.Id.ShouldBe(dataFlow.Id); entry.Entity.ShouldBeEquivalentTo(dataFlow); - entry.Entity.Destination.Properties["test-key"].ShouldBeEquivalentTo("test-value"); + entry.Entity.Destination?.Properties["test-key"].ShouldBeEquivalentTo("test-value"); } [Fact] diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs index 70fea73..d67e846 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowPrepareMessageSerializationTest.cs @@ -21,7 +21,7 @@ public void Serialize_WithAllProperties_Success() DataspaceContext = "dataspace-context", AgreementId = "agreement-def", CallbackAddress = new Uri("https://callback.example.com"), - DestinationDataAddress = new DataAddress("AzureBlob") + DataAddress = new DataAddress("AzureBlob") { Properties = { ["container"] = "dest-container", ["account"] = "myaccount" } }, @@ -43,7 +43,7 @@ public void Serialize_WithAllProperties_Success() json.ShouldContain("\"participantID\":\"participant-abc\""); json.ShouldContain("\"counterPartyID\":\"counterparty-xyz\""); json.ShouldContain("\"agreementID\":\"agreement-def\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"transferType\""); } @@ -61,7 +61,7 @@ public void Deserialize_WithAllProperties_Success() "dataspaceContext": "dataspace-context", "agreementID": "agreement-def", "callbackAddress": "https://callback.example.com", - "destinationDataAddress": { + "dataAddress": { "@type": "AzureBlob", "properties": { "container": "dest-container", @@ -89,8 +89,8 @@ public void Deserialize_WithAllProperties_Success() message.AgreementId.ShouldBe("agreement-def"); message.CallbackAddress.ShouldNotBeNull(); message.CallbackAddress.ToString().ShouldBe("https://callback.example.com/"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("AzureBlob"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("AzureBlob"); message.TransferType.ShouldNotBeNull(); message.TransferType.DestinationType.ShouldBe("AzureBlob"); message.TransferType.FlowType.ShouldBe(FlowType.Push); @@ -106,7 +106,7 @@ public void Deserialize_WithMinimalProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "destinationDataAddress": { + "dataAddress": { "type": "HttpData" }, "transferType": { @@ -125,7 +125,7 @@ public void Deserialize_WithMinimalProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.DestinationDataAddress.ShouldNotBeNull(); + message.DataAddress.ShouldNotBeNull(); message.TransferType.ShouldNotBeNull(); message.TransferType.FlowType.ShouldBe(FlowType.Pull); } @@ -141,7 +141,7 @@ public void SerializeDeserialize_RoundTrip_Success() DatasetId = "dataset-roundtrip", ParticipantId = "participant-roundtrip", AgreementId = "agreement-roundtrip", - DestinationDataAddress = new DataAddress("Database") + DataAddress = new DataAddress("Database") { Properties = { ["connectionString"] = "Server=localhost", ["table"] = "dest_table" } }, @@ -163,7 +163,7 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.DestinationDataAddress.Type.ShouldBe(original.DestinationDataAddress.Type); + deserialized.DataAddress?.Type.ShouldBe(original.DataAddress.Type); deserialized.TransferType.DestinationType.ShouldBe(original.TransferType.DestinationType); deserialized.TransferType.FlowType.ShouldBe(original.TransferType.FlowType); } diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs index 2a6faa7..09b57d9 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartByIdMessageTest.cs @@ -6,7 +6,7 @@ namespace DataPlane.Sdk.Core.Test.Domain.Messages; /// -/// Tests for JSON serialization and deserialization of DataFlowStartByIdMessage +/// Tests for JSON serialization and deserialization of DataFlowStartedNotificationMessage /// public class DataFlowStartByIdMessageTest { @@ -14,9 +14,9 @@ public class DataFlowStartByIdMessageTest public void SerDes_WithSourceDataAddress_Success() { // Arrange - var message = new DataFlowStartByIdMessage + var message = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("S3") + DataAddress = new DataAddress("S3") { Properties = { ["bucketName"] = "source-bucket", ["region"] = "us-east-1" } } @@ -27,20 +27,20 @@ public void SerDes_WithSourceDataAddress_Success() // Assert json.ShouldNotBeNullOrWhiteSpace(); - json.ShouldContain("\"sourceDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"@type\":\"S3\""); json.ShouldContain("\"bucketName\":\"source-bucket\""); json.ShouldContain("\"region\":\"us-east-1\""); // Act - var deserialized = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); + var deserialized = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); // Assert deserialized.ShouldNotBeNull(); - deserialized.SourceDataAddress.ShouldNotBeNull(); - deserialized.SourceDataAddress.Type.ShouldBe("S3"); - deserialized.SourceDataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); - deserialized.SourceDataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); + deserialized.DataAddress.ShouldNotBeNull(); + deserialized.DataAddress.Type.ShouldBe("S3"); + deserialized.DataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); + deserialized.DataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); deserialized.ShouldBeEquivalentTo(message); } @@ -51,24 +51,24 @@ public void SerDes_WithEmptySourceDataAddressProperties_Success() // Arrange var json = """ { - "sourceDataAddress": { + "dataAddress": { "@type": "HttpData" } } """; // Act - var message = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); + var message = JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions); // Assert message.ShouldNotBeNull(); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("HttpData"); - message.SourceDataAddress.Properties.ShouldBeEmpty(); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("HttpData"); + message.DataAddress.Properties.ShouldBeEmpty(); } [Fact] - public void SerializeDeserialize_NoSourceDataAddress_Failure() + public void SerializeDeserialize_NoDataAddress_Success() { // Arrange var json = """ @@ -77,6 +77,6 @@ public void SerializeDeserialize_NoSourceDataAddress_Failure() """; // Act - Should.Throw(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); + Should.NotThrow(() => JsonSerializer.Deserialize(json, TestJsonDeserializerConfig.DefaultOptions)); } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs index 4e08181..dc950af 100644 --- a/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs +++ b/DataPlane.Sdk.Core.Test/Domain/Messages/DataFlowStartMessageSerializationTest.cs @@ -21,11 +21,7 @@ public void Serialize_WithAllProperties_Success() DatasetId = "dataset-789", ParticipantId = "participant-abc", AgreementId = "agreement-def", - SourceDataAddress = new DataAddress("S3") - { - Properties = { ["bucketName"] = "source-bucket", ["region"] = "us-east-1" } - }, - DestinationDataAddress = new DataAddress("AzureBlob") + DataAddress = new DataAddress("AzureBlob") { Properties = { ["container"] = "dest-container", ["account"] = "myaccount" } }, @@ -46,8 +42,7 @@ public void Serialize_WithAllProperties_Success() json.ShouldContain("\"datasetId\":\"dataset-789\""); json.ShouldContain("\"participantId\":\"participant-abc\""); json.ShouldContain("\"agreementId\":\"agreement-def\""); - json.ShouldContain("\"sourceDataAddress\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); } [Fact] @@ -61,14 +56,7 @@ public void Deserialize_WithAllProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "sourceDataAddress": { - "@type": "S3", - "properties": { - "bucketName": "source-bucket", - "region": "us-east-1" - } - }, - "destinationDataAddress": { + "dataAddress": { "@type": "AzureBlob", "properties": { "container": "dest-container", @@ -92,14 +80,10 @@ public void Deserialize_WithAllProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("S3"); - message.SourceDataAddress.Properties["bucketName"].ShouldBeEquivalentTo("source-bucket"); - message.SourceDataAddress.Properties["region"].ShouldBeEquivalentTo("us-east-1"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("AzureBlob"); - message.DestinationDataAddress.Properties["container"].ShouldBeEquivalentTo("dest-container"); - message.DestinationDataAddress.Properties["account"].ShouldBeEquivalentTo("myaccount"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("AzureBlob"); + message.DataAddress.Properties["container"].ShouldBeEquivalentTo("dest-container"); + message.DataAddress.Properties["account"].ShouldBeEquivalentTo("myaccount"); message.TransferType.FlowType.ShouldBe(FlowType.Push); } @@ -113,10 +97,7 @@ public void Deserialize_WithMinimalProperties_Success() "datasetID": "dataset-789", "participantID": "participant-abc", "agreementID": "agreement-def", - "sourceDataAddress": { - "@type": "HttpData" - }, - "destinationDataAddress": { + "dataAddress": { "@type": "HttpData" }, "transferType": { @@ -135,10 +116,8 @@ public void Deserialize_WithMinimalProperties_Success() message.DatasetId.ShouldBe("dataset-789"); message.ParticipantId.ShouldBe("participant-abc"); message.AgreementId.ShouldBe("agreement-def"); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Type.ShouldBe("HttpData"); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Type.ShouldBe("HttpData"); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Type.ShouldBe("HttpData"); message.TransferType.FlowType.ShouldBe(FlowType.Pull); } @@ -153,11 +132,7 @@ public void SerializeDeserialize_RoundTrip_Success() DatasetId = "dataset-roundtrip", ParticipantId = "participant-roundtrip", AgreementId = "agreement-roundtrip", - SourceDataAddress = new DataAddress("FileSystem") - { - Properties = { ["path"] = "/data/source", ["format"] = "csv" } - }, - DestinationDataAddress = new DataAddress("Database") + DataAddress = new DataAddress("Database") { Properties = { ["connectionString"] = "Server=localhost", ["table"] = "dest_table" } }, @@ -179,12 +154,9 @@ public void SerializeDeserialize_RoundTrip_Success() deserialized.DatasetId.ShouldBe(original.DatasetId); deserialized.ParticipantId.ShouldBe(original.ParticipantId); deserialized.AgreementId.ShouldBe(original.AgreementId); - deserialized.SourceDataAddress?.Type.ShouldBe(original.SourceDataAddress.Type); - deserialized.SourceDataAddress?.Properties["path"].ShouldBeEquivalentTo("/data/source"); - deserialized.SourceDataAddress?.Properties["format"].ShouldBeEquivalentTo("csv"); - deserialized.DestinationDataAddress.Type.ShouldBe(original.DestinationDataAddress.Type); - deserialized.DestinationDataAddress.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); - deserialized.DestinationDataAddress.Properties["table"].ShouldBeEquivalentTo("dest_table"); + deserialized.DataAddress?.Type.ShouldBe(original.DataAddress.Type); + deserialized.DataAddress?.Properties["connectionString"].ShouldBeEquivalentTo("Server=localhost"); + deserialized.DataAddress?.Properties["table"].ShouldBeEquivalentTo("dest_table"); deserialized.TransferType.ShouldBeEquivalentTo(original.TransferType); } @@ -198,11 +170,7 @@ public void Deserialize_WithEmptyDataAddressProperties_Success() "datasetID": "dataset-123", "participantID": "participant-123", "agreementID": "agreement-123", - "sourceDataAddress": { - "type": "Custom", - "properties": {} - }, - "destinationDataAddress": { + "dataAddress": { "type": "Custom", "properties": {} }, @@ -218,10 +186,8 @@ public void Deserialize_WithEmptyDataAddressProperties_Success() // Assert message.ShouldNotBeNull(); - message.SourceDataAddress.ShouldNotBeNull(); - message.SourceDataAddress.Properties.ShouldBeEmpty(); - message.DestinationDataAddress.ShouldNotBeNull(); - message.DestinationDataAddress.Properties.ShouldBeEmpty(); + message.DataAddress.ShouldNotBeNull(); + message.DataAddress.Properties.ShouldBeEmpty(); } [Fact] @@ -234,8 +200,7 @@ public void Serialize_WithCamelCasePropertyNames_Success() DatasetId = "dataset-123", ParticipantId = "participant-123", AgreementId = "agreement-123", - SourceDataAddress = new DataAddress("TestType"), - DestinationDataAddress = new DataAddress("TestType"), + DataAddress = new DataAddress("TestType"), TransferType = new TransferType { DestinationType = "test-type", @@ -251,8 +216,7 @@ public void Serialize_WithCamelCasePropertyNames_Success() json.ShouldContain("\"datasetId\""); json.ShouldContain("\"participantId\""); json.ShouldContain("\"agreementId\""); - json.ShouldContain("\"sourceDataAddress\""); - json.ShouldContain("\"destinationDataAddress\""); + json.ShouldContain("\"dataAddress\""); json.ShouldContain("\"transferType\""); } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs similarity index 86% rename from DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs rename to DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs index ede234f..6048886 100644 --- a/DataPlane.Sdk.Core.Test/DataPlaneSignalingServiceTest.cs +++ b/DataPlane.Sdk.Core.Test/Infrastructure/DataPlaneSignalingServiceTest.cs @@ -12,7 +12,7 @@ [assembly: CollectionBehavior(MaxParallelThreads = 1)] -namespace DataPlane.Sdk.Core.Test; +namespace DataPlane.Sdk.Core.Test.Infrastructure; public abstract class DataPlaneSignalingServiceTest : IDisposable { @@ -92,11 +92,6 @@ public async Task StartAsync_WhenDataFlowExists_ShouldReturnConflict() result.Failure.ShouldNotBeNull(); result.Failure.Reason.ShouldBe(Conflict); - // result.Content.ShouldSatisfyAllConditions(() => result.Content!.Source.ShouldNotBeNull()); - // result.Content.ShouldSatisfyAllConditions(() => result.Content!.Destination.ShouldNotBeNull()); - - // _dataFlowContext.ChangeTracker.HasChanges().ShouldBeFalse(); - // _dataFlowContext.DataFlows.ShouldContain(x => x.Id == message.ProcessId && x.State == Started); } [Fact] @@ -149,12 +144,13 @@ public async Task StartAsync_WhenDataFlowIsLeased_ShouldReturnFailure() public async Task StartByIdAsync_WhenExists_ShouldReturnSuccess() { var flow = CreateDataFlow(Guid.NewGuid().ToString(), Uninitialized); + flow.IsConsumer = true; await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -167,9 +163,9 @@ public async Task StartByIdAsync_WhenExists_ShouldReturnSuccess() [Fact] public async Task StartByIdAsync_WhenNotExists_ShouldReturnFailure() { - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -181,6 +177,28 @@ public async Task StartByIdAsync_WhenNotExists_ShouldReturnFailure() result.Failure.Reason.ShouldBe(NotFound); } + [Fact] + public async Task StartByIdAsync_WhenNotConsumer_ShouldReturnFailure() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Uninitialized); + flow.IsConsumer = false; + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var msg = new DataFlowStartedNotificationMessage + { + DataAddress = new DataAddress("test-type") + { + Properties = { ["key1"] = "value1" } + } + }; + + var result = await _service.StartByIdAsync(flow.Id, msg); + result.IsFailed.ShouldBeTrue(); + result.Failure.ShouldNotBeNull(); + result.Failure.Reason.ShouldBe(Conflict); + } + [Fact] public async Task StartByIdAsync_WhenWrongState_ShouldReturnFailure() { @@ -188,9 +206,9 @@ public async Task StartByIdAsync_WhenWrongState_ShouldReturnFailure() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -209,9 +227,9 @@ public async Task StartByIdAsync_WhenAlreadyStarted_ShouldReturnSuccess() await _dataFlowContext.AddAsync(flow); await _dataFlowContext.SaveChangesAsync(); - var msg = new DataFlowStartByIdMessage + var msg = new DataFlowStartedNotificationMessage { - SourceDataAddress = new DataAddress("test-type") + DataAddress = new DataAddress("test-type") { Properties = { ["key1"] = "value1" } } @@ -522,6 +540,57 @@ public async Task SuspendAsync_ShouldReturnSuccess_WhenAlreadySuspended() result.IsSucceeded.ShouldBeTrue(); eventMock.Verify(ev => ev.Invoke(dataFlow), Times.Never); } + + [Fact] + public async Task CompleteAsync_WhenFound_ShouldReturnSuccess() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeTrue(); + } + + [Fact] + public async Task CompleteAsync_WhenWrongState_ShouldReturnConflict() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + flow.State = Terminated; + + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(Conflict); + } + + [Fact] + public async Task CompleteAsync_WhenSdkReportsError_ShouldReturnError() + { + var flow = CreateDataFlow(Guid.NewGuid().ToString(), Started); + await _dataFlowContext.AddAsync(flow); + await _dataFlowContext.SaveChangesAsync(); + + _sdk.OnComplete = _ => StatusResult.Failed(new StatusFailure { Message = "test error", Reason = InternalError }); + + var res = await _service.CompleteAsync(flow.Id); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(InternalError); + res.Failure.Message.ShouldBe("test error"); + } + + [Fact] + public async Task CompleteAsync_WhenNotFound_ShouldReturnNotFound() + { + var res = await _service.CompleteAsync("not-exist"); + res.IsSucceeded.ShouldBeFalse(); + res.Failure.ShouldNotBeNull(); + res.Failure.Reason.ShouldBe(NotFound); + } } [CollectionDefinition("SignalingService")] //parallelize tests in this collection diff --git a/DataPlane.Sdk.Core.Test/TestMethods.cs b/DataPlane.Sdk.Core.Test/TestMethods.cs index b59a7b8..0bb23fb 100644 --- a/DataPlane.Sdk.Core.Test/TestMethods.cs +++ b/DataPlane.Sdk.Core.Test/TestMethods.cs @@ -32,8 +32,7 @@ public static DataFlowStartMessage CreateStartMessage() return new DataFlowStartMessage { ProcessId = "test-process-id", - SourceDataAddress = new DataAddress("test-source-type"), - DestinationDataAddress = new DataAddress("test-destination-type"), + DataAddress = new DataAddress("test-destination-type"), TransferType = new TransferType { DestinationType = "test-type", @@ -50,7 +49,7 @@ public static DataFlowPrepareMessage CreatePrepareMessage() return new DataFlowPrepareMessage { ProcessId = "test-process-id", - DestinationDataAddress = new DataAddress("test-destination-type"), + DataAddress = new DataAddress("test-destination-type"), TransferType = new TransferType { DestinationType = "test-type", diff --git a/DataPlane.Sdk.Core/DataPlaneSdk.cs b/DataPlane.Sdk.Core/DataPlaneSdk.cs index b01699c..9d15f30 100644 --- a/DataPlane.Sdk.Core/DataPlaneSdk.cs +++ b/DataPlane.Sdk.Core/DataPlaneSdk.cs @@ -8,6 +8,7 @@ namespace DataPlane.Sdk.Core; public class DataPlaneSdk { + public Func? OnComplete; public Func>? OnPrepare; public Func>? OnStart; public Func? OnSuspend; @@ -50,6 +51,11 @@ internal StatusResult InvokeOnPrepare(DataFlow flow) return StatusResult.Success(flow); } + internal StatusResult InvokeOnComplete(DataFlow flow) + { + return OnComplete != null ? OnComplete(flow) : StatusResult.Success(); + } + public class SdkBuilder { private readonly DataPlaneSdk _dataPlaneSdk = new() @@ -86,6 +92,12 @@ public SdkBuilder OnSuspend(Func processor) return this; } + public SdkBuilder OnComplete(Func processor) + { + _dataPlaneSdk.OnComplete = processor; + return this; + } + public SdkBuilder RuntimeId(string runtimeId) { _dataPlaneSdk.RuntimeId = runtimeId; diff --git a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs index a844371..51f1105 100644 --- a/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Domain/Interfaces/IDataPlaneSignalingService.cs @@ -17,7 +17,7 @@ public interface IDataPlaneSignalingService /// /// /// The start message - Task> StartByIdAsync(string id, DataFlowStartByIdMessage message); + Task> StartByIdAsync(string id, DataFlowStartedNotificationMessage message); /// /// Suspends (pauses) a data flow by its ID. @@ -46,4 +46,14 @@ public interface IDataPlaneSignalingService /// preparation to happen asynchronously. If the state is PREPARED, then the caller can proceed normally. /// Task> PrepareAsync(DataFlowPrepareMessage prepareMessage); + + /// + /// Marks a data flow as completed. + /// + /// The ID of the data flow to complete + /// + /// A status result indicating success or failure. Failure may include specific details + /// such as wrong state, not found, or other error conditions. + /// + Task CompleteAsync(string dataFlowId); } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs index 543c978..5f76753 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowBaseMessage.cs @@ -32,6 +32,6 @@ public abstract class DataFlowBaseMessage : JsonLdDto [JsonPropertyName("transferType")] public required TransferType TransferType { get; init; } - [JsonPropertyName("destinationDataAddress")] - public required DataAddress DestinationDataAddress { get; init; } + [JsonPropertyName("dataAddress")] + public DataAddress? DataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs index 10f6699..04ea02d 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartMessage.cs @@ -1,6 +1,3 @@ -using System.Text.Json.Serialization; -using DataPlane.Sdk.Core.Domain.Model; - namespace DataPlane.Sdk.Core.Domain.Messages; /// @@ -10,6 +7,4 @@ namespace DataPlane.Sdk.Core.Domain.Messages; /// public class DataFlowStartMessage : DataFlowBaseMessage { - [JsonPropertyName("sourceDataAddress")] - public DataAddress? SourceDataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs similarity index 71% rename from DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs rename to DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs index 9dc91a8..4770239 100644 --- a/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartByIdMessage.cs +++ b/DataPlane.Sdk.Core/Domain/Messages/DataFlowStartedNotificationMessage.cs @@ -7,8 +7,8 @@ namespace DataPlane.Sdk.Core.Domain.Messages; /// Represents a data flow start message from the Dataplane Signaling API protocol. It is used to initiate a data /// transfer between a consumer and the provider. This message is sent by the control plane to the data plane. /// -public class DataFlowStartByIdMessage : JsonLdDto +public class DataFlowStartedNotificationMessage : JsonLdDto { - [JsonPropertyName("sourceDataAddress")] - public required DataAddress SourceDataAddress { get; init; } + [JsonPropertyName("dataAddress")] + public DataAddress? DataAddress { get; init; } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs index bc7e8f5..571c235 100644 --- a/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs +++ b/DataPlane.Sdk.Core/Domain/Model/DataFlow.cs @@ -3,7 +3,7 @@ namespace DataPlane.Sdk.Core.Domain.Model; public class DataFlow(string id) : StatefulEntity(id) { public DataAddress? Source { get; set; } - public required DataAddress Destination { get; set; } + public required DataAddress? Destination { get; set; } public Uri? CallbackAddress { get; init; } public IDictionary Properties { get; init; } = new Dictionary(); @@ -13,7 +13,7 @@ public class DataFlow(string id) : StatefulEntity(id) public bool IsProvisionRequested { get; init; } public bool IsDeprovisionComplete { get; init; } public bool IsDeprovisionRequested { get; init; } - public bool IsConsumer { get; init; } + public bool IsConsumer { get; set; } public required string ParticipantId { get; init; } public required string AssetId { get; init; } public required string AgreementId { get; init; } @@ -46,4 +46,9 @@ public void Starting() { Transition(DataFlowState.Starting); } + + public void Complete() + { + Transition(DataFlowState.Completed); + } } \ No newline at end of file diff --git a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs index 5c19bab..ce7317b 100644 --- a/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs +++ b/DataPlane.Sdk.Core/Infrastructure/DataPlaneSignalingService.cs @@ -31,7 +31,7 @@ public async Task> StartAsync(DataFlowStartMessage messag } - public async Task> StartByIdAsync(string id, DataFlowStartByIdMessage message) + public async Task> StartByIdAsync(string id, DataFlowStartedNotificationMessage message) { var existing = await dataFlowContext.FindByIdAsync(id); @@ -45,6 +45,11 @@ public async Task> StartByIdAsync(string id, DataFlowStar return StatusResult.Success(existing); } + if (!existing.IsConsumer) + { + return StatusResult.Conflict("This request is only valid for DataFlows on the consumer side."); + } + // check the correct state of the existing DF if (existing.State is DataFlowState.Starting or DataFlowState.Prepared or DataFlowState.Uninitialized) { @@ -144,6 +149,35 @@ public async Task> PrepareAsync(DataFlowPrepareMessage pr return StatusResult.Success(updatedFlow); } + public async Task CompleteAsync(string dataFlowId) + { + var existingFlowResult = await dataFlowContext.FindByIdAsync(dataFlowId); + if (existingFlowResult == null) + { + return StatusResult.NotFound(); + } + + if (existingFlowResult.State == DataFlowState.Completed) // de-duplication check + { + return StatusResult.Success(); + } + + if (existingFlowResult.State is DataFlowState.Started) + { + var res = sdk.InvokeOnComplete(existingFlowResult); + if (res.IsFailed) + { + return res; + } + + existingFlowResult.Complete(); + await dataFlowContext.UpsertAsync(existingFlowResult, true); + return StatusResult.Success(); + } + + return StatusResult.Conflict("DataFlow is not in started state, cannot complete."); + } + private async Task> StartExistingFlow(DataFlow existingFlow, Func> sdkHandler) { // invoke SDK handler @@ -171,7 +205,7 @@ private DataFlow CreateDataFlow(DataFlowPrepareMessage message) { return new DataFlow(message.ProcessId) { - Destination = message.DestinationDataAddress, + Destination = message.DataAddress, TransferType = message.TransferType, RuntimeId = _runtimeId, ParticipantId = message.ParticipantId, @@ -186,8 +220,9 @@ private DataFlow CreateDataFlow(DataFlowStartMessage message) { return new DataFlow(message.ProcessId) { - Source = message.SourceDataAddress, - Destination = message.DestinationDataAddress, + Source = new DataAddress( + "TODO: CHANGE"), //todo: this is incorrect: the source address must be resolved externally from the asset-to-source mapping + Destination = message.DataAddress, TransferType = message.TransferType, RuntimeId = _runtimeId, ParticipantId = message.ParticipantId,