-
Notifications
You must be signed in to change notification settings - Fork 558
Open
Description
Background:
The MQTT keepalive goroutine sends a PINGREQ
when no activity has been seen for the keepalive interval or no inbound packets have arrived. The code currently does this with a plain ping.Write(conn)
call. If the TCP send buffer is backed up because an earlier PUBLISH is unacknowledged (half-open / stuck), the ping.Write
can block indefinitely. That prevents the keepalive from progressing, delays detection of a dead connection, and defers recovery until kernel-level retransmit limits (tcp_retries2
) trigger or other higher-level timeouts fire.
Problem:
ping.Write(conn)
can hang if the underlying TCP socket is under backpressure (e.g., due to unacknowledged in-flight PUBLISH).- Keepalive logic relies on being able to send a PINGREQ and then observe a PINGRESP; if the write blocks forever, neither happens, so the connection liveness check is effectively stalled.
- This results in late disconnection detection in half-open scenarios, increasing the time to failover/reconnect.
Reproduction steps (high level):
- Establish a connection with a PUBLISH in flight that does not get acknowledged (simulate a silent peer or dropped ACK).
- Let the PUBLISH saturate the send buffer / create backpressure.
- Wait for keepalive to trigger; observe that
ping.Write
blocks because the socket is clogged. - No
PINGRESP
is received; connection isn’t torn down until much later (e.g., due totcp_retries2
expiry), even though the peer is effectively dead.
Proposal:
- Before calling
ping.Write(conn)
, set a bounded write deadline so the write cannot block indefinitely. - Example change:
// writer serializes deadline+write+clear so concurrent goroutines don't race.
type deadlineWriter struct {
conn net.Conn
mu sync.Mutex
}
func newDeadlineWriter(conn net.Conn) *deadlineWriter {
return &deadlineWriter{conn: conn}
}
// WriteWithTimeout sets a per-write deadline, does the write, then clears it.
func (w *deadlineWriter) WriteWithTimeout(packetsWriter func(io.Writer) error, timeout time.Duration) error {
w.mu.Lock()
defer w.mu.Unlock()
if timeout > 0 {
_ = w.conn.SetWriteDeadline(time.Now().Add(timeout))
}
// perform the actual write via callback so callers can pass arbitrary packet logic
err := packetsWriter(w.conn)
// clear the deadline so it does not linger
_ = w.conn.SetWriteDeadline(time.Time{})
return err
}
// assume you wrap the raw conn once and reuse:
safeWriter := newDeadlineWriter(conn)
// when sending ping:
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
DEBUG.Println(PNG, "keepalive sending ping")
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
atomic.StoreInt32(&c.pingOutstanding, 1)
// bounded write to avoid blocking forever behind stuck publishes
err := safeWriter.WriteWithTimeout(func(w io.Writer) error {
return ping.Write(w)
}, 2*time.Second) // make 2s configurable if desired
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
CRITICAL.Println(PNG, "keepalive ping write timed out, disconnecting:", err)
c.internalConnLost(fmt.Errorf("keepalive ping write timeout: %w", err))
return
}
ERROR.Println(PNG, "keepalive ping write error:", err)
// optionally treat other errors as fatal too
} else {
c.lastSent.Store(time.Now())
pingSent = time.Now()
}
}
Metadata
Metadata
Assignees
Labels
No labels