diff --git a/go.mod b/go.mod index ea95d8e32..c7e8221f1 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,7 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect go.uber.org/atomic v1.10.0 // indirect diff --git a/go.sum b/go.sum index 6d4cc8fb5..cfaad41cb 100644 --- a/go.sum +++ b/go.sum @@ -418,6 +418,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v0.0.0-20170130113145-4d4bfba8f1d1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/pkg/model/zookeeper/connection.go b/pkg/model/zookeeper/connection.go index f309db088..3c9d45c64 100644 --- a/pkg/model/zookeeper/connection.go +++ b/pkg/model/zookeeper/connection.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "math/rand" "net" @@ -33,14 +34,32 @@ import ( api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" ) +// ZKClient abstracts the subset of zk.Conn methods used by Connection for testability. +type ZKClient interface { + Get(path string) ([]byte, *zk.Stat, error) + Set(path string, data []byte, version int32) (*zk.Stat, error) + Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) + Delete(path string, version int32) error + Exists(path string) (bool, *zk.Stat, error) + AddAuth(scheme string, auth []byte) error + Close() +} + +// Assert that zk.Conn implements ZKClient +var _ ZKClient = (*zk.Conn)(nil) + type Connection struct { nodes api.ZookeeperNodes ConnectionParams sema *semaphore.Weighted mu sync.Mutex - connection *zk.Conn + connection ZKClient + + // retryDelayFn is configurable for testing + retryDelayFn func(i int) } +// NewConnection creates a new Zookeeper connection with the provided nodes and parameters. func NewConnection(nodes api.ZookeeperNodes, _params ...*ConnectionParams) *Connection { var params *ConnectionParams if len(_params) > 0 { @@ -51,52 +70,62 @@ func NewConnection(nodes api.ZookeeperNodes, _params ...*ConnectionParams) *Conn nodes: nodes, sema: semaphore.NewWeighted(params.MaxConcurrentRequests), ConnectionParams: *params, + retryDelayFn: func(i int) { + time.Sleep(time.Duration(i)*time.Second + time.Duration(rand.Int63n(int64(1*time.Second)))) + }, } } +// Get retrieves data from the specified path in Zookeeper. func (c *Connection) Get(ctx context.Context, path string) (data []byte, stat *zk.Stat, err error) { - err = c.retry(ctx, func(connection *zk.Conn) error { + err = c.retry(ctx, func(connection ZKClient) error { data, stat, err = connection.Get(path) return err }) return } -func (c *Connection) Exists(ctx context.Context, path string) bool { - exists, _, _ := c.Details(ctx, path) - return exists +// Exists checks if the specified path exists in Zookeeper. +func (c *Connection) Exists(ctx context.Context, path string) (bool, error) { + exists, _, err := c.Details(ctx, path) + return exists, err } +// Details retrieves existence and stat information for the specified path in Zookeeper. func (c *Connection) Details(ctx context.Context, path string) (exists bool, stat *zk.Stat, err error) { - err = c.retry(ctx, func(connection *zk.Conn) error { + err = c.retry(ctx, func(connection ZKClient) error { exists, stat, err = connection.Exists(path) return err }) return } +// Create creates a new node at the specified path with the given value, flags, and ACL. func (c *Connection) Create(ctx context.Context, path string, value []byte, flags int32, acl []zk.ACL) (pathCreated string, err error) { - err = c.retry(ctx, func(connection *zk.Conn) error { + err = c.retry(ctx, func(connection ZKClient) error { pathCreated, err = connection.Create(path, value, flags, acl) return err }) return } +// Set updates the value of the node at the specified path with the given version. func (c *Connection) Set(ctx context.Context, path string, value []byte, version int32) (stat *zk.Stat, err error) { - err = c.retry(ctx, func(connection *zk.Conn) error { + err = c.retry(ctx, func(connection ZKClient) error { stat, err = connection.Set(path, value, version) return err }) return } +// Delete removes the node at the specified path with the given version. func (c *Connection) Delete(ctx context.Context, path string, version int32) error { - return c.retry(ctx, func(connection *zk.Conn) error { + return c.retry(ctx, func(connection ZKClient) error { return connection.Delete(path, version) }) } +// Close closes the Zookeeper connection if it exists. If the connection is nil, it does nothing. func (c *Connection) Close() error { if c == nil { return nil @@ -109,41 +138,55 @@ func (c *Connection) Close() error { return nil } -func (c *Connection) retry(ctx context.Context, fn func(*zk.Conn) error) error { +func (c *Connection) retry(ctx context.Context, fn func(connection ZKClient) error) error { if err := c.sema.Acquire(ctx, 1); err != nil { return err } defer c.sema.Release(1) + var errs []error for i := 0; i < c.MaxRetriesNum; i++ { if i > 0 { // Progressive delay before each retry - time.Sleep(time.Duration(i)*time.Second + time.Duration(rand.Int63n(int64(1*time.Second)))) + c.retryDelayFn(i) } connection, err := c.ensureConnection(ctx) if err != nil { + errs = append(errs, fmt.Errorf("retry %d: connection error: %w", i+1, err)) continue // Retry } err = fn(connection) + if err == nil { + // Success - return nil, no need for caller to know about errors + return nil + } + + // Handle specific error cases if err == zk.ErrConnectionClosed { c.mu.Lock() if c.connection == connection { c.connection = nil } c.mu.Unlock() + errs = append(errs, fmt.Errorf("retry %d: connection closed: %w", i+1, err)) continue // Retry } - // Got result - return err + // Collect the errors + errs = append(errs, fmt.Errorf("retry %d: %w", i+1, err)) + } + + // All retries failed - wrap all errors + if len(errs) == 0 { + return fmt.Errorf("max retries number reached: %d", c.MaxRetriesNum) } - return fmt.Errorf("max retries number reached") + return fmt.Errorf("all retries (%d) failed: %w", c.MaxRetriesNum, errors.Join(errs...)) } -func (c *Connection) ensureConnection(ctx context.Context) (*zk.Conn, error) { +func (c *Connection) ensureConnection(ctx context.Context) (ZKClient, error) { c.mu.Lock() defer c.mu.Unlock() @@ -181,7 +224,7 @@ func (c *Connection) connectionAddAuth(ctx context.Context) { } } -func (c *Connection) connectionEventsProcessor(connection *zk.Conn, events <-chan zk.Event) { +func (c *Connection) connectionEventsProcessor(connection ZKClient, events <-chan zk.Event) { for event := range events { shouldCloseConnection := false switch event.State { @@ -206,7 +249,7 @@ func (c *Connection) connectionEventsProcessor(connection *zk.Conn, events <-cha } } -func (c *Connection) dial(ctx context.Context) (*zk.Conn, <-chan zk.Event, error) { +func (c *Connection) dial(ctx context.Context) (ZKClient, <-chan zk.Event, error) { ctx, cancel := context.WithTimeout(ctx, c.TimeoutConnect) defer cancel() @@ -232,7 +275,7 @@ func (c *Connection) dial(ctx context.Context) (*zk.Conn, <-chan zk.Event, error } } -func (c *Connection) connect(servers []string) (*zk.Conn, <-chan zk.Event, error) { +func (c *Connection) connect(servers []string) (ZKClient, <-chan zk.Event, error) { optionsDialer := zk.WithDialer(net.DialTimeout) if c.CertFile != "" && c.KeyFile != "" { if len(servers) > 1 { diff --git a/pkg/model/zookeeper/connection_test.go b/pkg/model/zookeeper/connection_test.go new file mode 100644 index 000000000..46a15164f --- /dev/null +++ b/pkg/model/zookeeper/connection_test.go @@ -0,0 +1,748 @@ +package zookeeper + +import ( + "context" + "testing" + + api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" + "github.com/go-zookeeper/zk" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestConnection_Get(t *testing.T) { + // Sample stat for successful responses + sampleStat := &zk.Stat{ + Czxid: 123, + Mzxid: 124, + Ctime: 1234567890, + Mtime: 1234567891, + Version: 1, + Cversion: 0, + Aversion: 0, + EphemeralOwner: 0, + DataLength: 4, + NumChildren: 0, + Pzxid: 125, + } + + tests := []struct { + name string + path string + setupMock func(*MockZKClient) + expectedData []byte + expectedStat *zk.Stat + expectedErrors []error + retries int + }{ + { + name: "success: successful get operation", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Get", "/test/path").Return([]byte("test data"), sampleStat, nil).Once() + }, + expectedData: []byte("test data"), + expectedStat: sampleStat, + retries: 1, // 1 means a single attempt + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Get", "/test/retry-path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Get", "/test/retry-path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Get", "/test/retry-path").Return([]byte("retry success"), sampleStat, nil).Once() + }, + expectedData: []byte("retry success"), + expectedStat: sampleStat, + retries: 3, + }, + { + name: "error: persistent connection error, no retries", + path: "/nonexistent/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Get", "/nonexistent/path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrNoNode).Once() + }, + expectedData: nil, + expectedStat: nil, + expectedErrors: []error{zk.ErrNoNode}, + retries: 1, + }, + + { + name: "error: persistent connection error, with retries", + path: "/nonexistent/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Get", "/nonexistent/path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrNoNode).Once() + mockClient.On("Get", "/nonexistent/path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Get", "/nonexistent/path").Return([]byte(nil), (*zk.Stat)(nil), zk.ErrNoAuth).Once() + }, + expectedData: nil, + expectedStat: nil, + expectedErrors: []error{zk.ErrNoNode, zk.ErrAPIError, zk.ErrNoAuth}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + data, stat, err := conn.Get(ctx, tt.path) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + assert.Nil(t, data) + assert.Nil(t, stat) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedData, data) + assert.Equal(t, tt.expectedStat, stat) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Exists(t *testing.T) { + // Sample stat for successful responses + sampleStat := &zk.Stat{ + Czxid: 123, + Mzxid: 124, + Ctime: 1234567890, + Mtime: 1234567891, + Version: 1, + Cversion: 0, + Aversion: 0, + EphemeralOwner: 0, + DataLength: 4, + NumChildren: 0, + Pzxid: 125, + } + + tests := []struct { + name string + path string + setupMock func(*MockZKClient) + expectedExists bool + expectedErrors []error + retries int + }{ + { + name: "success: node exists", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(true, sampleStat, nil).Once() + }, + expectedExists: true, + retries: 1, + }, + { + name: "success: node does not exist", + path: "/nonexistent/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/nonexistent/path").Return(false, (*zk.Stat)(nil), nil).Once() + }, + expectedExists: false, + retries: 1, + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/retry-path").Return(false, (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Exists", "/test/retry-path").Return(false, (*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Exists", "/test/retry-path").Return(true, sampleStat, nil).Once() + }, + expectedExists: true, + retries: 3, + }, + { + name: "error: persistent connection error, no retries", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrConnectionClosed).Once() + }, + expectedExists: false, + expectedErrors: []error{zk.ErrConnectionClosed}, + retries: 1, + }, + { + name: "error: all retries fail with different errors", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrBadVersion).Once() + }, + expectedExists: false, + expectedErrors: []error{zk.ErrAPIError, zk.ErrNoAuth, zk.ErrBadVersion}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + exists, err := conn.Exists(ctx, tt.path) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedExists, exists) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Details(t *testing.T) { + // Sample stat for successful responses + sampleStat := &zk.Stat{ + Czxid: 123, + Mzxid: 124, + Ctime: 1234567890, + Mtime: 1234567891, + Version: 1, + Cversion: 0, + Aversion: 0, + EphemeralOwner: 0, + DataLength: 4, + NumChildren: 0, + Pzxid: 125, + } + + tests := []struct { + name string + path string + setupMock func(*MockZKClient) + expectedExists bool + expectedStat *zk.Stat + expectedErrors []error + retries int + }{ + { + name: "success: node exists with stat", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(true, sampleStat, nil).Once() + }, + expectedExists: true, + expectedStat: sampleStat, + retries: 1, + }, + { + name: "success: node does not exist", + path: "/nonexistent/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/nonexistent/path").Return(false, (*zk.Stat)(nil), nil).Once() + }, + expectedExists: false, + expectedStat: nil, + retries: 1, + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/retry-path").Return(false, (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Exists", "/test/retry-path").Return(false, (*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Exists", "/test/retry-path").Return(true, sampleStat, nil).Once() + }, + expectedExists: true, + expectedStat: sampleStat, + retries: 3, + }, + { + name: "error: persistent error, no retries", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrConnectionClosed).Once() + }, + expectedExists: false, + expectedStat: nil, + expectedErrors: []error{zk.ErrConnectionClosed}, + retries: 1, + }, + { + name: "error: all retries fail with different errors", + path: "/test/path", + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Exists", "/test/path").Return(false, (*zk.Stat)(nil), zk.ErrBadVersion).Once() + }, + expectedExists: false, + expectedStat: nil, + expectedErrors: []error{zk.ErrAPIError, zk.ErrNoAuth, zk.ErrBadVersion}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + exists, stat, err := conn.Details(ctx, tt.path) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + assert.False(t, exists) + assert.Nil(t, stat) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedExists, exists) + assert.Equal(t, tt.expectedStat, stat) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Create(t *testing.T) { + tests := []struct { + name string + path string + value []byte + flags int32 + acl []zk.ACL + setupMock func(*MockZKClient) + expectedPath string + expectedErrors []error + retries int + }{ + { + name: "success: create node", + path: "/test/path", + value: []byte("test data"), + flags: 0, + acl: zk.WorldACL(zk.PermAll), + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Create", "/test/path", []byte("test data"), int32(0), zk.WorldACL(zk.PermAll)).Return("/test/path", nil).Once() + }, + expectedPath: "/test/path", + retries: 1, + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + value: []byte("retry data"), + flags: 0, + acl: zk.WorldACL(zk.PermAll), + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Create", "/test/retry-path", []byte("retry data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrAPIError).Once() + mockClient.On("Create", "/test/retry-path", []byte("retry data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNoAuth).Once() + mockClient.On("Create", "/test/retry-path", []byte("retry data"), int32(0), zk.WorldACL(zk.PermAll)).Return("/test/retry-path", nil).Once() + }, + expectedPath: "/test/retry-path", + retries: 3, + }, + { + name: "error: persistent error", + path: "/test/error", + value: []byte("data"), + flags: 0, + acl: zk.WorldACL(zk.PermAll), + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Create", "/test/error", []byte("data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNoNode).Once() + }, + expectedPath: "", + expectedErrors: []error{zk.ErrNoNode}, + retries: 1, + }, + { + name: "error: all retries fail with different errors", + path: "/test/error-path", + value: []byte("data"), + flags: 0, + acl: zk.WorldACL(zk.PermAll), + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Create", "/test/error-path", []byte("data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists).Once() + mockClient.On("Create", "/test/error-path", []byte("data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrAPIError).Once() + mockClient.On("Create", "/test/error-path", []byte("data"), int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNoAuth).Once() + }, + expectedPath: "", + expectedErrors: []error{zk.ErrNodeExists, zk.ErrAPIError, zk.ErrNoAuth}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + pathCreated, err := conn.Create(ctx, tt.path, tt.value, tt.flags, tt.acl) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + assert.Empty(t, pathCreated) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedPath, pathCreated) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Set(t *testing.T) { + // Sample stat for successful responses + sampleStat := &zk.Stat{ + Czxid: 123, + Mzxid: 124, + Ctime: 1234567890, + Mtime: 1234567891, + Version: 2, + Cversion: 0, + Aversion: 0, + EphemeralOwner: 0, + DataLength: 9, + NumChildren: 0, + Pzxid: 125, + } + + tests := []struct { + name string + path string + value []byte + version int32 + setupMock func(*MockZKClient) + expectedStat *zk.Stat + expectedErrors []error + retries int + }{ + { + name: "success: set data", + path: "/test/path", + value: []byte("new data"), + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Set", "/test/path", []byte("new data"), int32(1)).Return(sampleStat, nil).Once() + }, + expectedStat: sampleStat, + retries: 1, + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + value: []byte("retry data"), + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Set", "/test/retry-path", []byte("retry data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrAPIError).Once() + mockClient.On("Set", "/test/retry-path", []byte("retry data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrNoAuth).Once() + mockClient.On("Set", "/test/retry-path", []byte("retry data"), int32(1)).Return(sampleStat, nil).Once() + }, + expectedStat: sampleStat, + retries: 3, + }, + { + name: "error: node not found", + path: "/nonexistent/path", + value: []byte("data"), + version: -1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Set", "/nonexistent/path", []byte("data"), int32(-1)).Return((*zk.Stat)(nil), zk.ErrNoNode).Once() + }, + expectedStat: nil, + expectedErrors: []error{zk.ErrNoNode}, + retries: 1, + }, + { + name: "error: persistent connection error", + path: "/test/path", + value: []byte("data"), + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Set", "/test/path", []byte("data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrConnectionClosed).Once() + }, + expectedStat: nil, + expectedErrors: []error{zk.ErrConnectionClosed}, + retries: 1, + }, + { + name: "error: all retries fail with different errors", + path: "/test/error-path", + value: []byte("data"), + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Set", "/test/error-path", []byte("data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrBadVersion).Once() + mockClient.On("Set", "/test/error-path", []byte("data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrNoNode).Once() + mockClient.On("Set", "/test/error-path", []byte("data"), int32(1)).Return((*zk.Stat)(nil), zk.ErrAPIError).Once() + }, + expectedStat: nil, + expectedErrors: []error{zk.ErrBadVersion, zk.ErrNoNode, zk.ErrAPIError}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + stat, err := conn.Set(ctx, tt.path, tt.value, tt.version) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + assert.Nil(t, stat) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedStat, stat) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Delete(t *testing.T) { + tests := []struct { + name string + path string + version int32 + setupMock func(*MockZKClient) + expectedErrors []error + retries int + }{ + { + name: "success: delete", + path: "/test/path", + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Delete", "/test/path", int32(1)).Return(nil).Once() + }, + retries: 1, + }, + { + name: "success: retry scenario - 2 errors then success", + path: "/test/retry-path", + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Delete", "/test/retry-path", int32(1)).Return(zk.ErrAPIError).Once() + mockClient.On("Delete", "/test/retry-path", int32(1)).Return(zk.ErrNoAuth).Once() + mockClient.On("Delete", "/test/retry-path", int32(1)).Return(nil).Once() + }, + retries: 3, + }, + { + name: "error: persistent connection error", + path: "/test/path", + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Delete", "/test/path", int32(1)).Return(zk.ErrConnectionClosed).Once() + }, + expectedErrors: []error{zk.ErrConnectionClosed}, + retries: 1, + }, + { + name: "error: all retries fail with different errors", + path: "/test/error-path", + version: 1, + setupMock: func(mockClient *MockZKClient) { + mockClient.On("Delete", "/test/error-path", int32(1)).Return(zk.ErrNotEmpty).Once() + mockClient.On("Delete", "/test/error-path", int32(1)).Return(zk.ErrBadVersion).Once() + mockClient.On("Delete", "/test/error-path", int32(1)).Return(zk.ErrAPIError).Once() + }, + expectedErrors: []error{zk.ErrNotEmpty, zk.ErrBadVersion, zk.ErrAPIError}, + retries: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + tt.setupMock(mockClient) + + // Create connection with mock client + conn := newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: tt.retries}) + + // Call the method under test + ctx := context.Background() + err := conn.Delete(ctx, tt.path, tt.version) + + // Assert expectations + if len(tt.expectedErrors) > 0 { + assert.Error(t, err) + // Verify that all expected errors are present in the aggregated error + for _, expectedError := range tt.expectedErrors { + assert.ErrorIs(t, err, expectedError, "Expected error %v should be present in aggregated error", expectedError) + } + } else { + assert.NoError(t, err) + } + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +func TestConnection_Close(t *testing.T) { + tests := []struct { + name string + setup func(*MockZKClient) *Connection + }{ + { + name: "success: close connection", + setup: func(mockClient *MockZKClient) *Connection { + mockClient.On("Close").Return().Once() + return newTestConnection(api.ZookeeperNodes{}, mockClient, &ConnectionParams{MaxRetriesNum: 1}) + }, + }, + { + name: "success: handle empty connection", + setup: func(mockClient *MockZKClient) *Connection { + return nil + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock client + mockClient := new(MockZKClient) + + // Setup mock expectations + conn := tt.setup(mockClient) + + // Call the method under test + err := conn.Close() + + // Assert expectations - Close should always succeed + assert.NoError(t, err) + + // Verify all mock expectations were met + mockClient.AssertExpectations(t) + }) + } +} + +// newTestConnection creates a Connection with a custom ZKClient for testing +func newTestConnection(nodes api.ZookeeperNodes, client ZKClient, _params ...*ConnectionParams) *Connection { + conn := NewConnection(nodes, _params...) + conn.connection = client + conn.retryDelayFn = func(i int) {} // Disable retry delay for tests + return conn +} + +// MockZKClient is a mock implementation of the ZKClient interface for testing +type MockZKClient struct { + mock.Mock +} + +func (m *MockZKClient) Get(path string) ([]byte, *zk.Stat, error) { + args := m.Called(path) + return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Error(2) +} + +func (m *MockZKClient) Set(path string, data []byte, version int32) (*zk.Stat, error) { + args := m.Called(path, data, version) + return args.Get(0).(*zk.Stat), args.Error(1) +} + +func (m *MockZKClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) { + args := m.Called(path, data, flags, acl) + return args.String(0), args.Error(1) +} + +func (m *MockZKClient) Delete(path string, version int32) error { + args := m.Called(path, version) + return args.Error(0) +} + +func (m *MockZKClient) Exists(path string) (bool, *zk.Stat, error) { + args := m.Called(path) + return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2) +} + +func (m *MockZKClient) AddAuth(scheme string, auth []byte) error { + args := m.Called(scheme, auth) + return args.Error(0) +} + +func (m *MockZKClient) Close() { + m.Called() +} diff --git a/pkg/model/zookeeper/path_manager.go b/pkg/model/zookeeper/path_manager.go index d2ed3a44c..f456296c5 100644 --- a/pkg/model/zookeeper/path_manager.go +++ b/pkg/model/zookeeper/path_manager.go @@ -61,7 +61,11 @@ func (p *PathManager) Ensure(path string) { subPath := "" for _, folder := range pathParts { subPath += "/" + folder - if p.Connection.Exists(ctx, subPath) { + if ok, err := p.Connection.Exists(ctx, subPath); !ok { + if err != nil { + log.Warning("received error while checking zk path: %s err: %v", subPath, err) + } + } else { log.Info("zk path already exists: %s", subPath) continue // for }