Spring Boot 和 Kafka,Producer 抛出异常,key='null'

Posted

技术标签:

【中文标题】Spring Boot 和 Kafka,Producer 抛出异常,key=\'null\'【英文标题】:Spring Boot & Kafka, Producer thrown exception with key='null'Spring Boot 和 Kafka,Producer 抛出异常,key='null' 【发布时间】:2018-06-22 00:14:39 【问题描述】:

我正在尝试将Spring BootKafkaZooKeeperDocker 一起使用:

docker-compose.yml:

version: '2'

services:

zookeeper:
 image: wurstmeister/zookeeper
 restart: always
 ports:
   - "2181:2181"

kafka:
 image: wurstmeister/kafka
 restart: always
 ports:
   - "9092:9092"
 environment:
   KAFKA_ADVERTISED_HOST_NAME: localhost
   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

docker ps 输出:

CONTAINER ID        IMAGE                    COMMAND                            CREATED             STATUS              PORTS                                                        NAMES
980e6b09f4e3        wurstmeister/kafka       "start-kafka.sh"         29 minutes ago      Up 29 minutes       0.0.0.0:9092->9092/tcp                               samplespringkafkaproducerconsumermaster_kafka_1
64519d4808aa        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   2 hours ago         Up 29 minutes       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   samplespringkafkaproducerconsumermaster_zookeeper_1

docker-compose up输出日志:

kafka_1      | [2018-01-12 13:14:49,545] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:os.version=4.9.60-linuxkit-aufs (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,552] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,574] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
kafka_1      | [2018-01-12 13:14:49,578] INFO Opening socket connection to server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
zookeeper_1  | 2018-01-12 13:14:49,591 [myid:] - INFO  [NioserverCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.32.3:51466
kafka_1      | [2018-01-12 13:14:49,593] INFO Socket connection established to samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)
zookeeper_1  | 2018-01-12 13:14:49,600 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /192.168.32.3:51466
zookeeper_1  | 2018-01-12 13:14:49,603 [myid:] - INFO  [SyncThread:0:FileTxnLog@203] - Creating new log file: log.fd
zookeeper_1  | 2018-01-12 13:14:49,613 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@673] - Established session 0x160ea8232b00000 with negotiated timeout 6000 for client /192.168.32.3:51466
kafka_1      | [2018-01-12 13:14:49,616] INFO Session establishment complete on server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, sessionid = 0x160ea8232b00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
kafka_1      | [2018-01-12 13:14:49,619] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
kafka_1      | [2018-01-12 13:14:49,992] INFO Cluster ID = Fgy9ybPPQQ-QdLINzHpmVA (kafka.server.KafkaServer)
kafka_1      | [2018-01-12 13:14:50,003] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,067] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,167] INFO Log directory '/kafka/kafka-logs-980e6b09f4e3' not found, creating it. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,183] INFO Loading logs. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,199] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,283] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,291] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,633] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1      | [2018-01-12 13:14:50,639] INFO [SocketServer brokerId=1005] Started 1 acceptor threads (kafka.network.SocketServer)
kafka_1      | [2018-01-12 13:14:50,673] INFO [ExpirationReaper-1005-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,674] INFO [ExpirationReaper-1005-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,675] INFO [ExpirationReaper-1005-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,691] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1      | [2018-01-12 13:14:50,753] INFO [ExpirationReaper-1005-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,757] INFO [ExpirationReaper-1005-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,762] INFO [ExpirationReaper-1005-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,777] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,791] INFO [GroupCoordinator 1005]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2018-01-12 13:14:50,791] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,793] INFO [GroupCoordinator 1005]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2018-01-12 13:14:50,798] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1      | [2018-01-12 13:14:50,811] INFO [ProducerId Manager 1005]: Acquired new producerId block (brokerId:1005,blockStartProducerId:5000,blockEndProducerId:5999) by writing to Zk with path version 6 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1      | [2018-01-12 13:14:50,848] INFO [TransactionCoordinator id=1005] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2018-01-12 13:14:50,850] INFO [Transaction Marker Channel Manager 1005]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1      | [2018-01-12 13:14:50,850] INFO [TransactionCoordinator id=1005] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2018-01-12 13:14:50,949] INFO Creating /brokers/ids/1005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
zookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x70 zxid:0x102 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
zookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x71 zxid:0x103 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
kafka_1      | [2018-01-12 13:14:50,957] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,959] INFO Registered broker 1005 at path /brokers/ids/1005 with addresses: EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
kafka_1      | [2018-01-12 13:14:50,961] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1      | [2018-01-12 13:14:50,992] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2018-01-12 13:14:50,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2018-01-12 13:14:51,004] INFO [KafkaServer id=1005] started (kafka.server.KafkaServer)
zookeeper_1  | 2018-01-12 13:14:51,263 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:delete cxid:0xe3 zxid:0x105 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1      | [2018-01-12 13:24:50,793] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1      | [2018-01-12 13:34:50,795] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Kafka maven 依赖于 ProducerConsumer:

 <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>1.5.9.RELEASE</version>
     <relativePath/>
 </parent>

 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

