一个简单的java程序,用于RabbitMQ日志监控
Posted 袋鼠1989
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个简单的java程序,用于RabbitMQ日志监控相关的知识,希望对你有一定的参考价值。
RabbitMQ的所有日志,都会发给topic类型的exchange “amq.rabbitmq.log” routingKey 有 debug,info,waring,error. 如果接收所有类型日志,可以用 #
package logs; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import utils.ChannelUtils; public class ListenerRabbitMQLogs { private static final String QUEUE_NAME_DEBUG = "queue_debug"; private static final String QUEUE_NAME_INFO = "queue_info"; private static final String QUEUE_NAME_WARNING = "queue_warning"; private static final String QUEUE_NAME_ERROR = "queue_error"; private static final String EXCHANGE_NAME_LOG = "amq.rabbitmq.log"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ChannelUtils.getConnection("ListenerLog"); Channel channelDebug = connection.createChannel(); Channel channelInfo = connection.createChannel(); Channel channelWarning = connection.createChannel(); Channel channelError = connection.createChannel(); channelDebug.queueDelete(QUEUE_NAME_DEBUG); channelDebug.queueDeclare(QUEUE_NAME_DEBUG, false, false, false, null); channelInfo.queueDelete(QUEUE_NAME_INFO); channelInfo.queueDeclare(QUEUE_NAME_INFO, false, false, false, null); channelWarning.queueDelete(QUEUE_NAME_WARNING); channelWarning.queueDeclare(QUEUE_NAME_WARNING, false, false, false, null); channelError.queueDelete(QUEUE_NAME_ERROR); channelError.queueDeclare(QUEUE_NAME_ERROR, false, false, false, null); channelDebug.queueBind(QUEUE_NAME_DEBUG, EXCHANGE_NAME_LOG, "debug"); channelInfo.queueBind(QUEUE_NAME_INFO, EXCHANGE_NAME_LOG, "info"); channelWarning.queueBind(QUEUE_NAME_WARNING, EXCHANGE_NAME_LOG, "warning"); channelError.queueBind(QUEUE_NAME_ERROR, EXCHANGE_NAME_LOG, "error"); channelDebug.basicConsume(QUEUE_NAME_DEBUG, new LogsConsumer(channelDebug,"Debug")); channelInfo.basicConsume(QUEUE_NAME_INFO, new LogsConsumer(channelInfo,"Info")); channelWarning.basicConsume(QUEUE_NAME_WARNING, new LogsConsumer(channelWarning,"Warning")); channelError.basicConsume(QUEUE_NAME_ERROR, new LogsConsumer(channelError,"Error")); } } class LogsConsumer extends DefaultConsumer { private String logLevel; public LogsConsumer(Channel channel,String logLevel) { super(channel); this.logLevel = logLevel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println(logLevel+":"+new String(body)); // 使用手动确认模式,这里需要确认收到消息。 getChannel().basicAck(envelope.getDeliveryTag(), false); } }
package utils; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ChannelUtils { public static Channel getChannelInstance(String ConnectionDescription) { try { return getConnection(ConnectionDescription).createChannel(); } catch (Exception e) { throw new RuntimeException("获取Channel连接失败"); } } public static Connection getConnection(String ConnectionDescription) throws IOException, TimeoutException { ConnectionFactory connectionFactory = getConnectionFactory(); return connectionFactory.newConnection(ConnectionDescription); } public static ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.111"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("drs"); connectionFactory.setPassword("123456");
return connectionFactory; } }
以上是关于一个简单的java程序,用于RabbitMQ日志监控的主要内容,如果未能解决你的问题,请参考以下文章