类比 RocketMq 和 淘宝消息服务:

Posted erlongxizhu-03

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了类比 RocketMq 和 淘宝消息服务:相关的知识,希望对你有一定的参考价值。

rocketMq建立监听:

  • 一个groupId下通常会挂载多个consumer实例。
  • 集群订阅方式 (默认):一个监听到之后,另一个consumer实例就不会再监听到(不管在不在一个服务器上)。
  • 由于默认集群订阅方式,只能有一个监听到,所以,本地测试和服务器上topic不能一致,否则会影响服务器上监听不到消息。而topic 的不一致导致本地测试和服务端测试,groupID也不一致,但多台服务器上需要多个groupId,

 


淘宝消息服务:

通过SDK接受消息,Java接口使用说明:

public interface MessageHandler 

    /**
     * 消息通道客户端收到消息后,会回调该方法处理具体的业务,处理结果可以通过以下两种方式来表述:
     * <ul>
     * <li>抛出异常或设置status.fail()表明消息处理失败,需要消息通道服务端重发
     * <li>不抛出异常,也没有设置status信息,则表明消息处理成功,消息通道服务端不会再投递此消息
     * 
     * @param message 消息内容
     * @param status 处理结果,如果调用status.fail(),消息通道将会择机重发消息;否则,消息通道认为消息处理成功
     * @throws Exception 消息处理失败,消息通道将会择机重发消息
     */
    public void onMessage(Message message, MessageStatus status) throws Exception;

JAVA使用代码示例:

TmcClient client = new TmcClient("app_key", "app_secret", "default"); // 关于default参考消息分组说明
client.setMessageHandler(new MessageHandler() 
    public void onMessage(Message message, MessageStatus status) 
        try 
            System.out.println(message.getContent());
            System.out.println(message.getTopic());
         catch (Exception e) 
            e.printStackTrace();
            status.fail(); // 消息处理失败回滚,服务端需要重发
          // 重试注意:不是所有的异常都需要系统重试。 
          // 对于字段不全、主键冲突问题,导致写DB异常,不可重试,否则消息会一直重发
          // 对于,由于网络问题,权限问题导致的失败,可重试。
          // 重试时间 5分钟不等,不要滥用,否则会引起雪崩
        
    
);
client.connect("ws://mc.api.taobao.com"); // 消息环境地址:ws://mc.api.tbsandbox.com/

 

说消息服务之前先熟悉一下消息服务常见问题:

 

消息服务常见问题

 

什么是分组,是否需要分组
消息分组是用户消息隔离的一种方式,组内用户的消息只会发送到相同组名的连接上。同一个组支持多个连接,同一组的消息,随机发送到组内的某一个连接。如果要用户的类型对消息区别对待,比如优先保证付费用户,然后再保证免费用户,就可以通过消息分组来接收不同用户的消息。每个应用最多创建50个分组,每个分组用户数不限。

 

什么是多连接接收消息,如何建立多个连接
多连接收消息是指同一分组内ISV服务器与TOP的消息服务器建立多个连接来收消息。多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。多链接有的随机下发消息的功能,可以用同一分组多连接来实现集群,负载功能。

 

建立多链接只需要用相同的代码重新启动一个TmcClient实例。可在同一个ISV服务器上,也可在不同的ISV服务器上,建立同一个分组的多链接。

 

什么情况下使用多连接
消息服务的服务端有消息堆积的功能,它看的是你客户端的处理能力,只要能处理他就给你发,处理不了就堆积在服务端,一般情况下不需要建立多连接,单个连接就能把机器的网卡跑满。新消息服务的多连接,更多的是应用在用户分组,或者做集群部署的场景下。

 

消息重发逻辑是怎么样的
对 于断开连接(如应用挂了)情况,服务端会堆积消息,等应用重新连接进来后,再把堆积的消息顺序推送给客户端。一条消息从诞生开始,如果应用一直不接收,服 务端最长保留时间为 1 天,超过 1 天会自动清除。对于连接正常,但消息处理失败的情况,服务端会最快隔10分钟进行第一次重发,如果应用一直处理失败,服务 端会一直定时重发,直到消息被清理为止。

 

