c# RabbitMQ 和 ActiveMQ windows环境的配置和使用

Posted _iorilan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c# RabbitMQ 和 ActiveMQ windows环境的配置和使用相关的知识,希望对你有一定的参考价值。

c# RabbitMQ 和 ActiveMQ windows环境的配置和使用

RabbitMQ
1. 下载配置 erlang

确保 'ERLANG_HOME' 在环境变量中配置

2. 下载安装 rabbit mq

3. 激活 management portal 插件:
cd C:\\Program Files (x86)\\RabbitMQ Server\\rabbitmq_server-3.3.4\\sbin

rabbitmq-plugins.bat enable rabbitmq_management
rabbitmq-service.bat stop  
rabbitmq-service.bat install  
rabbitmq-service.bat start  

4. 打开url登陆  http://localhost:15672/mgmt.
guest/guest

4.1 创建user和队列
4.2 设置user权限
辅助类:

public class RabbitMqClient
    
        public RabbitConnectionParam ConnectionParam  get; 

        public RabbitMqClient(RabbitConnectionParam connectionParam)
        
            ConnectionParam = connectionParam;
        

        public void Publish(string json)
        
            try
            
                var factory = new ConnectionFactory()  HostName = ConnectionParam.HostName, UserName = ConnectionParam.UserName, Password = ConnectionParam.Password ;
                using (var connection = factory.CreateConnection())
                
                    using (var channel = connection.CreateModel())
                    
                        string message = json;
                        var body = Encoding.UTF8.GetBytes(message);

                        channel.BasicPublish(exchange: "",
                            routingKey: ConnectionParam.QueueName,
                            basicProperties: null,
                            body: body);
                    
                
            
            catch (Exception ex)
            

            
        
        public void BlockConsume(Action<string> onMessage)
        
            IConnection conn = null;
            IModel channel = null;
            var manualEventSet = new ManualResetEvent(false);

            try
            
                ConnectionFactory factory = new ConnectionFactory
                
                    UserName = ConnectionParam.UserName,
                    Password = ConnectionParam.Password,
                    VirtualHost = ConnectionParam.Vhost,
                    HostName = ConnectionParam.HostName
                ;
                // "guest"/"guest" by default, limited to localhost connections

                conn = factory.CreateConnection();
                channel = conn.CreateModel();
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (ch, ea) =>
                
                    var body = ea.Body;
                    var str = Encoding.UTF8.GetString(body);
                    onMessage(str);
                ;

                var consumerTag = channel.BasicConsume(ConnectionParam.QueueName, true, consumer);
                manualEventSet.WaitOne();
            
            catch (Exception ex)
            
                Task.Delay(10 * 1000);
            
            finally
            
                if (channel != null)
                
                    channel.Close();
                
                if (conn != null)
                
                    conn.Close();
                
            
        
    


ActiveMQ
0. java JRE

1. download 

2. 解压 
3. 管理员身份打开cmd ,进入目录 : activeMQfolder\\bin\\activemq\\    
4. 打开 : activemq/conf/activemq.xml
 

 <managementContext>
     <managementContext createConnector="true"/>
   </managementContext>


5. 使用以下命令来操作 Apache ActiveMQ, 开关 brokers的命令.

   activemq/bin/activemq-admin start
   activemq/bin/activemq-admin stop
   activemq/bin/activemq-admin list

6.打开 :http://localhost:8161/admin/   admin/admin

7. 修改端口:
activemq.xml ->

配置节点:

...
  <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
...

配置例子

base class :

public class AMQBaseClass
    
        public const string URI = "activemq:tcp://localhost:61616";
        public IConnectionFactory connectionFactory;
        public IConnection _connection;
        public ISession _session;

        public AMQBaseClass()
        
            connectionFactory = new ConnectionFactory(URI);
            if (_connection == null)
            
                _connection = connectionFactory.CreateConnection();
                _connection.Start();
                _session = _connection.CreateSession();
            
        
    


publisher :

public class AMQPublisher : AMQBaseClass
    
        public const string DESTINATION = "queue://videoReq";
        public AMQPublisher()
        
        

        public string SendMessage(string message)
        
            string result = string.Empty;
            try
            
                IDestination destination = _session.GetDestination(DESTINATION);
                using (IMessageProducer producer = _session.CreateProducer(destination))
                
                    var textMessage = producer.CreateTextMessage(message);
                    producer.Send(textMessage);
                
                result = "Message sent successfully.";
            
            catch (Exception ex)
            
                result = ex.Message;
                Console.WriteLine(ex.ToString());
            
            return result;
        

    

consumer :

public class AMQConsumer : AMQBaseClass
    
        public const string DESTINATION = "queue://videoReq";
        
        public void Consume(Action<string> onMsg)
        
            try
            
                IConnectionFactory connectionFactory = new ConnectionFactory(URI);
                IConnection _connection = connectionFactory.CreateConnection();
                _connection.Start();
                ISession _session = _connection.CreateSession();
                IDestination dest = _session.GetDestination(DESTINATION);
                using (IMessageConsumer consumer = _session.CreateConsumer(dest))
                
                    Console.WriteLine("Listener started.");
                    Console.WriteLine("Listener created.rn");
                    IMessage message;
                    while (true)
                    
                        message = consumer.Receive();
                        if (message != null)
                        
                            ITextMessage textMessage = message as ITextMessage;
                            if (!string.IsNullOrEmpty(textMessage.Text))
                            
                                Console.WriteLine(textMessage.Text);
                                onMsg(textMessage.Text);
                            
                        
                    
                

            
            catch (Exception ex)
            
                Console.WriteLine(ex);
                Console.WriteLine("Press <ENTER> to exit.");
                Console.Read();
            
        
    


 

以上是关于c# RabbitMQ 和 ActiveMQ windows环境的配置和使用的主要内容,如果未能解决你的问题,请参考以下文章

使用 RabbitMQ 访问 ActiveMQ 队列

rabbitmq使用

ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程

activemq rabbitmq

SpringBoot中使用rabbitmq,activemq消息队列和rest服务的调用

消息中间件面试题31道RabbitMQ+ActiveMQ+Kafka