RabbitMQ 使用

Posted zhaogaojian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 使用相关的知识,希望对你有一定的参考价值。

RabbitMQ能做啥

场景一:支付的通知

生产者:微信支付完成之后在其回调方法中调用一个服务接收消息,这个服务作为生产者。

消费者:消费者服务是一个不断从队列中获取支付结果的应用,然后在app或者页面展示。

场景二:注册的短信或者邮件通知

生产者:注册成功之后的回调中,发送注册成功信息到队列生产者。

消费者:应用程序不断的获取队列中的消息,获取到就发送短信后者邮件。

1、安装

sudo apt-get update
sudo apt-get install rabbitmq-server

2、启动服务、停止服务、查看服务状态

sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server status

3、修改打开文件句柄限制

打开/etc/default/rabbitmq-server,修改ulimit
浏览器中输入:127.0.0.1:15672,用户名密码是guest ,如果能登陆就说明安装成功。

4、支持的操作系统

  • Solaris
  • BSD
  • Linux
  • MacOSX
  • TRU64
  • Windows NT/2000/XP/Vista/Windows 7/Windows 8
  • Windows Server 2003/2008/2012
  • Windows 95, 98
  • VxWorks

5、支持的编程语言

  • C# (using .net/c# client)
  • clojure (using Langohr)
  • erlang (using erlang client)
  • java (using java client)
  • javascript/node.js (using amqp.node)
  • perl (using Net::RabbitFoot)
  • python (using pika)
  • python-puka (using puka)
  • ruby (using Bunny)
  • ruby (using amqp gem)

6、java示例-生产者

private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] args) throws Exception
        // TODO Auto-generated method stub
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");//因为两个进程在同一个机器上
        Connection connection = null;
        Channel channel = null;
        
        connection = factory.newConnection();
        channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [Producer] Sent ‘" + message + "‘");
            
        channel.close();
        connection.close();
    

java示例-消费者

private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] args) throws Exception
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) 
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException 
            String message = new String(body, "UTF-8");
            System.out.println(" [Consumer] Received ‘" + message + "‘");
          
        ;
        channel.basicConsume(QUEUE_NAME, true, consumer);
        
    

 7、C#示例生产者

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
 
namespace Producer

    classProgram
    
        staticvoidMain(string[] args)
        
            Thread.Sleep(1000);
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();
 
           channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct);
            stringvalue = DoSomethingInteresting();
            stringlogMessage = string.Format("0:1",TraceEventType.Information,value);
 
            byte[]message = Encoding.UTF8.GetBytes(logMessage);
           channel.BasicPublish("direct-exchange-example","",null,message);
 
           channel.Close();
           connection.Close();
        
 
        staticstringDoSomethingInteresting()
        
            returnGuid.NewGuid().ToString();
        
    

C#示例消费者

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
 
namespace Consumer

    classProgram
    
        staticvoidMain(string[] args)
        
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();
 
           channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct);
           channel.QueueDeclare("logs",false,false,true,null);
           channel.QueueBind("logs","direct-exchange-example","");
 
            varconsumer = new QueueingBasicConsumer(channel);
           channel.BasicConsume("logs",true,consumer);
 
            vareventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
 
            stringmessage = Encoding.UTF8.GetString(eventArgs.Body);
            Console.WriteLine(message);
 
           channel.Close();
           connection.Close();
            Console.ReadLine();
        
    

8、Go语言生产者

package main
 
import (
    "github.com/streadway/amqp"
)
 
var conn *amqp.Connection
var ch *amqp.Channel
 
func main() 
    //设置连接地址和交换器队列名称
    addr := "amqp://test:test@192.168.1.104:5672/"
    exname := "amqp.ex"
    quname := "amqp.qu"
 
    //创建连接
    conn, _ = amqp.Dial(addr)
    //创建通道,注意,一个连接可以有多个通道
    ch, _ = conn.Channel()
    //创建一个交换器,参数fanout说明该交换器会将消息转发到所有与之绑定的队列中
    ch.ExchangeDeclare(exname, "fanout", true, false, false, true, nil)
    //创建一个队列,用来接受和消费信息,注意这里的队列名称,消费者获取数据需要该名称
    ch.QueueDeclare(quname, true, false, false, false, nil)
    //绑定交换器和队列
    ch.QueueBind(quname, "", exname, false, nil)
    //发送消息
    ch.Publish(exname, quname, false, false, amqp.Publishing
        ContentType: "text/plain",
        Body:        []byte("hello,world"),
    )
    //关闭连接
    conn.Close()
    ch.Close()
 
    select 

