Jms消费者模式

Posted Liang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Jms消费者模式相关的知识,希望对你有一定的参考价值。

JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。

 

Apache官网上下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录下activemq.bat文件启动activeMQ

 

对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

 

 

1.      点对点的消息模式(Point to Point Messaging)

 

下面的JMS对象在点对点消息模式中是必须的:

a.      队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象

b.     队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列

        ConnectionQueue来取得与JMS点对点消息提供者的链接。

c.      链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之

        间,客户用它创建一个或者多个JMS队列会话(QueueSession)

d.     队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand

         QueueReceiver)

e.     消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列

f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息

 

2.      发布订阅模式(publish – subscribe Mode)

 

a.      主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象

b.     主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题

         ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。

c.      链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间

d.     会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher  and

         TopicSubscribers)

e.     消息发送者MessageProducer) – 发送消息到已经声明的主题

f.       消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息

 

以感知数据为例子

 

activemq.properties配置文件:

topic=csp.jxmessages
ipaddress=10.100.70.102
#ipaddress=localhost
port=61616
username=user
password=user

 

消费者模式:

package com.ship;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Controller;
import com.common.utils.PropertyLoader;

@Controller
public class JmsConsumer implements MessageListener {
    private java.sql.Connection con = null;
    private static JmsConsumer instance = null;

    public JmsConsumer() {
        if (instance != null)
            throw new RuntimeException();
        instance = this;
    }

    public JmsConsumer getInstance() {
        return instance;
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage txtMsg = (TextMessage) message;
            try {
                String content = txtMsg.getText();
                System.out.println("数据是:\\n" + content);
                String[] arrcontent = content.split("\\n");
                // con.setAutoCommit(false);
                // Statement statement = con.createStatement();
                for (int i = 0; i < arrcontent.length; i++) {
                    String[] shipinfo = arrcontent[i].split(",");
                    // System.out.println(shipinfo[0]); //测试船名
                    // 存储过程
                    // String sql = "call ganzhi()";
                    // 添加批处理
                    // statement.addBatch(sql);
                }
                // 批处理 因为存在读写延迟
                // statement.executeBatch();
                // con.commit();
                // statement.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @PostConstruct
    public void init() {
        try {
            // 数据库连接
            buildSqlCon();
            // jms处理
            buildJmsCon();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void buildSqlCon() {
        // TODO Auto-generated method stub
        Properties jdbcprop = PropertyLoader.getPropertiesFromClassPath(
                "jdbc.properties", "UTF-8");
        String driverClassName = jdbcprop.getProperty("jdbc.driverClassName");
        String jdbcurl = jdbcprop.getProperty("jdbc.url");
        String uname = jdbcprop.getProperty("jdbc.username");
        String pwd = jdbcprop.getProperty("jdbc.password");
        try {
            Class.forName(driverClassName);
            con = DriverManager.getConnection(jdbcurl + "&user=" + uname
                    + "&password=" + pwd);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void buildJmsCon() {
        // 消费者的主要流程
        Connection connection = null;
        Properties properties = PropertyLoader.getPropertiesFromClassPath(
                "activemq.properties", "UTF-8");
        String topic = properties.getProperty("topic");
        String username = properties.getProperty("username");
        String password = properties.getProperty("password");
        String ipaddress = properties.getProperty("ipaddress");
        String port = properties.getProperty("port");
        String brokerURL = "failover://tcp://" + ipaddress + ":" + port;
        try {
            // 1.初始化connection工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    username, password, brokerURL);

            // 2.创建Connection
            connection = connectionFactory.createConnection();

            // 3.打开连接
            connection.start();

            System.out.println("连接成功..................");

            // 4.创建session
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // 5.创建消息目标
            Destination destination = session.createTopic(topic);

            // 6.创建消费者
            MessageConsumer consumer = session.createConsumer(destination);

            // 7.配置监听
            consumer.setMessageListener(getInstance());

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        if (con != null)
            try {
                con.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
    }

}

 

测试:

数据是:
浙余杭货01803,120.16185,30.098183,0.0,140,140,2016-06-02 09:40:13:000,,WM2HZ803244,1,0
浙富阳货00759,119.844083,29.932533,null,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
浙长兴货1952,120.320117,30.630767,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ820795,1,0
浙富阳货00759,119.844083,29.932533,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
浙钱江货35077,120.122567,30.400967,0.0,340,340,2016-06-02 09:40:13:000,,WM2HZ802275,1,0
浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
浙余杭货01667,120.12385,30.402033,0.0,20,20,2016-06-02 09:40:13:000,,WM2HZ801640,1,0
浙越城货0670,120.736333,32.040183,0.0,210,210,2016-06-02 09:40:14:000,,WM2HZ817006,1,0
浙安吉货2389,120.149133,30.636817,0.0,220,220,2016-06-02 09:40:14:000,,WM2HZ811204,1,0
浙钱江货00628,120.053833,30.047217,8.0,210,210,2016-06-02 09:40:14:000,,WM2HZ800153,1,0
浙绍运6-25,120.473033,30.6393,6.0,230,230,2016-06-02 09:40:14:000,,WM2HZ801842,1,0
浙上虞货0553,119.705217,29.82325,null,330,330,2016-06-02 09:40:14:000,,WM2HZ801018,1,0
浙余杭货01782,120.07065,30.399117,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ801764,1,0
浙桐庐货00483,120.217017,30.269617,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ803645,1,0
汇海集026,120.1237,30.401283,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ811220,1,0
海源186,121.394967,31.485683,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ815369,1,0
任强1号,120.085533,30.0672,0.0,30,30,2016-06-02 09:40:14:000,,WM2HZ818227,1,0
金顺668,120.166217,30.125183,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ819091,1,0
浙富阳货00636,119.705533,29.823683,null,200,200,2016-06-02 09:40:14:000,,WM2HZ804624,1,0
浙临安游026,119.769083,30.230167,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ817016,1,0
浙萧山货23922,120.29145,30.501783,7.0,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
浙萧山货23922,120.29145,30.501783,null,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
浙嘉善货03216,120.141717,30.355,0.0,280,280,2016-06-02 09:40:14:000,,WM2HZ800918,1,0
浙萧山货03166,119.835467,29.907917,0.0,310,310,2016-06-02 09:40:14:000,,WM2HZ801393,1,0
浙萧山货23751,120.156167,30.100917,0.0,100,100,2016-06-02 09:40:14:000,,WM2HZ802551,1,0
合肥武运628,120.16455,30.125717,null,0,0,2016-06-02 09:40:14:000,,WM2HZ802141,1,0

以上是关于Jms消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

kafka

Kafka知识总结

JMS消息队列ActiveMQ(发布/订阅模式)

kafka相关知识点总结

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

JMS-ActiveMq-点对点模式