如何在 kafka 中创建自定义序列化程序?

Posted

技术标签:

【中文标题】如何在 kafka 中创建自定义序列化程序?【英文标题】:How to create Custom serializer in kafka? 【发布时间】:2017-03-02 10:24:06 【问题描述】:

只有少数可用的序列化程序,例如,

org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?

【问题讨论】:

【参考方案1】:

这里有一个示例,您可以将自己的序列化器/反序列化器用于 Kafka 消息值。对于 Kafka 消息密钥是一样的。

我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,然后在消费者端将其再次反序列化为 MyMessage 对象。

在生产者端序列化 MyMessage。

您应该创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序类

serialize() 方法完成这项工作,接收您的对象并将序列化版本作为字节数组返回。

public class MyValueSerializer implements Serializer<MyMessage>

    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    
        this.isKey = isKey;
    

    @Override
    public byte[] serialize(String topic, MyMessage message)
    
        if (message == null) 
            return null;
        

        try 

            (serialize your MyMessage object into bytes)

            return bytes;

         catch (IOException | RuntimeException e) 
            throw new SerializationException("Error serializing value", e);
        
    

    @Override
    public void close()
    

    


final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在消费者端反序列化 MyMessage。

您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer 的反序列化器类

deserialize() 方法完成工作,接收序列化值作为字节数组并返回您的对象。

public class MyValueDeserializer implements Deserializer<MyMessage>

    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    
        this.isKey = isKey;
    

    @Override
    public MyMessage deserialize(String s, byte[] value)
    
        if (value == null) 
            return null;
        

        try 

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

         catch (IOException | RuntimeException e) 
            throw new SerializationException("Error deserializing value", e);
        
    

    @Override
    public void close()
    

    

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) 

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...

【讨论】:

final KafkaConsumer consumer = new KafkaConsumer(props, keyDeserializer, myValueDeserializer); 上面说的不是语法,那kafka怎么知道反序列化器 Deserializer 是构造函数的第三个参数:myValueDeserializer。所有这些代码都取自工作代码,只是更改了一些名称。 为什么在configure()中保存“isKey”?你能解释一下 configure() 和 close() 什么时候不应该是空方法吗? @user1879313 对于这段代码,没有理由,但是例如 Confluent Serializers 使用布尔字段在 Schema Registry 客户端上执行不同的逻辑,随后在 close( ) 方法。【参考方案2】:

没有文字,只有代码

    您发送给 Kafka 的一些对象

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class TestDto 
    
        private String name;
        private String version;
    
    
    

    创建序列化器,生产者将使用该序列化器

    @Slf4j
    public class KafkaValueSerializer implements Serializer<TestDto> 
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) 
        
    
        @Override
        public byte[] serialize(String topic, TestDto data) 
            try 
                return objectMapper.writeValueAsBytes(data);
             catch (JsonProcessingException e) 
                log.error("Unable to serialize object ", data, e);
                return null;
            
        
    
        @Override
        public void close() 
        
    
    

    当然,不要忘记 Deserializer for Consumer

    @Slf4j
    public class KafkaValueDeserializer implements Deserializer<TestDto> 
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) 
        
    
        @Override
        public TestDto deserialize(String topic, byte[] data) 
            try 
                return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
             catch (Exception e) 
                log.error("Unable to deserialize message ", data, e);
                return null;
            
        
    
        @Override
        public void close() 
        
    
    

    最后一刻,将序列化器/反序列化器添加到 application.yml

    spring:
        kafka:
          bootstrap-servers:  192.168.192.168:9092
          producer:
              value-serializer: com.package.service.kafka.KafkaValueSerializer
          consumer:
              group-id: groupId
              value-deserializer: com.package.service.kafka.KafkaValueDeserializer
    

就是这样。不需要任何配置文件或手鼓跳舞:)

    发送

    KafkaTemplate<String, TestDto> kafkaTemplate;
    
    TestDto test = new TestDto("test name", "test-version");
    kafkaTemplate.send(topic, testDto);
    

    @KafkaListener(topics = "$ktp-agent.kafka.request-topic", groupId = "$spring.kafka.consumer.group-id")
    public void listen(TestDto message) 
    
        log.info("Received message '' from Kafka.", message.toString());
    
    

【讨论】:

Spring Kafka 已经有一个 JSON 序列化器和反序列化器,所以可能可以从这个答案中删除它们 是否可以通过 kafka 发送 Map 对象? 可以将Map转换成Json,发送json作为例子 有一个问题:如果我们没有传入消息对象结构怎么办?那么我们如何在这种情况下编写反序列化器。 ? 如果您不知道消息的外观,则无法反序列化。【参考方案3】:

您必须创建自己的序列化程序来实现接口Serializer (org.apache.kafka.common.serialization.Serializer),然后将生产者选项key.serializer / value.serializer 设置为它。

【讨论】:

以上是关于如何在 kafka 中创建自定义序列化程序?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Django 中创建自定义 403 页面?

如何在 Microsoft Azure IoT 中创建自定义的流数据仪表板?

如何在 JSF 2.0 中创建自定义 404 消息?

如何在 Blend 中创建自定义列表

如何在 React Native 中创建自定义顶部导航栏

调用 window.open 时如何在 WPF/WebView2 应用程序中创建自定义窗口?