Netty对象重用:Recycler源码分析

Posted 程序员小潘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty对象重用:Recycler源码分析相关的知识,希望对你有一定的参考价值。

Netty 作为一个高性能的网络 IO 框架,在代码层面做了大量的优化,为了减轻 GC 的压力,尽可能的使对象可以被重用,避免频繁的创建和销毁。

Recycler抽象类是 Netty 实现的,基于线程本地变量 Stack 实现的一个轻量级的对象重用池。调用get()方法时优先从对象池中获取可重用的对象,当池中没有对象可用时会自动触发newObject()创建新对象。io.netty.util.internal.ObjectPool.Handle.recycle()方法可以帮助对象进行回收,默认的实现类是DefaultHandle,回收时它会将对象push到线程绑定的 Stack 中。

1. 用法示例

如下是一个使用示例,完成对Person的回收与重用:

public class RecyclerDemo {

 static class Person {
  private ObjectPool.Handle<Person> handle;

  public Person(ObjectPool.Handle<Person> handle) {
   this.handle = handle;
  }

  public void recycle() {
   handle.recycle(this);
  }
 }

 public static void main(String[] args) {
  ObjectPool<Person> pool = ObjectPool.newPool(new ObjectPool.ObjectCreator<Person>() {
   @Override
   public Person newObject(ObjectPool.Handle<Person> handle) {
    return new Person(handle);
   }
  });

  Person person = pool.get();
  person.recycle();
 }
}


2. 源码分析

2.1 ObjectPool

首先看ObjectPool类,它定义了一个轻量级的对象池,get()方法可以从池中获取一个对象:

// 轻量级的对象池
public abstract class ObjectPool<T{

    ObjectPool() { }

    // 从对象池中获取一个对象,对象可能通过 ObjectPool.ObjectCreator.newObject()创建
    public abstract T get();
}

内部接口Handle负责定义对象的回收:

// 对象回收的处理
public interface Handle<T{

    // 重用对象
    void recycle(T self);
}

内部接口ObjectCreator负责定义新对象的创建:

// 对象创建器
public interface ObjectCreator<T{

    // 基于Handle创建一个新对象,对象在可重用时会调用 handle.recycle()
    newObject(Handle<T> handle);
}

静态方法newPool可以快速创建一个对象池实例:

 public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
     return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
 }

Netty 提供了一个默认的实现类RecyclerObjectPool,它的实现也非常简单,主要还是依赖Recycler类,所以核心还是要把Recycler搞懂。

// 可重用的对象池
private static final class RecyclerObjectPool<Textends ObjectPool<T{
    private final Recycler<T> recycler;

    RecyclerObjectPool(final ObjectCreator<T> creator) {
        recycler = new Recycler<T>() {
            // 当池中没有可用对象时,调用此方法创建新对象
            @Override
            protected T newObject(Handle<T> handle) {
                return creator.newObject(handle);
            }
        };
    }

    @Override
    public T get() {
        return recycler.get();
    }
}

2.2 Recycler

Recycler 是对象回收的核心,先来看它的属性。它提供了大量的静态常量来定义一些默认值,这些值可以通过设置 JVM 参数调整,如下:

// 不需要回收对象的 Handle
@SuppressWarnings("rawtypes")
private static final Handle NOOP_HANDLE = new Handle() {
    @Override
    public void recycle(Object object) {
        // NOOP
    }
};
// ID生成器,WeakOrderQueue和recycleId需要用到
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
// 对象被回收时,会将recycleId设为OWN_THREAD_ID,代表被回收了
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
// 线程本地变量Stack的最大容量 4096
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024// Use 4k instances as default.
// 线程本地变量Stack默认的最大容量
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
// Stack初始容量,256~4096之间扩容
private static final int INITIAL_CAPACITY;
//最大可共享的容量因子,默认为2,非主线程可回收的容量,默认是一半
private static final int MAX_SHARED_CAPACITY_FACTOR;
// 最大延迟队列的数量 默认CPU核心数*2
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
// Link节点的elements数组长度
private static final int LINK_CAPACITY;
// 对象回收的频率,默认每8个回收一个,避免WeakOrderQueue增长的过快,降低对快速回收的敏感程度
private static final int RATIO;
private static final int DELAYED_QUEUE_RATIO;

实例常量这里就不贴代码了,含义同静态常量,只是做简单的赋值操作。

使用 Recycler 的线程,会在线程的本地变量里存放一个Stack栈来存放可重用的对象:

// 在线程的本地变量Stack中存放已回收的对象
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
    @Override
    protected Stack<T> initialValue() {
        return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                            interval, maxDelayedQueuesPerThread, delayedQueueInterval);
    }

    @Override
    protected void onRemoval(Stack<T> value) {
        if (value.threadRef.get() == Thread.currentThread()) {
            if (DELAYED_RECYCLED.isSet()) {
                DELAYED_RECYCLED.get().remove(value);
            }
        }
    }
};

2.2.1 get()获取对象

直接看核心流程吧,先看get()获取对象的流程,它首先会判断maxCapacityPerThread是否为 0,0 代表不回收对象,每次都创建新对象。否则从线程的本地变量中获取 Stack,调用pop()方法试图从栈中弹出一个可用对象,如果没有对象可用,则调用newObject()创建新对象。

// 获取一个对象,如果池中没有,则创建新对象
@SuppressWarnings("unchecked")
public final T get() {
    if (maxCapacityPerThread == 0) {
        // 线程本地变量Stack容量为0,代表不重用对象,NOOP_HANDLE不会回收
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    // 获取线程本地变量Stack
    Stack<T> stack = threadLocal.get();
    // 对象经过DefaultHandle包装,从栈中弹出一个
    DefaultHandle<T> handle = stack.pop();
    if (handle == null) {
        // 没有可用对象了,基于handle创建一个新对象
        handle = stack.newHandle();
        handle.value = newObject(handle);
    }
    return (T) handle.value;
}

stack.pop()方法非常重要,需要重点关注!!!

Stack 是 Recycler 的内部类,它使用数组实现了一个简单的栈结构,调用pop()时,它首先会判断当前栈中是否有可用元素,如果有则直接弹出,否则会调用scavenge()将 WeakOrderQueue 中的对象转移到栈中。

为啥还需要 WeakOrderQueue 队列呢???这是因为对象除了可以被主线程回收外,还可以被其他线程回收。当被主线程回收时,直接入栈,如果是非主线程回收,则将对象放入 Stack 绑定的 WeakOrderQueue 中,待pop()时再从 Queue 转移到 Stack。


直接看pop()代码:

DefaultHandle<T> pop() {
    // 可用的对象数量,recycle()时会增加
    int size = this.size;
    if (size == 0) {
        // 没有可用对象,尝试清理
        if (!scavenge()) {
            return null;
        }
        size = this.size;
        if (size <= 0) {
            // double check, avoid races
            return null;
        }
    }
    // 存在可重用的对象,直接从数组中取
    size--;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    this.size = size;

    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    // 对象取出后,重置recycleId和lastRecycledId
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    return ret;
}

Stack 有可用对象时处理很简单,只是简单的弹出。无可用对象时,尝试将其他线程帮忙回收的对象从 WeakOrderQueue 转移到 Stack。

// 尝试将WeakOrderQueue中的对象转移到Stack
private boolean scavenge() {
    // continue an existing scavenge, if any
    if (scavengeSome()) {
        return true;
    }

    // 没有对象可以转移,重置指针
    prev = null;
    cursor = head;
    return false;
}

scavengeSome()会遍历 WeakOrderQueue,将所有 Link 中的元素转移到 Stack。

WeakOrderQueue 是一个单向链表,通过一系列 Link 节点连接而成,每个 Link 节点内部存储了一个 elements 数组用来存储已回收的对象。

// 尝试将Queue中的对象转移到Stack
private boolean scavengeSome() {
    WeakOrderQueue prev;
    WeakOrderQueue cursor = this.cursor;
    if (cursor == null) {
        prev = null;
        cursor = head;
        if (cursor == null) {// 还没有回收过对象,无法清理
            return false;
        }
    } else {
        prev = this.prev;
    }

    boolean success = false;
    do {
        if (cursor.transfer(this)) {
            success = true;
            break;
        }
        // 单向链表结构,获取下一个节点
        WeakOrderQueue next = cursor.getNext();
        if (cursor.get() == null) {
            // 使用了弱引用,检测到Queue绑定的线程销毁了,因此对象转移后需要断开此节点
            if (cursor.hasFinalData()) {
                for (;;) {
                    if (cursor.transfer(this)) {
                        success = true;
                    } else {
                        break;
                    }
                }
            }

            if (prev != null) {
                cursor.reclaimAllSpaceAndUnlink();
                prev.setNext(next);
            }
        } else {
            prev = cursor;
        }

        cursor = next;

    } while (cursor != null && !success);

    this.prev = prev;
    this.cursor = cursor;
    return success;
}

io.netty.util.Recycler.WeakOrderQueue.transfer()方法会将 Link 节点中的对象转移到 Stack 中,readIndex代表转移的进度指针,达到LINK_CAPACITY说明说明节点转移完毕,会继续转移 next 节点,代码这里就不贴了,感兴趣的同学自己去看下。

2.2.2 recycle()回收对象

另一个核心主流程是handle.recycle(),它负责回收对象。对象的回收会转交给Handle处理,默认的实现是io.netty.util.Recycler.DefaultHandle。它首先会获取到 Stack 容器,做一些校验工作,然后将对象 push 到 Stack 中进行回收重用。

// 回收对象
@Override
public void recycle(Object object) {
    if (object != value) {// 判空
        throw new IllegalArgumentException("object does not belong to handle");
    }

    // 获取绑定的Stack
    Stack<?> stack = this.stack;
    if (lastRecycledId != recycleId || stack == null) {
        // 对象已经被回收
        throw new IllegalStateException("recycled already");
    }

    stack.push(this);
}

核心逻辑依然在stack.push()中,这里它会判断回收的线程是否是 Stack 绑定的主线程,如果是则直接入栈,否则放入 Stack 绑定的 WeakOrderQueue 中,等待主线程自己去转移。

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    if (threadRef.get() == currentThread) {
        // 回收线程是主线程,直接添加到数组
        pushNow(item);
    } else {
        /*
        回收线程非主线程,添加到WeakOrderQueue中,
        后续pop()时会将Queue中对象转移到Stack
        */

        pushLater(item, currentThread);
    }
}

