Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions .deepsource.toml
Original file line number Diff line number Diff line change
@@ -1,17 +0,0 @@
version = 1

test_patterns = [
'*_test.go'
]

exclude_patterns = [

]

[[analyzers]]
name = 'go'
enabled = true


[analyzers.meta]
import_path = 'github.com/samuel/go-zookeeper/zk'
2 changes: 1 addition & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestIntegration_NoQuorum(t *testing.T) {
DefaultLogger.Printf(" Retrying no luck...")
var firstDisconnect *Event
begin := time.Now()
for time.Since(begin) < 6*time.Second {
for time.Now().Sub(begin) < 6*time.Second {
disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second)
if disconnectedEvent == nil {
t.Fatalf("Disconnected event expected")
Expand Down
118 changes: 11 additions & 107 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,65 +394,7 @@ func (c *Conn) connect() error {
return nil
}

c.logger.Printf("Failed to connect to %s: %+v", c.Server(), err)
}
}

func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
shouldCancel := func() bool {
select {
case <-c.shouldQuit:
return true
case <-c.closeChan:
return true
default:
return false
}
}

c.credsMu.Lock()
defer c.credsMu.Unlock()

defer close(reauthReadyChan)

if c.logInfo {
c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
}

for _, cred := range c.creds {
if shouldCancel() {
return
}
resChan, err := c.sendRequest(
opSetAuth,
&setAuthRequest{Type: 0,
Scheme: cred.scheme,
Auth: cred.auth,
},
&setAuthResponse{},
nil)

if err != nil {
c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}

var res response
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
c.logger.Printf("should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
c.logger.Printf("failed to connect to %s: %v", c.Server(), err)
}
}

Expand Down Expand Up @@ -730,28 +672,20 @@ func (c *Conn) authenticate() error {

binary.BigEndian.PutUint32(buf[:4], uint32(n))

if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
return err
}
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = c.conn.Write(buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}

// Receive and decode a connect response.
if err := c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
return err
}
c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = io.ReadFull(c.conn, buf[:4])
c.conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}

blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
Expand Down Expand Up @@ -813,18 +747,14 @@ func (c *Conn) sendData(req *request) error {
c.requests[req.xid] = req
c.requestsLock.Unlock()

if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
return err
}
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
c.conn.Close()
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}

return nil
}
Expand All @@ -847,17 +777,13 @@ func (c *Conn) sendLoop() error {

binary.BigEndian.PutUint32(c.buf[:4], uint32(n))

if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
return err
}
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = c.conn.Write(c.buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
c.conn.Close()
return err
}
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
return err
}
case <-c.closeChan:
return nil
}
Expand Down Expand Up @@ -889,12 +815,10 @@ func (c *Conn) recvLoop(conn net.Conn) error {
}

_, err = io.ReadFull(conn, buf[:blen])
conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
if err := conn.SetReadDeadline(time.Time{}); err != nil {
return err
}

res := responseHeader{}
_, err = decodePacket(buf[:16], &res)
Expand All @@ -915,27 +839,7 @@ func (c *Conn) recvLoop(conn net.Conn) error {
Err: nil,
}
c.sendEvent(ev)
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
wTypes = append(wTypes, watchTypeExist)
case EventNodeDeleted, EventNodeDataChanged:
wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
case EventNodeChildrenChanged:
wTypes = append(wTypes, watchTypeChild)
}
c.watchersLock.Lock()
for _, t := range wTypes {
wpt := watchPathType{res.Path, t}
if watchers, ok := c.watchers[wpt]; ok {
for _, ch := range watchers {
ch <- ev
close(ch)
}
delete(c.watchers, wpt)
}
}
c.watchersLock.Unlock()
c.notifyWatches(ev)
} else if res.Xid == -2 {
// Ping response. Ignore.
} else if res.Xid < 0 {
Expand Down
2 changes: 1 addition & 1 deletion constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "unknown state"
return "Unknown"
}

