Kafka,RocketMQ,RabbitMQ部署与使用体验

Posted wangqiguo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka,RocketMQ,RabbitMQ部署与使用体验相关的知识,希望对你有一定的参考价值。

前言

近期在研究各种消息队列方案,为了有一个直观的使用体验,我把Kafka,RocketMQ,RabbitMQ各自部署了一遍,并使用了最基本的生产与消费消息功能。在部署过程中也遇到一些问题,特此记录。本文只适用于没有使用过消息队列,还停留在安装部署阶段的新手用户,要了解一个软件,最好的开始方法是开始使用他,这样才会有一个直观的印象。本篇文章的作用也在于此,至于需要了解更深入的架构与细节,则需要查询其他的文档资料,这也不是本文的目的。我这里使用的操作系统是Centos 6.x,硬件配置一般即可。

Kafka的部署与使用

Kafka的部署我是参考官网的步骤开始的,请直接参考其Quickstart章节。

Step1:下载安装包并解压

# wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
# tar -xzf kafka_2.12-2.4.1.tgz
# cd kafka_2.12-2.4.1

问题:下载有可能报下面的错误(没有报错则忽略):
To connect to mirrors.tuna.tsinghua.edu.cn insecurely, use ‘--no-check-certificate’.
只需要添加报错提示的参数即可:
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz

Step2:启动服务器

在上述第一步执行完成之后,当前在目录kafka_2.12-2.4.1下,运行下面的命令启动zookeeper服务,Kafka依赖zookeeper服务,所以我们首先要启动zookeeper服务,由于zookeeper服务需要另外安装,Kafka的安装包中提供了一个简单的单节点的zookeeper实例,以方便快速运行Kafka服务,如果是在正式环境中使用则需要另外单独安装zookeeper服务(一般是分布式集群而非单节点)给Kafka使用。

# bin/zookeeper-server-start.sh config/zookeeper.properties

我在运行此命令时候报下面的错误:
Unrecognized VM option ‘+UseGCLogFileRotation‘
Could not create the Java virtual machine.

从提示上看是因为java虚拟机不支持参数UseGCLogFileRotation,我经过查资料说这个参数可以去掉,bin/zookeeper-server-start.sh 脚本中调用了 bin/kafka-run-class.sh ,真正起作用的代码在这个脚本中 kafka-run-class.sh,打开这个脚本找到上述参数所在的地方,去掉 -XX:+UseGCLogFileRotation , 如下图所示:
技术图片

继续运行上面的脚本,继续报错:
Unrecognized VM option ‘NumberOfGCLogFiles=10‘
Could not create the Java virtual machine.

像刚才一样,继续去掉 -XX:NumberOfGCLogFiles=10 ,如下图:
技术图片

继续运行,得到报错:
Unrecognized VM option ‘GCLogFileSize=100M‘
Could not create the Java virtual machine.

我们继续去掉脚本中的 -XX:GCLogFileSize=100M 如下图:
技术图片

继续运行,得到下面的报错:
技术图片

从提示信息可以看出应该是java字节码的兼容性问题,java.lang.UnsupportedClassVersionError,表示当前运行的zookeeper的字节码是高版本的java编译器编译的,而现在的运行时环境是低版本,所以不兼容。运行下面的命令查看java版本:
# java -version

java version "1.6.0_24"
OpenJDK Runtime Environment (IcedTea6 1.11.8) (rhel-1.56.1.11.8.el6_3-x86_64)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

是jdk1.6,这个版本太低了,我们需要安装jdk1.8或者以上的版本,怎么安装呢,这里我首先搜索一下系统的安装源里面有没有,如果有我们直接从操作系统提供的源进行安装,如果没有则需要通过源码等其他办法进行安装了,运行下面命令查看:

# yum search openjdk

结果图如下:
技术图片

我们安装开发环境,也就是下面这个,区别就是开发环境中有javac等编译工具,而运行时环境中没有,运行时环境只包含运行java程序所需要的工具,因为后面我们需要使用javac编译,所以我们安装开发环境,使用下面命令安装完成即可:

# yum install java-1.8.0-openjdk-devel.x86_64

成功之后我们再次运行 bin/zookeeper-server-start.sh config/zookeeper.properties 以启动zookeeper服务,没有任何错误,最后停留在如下界面:
技术图片

启动Kafka服务:bin/kafka-server-start.sh config/server.properties 没有任何报错,中间会打印出Kafka的配置,输出比较多,我只截取了一部分,其中 INFO KafkaConfig values 后面就是kafka的配置:
技术图片

