kafka2.5.0自定义数据序列化类

Posted zhuwenjoyce

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka2.5.0自定义数据序列化类相关的知识,希望对你有一定的参考价值。

kafka只接收bytes字节数组,所以自定义序列化器内部实现需按照bytes字节数组转换为标准。

重点:本例子只是提供参考怎样写自定义序列化器,因为关系到性能,一般默认使用StringSerializer即可,效率很高。

小知识:Kafka支持Avro序列化器,比较适用于生产者和消费者在版本升级差距拉大时使用,但同时要注意性能。参考文章《使用kafka中提供的Avro序列化框架实现序列化

1) 自定义序列化类,转换成bytes字节数组:

import cn.enjoyedu.vo.DemoUser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @author King老师*/
public class MySerializer implements Serializer<DemoUser> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public byte[] serialize(String topic, DemoUser data) {
        try {
            byte[] name;
            int nameSize;
            if(data==null){
                return null;
            }
            if(data.getName()!=null){
                name = data.getName().getBytes("UTF-8");
                //字符串的长度
                nameSize = data.getName().length();
            }else{
                name = new byte[0];
                nameSize = 0;
            }
            /*id的长度4个字节,字符串的长度描述4个字节,
            字符串本身的长度nameSize个字节*/
            ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error serialize DemoUser:"+e);
        }
    }

    public void close() {
        //do nothing
    }
}

2) 自定义反序列化类,从bytes字节数组转换成自定义对象:

import cn.enjoyedu.vo.DemoUser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @author King老师 
 */
public class MyDeserializer implements Deserializer<DemoUser> {


    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public DemoUser deserialize(String topic, byte[] data) {
        try {
            if(data==null){
                return null;
            }
            if(data.length<8){
                throw new SerializationException("Error data size.");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte,"UTF-8");
            return new DemoUser(id,name);
        } catch (Exception e) {
            throw new SerializationException("Error Deserializer DemoUser."+e);
        }

    }

    public void close() {
        //do nothing
    }
}

3) 配置序列化类

定义好自定义数据序列化类,需配置到kafka的配置里(参考《kafka2.5.0生产者与消费者配置详解》):

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class

ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class

end.

以上是关于kafka2.5.0自定义数据序列化类的主要内容,如果未能解决你的问题,请参考以下文章

kafka2.5.0分区再均衡监听器java例子(待续)

Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器

kafka2.5.0架构硬件选择参考因素

kafka2.5.0 主题Topic

kafka2.5.0基本命令

kafka2.5.0硬件集群架构图Topic主题与Partitions分区架构图