深入浅出Spring原理及实战「开发实战系列」采用protostuff和kryo高性能序列化框架实现RedisTemplate的序列化组件

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出Spring原理及实战「开发实战系列」采用protostuff和kryo高性能序列化框架实现RedisTemplate的序列化组件相关的知识,希望对你有一定的参考价值。

序列化

  • 序列化可以简单理解为对象–>字节的过程,同理,反序列化则是相反的过程。为什么需要序列化?因为网络传输只认字节。所以互信的过程依赖于序列化。

  • 网络传输的性能等诸多因素,通常会支持多种序列化方式以供使用者插拔使用,一些常用的序列化方案hessian,kryo,Protostuff、FST等,其中最快、效果最好的要数Kryo和Protostuff

RedisConfiguration的配置

  1. 创建Redis连接工厂对象(RedisConnectionFactory)

  2. 创建RestTemplate对象根据RedisConnectionFactory对象。

  3. 配置相关的RedisSerializaer组件

@Configuration
public class RedisConfiguration 

    @Bean("redisConnectionFactory")
    public RedisConnectionFactory redisConnectionFactory(RedisConfigMapper mapper) 
        List<RedisConfig> redisConfigs = mapper.getRedisConfig();
        List<String> clusterNodes = new ArrayList<>();
        for (RedisConfig rc : redisConfigs) 
            clusterNodes.add(rc.getUrl() + ":" + rc.getPort());
        
        // 获取Redis集群配置信息
        RedisClusterConfiguration rcf = new RedisClusterConfiguration(clusterNodes);
        return new JedisConnectionFactory(rcf);
    

    @Bean("redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException 
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        // redis value使用的序列化器
        template.setValueSerializer(new XXXRedisSerializer<>());
        // redis key使用的序列化器
        template.setKeySerializer(new XXXRedisSerializer<>());
        template.setHashKeySerializer(new XXXRedisSerializer<>());
        template.setHashValueSerializer(new XXXRedisSerializer<>());
        template.afterPropertiesSet();
        return template;
    

Kryo序列化实现

Maven配置文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.esotericsoftware</groupId>
            <artifactId>kryo</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>0.41</version>
        </dependency>
		<dependency>
        	<groupId>com.esotericsoftware</groupId>
        	<artifactId>kryo-shaded</artifactId>
        	<version>4.0.1</version>
    	</dependency>
    </dependencies>

由于其底层依赖于ASM技术,与Spring等框架可能会发生ASM依赖的版本冲突(文档中表示这个冲突还挺容易出现)所以提供了另外一个依赖以供解决此问题:kryo-shaded

Kryo三种读写方式

如果知道class字节码,并且对象不为空

  kryo.writeObject(output, classObject);
  RestClass restClass = kryo.readObject(input, RestClass.class);

快速入门中的序列化/反序列化的方式便是这一种。而Kryo考虑到someObject可能为null,也会导致返回的结果为null,所以提供了第二套读写方式。

如果知道class字节码,并且对象可能为空

kryo.writeObjectOrNull(output, classObject);
RestClass someObject = kryo.readObjectOrNull(input, RestClass.class);

但这两种方法似乎都不能满足我们的需求,在RPC调用中,序列化和反序列化分布在不同的端点,对象的类型确定,我们不想依赖于手动指定参数,最好将字节码的信息直接存放到序列化结果中,在反序列化时自行读取字节码信息。Kryo考虑到了这一点,于是提供了第三种方式。如果实现类的字节码未知,并且对象可能为null。

  kryo.writeClassAndObject(output, object);
Object object = kryo.readClassAndObject(input);
if (object instanceof RestClass) 

我们牺牲了一些空间一些性能去存放字节码信息

支持的序列化类型

上面表格中支持的类型一览无余,这都是其默认支持的。

Kryo kryo = new Kryo();
kryo.addDefaultSerializer(RestClass.class, RestSerializer.class);

这样的方式,也可以为一个Kryo实例扩展序列化器

Kryo支持类型:

  • 枚举
  • 集合、数组
  • 子类/多态
  • 循环引用
  • 内部类
  • 泛型

Kryo反序列化的异常问题

  • Kryo不支持Bean中增删字段,如果使用Kryo序列化了一个类,存入了Redis,对类进行了修改,会导致反序列化的异常。

  • 另外需要注意的一点是使用反射创建的一些类序列化的支持。如使用Arrays.asList();创建的List对象,会引起序列化异常。

  • 不支持包含无参构造器类的反序列化,尝试反序列化一个不包含无参构造器的类将会得到以下的异常:

  • 保证每个类具有无参构造器是应当遵守的编程规范,但实际开发中一些第三库的相关类不包含无参构造,的确是有点麻烦。

Kryo是线程不安全的

借助ThreadLocal来维护以保证其线程安全。

private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() 
    protected Kryo initialValue() 
        Kryo kryo = new Kryo();
        // configure kryo instance, customize settings
        return kryo;
    ;
;

// Somewhere else, use Kryo
Kryo k = kryos.get();
...
Kryo相关配置参数

每个Kryo实例都可以拥有两个配置参数。

  • kryo.setRegistrationRequired(false);//关闭注册行为

Kryo支持对注册行为,如kryo.register(SomeClazz.class),这会赋予该Class一个从0开始的编号,但Kryo使用注册行为最大的问题在于,其不保证同一个Class每一次注册的号码相同,这与注册的顺序有关,也就意味着在不同的机器、同一个机器重启前后都有可能拥有不同的编号,这会导致序列化产生问题,所以在分布式项目中,一般关闭注册行为。

  • kryo.setReferences(true);//支持循环引用

循环引用,Kryo为了追求高性能,可以关闭循环引用的支持。不过我并不认为关闭它是一件好的选择,大多数情况下,请保持kryo.setReferences(true)。

常用Kryo工具类

public class KryoSerializer 

    public byte[] serialize(Object obj) 
        Kryo kryo = kryoLocal.get();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);//<1>
        kryo.writeClassAndObject(output, obj);//<2>
        output.close();
        return byteArrayOutputStream.toByteArray();
    

    public <T> T deserialize(byte[] bytes) 
        Kryo kryo = kryoLocal.get();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);// <1>
        input.close();
        return (T) kryo.readClassAndObject(input);//<2>
    

    private static final ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() //<3>
        @Override
        protected Kryo initialValue() 
            Kryo kryo = new Kryo();
            kryo.setReferences(true);//默认值为true,强调作用
            kryo.setRegistrationRequired(false);//默认值为false,强调作用
            return kryo;
        
    ;