application.properties in Producer:

 spring.kafka.producer.bootstrap-servers=0.0.0.0:9092

 spring.kafka.consumer.topic=kafka_topic
 server.port=8080

application.properties in Consumer:

 spring.kafka.consumer.bootstrap-servers=0.0.0.0:9092
 spring.kafka.consumer.group-id=WorkUnitApp

 spring.kafka.consumer.topic=kafka_topic
 server.port=8081

Consumer:

@Component 
public class Consumer 

private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

@KafkaListener(topics = "$spring.kafka.consumer.topic")
 public void receive(ConsumerRecord<?, ?> consumerRecord) 
 LOGGER.info("received payload=''", consumerRecord.toString());
 

Producer:

@Component
public class Producer 

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic, String payload) 
 LOGGER.info("sending payload='' to topic=''", payload, topic);
 kafkaTemplate.send(topic, payload);
 

ConsumerConfig日志:

2018-01-12 15:25:48.220  INFO 20919 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [0.0.0.0:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = WorkUnitApp
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

ProducerConfig日志:

2018-01-12 15:26:27.956  INFO 20924 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [0.0.0.0:9092]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

当我尝试发送消息时出现异常:

producer.send("kafka_topic", "test")

异常日志:

2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1]    o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2018-01-12 15:26:58.152 ERROR 20924 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic kafka_topic:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_topic-0 due to 30033 ms has passed since batch creation plus linger time

如何解决?

【问题讨论】:

你在哪个班上producer.send("kafka_topic", "test")。也质疑该课程 试试producer.send("kafka_topic", "key", "test") @pvpkiran 类添加,向 kafka_topic 主题发送带有 key='key' 和 payload='test' 的消息时抛出异常: 【参考方案1】:

问题不在于将key 作为null 发送,可能无法建立与代理的连接

尝试使用本地 Kafka 安装。

如果你使用 mac Docker for mac 有一些网络 局限性 https://docs.docker.com/docker-for-mac/networking/#known-limitations-use-cases-and-workarounds

【讨论】:

你是对的,对于 mac,我使用 0.0.0.0 而不是 localhost,在日志中:bootstrap.servers = [0.0.0.0:9092],我想将它与 docker 一起使用,但我不还知道在哪里修复连接 我解决了这个问题,感谢您提出这是原因,链接:-zookeeper:zookeeper,【参考方案2】:

我遇到了同样的问题。问题出在我的 dockercompose 文件上。不是 100%,但我认为 KAFKA_ADVERTISED_HOST_NAMEKAFKA_ADVERTISED_LISTENERS 都需要引用 localhost。我的工作撰写文件。

version: '2'

networks:
  sb_net:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    networks:
     - sb_net
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    networks:
     - sb_net
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

【讨论】:

【参考方案3】:

正念错误,需要添加link

links:
  - zookeeper:zookeeper

完整的docker-compose.yml:

version: '2'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
    KAFKA_ADVERTISED_HOST_NAME: localhost
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

【讨论】:

【参考方案4】:

我也遇到了同样的问题,Kafka默认只允许127.0.0.0/localhost

我的解决方案:

在Kafkaserver.properties中加入这一行,重启Kafka服务

listeners=PLAINTEXT://192.168.31.72:9092

【讨论】:

以上是关于Spring Boot 和 Kafka,Producer 抛出异常,key='null'的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 入门和 Spring Boot 集成

解决 spring boot 访问 docker kafka 失败

Kafka 入门和 Spring Boot 集成

Spring Boot 和 Kafka,Producer 抛出异常,key='null'

Spring Boot Kafka监听器不一致

spring boot集成kafka开发,接收kafka消息,Java