Skip to content

Commit 2b1657a

Browse files
committed
Rearchitect status handling as per issue 605
1 parent bc0e78b commit 2b1657a

File tree

8 files changed

+952
-182
lines changed

8 files changed

+952
-182
lines changed

client.go

Lines changed: 190 additions & 146 deletions
Large diffs are not rendered by default.

fvt_client_test.go

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func Test_Disconnect(t *testing.T) {
126126
go func() {
127127
c.Disconnect(250)
128128
cli := c.(*client)
129-
cli.status = connected
129+
cli.status.forceConnectionStatus(connected)
130130
c.Disconnect(250)
131131
close(disconnectC)
132132
}()
@@ -1191,29 +1191,36 @@ func Test_cleanUpMids_2(t *testing.T) {
11911191
ops.SetKeepAlive(10 * time.Second)
11921192

11931193
c := NewClient(ops)
1194+
cl := c.(*client)
11941195

11951196
if token := c.Connect(); token.Wait() && token.Error() != nil {
11961197
t.Fatalf("Error on Client.Connect(): %v", token.Error())
11971198
}
11981199

11991200
token := c.Publish("/test/cleanUP", 2, false, "cleanup test 2")
1200-
if len(c.(*client).messageIds.index) == 0 {
1201+
cl.messageIds.mu.Lock()
1202+
mq := len(c.(*client).messageIds.index)
1203+
cl.messageIds.mu.Unlock()
1204+
if mq == 0 {
12011205
t.Fatalf("Should be a token in the messageIDs, none found")
12021206
}
1203-
fmt.Println("Disconnecting", len(c.(*client).messageIds.index))
1207+
// fmt.Println("Disconnecting", len(cl.messageIds.index))
12041208
c.Disconnect(0)
12051209

12061210
fmt.Println("Wait on Token")
12071211
// We should be able to wait on this token without any issue
12081212
token.Wait()
12091213

1210-
if len(c.(*client).messageIds.index) > 0 {
1214+
cl.messageIds.mu.Lock()
1215+
mq = len(c.(*client).messageIds.index)
1216+
cl.messageIds.mu.Unlock()
1217+
if mq > 0 {
12111218
t.Fatalf("Should have cleaned up messageIDs, have %d left", len(c.(*client).messageIds.index))
12121219
}
12131220
if token.Error() == nil {
12141221
t.Fatal("token should have received an error on connection loss")
12151222
}
1216-
fmt.Println(token.Error())
1223+
// fmt.Println(token.Error())
12171224
}
12181225

12191226
func Test_ConnectRetry(t *testing.T) {
@@ -1339,7 +1346,6 @@ func Test_ResumeSubs(t *testing.T) {
13391346
t.Fatalf("Expected 1 packet to be in store")
13401347
}
13411348
packet := subMemStore.Get(ids[0])
1342-
fmt.Println("packet", packet)
13431349
if packet == nil {
13441350
t.Fatal("Failed to retrieve packet from store")
13451351
}
@@ -1471,11 +1477,12 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {
14711477
c.Disconnect(250)
14721478
}
14731479

1474-
// Issue 209 - occasional deadlock when connections are lost unexpectedly
1480+
// Issue 509 - occasional deadlock when connections are lost unexpectedly
14751481
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
14761482
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
14771483
// of times). Following the fix it ran 10,000 times without issue.
1478-
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
1484+
//
1485+
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
14791486
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
14801487
topic := "/test/DisconnectWhileProcessingIncomingPublish"
14811488

@@ -1487,11 +1494,11 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
14871494

14881495
sops := NewClientOptions()
14891496
sops.AddBroker(FVTTCP)
1490-
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
1497+
sops.SetAutoReconnect(false) // We don't want the connection to be re-established
14911498
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
14921499
// sops.SetOrderMatters(false)
14931500
sops.SetClientID("dwpip-sub")
1494-
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
1501+
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occurred)
14951502
sDisconnected := make(chan struct{})
14961503
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })
14971504

@@ -1523,20 +1530,23 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
15231530
i := 0
15241531
for {
15251532
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
1526-
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
1527-
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
1533+
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing it's a problem for another time)
1534+
go func(i int) { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }(i)
15281535
i++
1529-
15301536
if ctx.Err() != nil {
15311537
return
15321538
}
15331539
}
15341540
}()
15351541