Kryo 的开发实现要点分析

  1. Kryo的Input和Output接收一个InputStream和OutputStream,Kryo通常完成字节数组和对象的转换,所以常用的输入输出流实现为ByteArrayInputStream/ByteArrayOutputStream。

  2. writeClassAndObject和readClassAndObject配对使用在分布式场景下是最常见的,序列化时将字节码存入序列化结果中,便可以在反序列化时不必要传入字节码信息。

  3. 使用ThreadLocal维护Kryo实例,这样减少了每次使用都实例化一次Kryo的开销又可以保证其线程安全。

KryoRedisSerializer

数据交换或数据持久化,比如使用kryo把对象序列化成字节数组发送给消息队列或者放到redis等等应用场景。

public class KryoRedisSerializer<T> implements RedisSerializer<T> 
    private static final String DEFAULT_ENCODING = "UTF-8";
    //每个线程的 Kryo 实例
    private static final ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() 
        @Override
        protected Kryo initialValue() 
            Kryo kryo = new Kryo();
            /**
             * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,
             * 上线的同时就必须清除 Redis 里的所有缓存,
             * 否则那些缓存再回来反序列化的时候,就会报错
             */
            //支持对象循环引用(否则会栈溢出)
            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置
            //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置
            //Fix the NPE bug when deserializing Collections.
            ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
                    .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
            return kryo;
        
    ;
    /**
     * 获得当前线程的 Kryo 实例
     *
     * @return 当前线程的 Kryo 实例
     */
    public static Kryo getInstance() 
        return kryoLocal.get();
    

     @Override
    public byte[] serialize(T t) throws SerializationException 
        byte[] buffer = new byte[2048];
        Output output = new Output(buffer);
        getInstance().writeClassAndObject(output, t);
        return output.toBytes();
    

    @Override
    public T deserialize(byte[] bytes) throws SerializationException 
        Input input = new Input(bytes);
        @SuppressWarnings("unchecked")
        T t = (T) getInstance().readClassAndObject(input);
        return t;
    


  • 为了提升性能,最好每个类都要提前注册 kryo.register (class),为什么?我不注册也能用。

  • 注册时间序列化的Serializer的两种方式