// ErrCode is the error code defined by server. Refer to ZK documentations for more specifics.
Expand Down
8 changes: 2 additions & 6 deletions flw.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,12 @@ func fourLetterWord(server, command string, timeout time.Duration) ([]byte, erro
// once the command has been processed, but better safe than sorry
defer conn.Close()

if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
conn.SetWriteDeadline(time.Now().Add(timeout))
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}

if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
conn.SetReadDeadline(time.Now().Add(timeout))
return ioutil.ReadAll(conn)
}
78 changes: 10 additions & 68 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ var (

// Lock is a mutual exclusion lock.
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
attemptedLockPath string
c *Conn
path string
acl []ACL
lockPath string
seq int
}

// NewLock creates a new lock instance using the provided connection, path, and acl.
Expand Down Expand Up @@ -58,29 +57,13 @@ func (l *Lock) LockWithData(data []byte) error {
return ErrDeadlock
}

if l.attemptedLockPath != "" {
// Check whether lock has been acquired previously and it still exists
if lockExists(l.c, l.path, l.attemptedLockPath) {
l.lockPath = l.attemptedLockPath
return nil
}
}

prefix := fmt.Sprintf("%s/lock-", l.path)

path := ""
var err error
tryLock:
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)

if path != "" {
// Store the path of newly created sequential ephemeral znode
l.attemptedLockPath = path
}

switch err {
case ErrNoNode:
path, err = l.c.CreateProtectedEphemeralSequential(prefix, data, l.acl)
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
Expand All @@ -99,9 +82,9 @@ tryLock:
return err
}
}
case nil:
break tryLock
default:
} else if err == nil {
break
} else {
return err
}
}
Expand Down Expand Up @@ -171,48 +154,7 @@ func (l *Lock) Unlock() error {
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
// Perform clean up
l.lockPath = ""
l.seq = 0
l.attemptedLockPath = ""

return nil
}

// Check whether lock got created and response was lost because of network partition failure.
// It queries zookeeper and scans existing sequential ephemeral znodes under the parent path
// It finds out that previously requested sequence number corresponds to child having lowest sequence number
func lockExists(c *Conn, rootPath string, znodePath string) bool {
seq, err := parseSeq(znodePath)
if err != nil {
return false
}

//Scan the existing znodes if there are any
children, _, err := c.Children(rootPath)
if err != nil {
return false
}

lowestSeq := seq
prevSeq := -1
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return false
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
}
}

if seq == lowestSeq {
// Acquired the lock
return true
}

return false
}
6 changes: 4 additions & 2 deletions server_java_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io
zkPath := os.Getenv("ZOOKEEPER_BIN_PATH")
if zkPath == "" {
// default to a static reletive path that can be setup with a build system
zkPath = "../zookeeper/bin"
zkPath = "zookeeper/bin"
}
if _, err := os.Stat(zkPath); err != nil {
if os.IsNotExist(err) {
Expand All @@ -49,6 +49,8 @@ func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io
}
// password is 'test'
superString := `SERVER_JVMFLAGS=-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=`
// enable TTL
superString += ` -Dzookeeper.extendedTypesEnabled=true -Dzookeeper.emulate353TTLNodes=true`

return &server{
cmdString: filepath.Join(zkPath, "zkServer.sh"),
Expand All @@ -65,8 +67,8 @@ func (srv *server) Start() error {
srv.cmd = exec.CommandContext(ctx, srv.cmdString, srv.cmdArgs...)
srv.cmd.Stdout = srv.stdout
srv.cmd.Stderr = srv.stderr
srv.cmd.Env = append(os.Environ(), srv.cmdEnv...)

srv.cmd.Env = srv.cmdEnv
return srv.cmd.Start()
}

Expand Down
2 changes: 1 addition & 1 deletion throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *conn) start() {

func (c *conn) writeLoop() {
for req := range c.wchan {
time.Sleep(time.Until(req.writeAt))
time.Sleep(req.writeAt.Sub(time.Now()))
var res nErr
for len(req.p) > 0 && res.err == nil {
writep := req.p
Expand Down
Loading