Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)
Posted
技术标签:
【中文标题】Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)【英文标题】:Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers) 【发布时间】:2018-06-05 20:08:23 【问题描述】:我正在使用这个 docker-compose 设置在本地设置 Kafka:https://github.com/wurstmeister/kafka-docker/
docker-compose up
工作正常,通过 shell 创建主题工作正常。
现在我尝试通过spring-kafka:2.1.0.RELEASE
连接到 Kafka
当启动 Spring 应用程序时,它会打印正确版本的 Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
我尝试发送这样的消息
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
在客户端发送失败
UnknownServerException: The server experienced an unexpected error when processing the request
在服务器控制台中,我收到消息 Magic v1 不支持记录标头
Error when handling request replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[topic=test-topic,partitions=[partition=0,fetch_offset=39,max_bytes=1048576]] (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
谷歌搜索提示版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0
在类路径中)。
有什么线索吗?谢谢!
编辑: 我缩小了问题的根源。发送纯字符串有效,但通过 JsonSerializer 发送 Json 会导致给定问题。这是我的生产者配置的内容:
@Value("\$kafka.bootstrap-servers")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())
【问题讨论】:
这没有意义; (在服务器端获取该消息)。如果客户端版本较旧,它不会发送任何标头,所以一切都应该很好(我已经使用带有 0.10 代理的 1.0.0 客户端进行了测试,只要您不尝试发送标头,它就可以工作)。对于 1.0.0 客户端,当模板不发送任何标头时(由客户端)发送“空”RecordHeaders
。
映像名称似乎没有指定版本,因此您可能使用的是较旧的缓存 docker 映像和较新的客户端。向 0.10 代理发送标头的 1.0 客户端会收到此错误。尝试检查 docker 镜像版本和 docker pull 最新的 1.0 代理镜像。
将 Kafka 版本更新到最新版本后,我没有收到“Magic v1 不支持记录标头”异常,并且代码运行良好。
【参考方案1】:
我遇到了类似的问题。如果我们使用JsonSerializer
或JsonSerde
作为值,Kafka 默认会添加标题。
为了防止这个问题,我们需要禁用添加信息头。
如果你对默认的 json 序列化没问题,那么使用下面的(这里的重点是ADD_TYPE_INFO_HEADERS
):
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
但是如果您需要带有特定ObjectMapper
的自定义JsonSerializer
(例如PropertyNamingStrategy.SNAKE_CASE
),则应禁用在JsonSerializer
上显式添加信息标头,因为spring kafka 会忽略DefaultKafkaProducerFactory
的属性ADD_TYPE_INFO_HEADERS
(对我来说这是spring kafka的糟糕设计)
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
或者如果我们使用JsonSerde
,那么:
Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
【讨论】:
添加这一行解决了我的问题props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
如果我确实需要类型标头怎么办?我该如何克服这个问题?【参考方案2】:
解决了。问题既不是代理,也不是一些 docker 缓存,也不是 Spring 应用程序。
问题是我并行使用的控制台使用者进行调试。这是一个以kafka-console-consumer.sh --topic=topic --zookeeper=...
开头的“旧”消费者
它实际上在启动时会打印一个警告:Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
应该使用带有--bootstrap-server
选项的“新”消费者(尤其是在使用带有 JsonSerializer 的 Kafka 1.0 时)。
注意:这里使用老消费者确实会影响生产者。
【讨论】:
我没有并行运行任何消费者,但这个问题仍然存在。我也尝试添加 props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);但它仍然给出了同样的错误,magicv1 不支持添加记录头。你能在这里帮忙吗【参考方案3】:我刚刚对那个 docker 映像进行了测试,没有任何问题...
$docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f093b3f2475c kafkadocker_kafka "start-kafka.sh" 33 minutes ago Up 2 minutes 0.0.0.0:32768->9092/tcp kafkadocker_kafka_1
319365849e48 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 33 minutes ago Up 2 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkadocker_zookeeper_1
.
@SpringBootApplication
public class So47953901Application
public static void main(String[] args)
SpringApplication.run(So47953901Application.class, args);
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template)
return args -> template.send("foo", "bar", "baz");
@KafkaListener(id = "foo", topics = "foo")
public void listen(String in)
System.out.println(in);
.
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
.
2017-12-23 13:27:27.990 INFO 21305 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
编辑
仍然对我有用...
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
.
2017-12-23 15:27:59.997 INFO 44079 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
...
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
...
2017-12-23 15:28:00.071 INFO 44079 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
【讨论】:
对我仍然有效 - 请参阅我的编辑。JsonSerializer
默认添加标头,因此看起来您的 docker 映像中的代理绝对小于 0.11。您可以通过将生产者属性JsonSerializer.ADD_TYPE_INFO_HEADERS
设置为false
来确认。该属性阻止它在标题中添加类型信息。
试试我的docker logs <containerId> > server.log
,我看到了这个:[2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
。
奇怪 - 更奇怪的是,如果我从 SK 2.1.0 和 JsonSerializer
与 0.10.2.0 服务器交谈,我会在 client 上得到异常我希望Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
。正如我在对您的问题的原始评论中所说,老经纪人对标头一无所知,所以我看不出如何在服务器上出现此错误,除非客户端是旧的。您是否在 AWS 上运行您的客户端?我之前听说过 AWS 上运行旧的 kafka-clients
的代码有些奇怪,即使应用程序是用正确的打包的。
所以.. 另一个更新。我认为问题既不是代理也不是 Spring 应用程序。我使用控制台使用者与 Spring 应用程序并行进行调试(基于 this tutorial )。我很确定在使用带有参数 zookeper 而不是 bootstrap-server 的“旧”消费者 (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK
) 时会出现问题。我发现有趣的是,这个消费者在(Spring)生产者中导致了一个 UnknownServerException。
尝试使用--bootstrap-server
选项而不是--zookeeper
;在这种情况下,console-consumer
将使用新的消费者(但无论如何,您都看不到控制台消费者的标题)。但我同意消费者可以影响生产者这很奇怪。也许有一些逻辑说“我们有一个老消费者附加到这个主题,所以你不能发送标题”。【参考方案4】:
您使用的是 kafka 版本
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
为您的生产者工厂属性。
如果您使用的是 kafka 版本 > 0.10.x.x,它应该可以正常工作
【讨论】:
以上是关于Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)的主要内容,如果未能解决你的问题,请参考以下文章
spring boot kafka 通用 JSON 模板发送方