Step3:创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testtopic

我们可以列出kafka服务器上的topic列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
testtopic

Step4:发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

>hello message
>
运行命令之后 > 提示符等待输入消息,我们输入字符串 hello message 并回车

Step5:启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic --from-beginning
hello message

此时在发送消息侧不断输入消息并回车,消费者会不断打印收到的消息,至此Kafka的简单收发消息运行完毕

RocketMQ的部署与使用

我们依然根据官网文档中的 Quick Start节的提示开始操作,官网建议使用64位Linux/Unix/Mac操作系统,我们使用的是64位的Centos 6.x,建议使用JDK1.8+,我们上面部署Kafka的时候已经安装了jdk1.8,需要maven3.2.x,我的系统上没有,所以首先需要安装maven:

安装maven:

1. 下载maven安装包,我这里使用的是更高一点的版本:
# wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz

2. 解压maven安装包:
# tar -zxvf apache-maven-3.5.4-bin.tar.gz

3. 配置maven到profile文件中以便启动的时候自动设置环境变量 vim /etc/profile 添加下面两行:
export MAVEN_HOME=/data/rocket_mq/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH

注意这里你应该换成你自己的对应的目录

4. 使环境变量生效:
# source /etc/profile

5. 检验是否正常:
# mvn -version

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /data/rocket_mq/apache-maven-3.5.4
Java version: 1.8.0_212, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.1.0-32.el6.ucloud.x86_64", arch: "amd64", family: "unix"

下载RocketMQ的源码并编译:

# wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
# unzip rocketmq-all-4.7.0-source-release.zip
# cd rocketmq-all-4.7.0/
# mvn -Prelease-all -DskipTests clean install -U
# cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

编译成功之后的截图如下:
技术图片

运行nameserver服务:

# sh bin/mqnamesrv
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

首先我们在编译RocketMQ的时候我们已经将目录定位到 cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0  在此目录下运行nameserver服务报错,根据提示我们需要设置 JAVA_HOME 环境变量,如下(修改为你自己的地址):

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 
export PATH=$PATH:$JAVA_HOME/bin

之后再次运行:

# sh bin/mqnamesrv 

OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

可以看到Name Server运行成功,这里提一下,Name Server是为RocketMQ提供名字服务的模块,其角色与zookeeper在Kafka中的作用类似

启动Broker服务:

在启动Broker这里有一点要注意的是,我们这里运行命令所在的目录跟上面运行Name Server的目录一样,都是在 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 这个目录下运行,并且上面需要导入的 JAVA_HOME 等环境变量这里都需要,除此之外,运行Broker我们还需要修改一下配置文件,在当前目录下(distribution/target/rocketmq-4.7.0/rocketmq-4.7.0)  修改文件 vim conf/broker.conf 在文件后面添加如下一行配置:

brokerIP1 = 127.0.0.1

设置Broker的ip,如果不设置的话,当机器存在多网卡的时候,例如一个内外ip和一个外网ip的情况,Broker可能会配置第一个ip地址,导致我们后面的生产者启动的时候写入消息失败报告下面的错误:
技术图片

为了测试方便,我们直接设置BrokerIP1为本机回环ip地址127.0.0.1,然后通过配置文件启动Broker:

# bin/mqbroker -n localhost:9876 -c conf/broker.conf
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876

启动生产者发送消息:

Broker启动完成之后,我们接下来启动生产者发送消息(同样要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 并导入 JAVA_HOME 等环境变量):

# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

03:08:05.769 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE28E0000, offsetMsgId=7F00000100002A9F00000000000638E4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE2B00001, offsetMsgId=7F00000100002A9F00000000000639AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=500]
......

03:08:07.668 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
03:08:07.669 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:10911] result: true
可以看到中间我省略了很多日志,这些日志表示生产者向服务器写入了大量的消息,由于输出太多,我只列出2条

启动消费者消费消息:

接着我们启动消费者消费消息(同样要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 并导入 JAVA_HOME 等环境变量):

# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

03:11:37.229 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886200, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886203, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063B42, commitLogOffset=408386, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest‘, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B80003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId=‘null‘}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886195, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886196, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063A78, commitLogOffset=408184, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest‘, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B30002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId=‘null‘}]]

同样是大量消费消息的日志,这里也只列出前面2条输出,至此,RocketMQ的部署与简单消息收发使用完毕

RabbitMQ的部署与使用

