一个简单的java程序,用于RabbitMQ日志监控

Posted 袋鼠1989

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个简单的java程序,用于RabbitMQ日志监控相关的知识,希望对你有一定的参考价值。

RabbitMQ的所有日志,都会发给topic类型的exchange “amq.rabbitmq.log”  routingKey 有 debug,info,waring,error.  如果接收所有类型日志,可以用 #

package logs;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

import utils.ChannelUtils;

public class ListenerRabbitMQLogs {

    private static final String QUEUE_NAME_DEBUG = "queue_debug";
    private static final String QUEUE_NAME_INFO = "queue_info";
    private static final String QUEUE_NAME_WARNING = "queue_warning";
    private static final String QUEUE_NAME_ERROR = "queue_error";

    private static final String EXCHANGE_NAME_LOG = "amq.rabbitmq.log";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ChannelUtils.getConnection("ListenerLog");

        Channel channelDebug = connection.createChannel();
        Channel channelInfo = connection.createChannel();
        Channel channelWarning = connection.createChannel();
        Channel channelError = connection.createChannel();

        channelDebug.queueDelete(QUEUE_NAME_DEBUG);
        channelDebug.queueDeclare(QUEUE_NAME_DEBUG, false, false, false, null);

        channelInfo.queueDelete(QUEUE_NAME_INFO);
        channelInfo.queueDeclare(QUEUE_NAME_INFO, false, false, false, null);

        channelWarning.queueDelete(QUEUE_NAME_WARNING);
        channelWarning.queueDeclare(QUEUE_NAME_WARNING, false, false, false, null);

        channelError.queueDelete(QUEUE_NAME_ERROR);
        channelError.queueDeclare(QUEUE_NAME_ERROR, false, false, false, null);

        channelDebug.queueBind(QUEUE_NAME_DEBUG, EXCHANGE_NAME_LOG, "debug");
        channelInfo.queueBind(QUEUE_NAME_INFO, EXCHANGE_NAME_LOG, "info");
        channelWarning.queueBind(QUEUE_NAME_WARNING, EXCHANGE_NAME_LOG, "warning");
        channelError.queueBind(QUEUE_NAME_ERROR, EXCHANGE_NAME_LOG, "error");

        channelDebug.basicConsume(QUEUE_NAME_DEBUG, new LogsConsumer(channelDebug,"Debug"));
        channelInfo.basicConsume(QUEUE_NAME_INFO, new LogsConsumer(channelInfo,"Info"));
        channelWarning.basicConsume(QUEUE_NAME_WARNING, new LogsConsumer(channelWarning,"Warning"));
        channelError.basicConsume(QUEUE_NAME_ERROR, new LogsConsumer(channelError,"Error"));

    }

}

class LogsConsumer extends DefaultConsumer {
    
    private String logLevel;

    public LogsConsumer(Channel channel,String logLevel) {
        super(channel);
        this.logLevel = logLevel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
            throws IOException {

        System.out.println(logLevel+":"+new String(body));

        // 使用手动确认模式,这里需要确认收到消息。
        getChannel().basicAck(envelope.getDeliveryTag(), false);
    }

}
package utils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ChannelUtils {

    public static Channel getChannelInstance(String ConnectionDescription) {
        try {
            return getConnection(ConnectionDescription).createChannel();
        } catch (Exception e) {
            throw new RuntimeException("获取Channel连接失败");
        }
    }

    public static Connection getConnection(String ConnectionDescription) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = getConnectionFactory();
        return connectionFactory.newConnection(ConnectionDescription);
    }

    public static ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.1.111");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("drs");
        connectionFactory.setPassword("123456");

return connectionFactory; } }

 

以上是关于一个简单的java程序,用于RabbitMQ日志监控的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq系列三 之发布/订阅

RabbitMQ入门教程——发布/订阅

RabbitMQ 一二事 - 简单队列使用

RabbitMQ交换机

RabbitMQ交换机

rabbitMQ学习