同步异步、阻塞非阻塞、Netty

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了同步异步、阻塞非阻塞、Netty相关的知识,希望对你有一定的参考价值。

参考技术A

老张爱喝茶,废话不说,煮开水。
出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,立等水开。(同步阻塞)
老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。
3 老张把响水壶放到火上,立等水开。(异步阻塞)
老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
老张觉得自己聪明了。

所谓同步异步,只是对于水壶而言。
普通水壶,同步;响水壶,异步。
虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。这是普通水壶所不能及的。
同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。
所谓阻塞非阻塞,仅仅对于老张而言。
立等的老张,阻塞;看电视的老张,非阻塞。
情况1和情况3中老张就是阻塞的,媳妇喊他都不知道。虽然3中响水壶是异步的,可对于立等的老张没有太大的意义。所以一般异步是配合非阻塞使用的,这样才能发挥异步的效用。

推荐:愚抄 银月游侠 https://www.zhihu.com/question/19732473

只要阻塞了,无论同步/异步,都会导致效率低下
IO多路复用(select,poll,epoll)是同步非阻塞(严格地来讲,是把阻塞点改变了位置)

BIO,同步阻塞IO
NIO,同步非阻塞IO (Netty,Netty是Reactor模式的实现)
异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

https://www.jianshu.com/p/b9f3f6a16911

BIO的缺点:

Netty之非阻塞处理

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

一、异步模型

同步I/O : 需要进程去真正的去操作I/O;

异步I/O:内核在I/O操作完成后再通知应用进程操作结果。

怎么去理解同步和异步?

  • 同步:比如服务端发送数据给客户端,客户端中的处理器(继承一个入站处理器即可),可以去重写 channelRead0 方法,那么该方法触发的时候,其实必须得服务器有消息发过来,客户端才能去读写,两者必须是有先后顺序,这就是所谓的同步
  • 异步:客户端在服务端发送数据来之前就已经返回数据给了用户,但客户端已经告诉服务端数据到了要通过订阅的方式(大名鼎鼎的观察者模式),文章最后已经附上传送门,理解设计模式

比如上一篇关于NettyAttributeKeyAttributeMap的原理和使用,这里不妨讲讲它的缺点

二、异步模型存在的问题

使用流程

创建一个处理器 NettyClientHandler 继承 SimpleChannelInboundHandler<RpcResponse>,它已经实现了 入站处理器相关的功能,只要重写它的 channelRead0 方法即可

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception 
        try 
            AttributeKey<RpcResponse> key = AttributeKey.valueOf(msg.getRequestId());
            ctx.channel().attr(key).set(msg);
            ctx.channel().close();
         finally 
            ReferenceCountUtil.release(msg);
        
    

记得将该 处理器 加入到 客户端 bootStraphandler()方法中,需要 通过默认的 初始化器new ChannelInitializer&lt;SocketChannel&gt;()(也是一个处理器)去初始化处理器链,我是通过匿名内部类去重写 initChannel 方法的,最后addLast() 刚刚自己写的处理器即可。

创建服务器和客户端,这里不再赘述,这篇文章对刚入门的帮助不大,可到文章最后取经拿服务端和客户端。

最后测试到,客户端拿不到值,总是为null

那怎么保持使用异步操作,并且可以顺利拿到值呢?

那么就得通过future来实现,就是先返回值,但值还是没有的,后面让用户自己用future的方法get阻塞拿值,说白了,还是要去同步,只是同步由CPU转到了用户自己手中,慢慢品

三、使用CompletableFuture 解决异步问题

CompletableFuture 使用方法

CompletableFuture<RpcResponse> resultFuture = new CompletableFuture<>();
/**complete 执行结束后,状态发生改变,则 说明 值已经传到了,complete 是 (被观察者)
通知类的通知方法,通知 观察者 ,get 方法将 不再阻塞,可以获取到值
*/
resultFuture .complete(msg);
/**获取 正确结果,get 是阻塞操作,所以 先把 resultFuture 作为 返回值 返回,再 get 
获取值
*/
RpcResponse rpcResponse = resultFuture.get();
// 获取 错误结果, 抛 异常 处理
resultFuture.completeExceptionally(future.cause());

