RocketMQ
Posted whtt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ相关的知识,希望对你有一定的参考价值。
RocketMQ生产者和消费者
注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断
如果不相同代表之前没有人进行消费,处理消费,保存到redis当中
当有第二个消费者时,如果拿到的消息与redis中相同代表之前已已经有人消费
就进行数据签收,防止后续消费者同样拿到重复消费数据
注:消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。
消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。
生产者
private static Map<String,Object> map=new HashMap<>(); public static void main(String[] args) throws Exception { //创建消费者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group"); //设置NameServer地址 consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876"); //设置实例名称 consumer.setInstanceName("mckz"); //订阅Topic consumer.subscribe("mckz_topic","TagA"); //监听消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //获取消息 for(MessageExt ext:msgs){ //判断redis中有没有当前消息key if(map.containsKey(ext.getKeys())){ System.out.println("已经消费......."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //RocketMQ由于是集群环境,所以产生的消息ID可能会重复 System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody())); //将当前Key保存到redis当中 map.put(ext.getKeys(),ext); }
/*如果出现异常可进行try进行捕获*/
/*try { int a=5/0; }catch (Exception e){ //接受消息状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }*/ /*try { Thread.sleep(60000); }catch (Exception e){ e.printStackTrace(); }*/ //接受消息状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); }
消费者
public static void main(String[] args) throws Exception { //创建一个生产者 DefaultMQProducer producer=new DefaultMQProducer("mckz_group"); //设置NameServer地址 producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876"); //设置生产者实例名称 producer.setInstanceName("mckz"); //启动生产者 producer.start(); //发送消息 for (int i = 1; i <=10 ; i++) { Thread.sleep(1000); //模拟网络延迟 //创建消息 topic代表主题名称 tags代表小分类 body代表消息体 Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes()); //消息的唯一标识 message.setKeys("订单编号"+i); //发送消息 SendResult send = producer.send(message); System.out.println(send.toString()); } }
以上是关于RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章