Skip to content

Commit 2c7d3f3

Browse files
authored
Merge pull request #112 from kellybyrd/better_thread_safety
Make ZMQ msg_id threadsafe to avoid ZMQ drops in ntopng
2 parents 4521f9a + 42e706f commit 2c7d3f3

File tree

1 file changed

+4
-7
lines changed

1 file changed

+4
-7
lines changed

transport/zmq.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,17 @@ func (d *ZmqDriver) Prepare() error {
6666

6767
func (d *ZmqDriver) Init() error {
6868
d.lock.Lock()
69+
defer d.lock.Unlock()
6970
d.context, _ = zmq.NewContext()
7071
d.publisher, _ = d.context.NewSocket(zmq.PUB)
7172
if err := d.publisher.Bind(d.listenAddress); err != nil {
72-
d.lock.Unlock()
7373
log.Fatalf("Unable to bind: %s", err.Error())
7474
}
7575

7676
log.Infof("Started ZMQ listener on: %s", d.listenAddress)
7777

7878
// Ensure subscriber connection has time to complete
7979
time.Sleep(time.Second)
80-
d.lock.Unlock()
8180
return nil
8281
}
8382

@@ -91,6 +90,9 @@ func (d *ZmqDriver) Send(key, data []byte) error {
9190
}
9291

9392
msg_len := uint16(len(data))
93+
// Lock before creating zmq header to ensure messageId is unique
94+
d.lock.Lock()
95+
defer d.lock.Unlock()
9496
header := d.newZmqHeader(msg_len)
9597

9698
// send our header with the topic first as a multi-part message
@@ -100,26 +102,21 @@ func (d *ZmqDriver) Send(key, data []byte) error {
100102
return err
101103
}
102104

103-
d.lock.Lock()
104105
bytes, err := d.publisher.SendBytes(hbytes, zmq.SNDMORE)
105106
if err != nil {
106107
log.Errorf("Unable to send header: %s", err.Error())
107-
d.lock.Unlock()
108108
return err
109109
}
110110
if bytes != len(hbytes) {
111111
log.Errorf("Wrote the wrong number of header bytes: %d", bytes)
112-
d.lock.Unlock()
113112
return err
114113
}
115114

116115
// now send the actual payload
117116
if _, err = d.publisher.SendBytes(data, 0); err != nil {
118117
log.Error(err)
119-
d.lock.Unlock()
120118
return err
121119
}
122-
d.lock.Unlock()
123120

124121
switch d.msgType {
125122
case PBUF:

0 commit comments

Comments
 (0)