RocketMQ系列4-长轮询模式实现推送消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ系列4-长轮询模式实现推送消息相关的知识,希望对你有一定的参考价值。

参考技术A       RocketMQ提供两种方式进行消费消息pull vs push,这也是很多涉及到Client和Server之间的交互模型

     主要是Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数。

     Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费。

     在现实中更多根据实际场景进行选择,大多场景更喜欢使用Push模式进行消费消息,那么Push是真正Broker端发送给Consumer的吗?答案肯定不是的,现实场景会有成百上千的Consumer对应的消息队列,Broker不会主动发送消息请求的。所以消息队列如何进行设计消息推送的呢?答案是长轮询。

     长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据。consumer收到响应后根据状态判断是否有消息。

     首先是Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下。

     可以看到启动后Consumer则不断轮询 Broker 获取消息。 Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息。

Consumer处理的逻辑包括:

     PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求。

可以思考一下Broker端需要面临哪些设计?

     如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。
     hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端。

     Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测。

     Producer写入消息,Broker端有消息通知Consumer端。
     当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest。

     当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。
     如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。

如何使用 PHP 和 Javascript 实现服务器推送/长轮询/彗星

【中文标题】如何使用 PHP 和 Javascript 实现服务器推送/长轮询/彗星【英文标题】:How to implement Server push / long polling / comet using PHP and Javascript 【发布时间】:2009-12-01 22:03:00 【问题描述】:

如何使用 PHP 和 Javascript 实现彗星/服务器推送设计模式? 基本上我想要的是这样的:

用户点击“提交”按钮 Javascript 显示类似“正在处理 0 / 100 个项目”的消息 每当处理一个新项目时,php / 服务器会推送更新,并且 javascript 将消息更改为“正在处理 2/100 个项目、3/100 个项目等”。

我该怎么做?请问有什么链接/信息吗?

【问题讨论】:

【参考方案1】:

Comet with PHP 在缩放方面存在一些棘手的问题。另一种选择是 SaaS 解决方案,例如 WebSync On-Demand。 (免责声明:我在那里工作)。这样一来,您就不必担心任何幕后的事情,您只需获得服务器推送功能。

【讨论】:

不适用于按需版本 - 它是 SaaS,因此它适用于您使用的任何版本。例如,我在运行 Apache (jerodandangela.com/fm-xd.htm) 的个人页面上进行了一些测试【参考方案2】:

有几个很好的例子:How to implement comet with PHP

【讨论】:

【参考方案3】:

大部分服务使用客户端发起的请求,但也有Comet。上面有一个few articles。

【讨论】:

以上是关于RocketMQ系列4-长轮询模式实现推送消息的主要内容,如果未能解决你的问题,请参考以下文章

长轮询,iframe和sse三种web消息实时推送demo实践

长轮询,iframe和sse三种web消息实时推送demo实践

atitit.web 推送实现方案集合

基于ajax与msmq技术的消息推送功能实现

服务端向客户端推送消息:轮询,长轮询(兼容性好),以及websocket(主流浏览器都支持)

基于ajax与msmq技术的消息推送功能如何实现?