所以我们要做的就是在channelRead0()中 做 complete(),最后 用户直接 get得到数据即可,只要把sendRequest()方法的返回类型改为CompletableFuture 就可以了。

简单来说就是通过使用这个CompletableFuture,让 response不至于返回后是null,因为我们自己new了一个CompletableFuture类,这个类会被通知,并把结果告知给它

需要注意的是,在 客户端的sendRequest()方法拿到的 CompletableFuture&lt;RpcResponse&gt; 和在channelRead0()拿到的必须为同一个,可以设计成单例模式,这里是很泛化的单例,通用

public class SingleFactory 

    private static Map<Class, Object> objectMap = new HashMap<>();

    private SingleFactory() 

    /**
     * 使用 双重 校验锁 实现 单例模式
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getInstance(Class<T> clazz) 
        Object instance = objectMap.get(clazz);
        if (instance == null) 
            synchronized (clazz) 
                if (instance == null) 
                    try 
                        instance = clazz.newInstance();
                     catch (InstantiationException | IllegalAccessException e) 
                        throw new RuntimeException(e.getMessage(), e);
                    
                
            
        
        return clazz.cast(instance);
    


下面这样实现是因为涉及到多个客户端并发访问同一个服务器,设计的原因如下:

  • 如果是同一个客户端要采用发起多个线程去请求服务端,设计时如果多个线程的rpcRequest请求id一样,那么要考虑线程安全
  • 如果是不同客户端发起请求服务端,又要保证线程之间对CompleteFuture是线程安全的,确保性能,不能用让所有线程共享同一个 CompleteFuture,这样通知会变为不定向,不可用,因此考虑使用map暂时缓存所有CompleteFuture,更加高效
public class UnprocessedRequests 

   /**
    * k - request id
    * v - 可将来获取 的 response
    */
   private static ConcurrentMap<String, CompletableFuture<RpcResponse>> unprocessedResponseFutures = new ConcurrentHashMap<>();

   /**
    * @param requestId 请求体的 requestId 字段
    * @param future 经过 CompletableFuture 包装过的 响应体
    */
   public void put(String requestId, CompletableFuture<RpcResponse> future) 
      System.out.println("put" + future);
      unprocessedResponseFutures.put(requestId, future);
   

   /**
    * 移除 CompletableFuture<RpcResponse>
    * @param requestId 请求体的 requestId 字段
    */
   public void remove(String requestId) 
      unprocessedResponseFutures.remove(requestId);
   

   public void complete(RpcResponse rpcResponse) 
      CompletableFuture<RpcResponse> completableFuture = unprocessedResponseFutures.remove(rpcResponse.getRequestId());
      completableFuture.complete(rpcResponse);
      System.out.println("remove" + completableFuture);
   

传送门:

设计模式:https://gitee.com/fyphome/git-res/tree/master/design-patterns
或者:https://github.com/Fyupeng/java/tree/main/design_patterns
服务端和客户端的实现:https://github.com/Fyupeng/java/tree/main/NettyPro/src/main/java/com/fyp/netty/groupchat

四、结束语

评论区可留言,可私信,可互相交流学习,共同进步,欢迎各位给出意见或评价,本人致力于做到优质文章,希望能有幸拜读各位的建议!
个人博客园:https://www.cnblogs.com/fyphome/

以上是关于同步异步、阻塞非阻塞、Netty的主要内容,如果未能解决你的问题,请参考以下文章

聊聊Netty那些事儿之从内核角度看IO模型

Netty之非阻塞处理

NIO高级编程与Netty入门

netty为什么非阻塞

IO阻塞非阻塞同步异步同步阻塞同步非阻塞异步阻塞异步非阻塞

同步阻塞同步非阻塞异步阻塞异步非阻塞--简明介绍