logback KafkaAppender 写入Kafka队列,集中日志输出.
Posted 高因咖啡
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logback KafkaAppender 写入Kafka队列,集中日志输出.相关的知识,希望对你有一定的参考价值。
为了减少应用服务器对磁盘的读写,以及可以集中日志在一台机器上,方便使用ELK收集日志信息,所以考虑做一个jar包,让应用集中输出日志
Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.
网上搜了一圈,只发现有人写了个程序在github
地址:https://github.com/johnmpage/logback-kafka
Redis 自定义 RedisAppender 插件, 实现日志缓冲队列,集中日志输出.
本来打算引用一下这个jar就完事了,没想到在pom里下不下来,只好把源码下了,拷贝了代码过来,自己修改一下.
首先,安装一个Kafka,作为一个懒得出神入化得程序员,我选择的安装方式是
启动zookeeper容器
docker run -d --name zookeeper --net=host -t wurstmeister/zookeeper
启动kafka容器
docker run --name kafka -d -e HOST_IP=192.168.1.7 --net=host -v /usr/local/docker/kafka/conf/server.properties:/opt/kafka_2.12-1.0.0/config/server.properties -v /etc/localtime:/etc/localtime:ro -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -t wurstmeister/kafka
要修改Kafka的server.properties 中zookeeper配置
配置文件如下
listeners=PLAINTEXT://192.168.1.7:9092 delete.topic.enable=true advertised.listeners=PLAINTEXT://192.168.1.7:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 log.dirs=/kafka/kafka-logs-92cfb0bbd88c num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.bytes=10737418240 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.1.7:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 version=1.0.0
启动好了,新建SpringBoot项目,首先消费队列的
pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lzw</groupId> <artifactId>kafkalog</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafkalog</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.M6</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </project>
程序结构
KafkaConfig
package com.lzw.kafkalog.config; /** * Created by laizhenwei on 2017/11/28 */ @Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.consumer.bootstrap-servers}") private String consumerBootstrapServers; @Value("${spring.kafka.producer.bootstrap-servers}") private String producerBootstrapServers; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; }
@Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public Areceiver areceiver() { return new Areceiver(); } @Bean public Breceiver breceiver(){ return new Breceiver(); } }
KafkaAdminConfig
package com.lzw.kafkalog.config; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import java.util.HashMap; import java.util.Map; /** * Created by laizhenwei on 2017/11/28 */ @Configuration public class KafkaAdminConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String producerBootstrapServers; @Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,producerBootstrapServers); return new KafkaAdmin(configs); } /** * 创建队列A,1个分区 * @return */ @Bean public NewTopic a() { return new NewTopic("A", 1, (short) 1); } /** * 创建队列B,1个分区 * @return */ @Bean public NewTopic b() { return new NewTopic("B", 1, (short) 1); } }
B队列消费者
package com.lzw.kafkalog.b; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; /** * Created by laizhenwei on 2017/11/28 */ public class Breceiver { Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics={"B"}) public void listen(ConsumerRecord data) { logger.info(data.value().toString()); } }
application.yml
spring:
kafka:
consumer:
bootstrap-servers: 192.168.1.7:9092
producer:
bootstrap-servers: 192.168.1.7:9092
logback-test.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="true"> <contextName>logback</contextName> <property name="LOG_HOME" value="F:/log" /> <appender name="aAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/a/a.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <!--<fileNamePattern>${LOG_HOME}/a/a-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>--> <!-- 日志文件保留天数 --> <MaxHistory>30</MaxHistory> <!-- 文件大小触发重写新文件 --> <MaxFileSize>100MB</MaxFileSize> <totalSizeCap>10GB</totalSizeCap> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="bAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/b/b.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <!--<fileNamePattern>${LOG_HOME}/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern>--> <!-- 日志文件保留天数 --> <MaxHistory>30</MaxHistory> <!-- 文件大小触发重写新文件 --> <MaxFileSize>100MB</MaxFileSize> <totalSizeCap>10GB</totalSizeCap> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!--异步输出--> <appender name="aAsyncFile" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="aAppender" /> </appender> <logger name="com.lzw.kafkalog.a" level="INFO" additivity="false"> <appender-ref ref="aAsyncFile" /> </logger> <!--异步输出--> <appender name="bAsyncFile" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="bAppender" /> </appender> <logger name="com.lzw.kafkalog.b" level="INFO" additivity="false"> <appender-ref ref="bAsyncFile" /> </logger> </configuration>
消费者程序,重点是红框部分
红框源码,本来想做个容错,后来发现不行,原因等下再说
package com.lzw.project_b.kafka; import ch.qos.logback.core.AppenderBase; import ch.qos.logback.core.Layout; import ch.qos.logback.core.status.ErrorStatus; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.StringReader; import java.util.Properties; public class KafkaAppender<E> extends AppenderBase<E> { protected Layout<E> layout; private static final Logger LOGGER = LoggerFactory.getLogger("local"); private boolean logToLocal = false; private String kafkaProducerProperties; private String topic; private KafkaProducer producer; public void start() { super.start(); int errors = 0; if (this.layout == null) { this.addStatus(new ErrorStatus("No layout set for the appender named \\"" + this.name + "\\".", this)); ++errors; } if (errors == 0) { super.start(); } LOGGER.info("Starting KafkaAppender..."); final Properties properties = new Properties(); try { properties.load(new StringReader(kafkaProducerProperties)); producer = new KafkaProducer<>(properties); } catch (Exception exception) { System.out.println("KafkaAppender: Exception initializing Producer. " + exception + " : " + exception.getMessage()); } System.out.println("KafkaAppender: Producer initialized: " + producer); if (topic == null) { System.out.println("KafkaAppender requires a topic. Add this to the appender configuration."); } else { System.out.println("KafkaAppender will publish messages to the \'" + topic + "\' topic."); } LOGGER.info("kafkaProducerProperties = {}", kafkaProducerProperties); LOGGER.info("Kafka Producer Properties = {}", properties); if (logToLocal) { LOGGER.info("KafkaAppender: kafkaProducerProperties = \'" + kafkaProducerProperties + "\'."); LOGGER.info("KafkaAppender: properties = \'" + properties + "\'."); } } @Override public void stop() { super.stop(); LOGGER.info("Stopping KafkaAppender..."); producer.close(); } @Override protected void append(E event) { /** * 源码这里是用Formatter类转为JSON */ String msg = layout.doLayout(event); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg); producer.send(producerRecord); } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public boolean getLogToLocal() { return logToLocal; } public void setLogToLocal(String logToLocal) { if (Boolean.valueOf(logToLocal)) { this.logToLocal = true; } } public void setLayout(Layout<E> layout) { this.layout = layout; } public String getKafkaProducerProperties() { return kafkaProducerProperties; } public void setKafkaProducerProperties(String kafkaProducerProperties) { this.kafkaProducerProperties = kafkaProducerProperties; } }
LogService就记录一段长的垃圾日志
package com.lzw.project_b.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * Created by laizhenwei on 2017/12/1 */ @Component public class LogService { Logger logger = LoggerFactory.getLogger(this.getClass()); private static final String msg = "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdf" + "sadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsaasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" + "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" + "asdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" + "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa" + "dfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfas" + "dfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadf" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfa" + "sdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsadfasdfsa"; public void dolog() { logger.info(msg, new RuntimeException(msg)); } }
KafkaLogController就一个很无聊的输出日志请求,并记录入队时间
package com.lzw.project_b.controller; import com.lzw.project_b.service.LogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * Created by laizhenwei on 2017/11/29 */ @RestController @RequestMapping(path = "/kafka") public class KafkaLogController { @Autowired private LogService logService; @GetMapping(path = "/aa") public void aa() { long begin = System.nanoTime(); for (int i = 0; i < 100000; i++) { logService.dolog(); } long end = System.nanoTime(); System.out.println((end - begin) / 1000000); } }
启动两个程序,来一个请求
查看耗时
生产者的 logback-test.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="true"> <appender name="KAFKA" class="com.lzw.project_b.kafka.KafkaAppender"> <topic>B</topic> <kafkaProducerProperties> bootstrap.servers=192.168.1.7:9092 retries=0 value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer <!--reconnect.backoff.ms=1--> producer.type=async request.required.acks=0 <!--acks=0--> <!--producer.type=async --> <!--request.required.acks=1 --> <!--queue.buffering.max.ms=20000 --> <!--queue.buffering.max.messages=1000--> <!--queue.enqueue.timeout.ms = -1 --> <!--batch.num.messages=8--> <!--metadata.fetch.timeout.ms=3000--> <!--producer.type=sync--> <!--request.required.acks=1--> <!--reconnect.backoff.ms=3000--> <!--retry.backoff.ms=3000--> </kafkaProducerProperties> <logToLocal>true</logToLocal> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> </layout> </appender> 时间滚动输出 level为 monitor 日志 <appender name="localAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>F:/localLog/b/b.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>F:/localLog/b/b-%d{yyyy-MM-dd}.%i.tar.gz</fileNamePattern> <!-- 日志文件保留天数 --> <MaxHistory>30</MaxHistory> <!-- 文件大小触发重写新文件 --> <MaxFileSize>200MB</MaxFileSize> <totalSizeCap>10GB</totalSizeCap> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <appender name="asyncLocal" class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 --> <discardingThreshold>0</discardingThreshold> <queueSize>2048</queueSize> <appender-ref ref="localAppender"/> </appender> <!--万一kafka队列不通,记录到本地--> <logger name="local" additivity="false"> <appender-ref ref="asyncLocal"/> </logger> <!--<appender name="asyncKafka" class="ch.qos.logback.classic.AsyncAppender">--><以上是关于logback KafkaAppender 写入Kafka队列,集中日志输出.的主要内容,如果未能解决你的问题,请参考以下文章