RabbitMQ的部署我们同样参照官网给出的步骤,找到官网的 Get Started 点开按钮 “Download + Installation” ,RabbitMQ有多种运行方式,可以基于docker镜像运行,我们本次不使用docker这种方式部署。我们使用常规的部署方法,RabbitMQ使用Erlang语言编写,运行需要Erlang的运行时环境,所以我们首先需要安装Erlang环境,RabbitMQ团队为我们提供了一个Erlang安装包,其仅包含运行RabbitMQ所需要的全部组件,为了方便我们就使用此Erlang安装包,安装起来很容易:

安装Erlang环境:

# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.1/erlang-22.3.1-1.el6.x86_64.rpm
# yum install erlang-22.3.1-1.el6.x86_64.rpm

没有报错,我这里是下载的Centos版本的二进制安装包,其他系统可以自行去RabbitMQ官网下载,Erlang安装好之后,我们就可以安装RabbitMQ服务了:

安装RabbitMq:

# wget https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# rpm --import rabbitmq-release-signing-key.asc 
# wget http://wangqiguoy.cn-bj.ufileos.com/rabbitmq-server-3.8.3-1.el6.noarch.rpm
# yum install rabbitmq-server-3.8.3-1.el6.noarch.rpm

我安装的时候没有任何报错,直接安装完成,比Kafka和RocketMQ要顺利多了,RabbitMQ提供了一个web界面可以对其进行管理,可以在上面看到当前的队列,通道信息,消息堆积情况等等,通过下面的命令开启插件:

开启web管理插件:

# rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@10-179-253-157:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@10-179-253-157...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch

started 3 plugins.

开启成功之后,可以通过一个http的地址访问web界面管理RabbitMQ,地址形式是:http://ip:15672/ 将ip地址换成自己的服务器ip地址即可,登陆界面如下:
技术图片

修改RabbitMQ的配置文件以登陆其web界面:

# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
{loopback_users, [<<"guest">>]},
改成
{loopback_users, [guest]}

修改好之后,重启服务生效,可以通过上面的web界面登录,用户名和密码都是guest,登陆进去之后的界面如下图:
技术图片

功能还是很丰富的,可以查看connection、channel、exchange、queue等信息,还可以管理用户权限,除了通过web界面管理RabbitMQ之外,Rabbit还提供一个命令行工具rabbitmqctl来管理RabbitMQ,该工具的具体使用可以查看其官网文档,另外对RabbitMQ常用的启动停止操作可以参考如下命令:

service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status

发送消息与消费消息:

RabbitMQ没有提供命令行的测试方式,不过RabbitMQ提供了很多客户端的demo可供使用,这里我选择的是使用nodejs,首先要准备node的环境如下:
# yum install nodejs
# yum install npm
# npm install amqplib
最后一步安装 amqplib 即是nodejs中要使用的与RabbitMQ相关的客户端库,我们的生产者 send.js 代码如下:

//send.js
var amqp = require(‘amqplib/callback_api‘);
amqp.connect(‘amqp://localhost‘, function(error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = ‘hello‘;
        var msg = ‘{"name":"Hello World!"}‘;
        channel.assertQueue(queue, {
            durable: false
        });
        channel.sendToQueue(queue, new Buffer(msg));
        console.log(" [x] Sent %s", msg);
    });
    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 500);
});

消费者 receive.js 的代码如下:

//receive.js
var amqp = require(‘amqplib/callback_api‘);
amqp.connect(‘amqp://localhost‘, function(error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = ‘hello‘;
        channel.assertQueue(queue, {
            durable: false
        });
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function(msg) {
            console.log(" [x] Received %s", msg.content.toString());
        }, {
            noAck: true
        });
    });
});

分别运行 send.js 与 receive.js 得到如下输出:
# node send.js
[x] Sent {"name":"Hello World!"}

# node receive.js
[*] Waiting for messages in hello. To exit press CTRL+C
[x] Received {"name":"Hello World!"}

至此RabbitMQ的部署以及简单消息收发使用完成

以上是关于Kafka,RocketMQ,RabbitMQ部署与使用体验的主要内容,如果未能解决你的问题,请参考以下文章

为啥说rabbitmq 比kafka可靠

rabbitmq与kafka到底用哪个好

kafka,activemq rabbitmq.rocketmq的优点和缺点

MQ选型对比ActiveMQ,RabbitMQ,RocketMQ,Kafka 消息队列框架选哪个?

Kafka、RocketMQ、RabbitMQ的对比

关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