春季启动生产者在kafka重启后无法发送任何消息

Posted

技术标签:

【中文标题】春季启动生产者在kafka重启后无法发送任何消息【英文标题】:Spring boot producer fail to send any message after kafka restart 【发布时间】:2019-09-26 06:57:36 【问题描述】:

spring-boot消费者微服务在kafka重启后无法向topic发送消息。

使用 docker swarm 配置,我设置了带有 1 个 kafka 代理和 2 个 Spring Boot 微服务(一个生产者和一个消费者)的单节点集群。 我正在使用 Spring Boot 2.0.3

consumer 和 producer(spring boot 微服务)在同一个覆盖网络“net-broker”上,因此他们使用服务名称“kafka:9092”访问 kafka

第一次启动时一切正常。

然后 kafka ONLY 重新启动,之后消费者无法再从 kafka 主题发送消息。

由于 docker-compose.yml 中的微小变化(例如 max_attempts: 3 => max_attempts: 4),kafka 服务重新启动

docker-compose.yml 文件

kafka:
    image: wurstmeister/kafka:2.12-2.2.0
    depends_on:
      - zookeeper
    networks:
      - net-broker
    deploy:
      replicas: 1
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure
        max_attempts: 3
    # ports:
    #   - target: 9094
    #     published: 9094
    #     protocol: tcp
    #     mode: host
    environment:
      HOSTNAME_COMMAND: "echo $HOST_IP:-192.168.99.100"
      KAFKA_CREATE_TOPICS: "gnss-topic-$GNSS_TAG:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_HOSTNAME_COMMAND:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      BROKER_ID_COMMAND: "echo 101"
      KAFKA_LOG_DIRS: "/kafka/kafka-logs"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka-data:/kafka

KafkaProducerConfig 类

@Bean
  public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() 
    Map<String, Object> configProps = new HashMap<>();

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());

    // high throughput producer (at the expense of a bit of latency and CPU usage)
    configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
    configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size

    // serializers
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(configProps);
  

kafka重启后的spring boot producer日志:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time

kafka 重启后的 spring boot 消费者日志:

gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2    | 2019-05-08 09:42:33.984  INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead

我正在为生产者/消费者微服务使用“spring-kafka-2.1.7.RELEASE.jar”库

使用远程调试模式,我了解到消费者正在尝试向旧的“已终止”容器 ID 发送消息,而不是使用服务名称“kafka:9092”。我不知道为什么。

【问题讨论】:

在:` configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());` gnssConfig.getKafkaBootstapServers() 是 IP、Docker 容器 ID 还是符号名称? 您好 Pablo,首先非常感谢您考虑我的问题。 gnssConfig.getKafkaBootstapServers() 将返回 kafka:9092 即服务名称 【参考方案1】:

我找到了解决这个问题的方法。

提醒一下:

Kafka 应该只能从 Docker net-broker 的覆盖虚拟网络访问。 出于安全原因,不应从 Docker 主机 IP 访问 Kafka

KAFKA_ADVERTISED_LISTENERS 环境变量中的以下更改修复了问题

旧值(重启后不工作): KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_HOSTNAME_COMMAND:9094

新值(重启后工作): KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://_HOSTNAME_COMMAND:9094

因此解决方法是为 INSIDE 广告侦听器指定 kafka 服务名称 kafka:9092

问题在于,即使 spring boot 生产者被配置为使用kafka:9092

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka:9092);  

生产者实际上是使用kafka容器ID而不是服务名称kafka:9092进行通信,因此在kafka重启后创建了一个新容器(新容器ID)但生产者仍然指向旧容器ID

【讨论】:

以上是关于春季启动生产者在kafka重启后无法发送任何消息的主要内容,如果未能解决你的问题,请参考以下文章

一段时间后,Kafka 生产者无法更新元数据

Java API中kafka生产者发送消息没有成功

kafka常用的shell命令

无法使用 kafka-node 向 kafka Producer 发送消息

(04)使用kafka脚本发送消息和接收消息

Vagrant中的Kafka Cluster(ZK,BR,BR,BR)无法建立连接