Kafka+Log4j2日志

Posted 小LUA

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka+Log4j2日志相关的知识,希望对你有一定的参考价值。

默认你已经安装配置了Zookeeper和Kafka。

 

为了目录清晰,我的Kafka配置文件的Zookeeper部分是:加上了节点用来存放Kafka信息

启动Zookeeper,然后启动Kafka。

Zookeeper的节点树:根目录下有专门的Kafka存放节点【以前没有配这个,结果Kafka的一大堆东西全部跑到根节点上了,很乱】

接下来是代码部分了。

依赖包:

Log4j2配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration monitorInterval="1800">

    <Filter type="ThresholdFilter" level="trace" />

    <Appenders>
        <Console name="console" target="SYSTEM_OUT">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="trace" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n" />
        </Console>

        <Kafka name="Kafka" topic="my-topic" syncSend="false">
            <PatternLayout pattern="%date %message" />
            <Property name="bootstrap.servers">192.168.127.129:9092,192.168.127.130:9092,192.168.127.131:9092</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="WARN">
            <!-- TRACE < DEBUG < INFO < WARN < ERROR < FATAL -->
            <AppenderRef ref="console" />
        </Root>
        <Logger name="kafkaLog" level="trace">
            <AppenderRef ref="Kafka" />
        </Logger>
    </Loggers>
</Configuration>

生产者:

package learn.kafka.log4j;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class SimpleProducer {

    private static Logger log = LogManager.getLogger("kafkaLog");
    
    public static void main(String[] args) {
        for (int i = 10; i < 20; i++) {
            log.info("Hello---" + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

消费者:

package learn.kafka.log4j;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.127.129:9092,192.168.127.130:9092,192.168.127.131:9092");
        props.put("group.id", "testGroup");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅的topic,多个用逗号隔开
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        //consumer.close();
    }
}

先运行消费者,让它监听等待。

在运行生产者,让它生产消息。

你会发现每隔一秒输出一行信息。信息在value后面。

 

 

 

以上是关于Kafka+Log4j2日志的主要内容,如果未能解决你的问题,请参考以下文章

log4j使用kafka作为输出源时死锁

kafka运维:kafka操作日志设置

log4j2日志发给kafka

如何使用log4j在日志中打印spring kafka配置

011- Kafka应用之Kafka与Log4j的整合

ELK日志系统设计方案-Log4j日志直推Kafka