编写自定义 Kafka 序列化器

Posted

技术标签:

【中文标题】编写自定义 Kafka 序列化器【英文标题】:Writing Custom Kafka Serializer 【发布时间】:2014-07-08 11:44:45 【问题描述】:

我在 Kafka 消息中使用我自己的类,它有一堆字符串数据类型。

因此,我不能使用默认的序列化程序类或 Kafka 库附带的 StringSerializer

我想我需要编写自己的序列化程序并将其提供给生产者属性?

【问题讨论】:

其实是自定义Encoder to kafkaMessage 是否可以在 i7 上运行 Kafka 应用程序? 【参考方案1】:

编辑

在较新的 Kafka 客户端中,实现 Serializer 而不是 Encoder


编写自定义序列化程序所需的东西是:

    使用为泛型指定的对象实现Encoder 需要提供VerifiableProperties 构造函数 重写 toBytes(...) 方法确保返回一个字节数组 将序列化程序类注入ProducerConfig

为生产者声明自定义序列化程序

正如您在问题中提到的,Kafka 提供了一种为生产者声明特定序列化程序的方法。序列化程序类设置在 ProducerConfig 实例中,该实例用于构造所需的 Producer 类。

如果您关注Kafka's Producer Example,您将通过Properties 对象构造ProducerConfig。在构建属性文件时,请务必包含:

props.put("serializer.class", "path.to.your.CustomSerializer");

包含您希望 Kafka 在将消息附加到日志之前用来序列化消息的类的路径。

创建 Kafka 理解的自定义序列化程序

编写 Kafka 可以正确解释的自定义序列化程序需要实现 Kafka 提供的 Encoder[T] scala 类。 Implementing traits in java is weird,但以下方法在我的项目中用于序列化 JSON:

public class JsonEncoder implements Encoder<Object> 
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) 
        /* This constructor must be present for successful compile. */
    

    @Override
    public byte[] toBytes(Object object) 
        try 
            return objectMapper.writeValueAsString(object).getBytes();
         catch (JsonProcessingException e) 
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        
        return "".getBytes();
    

您的问题听起来好像您正在使用一个对象(我们称之为CustomMessage)来处理附加到日志中的所有消息。如果是这种情况,您的序列化程序可能看起来更像这样:

package com.project.serializer;
    
public class CustomMessageEncoder implements Encoder<CustomMessage> 
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) 
        /* This constructor must be present for successful compile. */
    

    @Override
    public byte[] toBytes(CustomMessage customMessage) 
        return customMessage.toBytes();
    

这将使您的属性配置看起来像这样:

props.put("serializer.class", "path.to.your.CustomSerializer");

【讨论】:

感谢 Sam B。非常有帮助。 既然我们已经序列化了对象,你如何反序列化 kafka 消费者中的字节数组? 一个建议:由于ObjectMapper 的构造是一个重量级的操作,并且由于它们在构造后是线程安全的,因此为编码器/解码器创建一个静态的最终实例是有意义的。否则构建时间将比实际读取/写入时间长 10 倍。 @StaxMan 好点。我确信对于大多数应用程序来说,最好的方法是注入 ObjectMapper,但就像你说的那样,对于这个例子来说,显示单个实例化是一种改进。 @SamB。是的,在许多情况下注入一个正确配置的实例是有意义的。只想提一下这一点,因为性能影响很重要,而且代码经常被逐字剪切和粘贴【参考方案2】:

您需要同时实现编码和解码器

public class JsonEncoder implements Encoder<Object> 
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) 
            /* This constructor must be present for successful compile. */
        

        @Override
        public byte[] toBytes(Object object) 
            ObjectMapper objectMapper = new ObjectMapper();
            try 
                return objectMapper.writeValueAsString(object).getBytes();
             catch (JsonProcessingException e) 
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            
            return "".getBytes();
        
    

解码器代码

public class JsonDecoder  implements Decoder<Object> 
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) 
        /* This constructor must be present for successful compile. */
    

    @Override
    public Object fromBytes(byte[] bytes) 
        ObjectMapper objectMapper = new ObjectMapper();
        try 
            return objectMapper.readValue(bytes, Map.class);
         catch (IOException e) 
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        
        return null;
    

pom 入口

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

在 Kafka 属性中设置默认编码器

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

写入器和读取器代码如下

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());

【讨论】:

如何创建编码器实例? VerifiableProperties 可以为空吗?【参考方案3】:

因此,我不能使用默认的序列化程序类或 Kafka 库附带的 StringSerializer。

当然可以。

例如,使用 Jackson 或 Gson ;将您的实例转换为 JSON 字符串 or (preferrably) binary bytearray,然后使用内置的 Kafka 序列化程序之一。

其他选项

推荐

为您的课程使用 Avro 或 Protobuf 序列化程序的 Confluent 版本以及 Schema Registry。


如果您将课程写入ObjectOutputStream,也可以只使用ByteArraySerializer(但是,由于不支持跨语言,因此不建议这样做)。

【讨论】:

以上是关于编写自定义 Kafka 序列化器的主要内容,如果未能解决你的问题,请参考以下文章

LocalDateTime 的自定义 spring-kafka 反序列化器

如何在 kafka 中创建自定义序列化程序?

Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器

kafka之消息生成者基本知识

编写半自定义反序列化器

如何为计时时间戳使用自定义 serde 反序列化器?