RocketMQ

Posted whtt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ相关的知识,希望对你有一定的参考价值。

RocketMQ生产者和消费者

  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断

  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中

  当有第二个消费者时,如果拿到的消息与redis中相同代表之前已已经有人消费

  就进行数据签收,防止后续消费者同样拿到重复消费数据

 

  :消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

  消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

  每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

 

生产者

 

private static Map<String,Object> map=new HashMap<>();
    public static void main(String[] args) throws Exception {
        //创建消费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group");
        //设置NameServer地址
        consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //设置实例名称
        consumer.setInstanceName("mckz");
        //订阅Topic
        consumer.subscribe("mckz_topic","TagA");
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //获取消息
                for(MessageExt ext:msgs){
                    //判断redis中有没有当前消息key
                    if(map.containsKey(ext.getKeys())){
                        System.out.println("已经消费.......");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //RocketMQ由于是集群环境,所以产生的消息ID可能会重复
                    System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                    //将当前Key保存到redis当中
                    map.put(ext.getKeys(),ext);
                }
          /*如果出现异常可进行try进行捕获*/
/*try {
                    int a=5/0;
                }catch (Exception e){
                    //接受消息状态 
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }*/

                /*try {
                    Thread.sleep(60000);
                }catch (Exception e){
                    e.printStackTrace();
                }*/

                //接受消息状态 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }

 

 

消费者

public static void main(String[] args) throws Exception {
        //创建一个生产者
        DefaultMQProducer producer=new DefaultMQProducer("mckz_group");
        //设置NameServer地址
        producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //设置生产者实例名称
        producer.setInstanceName("mckz");
        //启动生产者
        producer.start();
        //发送消息
        for (int i = 1; i <=10 ; i++) {
            Thread.sleep(1000); //模拟网络延迟
            //创建消息  topic代表主题名称     tags代表小分类     body代表消息体
            Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes());
            //消息的唯一标识
            message.setKeys("订单编号"+i);
            //发送消息
            SendResult send = producer.send(message);
            System.out.println(send.toString());
        }
    }

以上是关于RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

配置 kafka 同步刷盘

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

3RocketMQ 源码解析之 源代码环境搭建

RocketMq 修改日志保存目录 无需修改代码

30 RocketMQ事务消息的代码实现细节