RocketMQ(08)——日志输出到RocketMQ

Posted elim168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(08)——日志输出到RocketMQ相关的知识,希望对你有一定的参考价值。

日志输出到RocketMQ

RocketMQ对常用的几种日志输出框架都定义了一个日志输出实现,使对应的日志信息作为一条消息发送到RocketMQ。要使日志输出信息能够发送到RocketMQ,需要添加rocketmq-logappender依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-logappender</artifactId>
    <version>4.5.0</version>
</dependency>

目前支持log4j、log4j2和logback。它们都有对应的属性用来指定生产者发送消息需要指定的相关信息,比如Name Server地址、生产者Group、消息发送的Topic、Tag。下面是一个使用logback的配置示例,其中定义了一个appender是RocketMQ实现的org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender,指定了发送的Topic是topic1,Tag是logback。然后指定了需要使用appender的地方使用该appender。这样当需要输出日志时对应的日志信息将输出到RocketMQ。该Appender是同步的,如果需要使用异步发送,需要使用ch.qos.logback.classic.AsyncAppender

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
        <tag>logback</tag>
        <topic>topic1</topic>
        <producerGroup>logback</producerGroup>
        <nameServerAddress>localhost:9876</nameServerAddress>
        <layout>
            <pattern>%date %p %t - %m%n</pattern>
        </layout>
    </appender>

    <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
        <queueSize>1024</queueSize>
        <discardingThreshold>80</discardingThreshold>
        <maxFlushTime>2000</maxFlushTime>
        <neverBlock>true</neverBlock>
        <appender-ref ref="mqAppender1"/>
    </appender>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%dHH:mm:ss.SSS-[%XtraceId-%XspanId-%Xappname-%Xexportable] [%thread] %-5level %logger36 -%msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="STDOUT" />
    </root>

    <logger name="com.elim" level="info">
        <appender-ref ref="mqAppender1"/>
    </logger>

</configuration>

基于上面配置,如果需要消费对应的日志消息,只需要订阅topic1和Tag为logback的组合,比如下面这样。

@Test
public void logAppenderConsume() throws Exception 
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
  consumer.setNamesrvAddr(this.nameServer);
  consumer.subscribe("topic1", "logback");
  consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
      System.out.println("消费消息:" + new String(msgs.get(0).getBody()));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
  );
  consumer.start();
  TimeUnit.SECONDS.sleep(1200);
  consumer.shutdown();

笔者目前日志输出使用的都是logback,所以log4j和log4j2的使用示例请参考官方文档http://rocketmq.apache.org/docs/logappender-example/

(注:本文是基于RocketMQ4.5.0所写)

以上是关于RocketMQ(08)——日志输出到RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ(08)——日志输出到RocketMQ

rocket linux 部署

Rocket MQ 问题排查命令

Rocket 安装 win 10

rocket MQ

rocketmq配置