Go语言消费者

package main
 
import (
    "bytes"
    "fmt"
    "github.com/streadway/amqp"
)
 
var conn *amqp.Connection
var ch *amqp.Channel
 
func main() 
    addr := "amqp://test:test@192.168.1.104:5672/"
    quname := "amqp.qu"
    conn, _ = amqp.Dial(addr)
    ch, _ = conn.Channel()
 
    //接受消息
    value, _ := ch.Consume(quname, "", true, false, false, false, nil)
    go func() 
        for v := range value 
            s := bytesToString(&(v.Body))
            fmt.Println(*s)
        
    ()
 
    conn.Close()
    ch.Close()
 
    select 

 
func bytesToString(b *[]byte) *string 
    s := bytes.NewBuffer(*b)
    r := s.String()
    return &r
 

9、php生产者

<?php
//配置信息 
$conn_args = array( 
    ‘host‘ => ‘127.0.0.1‘,  
    ‘port‘ => ‘5672‘,  
    ‘login‘ => ‘guest‘,  
    ‘password‘ => ‘guest‘, 
    ‘vhost‘=>‘/‘ 
);   
$e_name = ‘e_linvo‘; //交换机名 
//$q_name = ‘q_linvo‘; //无需队列名 
$k_route = ‘key_1‘; //路由key 
 
//创建连接和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect())    
    die("Cannot connect to the broker!\n");   
   
$channel = new AMQPChannel($conn);   
 

 
//创建交换机对象    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name);   
date_default_timezone_set("Asia/Shanghai");
//发送消息 
//$channel->startTransaction(); //开始事务  
for($i=0; $i<5; ++$i) 
    sleep(1);//休眠1秒
    //消息内容 
    $message = "TEST MESSAGE!".date("h:i:sa");   
    echo "Send Message:".$ex->publish($message, $k_route)."\n";  
 
//$channel->commitTransaction(); //提交事务 
 
$conn->disconnect();
?>

PHP消费者

<?php 
//配置信息 
$conn_args = array( 
    ‘host‘ => ‘127.0.0.1‘,  
    ‘port‘ => ‘5672‘,  
    ‘login‘ => ‘guest‘,  
    ‘password‘ => ‘guest‘, 
    ‘vhost‘=>‘/‘ 
);   
$e_name = ‘e_linvo‘; //交换机名 
$q_name = ‘q_linvo‘; //队列名 
$k_route = ‘key_1‘; //路由key 
 
//创建连接和channel 
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect())    
    die("Cannot connect to the broker!\n");   
   
$channel = new AMQPChannel($conn);   
 
//创建交换机    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";   
   
//创建队列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declare()."\n";   
 
//绑定交换机与队列,并指定路由键 
echo ‘Queue Bind: ‘.$q->bind($e_name, $k_route)."\n"; 
 
//阻塞模式接收消息 
echo "Message:\n";   
while(True) 
    $q->consume(‘processMessage‘);   
    //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答  
 
$conn->disconnect();   
 
/**
 * 消费回调函数
 * 处理消息
 */ 
function processMessage($envelope, $queue)  
    $msg = $envelope->getBody(); 
    echo $msg."\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 

?>

 


参考:
https://blog.csdn.net/calm_85/article/details/80848664
https://www.shiyanlou.com/courses/630/learning/
https://blog.csdn.net/pony_maggie/article/details/69781478
https://blog.csdn.net/luoye4321/article/details/83722437

以上是关于RabbitMQ 使用的主要内容,如果未能解决你的问题,请参考以下文章

rabbitMq使用笔记一:Window下安装使用RabbitMQ

深入了解RabbitMQ工作原理及简单使用

RabbitMQ手册之rabbitmq-env.conf

RabbitMQ的使用- RabbitMQ服务安装

RabbitMQ的安装和使用Python连接RabbitMQ

学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)