Spring Boot 和 Kafka,Producer 抛出异常,key='null'
Posted
技术标签:
【中文标题】Spring Boot 和 Kafka,Producer 抛出异常,key=\'null\'【英文标题】:Spring Boot & Kafka, Producer thrown exception with key='null'Spring Boot 和 Kafka,Producer 抛出异常,key='null' 【发布时间】:2018-06-22 00:14:39 【问题描述】:我正在尝试将Spring Boot
与Kafka
和ZooKeeper
与Docker
一起使用:
docker-compose.yml:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
restart: always
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
restart: always
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
docker ps
输出:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
980e6b09f4e3 wurstmeister/kafka "start-kafka.sh" 29 minutes ago Up 29 minutes 0.0.0.0:9092->9092/tcp samplespringkafkaproducerconsumermaster_kafka_1
64519d4808aa wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 2 hours ago Up 29 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp samplespringkafkaproducerconsumermaster_zookeeper_1
docker-compose up
输出日志:
kafka_1 | [2018-01-12 13:14:49,545] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,546] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,546] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,547] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,547] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,548] INFO Client environment:os.version=4.9.60-linuxkit-aufs (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,548] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,549] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,549] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,552] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)
kafka_1 | [2018-01-12 13:14:49,574] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
kafka_1 | [2018-01-12 13:14:49,578] INFO Opening socket connection to server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
zookeeper_1 | 2018-01-12 13:14:49,591 [myid:] - INFO [NioserverCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.32.3:51466
kafka_1 | [2018-01-12 13:14:49,593] INFO Socket connection established to samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)
zookeeper_1 | 2018-01-12 13:14:49,600 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /192.168.32.3:51466
zookeeper_1 | 2018-01-12 13:14:49,603 [myid:] - INFO [SyncThread:0:FileTxnLog@203] - Creating new log file: log.fd
zookeeper_1 | 2018-01-12 13:14:49,613 [myid:] - INFO [SyncThread:0:ZooKeeperServer@673] - Established session 0x160ea8232b00000 with negotiated timeout 6000 for client /192.168.32.3:51466
kafka_1 | [2018-01-12 13:14:49,616] INFO Session establishment complete on server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, sessionid = 0x160ea8232b00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
kafka_1 | [2018-01-12 13:14:49,619] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
kafka_1 | [2018-01-12 13:14:49,992] INFO Cluster ID = Fgy9ybPPQQ-QdLINzHpmVA (kafka.server.KafkaServer)
kafka_1 | [2018-01-12 13:14:50,003] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1 | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1 | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1 | [2018-01-12 13:14:50,067] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1 | [2018-01-12 13:14:50,167] INFO Log directory '/kafka/kafka-logs-980e6b09f4e3' not found, creating it. (kafka.log.LogManager)
kafka_1 | [2018-01-12 13:14:50,183] INFO Loading logs. (kafka.log.LogManager)
kafka_1 | [2018-01-12 13:14:50,199] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)
kafka_1 | [2018-01-12 13:14:50,283] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1 | [2018-01-12 13:14:50,291] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1 | [2018-01-12 13:14:50,633] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1 | [2018-01-12 13:14:50,639] INFO [SocketServer brokerId=1005] Started 1 acceptor threads (kafka.network.SocketServer)
kafka_1 | [2018-01-12 13:14:50,673] INFO [ExpirationReaper-1005-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,674] INFO [ExpirationReaper-1005-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,675] INFO [ExpirationReaper-1005-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,691] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1 | [2018-01-12 13:14:50,753] INFO [ExpirationReaper-1005-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,757] INFO [ExpirationReaper-1005-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,762] INFO [ExpirationReaper-1005-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1 | [2018-01-12 13:14:50,777] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
kafka_1 | [2018-01-12 13:14:50,791] INFO [GroupCoordinator 1005]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1 | [2018-01-12 13:14:50,791] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1 | [2018-01-12 13:14:50,793] INFO [GroupCoordinator 1005]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1 | [2018-01-12 13:14:50,798] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1 | [2018-01-12 13:14:50,811] INFO [ProducerId Manager 1005]: Acquired new producerId block (brokerId:1005,blockStartProducerId:5000,blockEndProducerId:5999) by writing to Zk with path version 6 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1 | [2018-01-12 13:14:50,848] INFO [TransactionCoordinator id=1005] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1 | [2018-01-12 13:14:50,850] INFO [Transaction Marker Channel Manager 1005]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1 | [2018-01-12 13:14:50,850] INFO [TransactionCoordinator id=1005] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1 | [2018-01-12 13:14:50,949] INFO Creating /brokers/ids/1005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
zookeeper_1 | 2018-01-12 13:14:50,952 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x70 zxid:0x102 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
zookeeper_1 | 2018-01-12 13:14:50,952 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x71 zxid:0x103 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
kafka_1 | [2018-01-12 13:14:50,957] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1 | [2018-01-12 13:14:50,959] INFO Registered broker 1005 at path /brokers/ids/1005 with addresses: EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
kafka_1 | [2018-01-12 13:14:50,961] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1 | [2018-01-12 13:14:50,992] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-01-12 13:14:50,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-01-12 13:14:51,004] INFO [KafkaServer id=1005] started (kafka.server.KafkaServer)
zookeeper_1 | 2018-01-12 13:14:51,263 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:delete cxid:0xe3 zxid:0x105 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1 | [2018-01-12 13:24:50,793] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1 | [2018-01-12 13:34:50,795] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
Kafka
maven 依赖于 Producer
和 Consumer
:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
in Producer
:
spring.kafka.producer.bootstrap-servers=0.0.0.0:9092
spring.kafka.consumer.topic=kafka_topic
server.port=8080
application.properties
in Consumer
:
spring.kafka.consumer.bootstrap-servers=0.0.0.0:9092
spring.kafka.consumer.group-id=WorkUnitApp
spring.kafka.consumer.topic=kafka_topic
server.port=8081
Consumer
:
@Component
public class Consumer
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "$spring.kafka.consumer.topic")
public void receive(ConsumerRecord<?, ?> consumerRecord)
LOGGER.info("received payload=''", consumerRecord.toString());
Producer
:
@Component
public class Producer
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload)
LOGGER.info("sending payload='' to topic=''", payload, topic);
kafkaTemplate.send(topic, payload);
ConsumerConfig
日志:
2018-01-12 15:25:48.220 INFO 20919 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [0.0.0.0:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = WorkUnitApp
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ProducerConfig
日志:
2018-01-12 15:26:27.956 INFO 20924 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [0.0.0.0:9092]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
当我尝试发送消息时出现异常:
producer.send("kafka_topic", "test")
异常日志:
2018-01-12 15:26:27.975 INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1
2018-01-12 15:26:27.975 INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247
2018-01-12 15:26:58.152 ERROR 20924 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='test' to topic kafka_topic:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_topic-0 due to 30033 ms has passed since batch creation plus linger time
如何解决?
【问题讨论】:
你在哪个班上producer.send("kafka_topic", "test")
。也质疑该课程
试试producer.send("kafka_topic", "key", "test")
@pvpkiran 类添加,向 kafka_topic 主题发送带有 key='key' 和 payload='test' 的消息时抛出异常:
【参考方案1】:
问题不在于将key
作为null
发送,可能无法建立与代理的连接
尝试使用本地 Kafka 安装。
如果你使用 mac Docker for mac 有一些网络 局限性 https://docs.docker.com/docker-for-mac/networking/#known-limitations-use-cases-and-workarounds
【讨论】:
你是对的,对于 mac,我使用 0.0.0.0 而不是 localhost,在日志中:bootstrap.servers = [0.0.0.0:9092],我想将它与 docker 一起使用,但我不还知道在哪里修复连接 我解决了这个问题,感谢您提出这是原因,链接:-zookeeper:zookeeper,【参考方案2】:我遇到了同样的问题。问题出在我的 dockercompose 文件上。不是 100%,但我认为 KAFKA_ADVERTISED_HOST_NAME 和 KAFKA_ADVERTISED_LISTENERS 都需要引用 localhost。我的工作撰写文件。
version: '2'
networks:
sb_net:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
networks:
- sb_net
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
networks:
- sb_net
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
【讨论】:
【参考方案3】:正念错误,需要添加link
:
links:
- zookeeper:zookeeper
完整的docker-compose.yml
:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
【讨论】:
【参考方案4】:我也遇到了同样的问题,Kafka默认只允许127.0.0.0
/localhost
。
我的解决方案:
在Kafkaserver.properties
中加入这一行,重启Kafka服务
listeners=PLAINTEXT://192.168.31.72:9092
【讨论】:
以上是关于Spring Boot 和 Kafka,Producer 抛出异常,key='null'的主要内容,如果未能解决你的问题,请参考以下文章
解决 spring boot 访问 docker kafka 失败