kryo.register(OffsetDateTime.class, new MyOffsetDateTimeSerializer());
kryo.addDefaultSerializer(OffsetDateTime.class, MyOffsetDateTimeSerializer.class);
  • 注册对象模型的两种方式
kryo.writeObject(output, t);
kryo.writeClassAndObject(output, t);

KryoUtils

public class KryoUtils 
    private static Logger LOGGER = LoggerFactory.getLogger(KryoUtils.class);
    private static ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() 
        @Override
        protected Kryo initialValue() 
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(OffsetDateTime.class, KryoOffsetDateTimeSerializer.class);
            return kryo;
        
    ;

    @SuppressWarnings(value = "unchecked")
    public static <T> T deepClone(T t) 
        Kryo kryo = kryoLocal.get();
        try (Output output = new Output(new ByteArrayOutputStream())) 
            kryo.writeObject(output, t);
            try (Input input = new Input(output.getBuffer())) 
                return kryo.readObject(input, (Class<T>) t.getClass());
            
         catch (Exception e) 
            LOGGER.error("深度克隆失败", e);
        
        return null;
    

protostuff序列化实现

Maven配置文件

<!-- 序列化 -->
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.1.3</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.1.3</version>
</dependency>

定义一个ProtoStuffUtil工具类

@Slf4j
public class ProtoStuffUtil 
    /**
     * 序列化对象
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serialize(T obj) 
        if (obj == null) 
            log.error("Failed to serializer, obj is null");
            throw new RuntimeException("Failed to serializer");
        
 
        @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);
        byte[] protoStuff;
        try 
            protoStuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
         catch (Exception e) 
            log.error("Failed to serializer, obj:", obj, e);
            throw new RuntimeException("Failed to serializer");
         finally 
            buffer.clear();
        
        return protoStuff;
    
 
    /**
     * 反序列化对象
     *
     * @param paramArrayOfByte
     * @param targetClass
     * @return
     */
    public static <T> T deserialize(byte[] paramArrayOfByte, Class<T> targetClass) 
        if (paramArrayOfByte == null || paramArrayOfByte.length == 0) 
            log.error("Failed to deserialize, byte is empty");
            throw new RuntimeException("Failed to deserialize");
        
 
        T instance;
        try 
            instance = targetClass.newInstance();
         catch (InstantiationException | IllegalAccessException e) 
            log.error("Failed to deserialize", e);
            throw new RuntimeException("Failed to deserialize");
        
 
        Schema<T> schema = RuntimeSchema.getSchema(targetClass);
        深入浅出Spring原理及实战「开发实战系列」手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule

深入浅出Spring原理及实战「开发实战系列」SpringSecurity原理以及实战认证分析开发指南

深入浅出Spring原理及实战「开发实战系列」Spring-Cache扩展自定义(注解失效时间+主动刷新缓存)

深入浅出Spring原理及实战「开发实战系列」SpringSecurity技术实战之通过注解表达式控制方法权限

深入浅出Spring原理及实战「开发实战系列」OAuth2的技术体系架构和开发概览

深入浅出Spring原理及实战「开发实战系列」重新回顾一下异常重试框架Spring Retry的功能指南和实战