消息队列简介

Posted xmh-sxh-1314

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列简介相关的知识,希望对你有一定的参考价值。

消息队列属于Windows系统组件服务。.net对其进行了封装.其可以为我们提供非常好的分布式应用服务

 

 

可以通过Visual Studio打开Message Queues并查看消息

 

image_2  

 

或者可以通过我的电脑右键管理菜单查看服务也可以.

 

image_4

 

以下是其特性,来自MSDN的解释

 

稳定性 — 组件失败对消息的影响程度远远小于组件间的直接调用,因为消息存储在队列中并一直留在那里,直到被适当地处理。消息处理同事务处理相似,因为消息处理是有保证的。

 

消息优先级 — 更紧急或更重要的消息可在相对不重要的消息之前接收,因此可以为关键的应用程序保证足够的响应时间。

 

脱机能力 — 发送消息时,它们可被发送到临时队列中并一直留在那里,直到被成功地传递。当因任何原因对所需队列的访问不可用时,用户可以继续执行操作。同时,其他操作可以继续进行,如同消息已经得到了处理一样,这是因为网络连接恢复时消息传递是有保证的。

 

事务性消息处理 — 将多个相关消息耦合为单个事务,确保消息按顺序传递、只传递一次并且可以从它们的目标队列中被成功地检索。如果出现任何错误,将取消整个事务。

 

安全性 - MessageQueue 组件基于的消息队列技术使用 Windows 安全来保护访问控制、提供审核并对组件发送和接收的消息进行加密和验证。

 

以下是我的理解

 

1.脱机能力 数据库难免有挂的时候,可以暂时把数据存在这里,然后处理.保证了相应的时间

 

2.数据批量处理 可以预先把数据存放此处,待一定时间批量进行处理,进一步提高了程序的吞吐能力

 

3.消息队列顾名思义,当然有消息队列优先级之分

 

4.最大的好处则是可用于分布式应用,用户则不需要关心是否是数据库。其实是一个非常好的数据缓存存储器了,而不会因为机器挂掉而资料丢失

 

来看一段基本的代码

 

static void Main(string[] args)

   

     String queuename = @".\\private$\\NOTIFICATIONS";

 

     if (!MessageQueue.Exists(queuename))

     

       MessageQueue.Create(queuename);

     

 

     MessageQueue que = new MessageQueue(queuename);

     que.Formatter = new BinaryMessageFormatter();

 

     while (true)

     

       using (Message msg = que.Receive())

       

         String str = (String) msg.Body;

         Console.WriteLine("Received: 0", str);

       

     

   

 

队列的几个概念

 

“公共队列”在整个“消息队列”网络中复制,并且有可能由网络连接的所有站点访问。

 

“专用队列”不在整个网络中发布。相反,它们仅在所驻留的本地计算机上可用。专用队列只能由知道队列的完整路径名或标签的应用程序访问。

 

“管理队列”包含确认在给定“消息队列”网络中发送的消息回执的消息。指定希望 MessageQueue 组件使用的管理队列(如果有的话)。

 

“响应队列”包含目标应用程序接收到消息时返回给发送应用程序的响应消息。指定希望 MessageQueue 组件使用的响应队列(如果有的话)。

 

系统生成的队列一般分为以下几类:

 

“日记队列”可选地存储发送消息的副本和从队列中移除的消息副本。每个“消息队列”客户端上的单个日记队列存储从该计算机发送的消息副本。在服务器上为每个队列创建了一个单独的日记队列。此日记跟踪从该队列中移除的消息。

 

“死信队列”存储无法传递或已过期的消息的副本。如果过期消息或无法传递的消息是事务性消息,则被存储在一种特殊的名为“事务性死信队列”的死信队列中。死信存储在过期消息所在的计算机上。有关超时期限和过期消息的更多信息,请参见默认消息属性。

 

“报告队列”包含指示消息到达目标所经过的路由的消息,还可以包含测试消息。每台计算机上只能有一个报告队列。

 

“专用系统队列”是一系列存储系统执行消息处理操作所需的管理和通知消息的专用队列。

