Spring生态研习:Spring-kafka

Posted SHIHUC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring生态研习:Spring-kafka相关的知识,希望对你有一定的参考价值。

1. 基本信息介绍


基于spring的kafka应用,非常简单即可搭建起来,前提是要有一个kafka的broker集群。我在之前的博文里面已经介绍并搭建了一套broker环境,参考Kafka研究【一】:bring up环境

另外,要注意的是kafka基于spring框架构建应用,需要注意版本信息,下面是官方要求:

Apache Kafka Clients 1.0.0
Spring Framework 5.0.x
Minimum Java version: 8

我这里要介绍的应用案例,是基于springboot构建的,所以,版本信息,可能不是严格按照上述的要求来的,但是整体还是满足版本兼容要求。

 

2. 搭建基于springboot的kafka应用

2.1 首先在IDEA里面构建一个maven项目

配置好pom.xml,整个项目的pom.xml如下:

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.roomdis</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka</name>
    <description>kafka project with Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 添加log4j的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

 

接下来,就是构建具体的消息生产者和消息消费者。这里,我们的topic是固定的,partition也是默认的1个,这里主要是介绍如何构建一个spring框架下的kafka应用,至于如何动态构建topic,下一个博文介绍深入内容。这里,介绍一个基本的消息发送和介绍流程,发送采用异步(async)的方式,接收消息的模块,采用了应用层面控制消费确认,一般来说,生产级别的kafka应用,消息的消费确认都是会选择应用层面控制确认逻辑,保障消息的安全处理,既不出现消息丢失,也不出现重复消费的问题

 

2.2 工程配置

这里,我采用的是YAML格式的配置文件,这个也非常简单,其实和properties的配置相比,还简单明了。具体配置如下:

server:
  port: 8899
  contextPath : /kafka
spring:
  application:
    name: kafka
  kafka:
    bootstrapServers: 10.90.7.2:9092,10.90.2.101:9092,10.90.2.102:9092
    consumer:
      groupId: kefu-logger
      enable-auto-commit: false
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      groupId: kefu-logger
      retries: 3
      buffer-memory: 20480
      keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
      valueDserializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: MANUAL_IMMEDIATE

 

 

 

这里重点说下几点:

A. 应用端口是8899,工程对外项目名称是kafka,即URL里面的头部是/kafka.

B. 另外,消息生产和消费的序列化工具都是指定的String的。

C. 消费者和生产者都在指定的组groupId为kefu-logger.注意,这里的groupId,其实是为了提高消息的消费能力做的特别处理,即同一个groupId的消费者,可以负载均衡的将partition组里面的消息消费掉。

D. 还有一点,很重要的就是监听器的ackMode的配置,这里,指定为MANUAL_IMMEDIATE,意思就是手动立即确认,这个必须要求消费者配置enable-auto-commit为false,同时,消息消费的逻辑里面,要有相应的逻辑对消费的消息进行acknowledge操作,否则,下次消费者启动后,将会再次消费这些offset对应的消息记录,导致重复消费

 

 

2.3 消息实例定义

这里,主要是考虑后续的日志集中接管处理,所以,DTO就是以日志消息维度定义的。主要有如下内容:

 

public class LogMessage {
    /*
     *服务类型,例如:IMS,BI等
     */
    private String serviceType;
    /*
     *服务器地址,IP:PORT,例如:10.130.207.221:8080
     */
    private String serverAddr;
    /*
     *日志产生的具体程序全路径
     */
    private String fullClassPath;
    /*
     *消息产生的时间
     */
    private String messageTime;
    /*
     *消息的具体内容。这个很重要,是json的字符串。兼容不同服务的消息格式。
     */
    private String content;
    /*
     *日志的级别,主要有INFO,WARN,ERROR,DEBUG等
     */
    private String level;

    public String getServiceType() {
        return serviceType;
    }

    public void setServiceType(String serviceType) {
        this.serviceType = serviceType;
    }

    public String getServerAddr() {
        return serverAddr;
    }

    public void setServerAddr(String serverAddr) {
        this.serverAddr = serverAddr;
    }

    public String getFullClassPath() {
        return fullClassPath;
    }

    public void setFullClassPath(String fullClassPath) {
        this.fullClassPath = fullClassPath;
    }

    public String getMessageTime() {
        return messageTime;
    }

    public void setMessageTime(String messageTime) {
        this.messageTime = messageTime;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }
}

 

当然,这里的DTO里面,其实可以采用注解的方式实现setter和getter以及toString等基本函数的实现,为了方便说明问题,我这里就不要lomback注解包的功能。

 

2.4 消息生产者

这里重点关注消息的异步生产过程,即消息投递到broker的过程是异步的,这个是非常有价值的,对于并发性提升。

 

@Service
public class MessageProducer {
    private Logger logger = Logger.getLogger(MessageProducer.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    public void send(LogMessage logMessage) {
        String msg = gson.toJson(logMessage);
        //下面采取的是异步的方式完成消息的发送,发送成功或者失败,都有回调函数进行后续逻辑处理,非常方便
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(Config.TOPIC, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> stringStringSendResult) {
                long offset = stringStringSendResult.getRecordMetadata().offset();
                String cont = stringStringSendResult.getProducerRecord().toString();
                logger.info("cont: " + cont + ", offset: " + offset);
            }

            @Override
            public void onFailure(Throwable throwable) {
                logger.error(throwable.getMessage());
            }
        });
    }
}

 

 

