c# RabbitMQ 和 ActiveMQ windows环境的配置和使用
Posted _iorilan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c# RabbitMQ 和 ActiveMQ windows环境的配置和使用相关的知识,希望对你有一定的参考价值。
c# RabbitMQ 和 ActiveMQ windows环境的配置和使用
RabbitMQ
1. 下载配置 erlang
确保 'ERLANG_HOME' 在环境变量中配置
3. 激活 management portal 插件:
cd C:\\Program Files (x86)\\RabbitMQ Server\\rabbitmq_server-3.3.4\\sbin
rabbitmq-plugins.bat enable rabbitmq_management
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
4. 打开url登陆 http://localhost:15672/mgmt.
guest/guest
4.1 创建user和队列
4.2 设置user权限
辅助类:
public class RabbitMqClient
public RabbitConnectionParam ConnectionParam get;
public RabbitMqClient(RabbitConnectionParam connectionParam)
ConnectionParam = connectionParam;
public void Publish(string json)
try
var factory = new ConnectionFactory() HostName = ConnectionParam.HostName, UserName = ConnectionParam.UserName, Password = ConnectionParam.Password ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
string message = json;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: ConnectionParam.QueueName,
basicProperties: null,
body: body);
catch (Exception ex)
public void BlockConsume(Action<string> onMessage)
IConnection conn = null;
IModel channel = null;
var manualEventSet = new ManualResetEvent(false);
try
ConnectionFactory factory = new ConnectionFactory
UserName = ConnectionParam.UserName,
Password = ConnectionParam.Password,
VirtualHost = ConnectionParam.Vhost,
HostName = ConnectionParam.HostName
;
// "guest"/"guest" by default, limited to localhost connections
conn = factory.CreateConnection();
channel = conn.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
var body = ea.Body;
var str = Encoding.UTF8.GetString(body);
onMessage(str);
;
var consumerTag = channel.BasicConsume(ConnectionParam.QueueName, true, consumer);
manualEventSet.WaitOne();
catch (Exception ex)
Task.Delay(10 * 1000);
finally
if (channel != null)
channel.Close();
if (conn != null)
conn.Close();
ActiveMQ
0. java JRE
1. download
2. 解压
3. 管理员身份打开cmd ,进入目录 : activeMQfolder\\bin\\activemq\\
4. 打开 : activemq/conf/activemq.xml
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
5. 使用以下命令来操作 Apache ActiveMQ, 开关 brokers的命令.
activemq/bin/activemq-admin start
activemq/bin/activemq-admin stop
activemq/bin/activemq-admin list
6.打开 :http://localhost:8161/admin/ admin/admin
7. 修改端口:
activemq.xml ->
配置节点:
...
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
...
配置例子
base class :
public class AMQBaseClass
public const string URI = "activemq:tcp://localhost:61616";
public IConnectionFactory connectionFactory;
public IConnection _connection;
public ISession _session;
public AMQBaseClass()
connectionFactory = new ConnectionFactory(URI);
if (_connection == null)
_connection = connectionFactory.CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
publisher :
public class AMQPublisher : AMQBaseClass
public const string DESTINATION = "queue://videoReq";
public AMQPublisher()
public string SendMessage(string message)
string result = string.Empty;
try
IDestination destination = _session.GetDestination(DESTINATION);
using (IMessageProducer producer = _session.CreateProducer(destination))
var textMessage = producer.CreateTextMessage(message);
producer.Send(textMessage);
result = "Message sent successfully.";
catch (Exception ex)
result = ex.Message;
Console.WriteLine(ex.ToString());
return result;
consumer :
public class AMQConsumer : AMQBaseClass
public const string DESTINATION = "queue://videoReq";
public void Consume(Action<string> onMsg)
try
IConnectionFactory connectionFactory = new ConnectionFactory(URI);
IConnection _connection = connectionFactory.CreateConnection();
_connection.Start();
ISession _session = _connection.CreateSession();
IDestination dest = _session.GetDestination(DESTINATION);
using (IMessageConsumer consumer = _session.CreateConsumer(dest))
Console.WriteLine("Listener started.");
Console.WriteLine("Listener created.rn");
IMessage message;
while (true)
message = consumer.Receive();
if (message != null)
ITextMessage textMessage = message as ITextMessage;
if (!string.IsNullOrEmpty(textMessage.Text))
Console.WriteLine(textMessage.Text);
onMsg(textMessage.Text);
catch (Exception ex)
Console.WriteLine(ex);
Console.WriteLine("Press <ENTER> to exit.");
Console.Read();
以上是关于c# RabbitMQ 和 ActiveMQ windows环境的配置和使用的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程