Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要为本人学习Apache RocketMQ的使用,主要参考官方文档

主要包括:

  1. 批量发送Message
  2. 设置DelayTime(不能再批量发送的时候使用)
  3. 设置用户属性
  4. 开启SQL过滤

2. 开启SQL过滤

如果使用了SQL过滤但当前的broker开启SQL过滤就会导致消费者连接时报错:The broker does not support consumer to filter message by SQL92,经过查询找到需要在broker.conf中设置一个属性enablePropertyFilter=true

1. 手动关闭broker和namesrv(使用mqshutdown方式)
2.找到broker.conf文件(位于conf文件夹中)
3.添加以下内容并保存:

4.重新启动namesrv和broker,注意:这里的broker启动方式必须修改为该配置文件,否则采用默认属性(不会有enablePropertyFilter)

./mqbroker -c ../conf/broker.conf -n localhost:9876


其中-c指明当前broker使用的配置文件

此时使用带有SQL32的过滤的消费者就不会报错了

3. 基本demo

消费者:

public class ScheduledAndBatchFilterMessageConsumer 
    
    public static void main(String[] args) throws Exception 
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        consumer.setNamesrvAddr("192.168.1.102:9876");
        // Subscribe topics
        //	使用消息选择器方式进行过滤消息(按照手动设置的属性方式)
        // The broker does not support consumer to filter message by SQL92,需要在broker.conf文件中添加enablePropertyFilter = true
        consumer.subscribe("TestTopic", /*"*"*/MessageSelector.bySql("index > 50 and index <80"));
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) 
                for (MessageExt message : messages) 
                    // Print approximate delay time period
                    try 
						System.out.println("Receive message[msg=" + new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET).intern() + "] "
						        + (message.getStoreTimestamp() - message.getBornTimestamp() ) + "ms later"+(System.currentTimeMillis() - message.getStoreTimestamp()));
					 catch (UnsupportedEncodingException e) 
						// TODO Auto-generated catch block
						e.printStackTrace();
					
                    
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        // Launch consumer
        consumer.start();
        System.out.println("comsumer start");
    

生产者:

public class ScheduledAndBatchFilterMessageProducer 
    
    public static void main(String[] args) throws Exception 
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("192.168.1.102:9876");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        List<Message> messages=new ArrayList<>(totalMessagesToSend);
        
        for (int i = 0; i < totalMessagesToSend; i++) 
            Message message = new Message("TestTopic", ("Hello scheduled message index=" + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            // 	设置在10秒之后才发送给消费者
            // message.setDelayTimeLevel(3); 批量发送方式不支持DelayTime
            //	设置用户属性
            message.putUserProperty("index",String.valueOf(i));
            // Send the message
            messages.add(message);
            
        
        // 使用批量方式发送消息
        producer.send(messages);
   
        // Shutdown producer after use.
        producer.shutdown();
        System.out.println("send message finish");
    
       

测试结果:

测试成功

4. 总结

1.批量发送主要为使用producer的时候发送集合数据即可(不可和设置DelayTime一起使用)

2.设置用户属性为:message.putUserProperty("index",String.valueOf(i));,可以手动添加设置属性

3.SQL过滤,主要为消费者那边使用MessageSelector.bySql("index > 50 and index <80"),主要为使用用户设置的属性的表达式

以上是关于Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子的主要内容,如果未能解决你的问题,请参考以下文章

Apache RocketMQ:理解官方的Broadcasting的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:使用官方demo测试rocketmq

RocketMQ部署以及调优

RocketMQ架构原理及名词概念