2.5 消息消费者

下面的消费者逻辑中,OnMessage的入参中,必须要有Acknowledgment参数,否则没有办法完成MANUAL的所谓应用层面的消息消费确认。

@Service
public class MessageConsumer {

    private Logger logger = Logger.getLogger(MessageConsumer.class);

    @KafkaListener(topics = Config.TOPIC)
    public void onMessage(@Payload String msg, Acknowledgment ack){
        logger.info(msg);
//        long offset = record.offset();
//        long partition = record.partition();
//        String content = record.value();
//        logger.info("offset: " + offset + ", partition: " + partition + ", content: " + content);
        ack.acknowledge();
    }

    @KafkaListener(topics = Config.TOPIC)
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack){
        logger.info(record);
        long offset = record.offset();
        long partition = record.partition();
        String content = record.value();
        logger.info("offset: " + offset + ", partition: " + partition + ", payload: " + content);
        //手动确认消息已经被消费,这个很重要,灵活控制,保证消息消费确认的问题。
        ack.acknowledge();
    }
}

 

3. 程序运行验证

这里,主要是验证消息消费后,执行了ack.acknowledge()和不执行ack.acknowledge()的区别,深刻理解不确认会导致重复消费的问题。

 

3.1 执行acknowledge

效果是程序启动后offset的值会接着上次递增,对应的消息内容payload也是不同的。这个就不给出日志内容了。

3.2 不执行acknowledge
为了对比,给出一段停应用前的日志:

  .   ____          _            __ _ _
 /\\\\ / ___\'_ __ _ _(_)_ __  __ _ \\ \\ \\ \\
( ( )\\___ | \'_ | \'_| | \'_ \\/ _` | \\ \\ \\ \\
 \\\\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  \'  |____| .__|_| |_|_| |_\\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.4.RELEASE)

2018-08-01 19:45:06.181  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : Starting KafkaApplication on 60-361-0008 with PID 14264 (D:\\Knowledge\\SOURCE\\springboot-kafka\\target\\classes started by chengsh05 in D:\\Knowledge\\SOURCE\\springboot-kafka)
2018-08-01 19:45:06.184  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : No active profile set, falling back to default profiles: default
2018-08-01 19:45:06.236  INFO 14264 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy
2018-08-01 19:45:07.194  INFO 14264 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean \'org.springframework.kafka.annotation.KafkaBootstrapConfiguration\' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2d472f92] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-08-01 19:45:07.655  INFO 14264 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8899 (http)
2018-08-01 19:45:07.672  INFO 14264 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2018-08-01 19:45:07.673  INFO 14264 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/8.5.15
2018-08-01 19:45:07.786  INFO 14264 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/kafka]  : Initializing Spring embedded WebApplicationContext
2018-08-01 19:45:07.786  INFO 14264 --- [ost-startStop-1] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1552 ms
2018-08-01 19:45:07.942  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean  : Mapping servlet: \'dispatcherServlet\' to [/]
2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: \'characterEncodingFilter\' to: [/*]
2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: \'hiddenHttpMethodFilter\' to: [/*]
2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: \'httpPutFormContentFilter\' to: [/*]
2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: \'requestContextFilter\' to: [/*]
2018-08-01 19:45:08.354  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy
2018-08-01 19:45:08.419  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.Object>> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-08-01 19:45:08.420  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-08-01 19:45:08.448  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-08-01 19:45:08.448  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-08-01 19:45:08.478  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-08-01 19:45:08.623  INFO 14264 --- [           main] o.s.ui.freemarker.SpringTemplateLoader   : SpringTemplateLoader for FreeMarker: using resource loader [org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy] and template loader path [classpath:/templates/]
2018-08-01 19:45:08.624  INFO 14264 --- [           main] o.s.w.s.v.f.FreeMarkerConfigurer         : ClassTemplateLoader for Spring macros added to FreeMarker configuration
2018-08-01 19:45:08.644  WARN 14264 --- [           main] o.s.b.a.f.FreeMarkerAutoConfiguration    : Cannot find template location(s): [classpath:/templates/] (please add some templates, check your FreeMarker configuration, or set spring.freemarker.checkTemplateLocation=false)
2018-08-01 19:45:08.717  INFO 14264 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-08-01 19:45:08.734  INFO 14264 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-08-01 19:45:08.748  INFO 14264 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kefu-logger
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2018-08-01 19:45:08.751  INFO 14264 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
    check.crcs = true
    client.id = consumer-1
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kefu-logger
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2018-08-01 19:45:08.796  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2018-08-01 19:45:08.797  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2018-08-01 19:45:08.841  INFO 14264 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8899 (http)
2018-08-01 19:45:08.848  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : Started KafkaApplication in 3.079 seconds (JVM running for 3.484)
2018-08-01 19:45:08.859  INFO 14264 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
    buffer.memory = 20480
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 3
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-08-01 19:45:08.859  INFO 14264 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
    buffer.memory = 20480
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linge

以上是关于Spring生态研习:Spring-kafka的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性

spring-kafka 监听器签名

spring-kafka 固定事件迁移实现

LocalDateTime 的自定义 spring-kafka 反序列化器

Spring-Kafka.RecordIntercepter类未找到。找不到RecordIntercepter类

如何使用 testcontainers 和 spring-kafka 准备测试