15361542
// Wait until we have received a message (ensuring that the stream of messages has started)
1543+
delay := time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
15371544
select {
15381545
case <-msgReceived: // All good
1539-
case <-time.After(time.Second):
1546+
if !delay.Stop() { // Cleanly close timer as this may be run in a tight loop!
1547+
<-delay.C
1548+
}
1549+
case <-delay.C:
15401550
t.Errorf("no messages received")
15411551
}
15421552

@@ -1545,34 +1555,42 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
15451555
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
15461556
err := dm.Write(s.conn)
15471557
if err != nil {
1548-
t.Fatalf("error dending disconnect packet: %s", err)
1558+
t.Fatalf("error sending disconnect packet: %s", err)
15491559
}
15501560

15511561
// Lets give the library up to a second to shutdown (indicated by the status changing)
1562+
delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
15521563
select {
15531564
case <-sDisconnected: // All good
1554-
case <-time.After(time.Second):
1555-
cancel() // no point leaving publisher running
1556-
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
1565+
if !delay.Stop() {
1566+
<-delay.C
1567+
}
1568+
case <-delay.C:
1569+
cancel() // no point leaving publisher running
1570+
time.Sleep(10 * time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
15571571
buf := make([]byte, 1<<20)
15581572
stacklen := runtime.Stack(buf, true)
15591573
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
15601574
}
15611575

15621576
cancel() // no point leaving publisher running
15631577

1578+
delay = time.NewTimer(time.Second) // Be careful with timers as this will be run in a tight loop!
15641579
select {
15651580
case <-pubDone:
1566-
case <-time.After(time.Second):
1567-
t.Errorf("pubdone not closed within a second")
1581+
if !delay.Stop() {
1582+
<-delay.C
1583+
}
1584+
case <-delay.C:
1585+
t.Errorf("pubdone not closed within two seconds (probably due to load on system but may be an issue)")
15681586
}
15691587
p.Disconnect(250) // Close publisher
15701588
}
15711589

15721590
// Test_ResumeSubsMaxInflight - Check the MaxResumePubInFlight option.
15731591
// This is difficult to test without control of the broker (because we will be communicating via the broker not
1574-
// directly. However due to the way resume works when there is no limit to inflight messages message ordering is not
1575-
// guaranteed. However with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
1592+
// directly. However, due to the way resume works when there is no limit to inflight messages message ordering is not
1593+
// guaranteed. However, with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
15761594
// On my PC (using mosquitto under docker) running this without SetMaxResumePubInFlight(1) will fail with 1000 messages
15771595
// (generally passes if only 100 are sent). With the option set it always passes.
15781596
func Test_ResumeSubsMaxInflight(t *testing.T) {

net.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ type incomingComms struct {
150150

151151
// startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
152152
// messages.
153-
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
153+
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as
154154
// everything in the store has been sent.
155155
// Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
156156
func startIncomingComms(conn io.Reader,
@@ -332,7 +332,7 @@ func startOutgoingComms(conn net.Conn,
332332
DEBUG.Println(NET, "outbound wrote disconnect, closing connection")
333333
// As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection"
334334
// Closing the connection will cause the goroutines to end in sequence (starting with incoming comms)
335-
conn.Close()
335+
_ = conn.Close()
336336
}
337337
case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket)
338338
if !ok {
@@ -370,9 +370,10 @@ type commsFns interface {
370370
// startComms initiates goroutines that handles communications over the network connection
371371
// Messages will be stored (via commsFns) and deleted from the store as necessary
372372
// It returns two channels:
373-
// packets.PublishPacket - Will receive publish packets received over the network.
374-
// Closed when incoming comms routines exit (on shutdown or if network link closed)
375-
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
373+
//
374+
// packets.PublishPacket - Will receive publish packets received over the network.
375+
// Closed when incoming comms routines exit (on shutdown or if network link closed)
376+
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
376377
//
377378
// Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the
378379
// connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is

ping.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func keepalive(c *client, conn io.Writer) {
5858
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
5959
DEBUG.Println(PNG, "keepalive sending ping")
6060
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
61-
// We don't want to wait behind large messages being sent, the Write call
62-
// will block until it it able to send the packet.
61+
// We don't want to wait behind large messages being sent, the `Write` call
62+
// will block until it is able to send the packet.
6363
atomic.StoreInt32(&c.pingOutstanding, 1)
6464
if err := ping.Write(conn); err != nil {
6565
ERROR.Println(PNG, err)

0 commit comments

Comments
 (0)