php中json_decode整形溢出问题
PHP 5.3版本以下,json_decode依赖于操作系统的位数来解释数字,在32位系统上最大只支持2^32的数字解释,在64位的系统上最大支持 2^64的数字的解释。由于消息服务的消息ID超过32位系统的最大值,如果没有升级到PHP 5.3版本以上,就会由于确认了错误的消息ID,导致消息重复投递。解决方案是:1. 升级PHP到5.3以上;2. 把应用部署到64位的系统上;3. 把JSON消息里面的数字通过正则等手段替换为字符串。

 

消息的断开和心跳测试
客户端要直接断开消息:TmcClient.close(); 心跳测试是否正常连接:TmcClient.isOnline();

 

天猫退款和淘宝退款的区别
天猫退款只包含天猫的订单,淘宝退款包含淘宝和天猫的订单,不过天猫退款的状态有丰富一点,多了一些过程。如果用不到,建议用淘宝退款消息就可以了,如果需要,需要申请天猫退款API权限,申请后即可开通。

 

消息服务会有延时吗
为用户开通消息服务taobao.tmc.user.permit后需要10秒才能生效。使用中消息基本没有延迟,都会在1秒内收到。如果有消息堆积或者程序处理不及时,就会有延时。延时时间与程序处理能力有关。为用户取消消息服务taobao.tmc.user.cancel后1秒内生效,取消后,堆积的消息会继续发送,新的消息不会发送。

 

商品消息message.getContent()中的nick为空正常吗?我怎么判断该消息属于哪个店铺?
商品消息中,是有nick为空的情况的。可以用个外层获取到message.getUserNick()或message.getUserId()。

 

消息服务,用户到期了,消息还会不会收到?
消息服务推送的判断有两个条件:1、是用户授权是否在有效期内;2、是用户有没有开通消息服务(toabao.tmc.user.permit)。只有二者同时满足才会推送。相反如果用户授权到期就不会推送。另外用户授权到期一个月以内,用户的开通关系还会保存,一个月以后会清除。如果在一个月以内用户重新授权,就不需要重新为用户开通消息服务。

 

获取消息后,如果不确认,消息服务会选择时机重发,重发次数由消息服务控制,目前会重发多少次?
消息服务每十分钟查一次未处理的消息,然后择机发送,如果消息 1 天内都没有被确认将会被删除

 

消息没有收到,如何确认是不是消息服务漏掉了?
通过日常反馈,未出现消息服务漏消息的情况,一般是ISV程序处理未收到消息或者程序处理能力导致消息阻塞。排查消息可以从以下方面确认:
* 首先确认授权(SessionKey)是否有效;
* 调用taobao.tmc.user.get确认当前用户以及开通的消息,返回参数传入topics;调用TmcClient.isOnline()测试心跳是否正常连接。若以上排查不出结果可以提交问题到支持中心附上:AppKey、用户nick、消息状态、消息大概时间、订单的tid、商品的num_iid。

 

客户端配置参数注意事项
.NET SDK:ReconnectIntervalSeconds重连时间,标识TmcClient断开时重连的时间间隔。此值必须>10s,如果此值太小会出现链接不上的情况,原因是服务端如果检测到500ms内重连,会断开新的链接。

 

商品库存变更注意事项

 

  • 当通过API(taobao.item.quantity.update,或taobao.item.sku.update更改数量) 修改商品库存时,会产生taobao_item_ItemStockChanged的消息。
  • 当通过API(taobao.item.update)更新商品数量时 或 通过页面修改商品库存时,只会产生消息商品变更消息(taobao_item_ItemUpdate),而不会发taobao_item_ItemStockChanged消息,消息只包含商品库存数量,无变化量。

 

下面的操作中,是直接返回商品的库存数量:

 

  • 当商品拍下(拍下减库存)或付款(付款减库存)(包含通过API创建交易)时,会产生上面的消息。
  • 当订单关闭或子订单关闭会产生此消息(包含通过API关闭交易)。
  • 当买家付完款后,卖家通过页面修改订单商品的SKU时,相应的商品的SKU库存也会变化,产生上面的消息(此时还会产生交易变更消息taobao_trade_TradeChanged)。

 

