+-
paho.mqtt.golang keepAlive源码浅析


# 阅读本⽂,你将了解:

# 1.MQTT 协议KeepAlive机制

# 2.MQTT 协议KeepAlive机制 golang实现原理

# 3.关于KeepAlive 时⻓设置建议


前⾔

最近做mqtt协议压测的时候,发现少量mqtt设备在执⾏publish过程中,connect连接被协议层主动断掉了。⽽客户端catch到的 errors 只有⼀句 var ErrNotConnected = errors.New("Not Connected") ,但已知⽹络通讯没有问题,且研发⼤佬表示MQTT broker负载并没有打满。因此⾃⼰便开始尝试看paho.mqtt.golang包的相关源码,从中获取⼀些有⽤的信息,便有了此⽂。

MQTT 协议 Keep ALive机制

MQTT Keep Alive


MQTT includes a keep alive function that provides a workaround for the issue of half-open connections (or at least makes it possible to assess if the connection is still open).

MQTT包括⼀个保持活动功能,该功能为半开连接的问题提供了⼀种解决⽅法(或者⾄少使评估连接是否仍处于打开状态成为可能)。

Keep alive ensures that the connection between the broker and client is still open and that the broker and the client are aware of being connected. When the client establishes a connection to the broker, the client communicates a time interval in seconds to the broker. This interval defines the maximum length of time that the broker and client may not communicate with each other.

保持活动状态可确保broker和客户端之间的连接仍处于打开状态,并确保broker和客户端知道已连接。当客户端建⽴与broker的连接时,客户端将以秒为单位的时间间隔传达给broker。此时间间隔定义了broker和客户端可能⽆法相互通信的最⼤时间⻓度。


The MQTT specification says the following:

"The Keep Alive ... is the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one Control Packet and the point it starts sending the next. It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet."

As long as messages are exchanged frequently and the keep-alive interval is not exceeded, there is no need to send an extra message to establish whether the connection is still open.

只要频繁交换消息且不超过保持连接间隔,就⽆需发送额外的消息来确定连接是否仍处于打开状态。

If the client does not send a messages during the keep-alive period, it must send a PINGREQ packet to the broker to confirm that it is available and to make sure that the broker is also still available.

如果客户端在保持活动期间未发送消息,则必须将PINGREQ数据包发送给broker,以确认该消息可⽤,并确保broker仍然可⽤。

The broker must disconnect a client that does not send a message or a PINGREQ packet in one and a half times the keep alive interval. Likewise, the client is expected to close the connection if it does not receive a response from the broker in a reasonable amount of time.

Broker必须断开不发送消息或PINGREQ数据包的客户端的保持时间间隔的⼀半。同样,如果客户端在合理的时间内未收到broker的响应,则期望该客户端关闭连接。

以上内容,在来⾄:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/

通过上⽂信息,我们可以知道,mqtt协议层⽀持keepAlive机制,使之client与broker之间能维持⼀个⻓连接,⽽要保持⼀个⻓连接,就需client与broker之间存在间隔通讯,或者payload 等操作。mqtt协议是基于TCP协议之上的,因此也就不难理解client与broker之间的keepAlive通讯机制了。

MQTT keepAlive golang源码浅析

话不多说,直接上源码:

// 这是[paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang)包中 ping的源码 func keepalive(c *client) { defer c.workers.Done() DEBUG.Println(PNG, "keepalive starting") var checkInterval int64 var pingSent time.Time //从 client 对象中,获取设置的keepAlive值,即opts.SetKeepAlive(time.Duration(ktime) * time.Second) if c.options.KeepAlive > 10 { checkInterval = 5 } else { checkInterval = c.options.KeepAlive / 2 } // 创建一个timeTicker(),来轮巡的检查client的keepAlive 值 // 注意: 这里可以看出,轮巡机制的newTicker()时间间隔是在1~5s内做一次轮巡 intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second))) defer intervalTicker.Stop() for { select { case <-c.stop: DEBUG.Println(PNG, "keepalive stopped") return case <-intervalTicker.C: lastSent := c.lastSent.Load().(time.Time)//最近一次client 发送pingrep包时间 lastReceived := c.lastReceived.Load().(time.Time)//最近一次client 接受pingresp包的时间 DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds()) // 如果符合,上次发送间隔>=keepAlive 或则 最近接受pingresp包时间>=keepalive 进入client 发送pingrep逻辑中 if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) { if atomic.LoadInt32(&c.pingOutstanding) == 0 { DEBUG.Println(PNG, "keepalive sending ping") ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) //We don't want to wait behind large messages being sent, the Write call //will block until it it able to send the packet. atomic.StoreInt32(&c.pingOutstanding, 1) //将 c.pingOutstanding更新为1 ping.Write(c.conn)// 向broker 发送ping包 c.lastSent.Store(time.Now())//更新最近一次client send pingrep time pingSent = time.Now()// 更新pingrep send time } } // 判断client 是否接受到pingresp包 // 注意默认的 PingTimeout: 10 * time.Second //如果c.pingOutstanding>0 且 pingSent间隔>PingTimeout 说明client 没有接受到broker的 pingresp包 if atomic.LoadInt32(&c.pingOutstanding) > 0 && time.Now().Sub(pingSent) >= c.options.PingTimeout { CRITICAL.Println(PNG, "pingresp not received, disconnecting") c.errors <- errors.New("pingresp not received, disconnecting") return } } } }


总结

通过查看ping包源码,我们可以看出keepalive实现机制其实⽐较简单,其实就是⼀个timeTicker()实现的轮训机制,来定期的检查client是否发送了ping包,以及是否收到pingresp包。

timeTricker()轮巡周期的巧妙处理

我认为timeticker()时间周期太短,会导致cpu频繁切换上下⽂。因此我个⼈建议,keepAlive time的间隔时⻓设置在(30~60)s之间。

if c.options.KeepAlive > 10 { checkInterval = 5 } else { checkInterval = c.options.KeepAlive / 2 } intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))