先看简单的pushNow(),对于主线程回收,对象入栈即可。

// 主线程回收,对象入栈
private void pushNow(DefaultHandle<?> item) {
    // 避免重复回收,第一次回收时 recycleId=0,于是会设置lastRecycleId=OWN_THREAD_ID
    if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) {
        throw new IllegalStateException("recycled already");
    }
    // 对象回收将 recycleId设为OWN_THREAD_ID
    item.recycleId = OWN_THREAD_ID;

    int size = this.size;
    if (size >= maxCapacity || dropHandle(item)) {
        return;
    }
    if (size == elements.length) {
        // 扩容
        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
    }

    elements[size] = item;
    this.size = size + 1;
}

对于非主线程回收,会复杂一些,对象会被添加到 Stack 绑定的 WeakOrderQueue 中,如果 WeakOrderQueue 的数量达到上限,则不再回收对象。

// 非主线程回收对象
private void pushLater(DefaultHandle<?> item, Thread thread) {
    if (maxDelayedQueues == 0) {
        // 不支持线程之间的回收
        return;
    }

    // 获取Stack和Queue的映射关系
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    // 当前Stack绑定的Queue
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {// 如果Queue是null
        if (delayedRecycled.size() >= maxDelayedQueues) {
            // 延迟队列的数量已达上限,put一个占位符
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // 创建一个和Stack绑定的新Queue
        if ((queue = newWeakOrderQueue(thread)) == null) {
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // Queue的数量已达上限
        return;
    }

    // 添加到Queue中
    queue.add(item);
}

到这里对象回收的流程也就基本结束了。

3. 总结

Recycler 是 Netty 基于本地线程变量 Stack 实现的一套轻量级的重用对象池,它的目的是减轻 GC 的压力,避免对象的频繁创建和销毁,像 ByteBuf、Entry 等对象都使用它来进行回收和重用。

它利用 FastThreadLocal 在每个线程的本地变量中存放一个 Stack 栈结构来保存已回收的对象,哪个线程创建的对象只有该线程自己可以重用,因此 Stack 绑定了一个主线程。但是对象的回收可以是任意线程,如果是主线程回收则直接将对象入栈即可,对于非主线程的回收,Recycler 会将对象暂时放入 Stack 绑定的延迟队列 WeakOrderQueue 中,为了避免对象回收的速度过快导致 WeakOrderQueue 增长的过快,允许通过ratio属性来设置回收的频率。回收对象被放入 WeakOrderQueue 后,主线程在取对象时如果栈中无可用对象了,会尝试将 WeakOrderQueue 中的对象迁移到 Stack 中,这就是 Recycler 回收的大致流程总结。


以上是关于Netty对象重用:Recycler源码分析的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之内存池

netty源码之内存池

netty源码之内存池

netty源码之内存池

Netty源码分析 ----- write过程 源码分析

netty里的ByteBuf扩容源码分析