Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions examples/streaming-pull-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ type ConsumerDataPlane struct {

func NewDataPlane(eventSubscriber *natsservices.EventSubscriber) (*ConsumerDataPlane, error) {
dataplane := &ConsumerDataPlane{eventSubscriber: eventSubscriber}
sdk, err := dsdk.NewDataPlaneSDKBuilder().
Store(memory.NewInMemoryStore()).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(dataplane.prepareProcessor).
OnStart(dataplane.startProcessor).
OnTerminate(dataplane.terminateProcessor).
OnSuspend(dataplane.noopHandler).
Build()

sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(dataplane.prepareProcessor),
dsdk.WithStartProcessor(dataplane.startProcessor),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,10 +113,6 @@ func (d *ConsumerDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataF
return nil
}

func (d *ConsumerDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error {
return nil
}

func parseToken(keyValue string, da *dsdk.DataAddress) (string, bool) {
rawProps, ok := da.Properties[dsdk.EndpointProperties].([]any)
if !ok {
Expand Down
17 changes: 8 additions & 9 deletions examples/streaming-pull-dataplane/provider/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ func NewDataPlane(authService *natsservices.AuthService,
publisherService *EventPublisherService) (*ProviderDataPlane, error) {
providerDataPlane := &ProviderDataPlane{authService: authService, connectionInvalidator: invalidator, publisherService: publisherService}

builder := dsdk.NewDataPlaneSDKBuilder()
store := memory.NewInMemoryStore()
sdk, err := builder.Store(store).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(providerDataPlane.prepareProcessor).
OnStart(providerDataPlane.startProcessor).
OnSuspend(providerDataPlane.suspendProcessor).
OnTerminate(providerDataPlane.terminateProcessor).
Build()
sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(providerDataPlane.prepareProcessor),
dsdk.WithStartProcessor(providerDataPlane.startProcessor),
dsdk.WithSuspendProcessor(providerDataPlane.suspendProcessor),
dsdk.WithTerminateProcessor(providerDataPlane.terminateProcessor),
)
if err != nil {
return nil, err
}
Expand Down
23 changes: 11 additions & 12 deletions examples/streaming-push-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ func NewDataPlane(authService *natsservices.AuthService,
natsUrl string,
eventSubscriber *natsservices.EventSubscriber) (*ConsumerDataPlane, error) {

providerDataPlane := &ConsumerDataPlane{
dataPlane := &ConsumerDataPlane{
authService: authService,
connectionInvalidator: invalidator,
natsUrl: natsUrl,
eventSubscriber: eventSubscriber}

builder := dsdk.NewDataPlaneSDKBuilder()
store := memory.NewInMemoryStore()
sdk, err := builder.Store(store).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(providerDataPlane.prepareProcessor).
OnStart(providerDataPlane.startProcessor).
OnSuspend(providerDataPlane.suspendProcessor).
OnTerminate(providerDataPlane.terminateProcessor).
Build()
sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(dataPlane.prepareProcessor),
dsdk.WithStartProcessor(dataPlane.startProcessor),
dsdk.WithSuspendProcessor(dataPlane.suspendProcessor),
dsdk.WithTerminateProcessor(dataPlane.terminateProcessor),
)
if err != nil {
return nil, err
}

providerDataPlane.api = dsdk.NewDataPlaneApi(sdk)
dataPlane.api = dsdk.NewDataPlaneApi(sdk)

return providerDataPlane, nil
return dataPlane, nil
}

func (d *ConsumerDataPlane) Init() {
Expand Down
21 changes: 9 additions & 12 deletions examples/streaming-push-dataplane/provider/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ type ProviderDataPlane struct {

func NewDataPlane(publisherService *EventPublisherService) (*ProviderDataPlane, error) {
dataplane := &ProviderDataPlane{publisherService: publisherService}
sdk, err := dsdk.NewDataPlaneSDKBuilder().
Store(memory.NewInMemoryStore()).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(dataplane.prepareProcessor).
OnStart(dataplane.startProcessor).
OnTerminate(dataplane.terminateProcessor).
OnSuspend(dataplane.noopHandler).
Build()
sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(dataplane.prepareProcessor),
dsdk.WithStartProcessor(dataplane.startProcessor),
dsdk.WithSuspendProcessor(dataplane.suspendProcessor),
dsdk.WithTerminateProcessor(dataplane.terminateProcessor),
)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,10 +113,6 @@ func (d *ProviderDataPlane) suspendProcessor(_ context.Context, flow *dsdk.DataF
return nil
}

func (d *ProviderDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error {
return nil
}

func parseToken(keyValue string, da *dsdk.DataAddress) (string, bool) {
rawProps, ok := da.Properties[dsdk.EndpointProperties].([]any)
if !ok {
Expand Down
19 changes: 7 additions & 12 deletions examples/sync-pull-dataplane/consumer/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ type ConsumerDataPlane struct {

func NewDataPlane() (*ConsumerDataPlane, error) {
dataplane := &ConsumerDataPlane{tokenStore: common.NewStore[tokenEntry]()}
sdk, err := dsdk.NewDataPlaneSDKBuilder().
Store(memory.NewInMemoryStore()).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(dataplane.prepareProcessor).
OnStart(dataplane.startProcessor).
OnTerminate(dataplane.noopHandler).
OnSuspend(dataplane.noopHandler).
Build()

sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(dataplane.prepareProcessor),
dsdk.WithStartProcessor(dataplane.startProcessor),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,10 +108,6 @@ func (d *ConsumerDataPlane) startProcessor(_ context.Context,
return &dsdk.DataFlowResponseMessage{State: dsdk.Started}, nil
}

func (d *ConsumerDataPlane) noopHandler(context.Context, *dsdk.DataFlow) error {
return nil
}

func (d *ConsumerDataPlane) getEndpointToken(w http.ResponseWriter, r *http.Request) {
// Check if it's a GET request
if r.Method != http.MethodGet {
Expand Down
17 changes: 8 additions & 9 deletions examples/sync-pull-dataplane/provider/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ func NewDataPlane() (*ProviderDataPlane, error) {
tokenStore: common.NewStore[tokenEntry](),
}

builder := dsdk.NewDataPlaneSDKBuilder()
store := memory.NewInMemoryStore()
sdk, err := builder.Store(store).
TransactionContext(memory.InMemoryTrxContext{}).
OnPrepare(providerDataPlane.prepareProcessor).
OnStart(providerDataPlane.startProcessor).
OnSuspend(providerDataPlane.suspendProcessor).
OnTerminate(providerDataPlane.terminateProcessor).
Build()
sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(memory.NewInMemoryStore()),
dsdk.WithTransactionContext(memory.InMemoryTrxContext{}),
dsdk.WithPrepareProcessor(providerDataPlane.prepareProcessor),
dsdk.WithStartProcessor(providerDataPlane.startProcessor),
dsdk.WithSuspendProcessor(providerDataPlane.suspendProcessor),
dsdk.WithTerminateProcessor(providerDataPlane.terminateProcessor),
)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/tests/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,9 @@ func newCallback() dsdk.CallbackURL {
}

func newSdk(db *sql.DB) (*dsdk.DataPlaneSDK, error) {
sdk, err := dsdk.NewDataPlaneSDKBuilder().
Store(postgres.NewStore(db)).
TransactionContext(postgres.NewDBTransactionContext(db)).
Build()
sdk, err := dsdk.NewDataPlaneSDK(
dsdk.WithStore(postgres.NewStore(db)),
dsdk.WithTransactionContext(postgres.NewDBTransactionContext(db)),
)
return sdk, err
}
94 changes: 55 additions & 39 deletions pkg/dsdk/dsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,85 +325,101 @@ func (dsdk *DataPlaneSDK) execute(ctx context.Context, callback func(ctx2 contex
}
}

type DataPlaneSDKBuilder struct {
sdk *DataPlaneSDK
}
// DataPlaneSDKOption configures a DataPlaneSDK instance
type DataPlaneSDKOption func(*DataPlaneSDK)

func NewDataPlaneSDKBuilder() *DataPlaneSDKBuilder {
return &DataPlaneSDKBuilder{
sdk: &DataPlaneSDK{},
func WithStore(store DataplaneStore) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.Store = store
}
}

func (b *DataPlaneSDKBuilder) Store(store DataplaneStore) *DataPlaneSDKBuilder {
b.sdk.Store = store
return b
func WithTransactionContext(trxContext TransactionContext) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.TrxContext = trxContext
}
}

func (b *DataPlaneSDKBuilder) TransactionContext(trxContext TransactionContext) *DataPlaneSDKBuilder {
b.sdk.TrxContext = trxContext
return b
func WithMonitor(monitor LogMonitor) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.Monitor = monitor
}
}

func (b *DataPlaneSDKBuilder) OnPrepare(processor DataFlowProcessor) *DataPlaneSDKBuilder {
b.sdk.onPrepare = processor
return b
func WithPrepareProcessor(processor DataFlowProcessor) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.onPrepare = processor
}
}

func (b *DataPlaneSDKBuilder) OnStart(processor DataFlowProcessor) *DataPlaneSDKBuilder {
b.sdk.onStart = processor
return b
func WithStartProcessor(processor DataFlowProcessor) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.onStart = processor
}
}

func (b *DataPlaneSDKBuilder) OnTerminate(handler DataFlowHandler) *DataPlaneSDKBuilder {
b.sdk.onTerminate = handler
return b
func WithTerminateProcessor(handler DataFlowHandler) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.onTerminate = handler
}
}

func (b *DataPlaneSDKBuilder) OnSuspend(handler DataFlowHandler) *DataPlaneSDKBuilder {
b.sdk.onSuspend = handler
return b
func WithSuspendProcessor(handler DataFlowHandler) DataPlaneSDKOption {
return func(sdk *DataPlaneSDK) {
sdk.onSuspend = handler
}
}

func (b *DataPlaneSDKBuilder) Build() (*DataPlaneSDK, error) {
if b.sdk.Store == nil {
func NewDataPlaneSDK(options ...DataPlaneSDKOption) (*DataPlaneSDK, error) {
sdk := &DataPlaneSDK{}

// Apply all options
for _, opt := range options {
opt(sdk)
}

// Validate required fields
if sdk.Store == nil {
return nil, errors.New("store is required")
}
if b.sdk.TrxContext == nil {
if sdk.TrxContext == nil {
return nil, errors.New("transaction context is required")
}
if b.sdk.onPrepare == nil {
b.sdk.onPrepare = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) {

// Set defaults for optional fields
if sdk.Monitor == nil {
sdk.Monitor = defaultLogMonitor{}
}
if sdk.onPrepare == nil {
sdk.onPrepare = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) {
return &DataFlowResponseMessage{
DataplaneID: "TODO_REPLACE_ME",
DataAddress: &flow.DestinationDataAddress,
State: Prepared,
Error: ""}, nil
}
}
if b.sdk.onStart == nil {
b.sdk.onStart = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) {
if sdk.onStart == nil {
sdk.onStart = func(context context.Context, flow *DataFlow, sdk *DataPlaneSDK, options *ProcessorOptions) (*DataFlowResponseMessage, error) {
return &DataFlowResponseMessage{
State: Started,
DataplaneID: "TODO_REPLACE_ME",
DataAddress: &flow.DestinationDataAddress,
Error: ""}, nil
}
}
if b.sdk.onTerminate == nil {
b.sdk.onTerminate = func(context context.Context, flow *DataFlow) error {
if sdk.onTerminate == nil {
sdk.onTerminate = func(context context.Context, flow *DataFlow) error {
return nil
}
}
if b.sdk.onSuspend == nil {
b.sdk.onSuspend = func(context context.Context, flow *DataFlow) error {
if sdk.onSuspend == nil {
sdk.onSuspend = func(context context.Context, flow *DataFlow) error {
return nil
}
}
if b.sdk.Monitor == nil {
b.sdk.Monitor = defaultLogMonitor{}
}
return b.sdk, nil

return sdk, nil
}

type defaultLogMonitor struct {
Expand Down
Loading