RabbitMQ heartbeat原理
Posted gj4990
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ heartbeat原理相关的知识,希望对你有一定的参考价值。
RabbitMQ的heartbeat是用于客户端与RabbitMQ之间连接的存活状态检测,类似于tcp keepalives功能。本文将介绍RabbitMQ的heartbeat功能何时被创建以及如何检测连接存活状态。
1. RabbitMQ连接建立的协议流程
对于本文的研究主要聚焦到connection.tune和connection.tune-ok流程进行说明。
2. channel_max,frame_max,heartbeat参数值说明
客户端与RabbitMQ之间建立连接的流程图如上所示,其中与RabbitMQ建立连接时的协议交互主要在channel 0上进行,客户端与RabbitMQ之间只会建立一个tcp连接,然后在该tcp连接上会建立多个channel,一个tcp连接所能包括最多的channel个数是由客户端和RabbitMQ共同协商决定。
协商的流程在connection.tune和connection.tune-ok的过程中。首先,RabbitMQ收到connection.start-ok消息后,进行相关的处理,处理完成后,对用户进行验证工作,验证完成后,组装connection.tune消息发送给客户端,其中发送的connection.tune消息包括channel_max,frame_max以及heartbeat的值,这些值是RabbitMQ本身的配置值。代码如下:
%% rabbit_reader.erl
%% 验证阶段(去系统查找该用户及其对应的密码是否合法正确)
auth_phase(Response,
State = #v1connection = Connection =
#connectionprotocol = Protocol,
auth_mechanism = Name, AuthMechanism,
auth_state = AuthState,
sock = Sock) ->
%% 通过验证模块得到对应的用户名字和密码
case AuthMechanism:handle_response(Response, AuthState) of
refused, Username, Msg, Args ->
%% 验证拒绝,则中断当前rabbit_reader进程
auth_fail(Username, Msg, Args, Name, State);
protocol_error, Msg, Args ->
%% 向rabbit_event发布用户验证结果
notify_auth_result(none, user_authentication_failure,
[error, rabbit_misc:format(Msg, Args)],
State),
rabbit_misc:protocol_error(syntax_error, Msg, Args);
....
%% 组装connection.tune消息
Tune = #'connection.tune'frame_max = get_env(frame_max),
channel_max = get_env(channel_max),
heartbeat = get_env(heartbeat),
%% 将connection.tune消息发送给客户端
ok = send_on_channel0(Sock, Tune, Protocol),
%% 将得到的用户存储起来
State#v1connection_state = tuning, %% 将connection_state状态字段置为tuning,等待connection.tune_ok消息的返回
connection = Connection#connectionuser = User,
auth_state = none
end.
如上图黄色标记的代码,从RabbitMQ的环境变量中获取frame_max,channel_max和heartbeat的值,即
bash-4.4# rabbitmqctl environment|grep -E "channel_max|frame_max|heartbeat"
channel_max,2047,
frame_max,131072,
heartbeat,60,
这些值将发给客户端进行协商,协商完成后客户端将发送connection.tune-ok消息给RabbitMQ,该消息中带有协商后的frame_max,channel_max和heartbeat的值,即:
# amqp/connection.py:Connection
def _setup_listeners(self):
self._callbacks.update(
spec.Connection.Start: self._on_start,
spec.Connection.OpenOk: self._on_open_ok,
spec.Connection.Secure: self._on_secure,
spec.Connection.Tune: self._on_tune,
spec.Connection.Close: self._on_close,
spec.Connection.Blocked: self._on_blocked,
spec.Connection.Unblocked: self._on_unblocked,
spec.Connection.CloseOk: self._on_close_ok,
)
# amqp/connection.py:Connection
def _on_tune(self, channel_max, frame_max, server_heartbeat, argsig='BlB'):
client_heartbeat = self.client_heartbeat or 0
self.channel_max = channel_max or self.channel_max
self.frame_max = frame_max or self.frame_max
self.server_heartbeat = server_heartbeat or 0
# negotiate the heartbeat interval to the smaller of the
# specified values
if self.server_heartbeat == 0 or client_heartbeat == 0:
self.heartbeat = max(self.server_heartbeat, client_heartbeat)
else:
self.heartbeat = min(self.server_heartbeat, client_heartbeat)
# Ignore server heartbeat if client_heartbeat is disabled
if not self.client_heartbeat:
self.heartbeat = 0
self.send_method(
spec.Connection.TuneOk, argsig,
(self.channel_max, self.frame_max, self.heartbeat),
callback=self._on_tune_sent,
)
其中_setup_listeners函数为amqp模块中的注册回调函数,即当RabbitMQ回复connection.tune消息时,当客户端使用amqp模块时,则amqp模块将调用_on_tune函数进行相应的处理。
_on_tune函数的功能就是将从RabbitMQ获取到的frame_max,channel_max和heartbeat值与自身(这里是所说的客户端)的相对应的参数值进行对比协商。
在客户端侧,默认情况下,frame_max的值为131072,channel_max的值为65535,heartbeat的值为60s。
在RabbitMQ侧,默认情况下,frame_max,channel_max和heartbeat的值如下:
bash-4.4# rabbitmqctl environment|grep -E "channel_max|frame_max|heartbeat"
channel_max,2047,
frame_max,131072,
heartbeat,60,
2.1 frame_max和channel_max参数值
通过分析_on_tune函数的代码可知,frame_max和channel_max最终的值是基于这样的原则进行设置的:首先看RabbitMQ通过connection.tune消息传递过来的值,如果RabbitMQ中没有设置,则使用客户端的frame_max和channel_max值。
就上述而已,RabbitMQ通过connection.tune消息由创建过来frame_max和channel_max值,因此使用RabbitMQ设置的frame_max和channel_max值,即frame_max为131072,channel_max为2047。
2.2 heartbeat参数值
根据RabbitMQ官方文档以及分析_on_tune函数可知,heartbeat参数值的设置也是通过客户端和RabbitMQ进行协商设置的。协商的前提是客户端必须设置了heartbeat值,如果客户端设置heartbeat值为0,则表示客户端与RabbitMQ不使用heartbeat功能。如果其中RabbitMQ和客户端的heartbeat都设置为非0,则最终的heartbeat值取小的那个值。如果其中RabbitMQ和客户端的heartbeat一个设置为0,则最终的heartbeat值取大的那个值(如果另外一个也被设置为0,则heartbeat最终值为0)。
2.3 RabbitMQ处理协商后的值
从目前的分析来看,协商的处理流程主要是在客户端进行处理,即RabbitMQ通过发送connection.tune消息给客户端,客户端解析connection.tune消息中的RabbitMQ中的发送frame_max,channel_max和heartbeat值,然后与客户端自身的frame_max,channel_max和heartbeat值进行对比协商,获取最终的参数值。
_on_tune函数通过协商获取到最终的frame_max,channel_max和heartbeat值后,通过组装connection.tune-ok消息发送给RabbitMQ进行处理。处理流程如下:
%% 处理connection.tune_ok消息
%% frame_max:和客户端通信时所允许的最大的frame size.默认值为131072,增大这个值有助于提高吞吐,降低这个值有利于降低时延
%% channel_max:最大链接数
handle_method0(#'connection.tune_ok'frame_max = FrameMax,
channel_max = ChannelMax,
heartbeat = ClientHeartbeat,
State = #v1connection_state = tuning, %% 在向客户端发送connection.tune消息的时候将该状态字段置为tuning
connection = Connection,
helper_sup = SupPid,
sock = Sock) ->
%% 验证协商客户端发过来的整数值和服务器设置的frame_max整数值
ok = validate_negotiated_integer_value(
frame_max, ?FRAME_MIN_SIZE, FrameMax),
%% 验证协商客户端发过来的整数值和服务器设置的channel_max整数值
ok = validate_negotiated_integer_value(
channel_max, ?CHANNEL_MIN, ChannelMax),
%% 在rabbit_connection_helper_sup监督进程下启动queue_collector进程
ok, Collector = rabbit_connection_helper_sup:start_queue_collector(
SupPid, Connection#connection.name),
%% 创建心跳包消息Frame结构
Frame = rabbit_binary_generator:build_heartbeat_frame(),
%% 创建发送心跳包消息的函数
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
%% 创建向自己发送心跳超时的消息的函数
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
%% 在rabbit_connection_helper_sup监督进程下启动两个心跳进程
%% 一个在ClientHeartbeat除以2后检测RabbitMQ向客户端发送数据的心跳检测进程
%% 一个是在ClientHeartbeat时间内检测RabbitMQ的当前rabbit_reader进程的socket接收数据的心跳检测进程
Heartbeater = rabbit_heartbeat:start(
SupPid, Sock, Connection#connection.name,
ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1connection_state = opening, %% 接收connection.tune_ok消息将connection_state状态置为opening
connection = Connection#connection
frame_max = FrameMax,
channel_max = ChannelMax,
timeout_sec = ClientHeartbeat,
queue_collector = Collector,
heartbeater = Heartbeater;
其中RabbitMQ在收到客户端发送过来的connection.tune-ok消息后,还将对frame_max和channel_max参数进行协商校验。即
%% negotiated:协商
%% 验证协商客户端发过来的整数值和服务器设置的整数值
validate_negotiated_integer_value(Field, Min, ClientValue) ->
%% 从rabbit应用拿到Field对应的配置数据
ServerValue = get_env(Field),
if ClientValue /= 0 andalso ClientValue < Min ->
%% 验证的客户端值比服务器的最小值小,则将当前rabbit_reader进程终止
fail_negotiation(Field, min, ServerValue, ClientValue);
ServerValue /= 0 andalso (ClientValue =:= 0 orelse
ClientValue > ServerValue) ->
%% 验证 的客户端值大于服务器设置的最大值,则将当前rabbit_reader进程终止
fail_negotiation(Field, max, ServerValue, ClientValue);
true ->
ok
end.
最终协商的frame_max和channel_max参数值需要处于RabbitMQ所设置的对应参数值的最大值和最小值之间。其中他们的最小值在RabbitMQ中如下:
-define(FRAME_MIN_SIZE, 4096).
-define(CHANNEL_MIN, 1).
最大值为:
bash-4.4# rabbitmqctl environment|grep -E "channel_max|frame_max|heartbeat"
channel_max,2047,
frame_max,131072,
heartbeat,60,
就上面举例的环境来说,frame_max和channel_max的值分别为131072和2047,Heartbeat最终的值采用在客户端协商的值,即60s。
3. RabbitMQ heartbeat原理
3.1 RabbitMQ侧
通过第二节的分析,heartbeat的值是通过客户端和RabbitMQ在connection.tune和connection.tune-ok消息中协商决定的。就我们的环境而言,最终协商的heartbeat的值为60s。在确定heartbeat值后,即在处理connection.tune-ok消息时,将在RabbitMQ中启动两个心跳进程:一个在ClientHeartbeat除以2后检测RabbitMQ向客户端发送数据的心跳检测进程, 一个是在ClientHeartbeat时间内检测RabbitMQ的当前rabbit_reader进程的socket接收数据的心跳检测进程。代码如下:
%% rabbit_read.erl
handle_method0(#'connection.tune_ok'frame_max = FrameMax,
channel_max = ChannelMax,
heartbeat = ClientHeartbeat,
State = #v1connection_state = tuning, %% 在向客户端发送connection.tune消息的时候将该状态字段置为tuning
connection = Connection,
helper_sup = SupPid,
sock = Sock) ->
......
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
%% 创建向自己发送心跳超时的消息的函数
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
%% 在rabbit_connection_helper_sup监督进程下启动两个心跳进程
%% 一个在ClientHeartbeat除以2后检测RabbitMQ向客户端发送数据的心跳检测进程
%% 一个是在ClientHeartbeat时间内检测RabbitMQ的当前rabbit_reader进程的socket接收数据的心跳检测进程
Heartbeater = rabbit_heartbeat:start(
SupPid, Sock, Connection#connection.name,
ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1connection_state = opening, %% 接收connection.tune_ok消息将connection_state状态置为opening
connection = Connection#connection
frame_max = FrameMax,
channel_max = ChannelMax,
timeout_sec = ClientHeartbeat,
queue_collector = Collector,
heartbeater = Heartbeater;
其中rabbit_heartbeat的代码如下:
start(SupPid, Sock, Identity,
SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
%% 启动Sock数据发送的心跳检测进程
ok, Sender =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
start_heartbeat_sender, Identity),
%% 启动Sock数据接收的心跳检测进程
ok, Receiver =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
start_heartbeat_receiver, Identity),
Sender, Receiver.
%% 实际的启动Sock数据发送的心跳检测进程
start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary(边界) case
%% where the last message was sent just after a heartbeat.
%% send_oct: 查看socket上发送的字节数
%% 进程定时检测tcp连接上是否有数据发送(这里的发送是指rabbitmq发送数据给客户端),如果一段时间内没有数据发送给客户端,则发送一个心跳包给客户端,然后循环进行下一次检测
%% sock数据发送的心跳检测进程在超时后执行完向客户端发送心跳消息后,则继续进行心跳检测操作
heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () -> SendFun(), continue end, Identity).
%% 实际的启动Sock数据接收的心跳检测进程
start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
%% recv_oct: 查看socket上接收的字节数
%% 进程定时检测tcp连接上是否有数据的接收,如果一段时间内没有收到任何数据,则判定为心跳超时,最终会关闭tcp连接。另外,rabbitmq的流量控制机制可能会暂停heartbeat检测
%% sock数据接收的心跳进程在超时后执行完向rabbit_reader进程发送停止的消息后,则自己也立刻停止
heartbeater(Sock, TimeoutSec * 1000, recv_oct, 1,
fun () -> ReceiveFun(), stop end, Identity).
通过分析上述代码可知,rabbit_heartbeat将启动两个进程(在erlang中一般称为进程,不是系统进程):一个进程为heartbeat_sender,另一个为heartbeat_receiver。
heartbeat_sender进程在发现connection上没有数据发送的情况下,将每隔heartbeat/2时间间隔向客户端发送心跳消息。
heartbeat_receiver进程在发现connection上没有数据接收的情况下,将在两次heartbeat后,发送heartbeat_timeout给父进程,让其关闭客户端与RabbitMQ之间建立的tcp连接。即RabbitMQ会出现类似如下日志:
=ERROR REPORT==== 9-Sep-2019::16:22:55 ===
closing AMQP connection <0.27892.1951> (10.101.71.0:49814 -> 10.101.63.7:5672 - neutron-server:323:29f77387-90c6-42d0-bf62-44780368c7f7):
missed heartbeats from client, timeout: 60s
3.2 客户端侧
# oslo_messaging/_drivers/impl_rabbit.py:Connection
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = eventletutils.Event()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_thread_job)
self._heartbeat_thread.daemon = True
self._heartbeat_thread.start()
else:
self._heartbeat_thread = None
def _heartbeat_thread_job(self):
"""Thread that maintains inactive connections
"""
while not self._heartbeat_exit_event.is_set():
with self._connection_lock.for_heartbeat():
try:
try:
self._heartbeat_check()
# NOTE(sileht): We need to drain event to receive
# heartbeat from the broker but don't hold the
# connection too much times. In amqpdriver a connection
# is used exclusively for read or for write, so we have
# to do this for connection used for write drain_events
# already do that for other connection
try:
self.connection.drain_events(timeout=0.001)
except socket.timeout:
pass
except kombu.exceptions.OperationalError as exc:
LOG.info(_LI("A recoverable connection/channel error "
"occurred, trying to reconnect: %s"), exc)
self.ensure_connection()
except Exception:
LOG.warning(_LW("Unexpected error during heartbeart "
"thread processing, retrying..."))
LOG.debug('Exception', exc_info=True)
self._heartbeat_exit_event.wait(
timeout=self._heartbeat_wait_timeout)
self._heartbeat_exit_event.clear()
客户端在与RabbitMQ之间建立连接后,默认将启动一个线程,用于与RabbitMQ之间的心跳数据交互。其中_heartbeat_thread_job函数中的_heartbeat_check函数用于客户端发送心跳数据到RabbitMQ,drain_events函数用于客户端接收RabbitMQ发送来的心跳数据。
4 总结
1 对于channel_max,frame_max和heartbeat 3个参数值的设置主要在客户端与RabbitMQ之间建立连接中的connection.tune和connection.tune-ok协议中进行协商决定。首先在RabbitMQ侧,RabbitMQ通过组装且发送connection.tune报文给客户端,告诉客户端在RabbitMQ侧的channel_max,frame_max和heartbeat 3个参数的值。然后在客户端侧,通过对比RabbitMQ发送过来的3个参数值与自身的相对应的参数值做对比,最终获得一个协商的值。最终通过组装connection.tune-ok报文发送给RabbitMQ侧。
2 通过协商后的heartbeat值,被用于RabbitMQ侧和客户端侧发送心跳的超时时间。在RabbitMQ收到connection.tune-ok报文后,将在RabbitMQ中启动两个心跳进程:一个在ClientHeartbeat除以2后检测RabbitMQ向客户端发送数据的心跳检测进程, 一个是在ClientHeartbeat时间内检测RabbitMQ的当前rabbit_reader进程的socket接收数据的心跳检测进程。
以上是关于RabbitMQ heartbeat原理的主要内容,如果未能解决你的问题,请参考以下文章