退款相关消息说明
通过接口taobao.trade.fastrefund(快速退款)退款时不会产生退款相关的消息,要有退款流程的退款才会产生退款相关的消息。快速退款接口(taobao.trade.fastrefund)直接打款给买家,然后关闭交易,不会创 建退款流程,所以不会产生退款消息。目前只有虚拟类目才支持taobao.trade.fastrrefund接口。

 

time 、outtime、localtime消息相关字段说明

 

  • time 是消息产生时间
  • outtime 是消息的本次推送时间
  • localtime 是本机的时间
  • outtime - time 表示服务端的处理或重发延迟时长
  • localtime - outtime 表示本机的时间与TOP时间差,或网络延迟,或收到消息后处理的延迟

 

消息服务报错isp.system-error: unknown errors,isv.tmc-switch-off: appkey,the app do not enable messaging-channel feature
应用未订阅(开通)消息服务,就使用TmcClient来接收消息。

好了,已了解完常见问题,下面介绍与消息队列类比地方:

rocketMq建立监听:

  • 三个参数 和一个groupID 建立连接(生产者或消费者):1、rocketmq.accessKey=LTAId2o..... 2、rocketmq.secretKey=abHR.... 3、 rocketmq.namesrvAddr=http:/   # 公网、杭州公有云生产,TCP协议接入点,建立长连接用到   
  • 一个groupId下通常会挂载多个consumer实例。生产者生产消息或消费者消费消息:通过topic 和tag,groupID——一对多——实例,实例——一对多——topic,topic——一对多——tag。
  • 订阅关系一致:一个groupID下的示例订阅的

一、同一个 Group ID 下的两个实例订阅的 Topic 不一致。

 

二、同一个 Group ID 下订阅 Topic 的 Tag 不一致。 Consumer 实例2-1 订阅了 TagA,而 Consumer 实例2-2 未指定 Tag。

    1. 同一个 Group ID 下订阅 Topic 个数不一致。
    2. 同一个 Group ID 下订阅 Topic 的 Tag 不一致。

 

 

  • 集群订阅方式 (默认):一个监听到之后,另一个consumer实例就不会再监听到(不管在不在一个服务器上)。
  • 由于默认集群订阅方式,只能有一个监听到,所以,本地测试和服务器上topic不能一致,否则会影响服务器上监听不到消息。而topic 的不一致导致本地测试和服务端测试,groupID也不一致,但多台服务器上需要多个groupId,

 

  • 三个参数1、"app_key", 2、"app_secret",3、消息环境地址:ws://mc.api.tbsandbox.com/和一个分组组名(默认default)建立连接(参看上边消息服务Java示例
  • 一个分组下边用户数不限。分组—— 1对多—— 连接,不同组必是不同连接,不同连接是可以不同程序不同处理。

什么是分组,是否需要分组
消息分组是用户消息隔离的一种方式,组内用户的消息只会发送到相同组名的连接上。同一个组支持多个连接,同一组的消息,随机发送到组内的某一个连接。如果要用户的类型对消息区别对待,比如优先保证付费用户,然后再保证免费用户,就可以通过消息分组来接收不同用户的消息。每个应用最多创建50个分组,每个分组用户数不限。
什么是多连接接收消息,如何建立多个连接
多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。多链接有的随机下发消息(只有一个收到)的功能,可以用同一分组多连接来实现集群,负载功能。
建立多链接只需要用相同的代码重新启动一个TmcClient实例。可在同一个ISV服务器上,也可在不同的ISV服务器上,建立同一个分组的多链接。

以上是关于类比 RocketMq 和 淘宝消息服务:的主要内容,如果未能解决你的问题,请参考以下文章

RocketMq Tag

RocketMQ笔记:普通消息

RocketMQ事务消息机制

滴滴出行基于RocketMQ构建企业级消息队列服务的实践

滴滴出行基于RocketMQ构建企业级消息队列服务的实践

消息中间件—RocketMQ的RPC通信