分布式系列之ActiveMqActiveMq入门示例

Posted 霓裳梦竹

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式系列之ActiveMqActiveMq入门示例相关的知识,希望对你有一定的参考价值。

前言

github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01

下载ActiveMQ :http://activemq.apache.org/download.html

放到自己的目录,大致目录如下:

  • bin存放的是脚本文件
  • conf存放的是基本配置文件
  • data存放的是日志文件
  • docs存放的是说明文档
  • examples存放的是简单的实例
  • lib存放的是activemq所需jar包
  • webapps用于存放项目的目录

然后启动ActiveMQ:比如我的目录是:D:\\develop tools\\apache-activemq-5.15.2\\bin\\win64下的activemq.bat

出现如下消息则说明启动成功了。

登录上述启动成功的地址:http://127.0.0.1:8161用户名和密码是admin:admin

一、创建一个java项目,加入maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.2</version>
        </dependency>
    </dependencies>

二、项目目录如下

三、编写具体的生产者和消费者

package com.slp.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author sanglp
 * @create 2017-12-05 11:30
 * @desc 生产者
 **/
public class Producer {
    //ActiveMQ默认用户名
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;
    //ActiveMQ默认登陆密码
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ链接地址
    private static final String BROKEN_URL= ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<MessageProducer>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(通过参数设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if (threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else {
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
            while (true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                System.out.println(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}

  

package com.slp.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author sanglp
 * @create 2017-12-05 11:30
 * @desc 消费者
 **/
public class Consumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer ;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 四、运行测试

package com.slp.activemq;

/**
 * @author sanglp
 * @create 2017-12-05 11:31
 * @desc mq测试
 **/
public class TestMq {
    public static void main(String[] args){
        Producer producer = new Producer();
        producer.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 2
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 3
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 4
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 5
        new Thread(testMq.new ProducorMq(producer)).start();
    }


    private  class  ProducorMq implements Runnable{
        Producer producer;
        public ProducorMq(Producer producer){
            this.producer = producer;
        }
        public void run() {
            while(true){
                try {
                    producer.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

 运行消费者

package com.slp.activemq;

/**
 * @author sanglp
 * @create 2017-12-05 11:31
 * @desc 消费者测试
 **/
public class TestConcumer {
    public static void main(String[] args){
        Consumer consumer = new Consumer();
        consumer.init();
        TestConcumer testConsumer = new TestConcumer();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer consumer;
        public ConsumerMq(Consumer consumer){
            this.consumer = consumer;
        }


        public void run() {
            while(true){
                try {
                    consumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

以上是关于分布式系列之ActiveMqActiveMq入门示例的主要内容,如果未能解决你的问题,请参考以下文章

原创 | 大数据入门基础系列之ZooKeeper如何实现分布式锁

「数据挖掘入门系列」数据探索之数据特征分析

Spark快速入门之RDD编程模型

分布式--ActiveMQ 消息中间件

《撸轮子系列》之LoadPE

14.spark mllib之快速入门