Kafka 服务异步通信 -- 消息队列MQ简介Kafak简介Kafak环境搭建

1. 消息队列MQ简介


1.1 消息队列MQ简介

消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:

// 1. 创建一个保存字符串的队列
Queue<String> stringQueue = new LinkedList<String>();

// 2. 往消息队列中放入消息
stringQueue.offer("hello");

// 3. 从消息队列中取出消息并打印
System.out.println(stringQueue.poll());

上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。

我们可以简单理解消息队列就是将需要传输的数据存放在队列中


1.2 消息队列中间件(常见MQ的对比)

消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。

目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka


1.3 消息队列的应用场景


1.3.1 异步处理

电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。


1.3.2 系统解耦


1.3.3 流量削锋


1.3.4 日志处理(大数据领域常见)

大型电商网站(淘宝、京东、国美、苏宁…)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。


1.4 消息队列的 生产者 / 消费者 模型

我们之前学习过Java的服务器开发,Java服务器端开发的交互模型是这样的:


我们之前也学习过使用Java JDBC来访问操作MySQL数据库,它的交互模型是这样的:

它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。

它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。


1.5 消息队列的2种模式


1.5.1 点对点模式

消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

点对点模式特点

  • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

1.5.2 发布订阅模式

发布 / 订阅模式特点

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

2. Kafak简介


2.1 Kafak简介


Kafka是由Apache软件基金会开发的一个开源的分布式流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

  1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统。
  2. 以容错的持久化方式存储数据流。
  3. 处理数据流。

2.2 Kafak的应用场景

我们通常将Apache Kafka用在两类程序:

  1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
  2. 构建实时流应用程序,以转换或响应数据流


上图,我们可以看到:

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
    数据库中。
  4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

2.3 Kafak生态圈介绍

Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。
Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem


2.4 Kafak版本介绍

kafka_2.12-2.4.1 版本为例,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。


3. Kafak的安装(单机安装)

注意需要安装JDK

Kafak下载地址

# 进入 /opt 目录
cd /opt  
# 下载 kafka_2.11-2.2.2 安装包
wget https://archive.apache.org/dist/kafka/2.2.2/kafka_2.11-2.2.2.tgz
tar zxvf kafka_2.11-2.3.0.tgz  
# 创建 /usr/local/apps/  目录
mdkir -p /usr/local/apps/  
# 把 kafka_2.11-2.2.2 移动到 /usr/local/apps/ 目录
mv kafka_2.11-2.2.2 /usr/local/apps/  
# 进入 /usr/local/apps/  目录
cd /usr/local/apps/  
# 创建软链接
ln -s kafka_2.11-2.3.0 kafka

4. 启动测试


4.1 启动Zookeeper

没有安装Zookeeper请看这里。

# 进入 zooKeeper 的bin 目录
cd /opt/zooKeeper/apache-zookeeper-3.5.6-bin/bin
# 启动zookeeper
./zkServer.sh  start

4.2 启动单机Kafka服务

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
#执行脚本  
./bin/kafka-server-start.sh config/server.properties  
#查看进程  
jps


jps无法执行的请查看


4.3 测试


4.3.1 创建topic进行测试

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
# 创建名为test的topic  
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.3.2 查看topic列表

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
# 查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181  


4.3.3 生产者消息测试

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
#执行脚本(使用kafka-console-producer.sh 发送消息)  
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

说明:

这里的^H是我不小心按了Backspace键。


4.3.4 消费者消息测试(建议另外打开一个终端测试)

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
# 执行脚本(使用kafka-console-consumer.sh 接收消息并在终端打印)  
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning


4.3.5 删除topic

# 进入 kafka 安装目录
cd /usr/local/apps/kafka
# 删除名为test的topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test


以上是关于消息队列简介的主要内容,如果未能解决你的问题,请参考以下文章

消息队列简介

消息队列简介

Kafka 服务异步通信 -- 消息队列MQ简介Kafak简介Kafak环境搭建

分布式消息队列之JMS规范(总结篇)

必学消息队列-RabbitMQ(下集)

消息队列简介