RocketMQ(08)——日志输出到RocketMQ
Posted elim168
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(08)——日志输出到RocketMQ相关的知识,希望对你有一定的参考价值。
日志输出到RocketMQ
RocketMQ对常用的几种日志输出框架都定义了一个日志输出实现,使对应的日志信息作为一条消息发送到RocketMQ。要使日志输出信息能够发送到RocketMQ,需要添加rocketmq-logappender
依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-logappender</artifactId>
<version>4.5.0</version>
</dependency>
目前支持log4j、log4j2和logback。它们都有对应的属性用来指定生产者发送消息需要指定的相关信息,比如Name Server地址、生产者Group、消息发送的Topic、Tag。下面是一个使用logback的配置示例,其中定义了一个appender是RocketMQ实现的org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender
,指定了发送的Topic是topic1,Tag是logback。然后指定了需要使用appender的地方使用该appender。这样当需要输出日志时对应的日志信息将输出到RocketMQ。该Appender是同步的,如果需要使用异步发送,需要使用ch.qos.logback.classic.AsyncAppender
。
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
<tag>logback</tag>
<topic>topic1</topic>
<producerGroup>logback</producerGroup>
<nameServerAddress>localhost:9876</nameServerAddress>
<layout>
<pattern>%date %p %t - %m%n</pattern>
</layout>
</appender>
<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<discardingThreshold>80</discardingThreshold>
<maxFlushTime>2000</maxFlushTime>
<neverBlock>true</neverBlock>
<appender-ref ref="mqAppender1"/>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%dHH:mm:ss.SSS-[%XtraceId-%XspanId-%Xappname-%Xexportable] [%thread] %-5level %logger36 -%msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
<logger name="com.elim" level="info">
<appender-ref ref="mqAppender1"/>
</logger>
</configuration>
基于上面配置,如果需要消费对应的日志消息,只需要订阅topic1和Tag为logback的组合,比如下面这样。
@Test
public void logAppenderConsume() throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr(this.nameServer);
consumer.subscribe("topic1", "logback");
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
System.out.println("消费消息:" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
TimeUnit.SECONDS.sleep(1200);
consumer.shutdown();
笔者目前日志输出使用的都是logback,所以log4j和log4j2的使用示例请参考官方文档http://rocketmq.apache.org/docs/logappender-example/。
(注:本文是基于RocketMQ4.5.0所写)
以上是关于RocketMQ(08)——日志输出到RocketMQ的主要内容,如果未能解决你的问题,请参考以下文章