Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)

Posted

技术标签:

【中文标题】Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)【英文标题】:Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers) 【发布时间】:2018-06-05 20:08:23 【问题描述】:

我正在使用这个 docker-compose 设置在本地设置 Kafka:https://github.com/wurstmeister/kafka-docker/

docker-compose up 工作正常,通过 shell 创建主题工作正常。

现在我尝试通过spring-kafka:2.1.0.RELEASE 连接到 Kafka

当启动 Spring 应用程序时,它会打印正确版本的 Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

我尝试发送这样的消息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

在客户端发送失败

UnknownServerException: The server experienced an unexpected error when processing the request

在服务器控制台中,我收到消息 Magic v1 不支持记录标头

Error when handling request replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[topic=test-topic,partitions=[partition=0,fetch_offset=39,max_bytes=1048576]] (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

谷歌搜索提示版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0 在类路径中)。

有什么线索吗?谢谢!

编辑: 我缩小了问题的根源。发送纯字符串有效,但通过 JsonSerializer 发送 Json 会导致给定问题。这是我的生产者配置的内容:

@Value("\$kafka.bootstrap-servers")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply 
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

【问题讨论】:

这没有意义; (在服务器端获取该消息)。如果客户端版本较旧,它不会发送任何标头,所以一切都应该很好(我已经使用带有 0.10 代理的 1.0.0 客户端进行了测试,只要您不尝试发送标头,它就可以工作)。对于 1.0.0 客户端,当模板不发送任何标头时(由客户端)发送“空”RecordHeaders 映像名称似乎没有指定版本,因此您可能使用的是较旧的缓存 docker 映像和较新的客户端。向 0.10 代理发送标头的 1.0 客户端会收到此错误。尝试检查 docker 镜像版本和 docker pull 最新的 1.0 代理镜像。 将 Kafka 版本更新到最新版本后,我没有收到“Magic v1 不支持记录标头”异常,并且代码运行良好。 【参考方案1】:

我遇到了类似的问题。如果我们使用JsonSerializerJsonSerde 作为值,Kafka 默认会添加标题。 为了防止这个问题,我们需要禁用添加信息头。

如果你对默认的 json 序列化没问题,那么使用下面的(这里的重点是ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

但是如果您需要带有特定ObjectMapper 的自定义JsonSerializer(例如PropertyNamingStrategy.SNAKE_CASE),则应禁用在JsonSerializer 上显式添加信息标头,因为spring kafka 会忽略DefaultKafkaProducerFactory 的属性ADD_TYPE_INFO_HEADERS (对我来说这是spring kafka的糟糕设计)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

或者如果我们使用JsonSerde,那么:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);

【讨论】:

添加这一行解决了我的问题props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); 如果我确实需要类型标头怎么办?我该如何克服这个问题?【参考方案2】:

解决了。问题既不是代理,也不是一些 docker 缓存,也不是 Spring 应用程序。

问题是我并行使用的控制台使用者进行调试。这是一个以kafka-console-consumer.sh --topic=topic --zookeeper=... 开头的“旧”消费者

它实际上在启动时会打印一个警告:Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

应该使用带有--bootstrap-server 选项的“新”消费者(尤其是在使用带有 JsonSerializer 的 Kafka 1.0 时)。 注意:这里使用老消费者确实会影响生产者。

【讨论】:

我没有并行运行任何消费者,但这个问题仍然存在。我也尝试添加 props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);但它仍然给出了同样的错误,magicv1 不支持添加记录头。你能在这里帮忙吗【参考方案3】:

我刚刚对那个 docker 映像进行了测试,没有任何问题...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application 

    public static void main(String[] args) 
        SpringApplication.run(So47953901Application.class, args);
    

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) 
        return args -> template.send("foo", "bar", "baz");
    

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) 
        System.out.println(in);
    


.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

编辑

仍然对我有用...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

【讨论】:

对我仍然有效 - 请参阅我的编辑。 JsonSerializer 默认添加标头,因此看起来您的 docker 映像中的代理绝对小于 0.11。您可以通过将生产者属性JsonSerializer.ADD_TYPE_INFO_HEADERS 设置为false 来确认。该属性阻止它在标题中添加类型信息。 试试我的docker logs &lt;containerId&gt; &gt; server.log,我看到了这个:[2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser) 奇怪 - 更奇怪的是,如果我从 SK 2.1.0 和 JsonSerializer 与 0.10.2.0 服务器交谈,我会在 client 上得到异常我希望Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers。正如我在对您的问题的原始评论中所说,老经纪人对标头一无所知,所以我看不出如何在服务器上出现此错误,除非客户端是旧的。您是否在 AWS 上运行您的客户端?我之前听说过 AWS 上运行旧的 kafka-clients 的代码有些奇怪,即使应用程序是用正确的打包的。 所以.. 另一个更新。我认为问题既不是代理也不是 Spring 应用程序。我使用控制台使用者与 Spring 应用程序并行进行调试(基于 this tutorial )。我很确定在使用带有参数 zookeper 而不是 bootstrap-server 的“旧”消费者 (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK) 时会出现问题。我发现有趣的是,这个消费者在(Spring)生产者中导致了一个 UnknownServerException。 尝试使用--bootstrap-server 选项而不是--zookeeper;在这种情况下,console-consumer 将使用新的消费者(但无论如何,您都看不到控制台消费者的标题)。但我同意消费者可以影响生产者这很奇怪。也许有一些逻辑说“我们有一个老消费者附加到这个主题,所以你不能发送标题”。【参考方案4】:

您使用的是 kafka 版本

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

为您的生产者工厂属性。

如果您使用的是 kafka 版本 > 0.10.x.x,它应该可以正常工作

【讨论】:

以上是关于Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)的主要内容,如果未能解决你的问题,请参考以下文章

spring kafka producer 生产者

spring boot kafka 通用 JSON 模板发送方

kafka producer batch 发送消息

Kafka生产者producer简要总结

Spring Boot 和 Kafka,Producer 抛出异常,key='null'

kafka发送消息与消费消息