Skip to content

Commit 42e706f

Browse files
committed
Make ZMQ msg_id threadsafe to avoid ZMQ drops in ntopng
Cleanup the use of ZmqDriver.lock in two ways: * Use defer d.lock.Unlock() like someone who actually knows golang * Lock earlier in ZmqDriver.Send() to ensure that the msg_id in the header, the header send, and the message send are all done together.
1 parent 965ff5a commit 42e706f

File tree

1 file changed

+4
-7
lines changed

1 file changed

+4
-7
lines changed

transport/zmq.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,25 +66,27 @@ func (d *ZmqDriver) Prepare() error {
6666

6767
func (d *ZmqDriver) Init() error {
6868
d.lock.Lock()
69+
defer d.lock.Unlock()
6970
d.context, _ = zmq.NewContext()
7071
d.publisher, _ = d.context.NewSocket(zmq.PUB)
7172
if err := d.publisher.Bind(d.listenAddress); err != nil {
72-
d.lock.Unlock()
7373
log.Fatalf("Unable to bind: %s", err.Error())
7474
}
7575

7676
log.Infof("Started ZMQ listener on: %s", d.listenAddress)
7777

7878
// Ensure subscriber connection has time to complete
7979
time.Sleep(time.Second)
80-
d.lock.Unlock()
8180
return nil
8281
}
8382

8483
func (d *ZmqDriver) Send(key, data []byte) error {
8584
var err error
8685

8786
msg_len := uint16(len(data))
87+
// Lock before creating zmq header to ensure messageId is unique
88+
d.lock.Lock()
89+
defer d.lock.Unlock()
8890
header := d.newZmqHeader(msg_len)
8991

9092
// send our header with the topic first as a multi-part message
@@ -94,26 +96,21 @@ func (d *ZmqDriver) Send(key, data []byte) error {
9496
return err
9597
}
9698

97-
d.lock.Lock()
9899
bytes, err := d.publisher.SendBytes(hbytes, zmq.SNDMORE)
99100
if err != nil {
100101
log.Errorf("Unable to send header: %s", err.Error())
101-
d.lock.Unlock()
102102
return err
103103
}
104104
if bytes != len(hbytes) {
105105
log.Errorf("Wrote the wrong number of header bytes: %d", bytes)
106-
d.lock.Unlock()
107106
return err
108107
}
109108

110109
// now send the actual payload
111110
if _, err = d.publisher.SendBytes(data, 0); err != nil {
112111
log.Error(err)
113-
d.lock.Unlock()
114112
return err
115113
}
116-
d.lock.Unlock()
117114

118115
switch d.msgType {
119116
case PBUF:

0 commit comments

Comments
 (0)