ActiveMQ消息过滤

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ消息过滤相关的知识,希望对你有一定的参考价值。

前言

ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。 

消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

实现对MapMessage和TextMessage两种消息的过滤条件的设置和消费

Producer

在消息的属性中设置过滤条件

技术分享
package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageProducer messageProducer;

    public Producer() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public Session getSession() {
        return this.session;
    }

    public void send1(/* String QueueName, Message message */) {
        try {

            Destination destination = this.session.createQueue("first");
            MapMessage msg1 = this.session.createMapMessage();
            msg1.setString("name", "张三");
            msg1.setInt("age", 20);
            // 设置用于消息过滤器的条件
            msg1.setStringProperty("name", "张三");
            msg1.setIntProperty("age", 20);
            msg1.setStringProperty("color", "bule");

            MapMessage msg2 = this.session.createMapMessage();
            msg2.setString("name", "李四");
            msg2.setInt("age", 25);
            // 设置用于消息过滤器的条件
            msg2.setStringProperty("name", "李四");
            msg2.setIntProperty("age", 25);
            msg2.setStringProperty("color", "white");

            MapMessage msg3 = this.session.createMapMessage();
            msg3.setString("name", "赵六");
            msg3.setInt("age", 30);
            // 设置用于消息过滤器的条件
            msg3.setStringProperty("name", "赵六");
            msg3.setIntProperty("age", 30);
            msg3.setStringProperty("color", "black");
            // 发送消息
            this.messageProducer.send(destination, msg1,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg2,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg3,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public void send2() {
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage message = this.session.createTextMessage("我是一个字符串");
            message.setIntProperty("age", 25);
            // 发送消息
            this.messageProducer.send(destination, message,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send1();
        // producer.send2();

    }
}
View Code

Conmuser

消费消息时,直接在session创建MessageConsumer时,将过滤条件作为参数传入(过滤条件的写法和SQL的写法是很像的)

技术分享
package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Conmuser {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageConsumer messageConsumer;
    // 5、目的地址
    private Destination destination;
    // 消息选择器
    public final String SELECTOR_1 = "age > 25";
    public final String SELECTOR_2 = " age > 20 and color=‘black‘";

    public Conmuser() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("first");
            // 在构造消费者的时候,指定了 消息选择器
            // 有选择性的消费消息
            this.messageConsumer = this.session.createConsumer(destination,
                    SELECTOR_1);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public Session getSession() {
        return this.session;
    }

    // 用于监听消息队列的消息
    class MyLister implements MessageListener {

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage ret = (TextMessage) message;
                    System.out.println("results;" + ret.getText());
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage) message;
                    System.out.println(ret.toString());
                    System.out.println(ret.getString("name"));
                    System.out.println(ret.getInt("age"));
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

    }

    // 用于异步监听消息
    public void receiver() {
        try {
            this.messageConsumer.setMessageListener(new MyLister());
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        Conmuser conmuser = new Conmuser();
        conmuser.receiver();

    }
}
View Code

以上是关于ActiveMQ消息过滤的主要内容,如果未能解决你的问题,请参考以下文章

读取 ActiveMQ 消息而不删除

activemq部分消息无法取走

ActiveMQ

activeMQ消息中文乱码解决(控制台乱码及后台接收乱码)

JMS中间件ActiveMQ详解

消息队列如何利用标签实现消息过滤