Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty相关的知识,希望对你有一定的参考价值。

鏍囩锛?a href='http://www.mamicode.com/so/1/%e4%bb%a3%e7%a0%81%e7%a4%ba%e4%be%8b' title='浠g爜绀轰緥'>浠g爜绀轰緥   rtti   oba   nios   ota   OLE   display   erro   

鍦ㄧ綉缁滃簲鐢ㄥ紑鍙戠殑杩囩▼涓紝鐩存帴浣跨敤JDK鎻愪緵鐨凬IO鐨凙PI锛屾瘮杈冪箒鐞愶紝鑰屼笖鎯宠杩涜鎬ц兘鎻愬崌锛岃繕闇€瑕佺粨鍚堝绾跨▼鎶€鏈€?/p>

鐢变簬缃戠粶缂栫▼鏈韩鐨勫鏉傛€э紝浠ュ強JDK API寮€鍙戠殑浣跨敤闅惧害杈冮珮锛屾墍浠ュ湪寮€婧愮ぞ鍖轰腑锛屾秾鐜板嚭鏉ヤ簡寰堝瀵笿DK NIO杩涜灏佽銆佸寮虹殑缃戠粶缂栫▼妗嗘灦锛屾瘮濡侼etty銆丮ina绛夈€?/p>

 

涓€銆丯etty绠€浠?/h2>

https://netty.io/ 瀹樼綉

Netty鏄竴涓珮鎬ц兘銆侀珮鍙墿灞曟€х殑寮傛浜嬩欢椹卞姩鐨勭綉缁滃簲鐢ㄧ▼搴忔鏋讹紝瀹冩瀬澶х畝鍖栦簡TCP鍜孶DP瀹㈡埛绔拰鏈嶅姟鍣ㄥ紑鍙戠瓑缃戠粶缂栫▼銆?/p>

Netty閲嶈鐨勫洓涓唴瀹癸細

  • Reactor绾跨▼妯″瀷锛氫竴绉嶉珮鎬ц兘鐨勫绾跨▼绋嬪簭璁捐鎬濊矾
  • Netty涓嚜宸卞畾涔夌殑Channel姒傚康锛氬寮虹増鐨勯€氶亾姒傚康
  • ChannelPipeline鑱岃矗閾捐璁℃ā寮忥細浜嬩欢澶勭悊鏈哄埗
  • 鍐呭瓨绠$悊锛氬寮虹殑ByteBuf缂撳啿鍖?/li>

鏁翠綋缁撴瀯鍥?/p>

鎶€鏈浘鐗? src=

 

浜屻€丯etty绾跨▼妯″瀷

 涓轰簡璁㎞IO澶勭悊鏇村ソ鐨勫埄鐢ㄥ绾跨▼鐗规€э紝Netty瀹炵幇浜哛eactor绾跨▼妯″瀷銆?/p>

Reactor妯″瀷涓湁鍥涗釜鏍稿績姒傚康锛?/p>

  • Resources璧勬簮锛堣姹?浠诲姟锛?/li>
  • Synchronous Event Demultiplexer鍚屾浜嬩欢澶嶇敤鍣?/li>
  • Dispatcher鍒嗛厤鍣?/li>
  • Request Handler璇锋眰澶勭悊鍣?/li>

鎶€鏈浘鐗? src=

 

 Netty鍚姩鏃朵細鏋勫缓澶氫釜Reactor

EventLoopGroup鍒濆鍖栬繃绋?/p>

 鎶€鏈浘鐗? style=

 

 

 

 

鎶€鏈浘鐗? style=

涓ょ粍EventLoopGroup(Main&Sub)澶勭悊涓嶅悓閫氶亾涓嶅悓鐨勪簨浠?/p>

 

public final class EchoServer 
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception 
     // Configure the server.
// 鍒涘缓EventLoopGroup accept绾跨▼缁?NioEventLoop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 鍒涘缓EventLoopGroup I/O绾跨▼缁? EventLoopGroup workerGroup
= new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try
// 鏈嶅姟绔惎鍔ㄥ紩瀵煎伐鍏风被 ServerBootstrap b
= new ServerBootstrap();
// 閰嶇疆鏈嶅姟绔鐞嗙殑reactor绾跨▼缁勪互鍙婃湇鍔$鐨勫叾浠栭厤缃? b.group(bossGroup, workerGroup2) .channel(NioserverSocketChannel.
class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler((ChannelInitializer)(ch)-> ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler); ); // 閫氳繃bind鍚姩鏈嶅姟鍣?/span> ChannelFuture f = b.bind(PORT).sync(); // 闃诲涓荤嚎绋嬶紝鐩村埌缃戠粶鏈嶅姟琚叧闂?/span> f.channel().closeFuture().sync(); finally // 鍏抽棴绾跨▼缁?/span> bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();

 

 

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

杩借釜NioEventLoopGroup婧愮爜锛屼細鍙戠幇鏄垱閫犲緢澶歂ioEventLoop

public class NioEventLoopGroup extends MultithreadEventLoopGroup 
    ...
    public NioEventLoopGroup(int nThreads) 
            this(nThreads, (Executor) null);
     
    ...

杩借釜鍒扮埗绫?/p>

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup 

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static 
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) 
            logger.debug("-Dio.netty.eventLoopThreads: ", DEFAULT_EVENT_LOOP_THREADS);
        
    

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) 
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // 榛樿鏄痗pu鏍告暟*2
    
...

杩借釜鐖剁被

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup 
// 浠g爜鐪佺暐
    
    // 澶氱嚎绋嬬殑浜嬩欢鎵ц鍣?/span>
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) 
        if (nThreads <= 0) 
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        

        if (executor == null) // Tony: 濡傛灉鎵ц鍣ㄤ负绌猴紝鍒欏垱寤轰竴涓?br />          
// EventLoop閮芥槸閫氳繃executor鍒涘缓绾跨▼骞舵墽琛屽畠鐨? executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); // 绾跨▼鍒涘缓鍣紝婧愮爜瑙佷笅闈? // EventLoop鏄疎ventExecutor鎺ュ彛鐨勫叿浣撳疄鐜? children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) boolean success = false; try
// 鏈夊涓疄鐜版柟娉曪紝瑙佷笅闈?杩斿洖NioEventLoop children[i]
= newChild(executor, args); success = true; catch (Exception e) // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); finally if (!success) for (int j = 0; j < i; j ++) children[j].shutdownGracefully(); for (int j = 0; j < i; j ++) EventExecutor e = children[j]; try while (!e.isTerminated()) e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); catch (InterruptedException interrupted) // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() @Override public void operationComplete(Future<Object> future) throws Exception if (terminatedChildren.incrementAndGet() == children.length) terminationFuture.setSuccess(null); ; for (EventExecutor e: children) e.terminationFuture().addListener(terminationListener); Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);

ThreadPerTaskExecutor鍒涘缓绾跨▼

public final class ThreadPerTaskExecutor implements Executor 
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) 
        if (threadFactory == null) 
            throw new NullPointerException("threadFactory");
        
        this.threadFactory = threadFactory;
    

    @Override
    public void execute(Runnable command) 
        threadFactory.newThread(command).start();
    

鎶€鏈浘鐗? style=

NioEventLoopGroup

public class NioEventLoopGroup extends MultithreadEventLoopGroup 
  // 鐪佺暐浠g爜

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception 
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    

杩斿洖NioEventLoop锛屼篃浼犲叆浜唀xecutor锛岀敤鏉ュ府鍔╁垱寤虹嚎绋嬫墽琛屼换鍔?/p>

鐪婲ioEventLoop鐨勫叿浣撳疄鐜?/p>

public final class NioEventLoop extends SingleThreadEventLoop 
    // 浠g爜鐪佺暐
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) 
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) 
            throw new NullPointerException("selectorProvider");
        
        if (strategy == null) 
            throw new NullPointerException("selectStrategy");
        
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    
// 浠g爜鐪佺暐

selector鏄疦IO鐨剆elector

 NioEventLoop灏嗛€氶亾娉ㄥ唽鍒癊ventLoop鐨剆elector涓婏紝杩涜浜嬩欢杞

涓嶆柇杩借釜NioEventLoop

鏈€椤跺眰鏄?/p>

public interface Executor 

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the @code Executor implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);

execute鐢盨ingleThreadEventExecutor瀹炵幇

鎻愪氦浠诲姟

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 
    // 鐪佺暐浠g爜  
    @Override
    public void execute(Runnable task) 
        if (task == null) 
            throw new NullPointerException("task");
        
        //  鍒ゆ柇execute鏂规硶鐨勮皟鐢ㄨ€呮槸涓嶆槸EventLoop鍚屼竴涓嚎绋?/span>
        boolean inEventLoop = inEventLoop();
        addTask(task);// 澧炲姞鍒颁换鍔¢槦鍒?/span>
        if (!inEventLoop) // 涓嶆槸鍚屼竴涓嚎绋嬶紝鍒欒皟鐢ㄥ惎鍔ㄦ柟娉?/span>
            startThread();
            if (isShutdown()) 
                boolean reject = false;
                try 
                    if (removeTask(task)) 
                        reject = true;
                    
                 catch (UnsupportedOperationException e) 
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                
                if (reject) 
                    reject();
                
            
        

        if (!addTaskWakesUp && wakesUpForTask(task)) 
            wakeup(inEventLoop);
        
    

startThread

    private void startThread() 
        if (state == ST_NOT_STARTED) 
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) 
                try 
                    doStartThread();// Tony: 鏈惎鍔紝鍒欒Е鍙戝惎鍔?/span>
                 catch (Throwable cause) 
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                
            
        
    

    private void doStartThread() 
        assert thread == null;
        executor.execute(new Runnable() // 杩欓噷鐨別xecutor鏄垵濮嬪寲EventLoop鐨勬椂鍊欎紶杩涙潵鐨?/span>
            @Override
            public void run() 
                thread = Thread.currentThread();
                if (interrupted) 
                    thread.interrupt();
                

                boolean success = false;
                updateLastExecutionTime();
                try // 鍒涘缓绾跨▼寮€濮嬫墽琛宺un鏂规硶锛屾墍浠ワ紝姣忎釜EventLoop閮芥槸鎵цrun
                    SingleThreadEventExecutor.this.run();
                    success = true;
                 catch (Throwable t) 
                    logger.warn("Unexpected exception from an event executor: ", t);
                 finally 
                    for (;;) 
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) 
                            break;
                        
                    

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) 
                        if (logger.isErrorEnabled()) 
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        
                    

                    try 
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) 
                            if (confirmShutdown()) 
                                break;
                            
                        
                     finally 
                        try 
                            cleanup();
                         finally 
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) 
                                if (logger.isWarnEnabled()) 
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + 鈥?鈥?span style="color: #000000;">);
                                
                            

                            terminationFuture.setSuccess(null);
                        
                    
                
            
        );
    

鎶€鏈浘鐗? style=

 

 

@Override
    protected void run() // 鏈変换鍔℃彁浜ゅ悗锛岃瑙﹀彂鎵ц
        for (;;) // 鎵ц涓や欢浜媠elector,select鐨勪簨浠?鍜?taskQueue閲岄潰鐨勫唴瀹?/span>
            try 
                try 
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 鈥榳akenUp.compareAndSet(false, true)鈥?is always evaluated
                        // before calling 鈥榮elector.wakeup()鈥?to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 鈥榳akenUp鈥?is set to
                        // true too early.
                        //
                        // 鈥榳akenUp鈥?is set to true too early if:
                        // 1) Selector is waken up between 鈥榳akenUp.set(false)鈥?and
                        //    鈥榮elector.select(...)鈥? (BAD)
                        // 2) Selector is waken up between 鈥榮elector.select(...)鈥?and
                        //    鈥榠f (wakenUp.get())  ... 鈥? (OK)
                        //
                        // In the first case, 鈥榳akenUp鈥?is set to true and the
                        // following 鈥榮elector.select(...)鈥?will wake up immediately.
                        // Until 鈥榳akenUp鈥?is set to false again in the next round,
                        // 鈥榳akenUp.compareAndSet(false, true)鈥?will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 鈥榮elector.select(...)鈥?call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) 
                            selector.wakeup();
                        
                        // fall through
                    default:
                    
                 catch (IOException e) 
                    // If we receive an IOException here its because the Selector is messed up. Let鈥榮 rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) 
                    try // 澶勭悊浜嬩欢
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        runAllTasks();
                    
                 else 
                    final long ioStartTime = System.nanoTime();
                    try 
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    
                
             catch (Throwable t) 
                handleLoopException(t);
            
            // Always handle shutdown even if the loop processing threw an exception.
            try 
                if (isShuttingDown()) 
                    closeAll();
                    if (confirmShutdown()) 
                        return;
                    
                
             catch (Throwable t) 
                handleLoopException(t);
            
        
    

    private static void handleLoopException(Throwable t) 
        logger.warn("Unexpected exception in the selector loop.", t);

        // Prevent possible consecutive immediate failures that lead to
        // excessive CPU consumption.
        try 
            Thread.sleep(1000);
         catch (InterruptedException e) 
            // Ignore.
        
    

    private void processSelectedKeys() 
        if (selectedKeys != null) 
            processSelectedKeysOptimized();
         else // 澶勭悊浜嬩欢
            processSelectedKeysPlain(selector.selectedKeys());
        
    

 

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) 
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) 
            return;
        
        // 鑾峰彇selector鎵€鏈夐€変腑鐨勪簨浠讹紙ServerSocketChannel涓昏鏄疧P_ACCEPT,SocketChannle涓昏鏄疧P_READ锛?/span>
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) 
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) //  澶勭悊niochannel浜嬩欢
                processSelectedKey(k, (AbstractNioChannel) a);
             else 
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            

            if (!i.hasNext()) 
                break;
            

            if (needsToSelectAgain) 
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) 
                    break;
                 else 
                    i = selectedKeys.iterator();
                
            
        
    

 

EventLoop鑷韩瀹炵幇浜咵xecutor鎺ュ彛锛屽綋璋冪敤executor鏂规硶鎻愪氦浠诲姟鏃讹紝鍒欏垽鏂槸鍚﹀惎鍔紝鏈惎鍔ㄥ垯璋冪敤鍐呯疆鐨別xecutor鍒涘缓鏂扮嚎绋嬫潵瑙﹀彂run鏂规硶鎵ц

鎶€鏈浘鐗? style=

 

 

channel娉ㄥ唽鍒皊elector涓?/p>

璇锋眰

鏈嶅姟绔惎鍔ㄧ殑杩囩▼锛屾湇鍔$鐨勫惎鍔ㄥ氨鏄疊ind缁戝畾绔彛鐨勮繃绋?/p>

鎶€鏈浘鐗? style=

 

 

 

鍥炲埌EchoServer

杩借釜bind婧愮爜

 // Start the server.
ChannelFuture f = b.bind(PORT).sync();

bind缁戝畾绔彛骞跺垱寤洪€氶亾

 public ChannelFuture bind(int inetPort) 
        return bind(new InetSocketAddress(inetPort)); // 缁戝畾绔彛鐨勫叆鍙d唬鐮?/span>
    

    /**
     * Create a new @link Channel and bind it.
     */
    public ChannelFuture bind(String inetHost, int inetPort) 
        return bind(SocketUtils.socketAddress(inetHost, inetPort));
    

    /**
     * Create a new @link Channel and bind it.
     */
    public ChannelFuture bind(InetAddress inetHost, int inetPort) 
        return bind(new InetSocketAddress(inetHost, inetPort));
    

    /**
     * Create a new @link Channel and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) 
        validate();
        if (localAddress == null) 
            throw new NullPointerException("localAddress");
        
        return doBind(localAddress);// 鐪熸骞蹭簨鐨勪唬鐮?/span>
    

    private ChannelFuture doBind(final SocketAddress localAddress) 
        final ChannelFuture regFuture = initAndRegister();// 鍒涘缓/鍒濆鍖朣erverSocketChannel瀵硅薄锛屽苟娉ㄥ唽鍒癝elector
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) 
            return regFuture;
        
        // 绛夋敞鍐屽畬鎴愪箣鍚庯紝鍐嶇粦瀹氱鍙c€?闃叉绔彛寮€鏀句簡锛屽嵈涓嶈兘澶勭悊璇锋眰
        if (regFuture.isDone()) 
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);// 瀹為檯鎿嶄綔缁戝畾绔彛鐨勪唬鐮?/span>
            return promise;
         else 
            // Registration future is almost always fulfilled already, but just in case it鈥榮 not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception 
                    Throwable cause = future.cause();
                    if (cause != null) 
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                     else 
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    
                
            );
            return promise;
        
    

    final ChannelFuture initAndRegister() 
        Channel channel = null;
        try 
            channel = channelFactory.newChannel(); // 閫氶亾
            init(channel); // 鍒濆鍖栭€氶亾
         catch (Throwable t) 
            if (channel != null) 
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        
        //锛堜竴寮€濮嬪垵濮嬪寲鐨刧roup锛塎ultithreadEventLoopGroup閲岄潰閫夋嫨涓€涓猠ventLoop杩涜缁戝畾
        ChannelFuture regFuture = config().group().register(channel); // register瑙佷笅闈?
        if (regFuture.cause() != null) 
            if (channel.isRegistered()) 
                channel.close();
             else 
                channel.unsafe().closeForcibly();
            
        

        // If we are here and the promise is not failed, it鈥榮 one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It鈥榮 safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop鈥榮 task queue for later execution.
        //    i.e. It鈥榮 safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    

    abstract void init(Channel channel) throws Exception;

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) 

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() 
            @Override
            public void run() //杩欓噷鍚慐ventLoop鎻愪氦浠诲姟锛屼竴鏃︽湁浠诲姟鎻愪氦鍒欎細瑙﹀彂EventLoop鐨勮疆璇?/span>
                if (regFuture.isSuccess()) // 鏈川鍙堢粫鍥炲埌channel鐨刡ind鏂规硶涓婇潰銆?/span>
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                 else 
                    promise.setFailure(regFuture.cause());
                
            
        );
    

 

@Override
public ChannelFuture register(Channel channel)
    return next().register(channel); // 鏍规嵁閫夋嫨鍣紝閫夋嫨涓€涓悎閫傜殑NioEventLoop杩涜娉ㄥ唽(SingleEventLoop)

杩借釜register浠g爜...

 

netty涓殑Channel鏄竴涓娊璞$殑姒傚康锛屽彲浠ョ悊瑙d负瀵笿DK NIO Channel鐨勫寮哄拰鎷撳睍銆傚鍔犱簡寰堝灞炴€у拰鏂规硶锛屼笅闈㈢綏鍒楀嚑涓父瑙佺殑灞炴€у拰鏂规硶锛?/p>

鎶€鏈浘鐗? style=

 

涓夈€佽矗浠婚摼璁捐妯″紡

 璐d换閾撅紙Chain of Responsibility Pattern锛?/strong>涓鸿姹傚垱寤轰簡涓€涓?span style="color: #0000ff;">澶勭悊瀵硅薄鐨勯摼銆?/p>

鍙戣捣璇锋眰鍜?span style="color: #0000ff;">鍏蜂綋澶勭悊璇锋眰鐨勮繃绋嬭繘琛岃В鑰︼細鑱岃矗閾句笂鐨勫鐞嗚€?/span>璐熻矗澶勭悊璇锋眰锛?span style="color: #0000ff;">瀹㈡埛鍙渶瑕佸皢璇锋眰鍙戦€佸埌鑱岃矗閾句笂鍗冲彲锛屾棤椤诲叧蹇冭姹傜殑澶勭悊缁嗚妭鍜岃姹傜殑浼犻€掋€?/p>

 

鎶€鏈浘鐗? style=

 handler鏄叿浣撳鐞嗚姹傜殑绋嬪簭

 

 瀹炵幇璐d换閾炬ā寮?/strong>4涓绱狅細澶勭悊鍣ㄦ娊璞$被銆佸叿浣撶殑澶勭悊鍣ㄥ疄鐜扮被銆佷繚瀛樺鐞嗗櫒淇℃伅銆佸鐞嗘墽琛?/p>

 鎶€鏈浘鐗? src=鎶€鏈浘鐗? src=

 

 

 璐d换閾句唬鐮佺ず渚?/p>

// -----閾捐〃褰㈠紡璋冪敤------netty灏辨槸绫讳技鐨勮繖绉嶅舰寮?/span>
public class PipelineDemo 
    /**
     * 鍒濆鍖栫殑鏃跺€欓€犱竴涓猦ead锛屼綔涓鸿矗浠婚摼鐨勫紑濮嬶紝浣嗘槸骞舵病鏈夊叿浣撶殑澶勭悊
     */
    public HandlerChainContext head = new HandlerChainContext(new AbstractHandler() 
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) 
            handlerChainContext.runNext(arg0);
        
    );

    public void requestProcess(Object arg0) 
        this.head.handler(arg0);
    

    public void addLast(AbstractHandler handler) 
        HandlerChainContext context = head;
        while (context.next != null) 
            context = context.next;
        
        context.next = new HandlerChainContext(handler);
    


    public static void main(String[] args) 
        PipelineDemo pipelineChainDemo = new PipelineDemo();
        pipelineChainDemo.addLast(new Handler2());
        pipelineChainDemo.addLast(new Handler1());
        pipelineChainDemo.addLast(new Handler1());
        pipelineChainDemo.addLast(new Handler2());

        // 鍙戣捣璇锋眰
        pipelineChainDemo.requestProcess("鐏溅鍛滃憸鍛渵~");

    


/**
 * handler涓婁笅鏂囷紝鎴戜富瑕佽礋璐g淮鎶ら摼锛屽拰閾剧殑鎵ц
 */
class HandlerChainContext 
    HandlerChainContext next; // 涓嬩竴涓妭鐐?/span>
    AbstractHandler handler;

    public HandlerChainContext(AbstractHandler handler) 
        this.handler = handler;
    

    void handler(Object arg0) 
        this.handler.doHandler(this, arg0);
    

    /**
     * 缁х画鎵ц涓嬩竴涓?
     */
    void runNext(Object arg0) 
        if (this.next != null) 
            this.next.handler(arg0);
        
    


// 澶勭悊鍣ㄦ娊璞$被
abstract class AbstractHandler 
    /**
     * 澶勭悊鍣紝杩欎釜澶勭悊鍣ㄥ氨鍋氫竴浠朵簨鎯咃紝鍦ㄤ紶鍏ョ殑瀛楃涓蹭腑澧炲姞涓€涓熬宸?.
     */
    abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); // handler鏂规硶


// 澶勭悊鍣ㄥ叿浣撳疄鐜扮被
class Handler1 extends AbstractHandler 
    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object arg0) 
        arg0 = arg0.toString() + "..handler1鐨勫皬灏惧反.....";
        System.out.println("鎴戞槸Handler1鐨勫疄渚嬶紝鎴戝湪澶勭悊锛? + arg0);
        // 缁х画鎵ц涓嬩竴涓?/span>
        handlerChainContext.runNext(arg0);
    


// 澶勭悊鍣ㄥ叿浣撳疄鐜扮被
class Handler2 extends AbstractHandler 
    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object arg0) 
        arg0 = arg0.toString() + "..handler2鐨勫皬灏惧反.....";
        System.out.println("鎴戞槸Handler2鐨勫疄渚嬶紝鎴戝湪澶勭悊锛? + arg0);
        // 缁х画鎵ц涓嬩竴涓?/span>
        handlerChainContext.runNext(arg0);
    

 

 Netty涓殑ChannelPipeline璐d换閾?/p>

鎶€鏈浘鐗? style=

鎶€鏈浘鐗? style=

 

 Nettty涓畾涔変簡寰堝浜嬩欢

鎶€鏈浘鐗? style=

 

 Pipeline涓殑handler鏄粈涔堬紵

鎶€鏈浘鐗? style=

 

 ChannelPipeline鏄嚎绋嬪畨鍏ㄧ殑锛孋hannelHandler鍙互鍦ㄤ换浣曟椂鍊欐坊鍔犳垨鍒犻櫎銆?/p>

渚嬪锛屽彲浠ュ湪鍗冲皢浜ゆ崲鏁忔劅淇℃伅鏃舵彃鍏ュ姞瀵嗗鐞嗙▼搴忥紝骞跺湪浜ゆ崲鍚庡垹闄ゃ€?/p>

涓€鑸搷浣滐紝鍒濆鍖栫殑鏃跺€欏鍔犺繘鍘伙紝杈冨皯鍒犻櫎銆備笅闈㈡槸Pipeline涓鐞唄andler鐨凙PI锛?/p>

鎶€鏈浘鐗? style=

 

 婧愮爜鏌ョ湅

 

 

handler鎵ц鍒嗘瀽

鎶€鏈浘鐗? style=

 

 

鍒嗘瀽registered鍏ョ珯浜嬩欢鐨勫鐞?/p>

鎶€鏈浘鐗? style=

 

 婧愮爜鏌ョ湅 浠巄ind()杩涘叆

 

 鎶€鏈浘鐗? style=

 

 

bind鍑虹珯浜嬩欢鍒嗘瀽

鎶€鏈浘鐗? style=

 

 婧愮爜鏌ョ湅

 

鍒嗘瀽accept鍏ョ珯浜嬩欢鐨勫鐞?/p>

鎶€鏈浘鐗? style=

 婧愮爜鏌ョ湅

 

read鍏ョ珯浜嬩欢鐨勫鐞?/p>

鎶€鏈浘鐗? style=

 

 婧愮爜鏌ョ湅

 

 

鎶€鏈浘鐗? style=

 

 

鍥涖€侀浂鎷疯礉鏈哄埗

JDK ByteBuffer瀛樺湪涓€浜?strong>缂虹偣锛?/p>

  • 鏃犳硶鍔ㄦ€佹墿瀹广€傞暱搴︽槸鍥哄畾鐨勶紝涓嶈兘鍔ㄦ€佹墿灞曞拰鏀剁缉锛屽綋鏁版嵁澶т簬ByteBuffer瀹归噺鏃讹紝浼氬彂鐢熺储寮曡秺鐣屽紓甯搞€?/li>
  • API浣跨敤澶嶆潅銆傝鍐欑殑鏃跺€欓渶瑕佹墜宸ヨ皟鐢╢lip()鍜宺ewind()绛夋柟娉曪紝浣跨敤鏃堕渶瑕侀潪甯歌皑鎱庣殑浣跨敤杩欎簺api锛屽惁鍒欏鏄撳嚭閿欍€?/li>

1.Netty鑷繁鐨凚yteBuf

ByteBuf鏄负瑙e喅ByteBuffer鐨勯棶棰樺拰婊¤冻缃戠粶搴旂敤绋嬪簭寮€鍙戜汉鍛樼殑鏃ュ父闇€姹傝€岃璁$殑銆?/p>

ByteBuf鐨?strong>澧炲己锛?/p>

  •  API鎿嶄綔渚挎嵎鎬?/li>
  • 鍔ㄦ€佹墿瀹?/li>
  • 澶氱ByteBuf瀹炵幇
  • 楂樻晥鐨勯浂鎷疯礉鏈哄埗

 

 2.ByteBuf鎿嶄綔

ByteBuf涓変釜閲嶈灞炴€э細capacity瀹归噺銆乺eaderIndex璇诲彇浣嶇疆銆亀riteIndex鍐欏叆浣嶇疆銆?/p>

鎻愪緵浜?span style="color: #0000ff;">涓や釜鎸囬拡鍙橀噺鏉ユ敮鎸侀『搴忚鍜屽啓鎿嶄綔锛屽垎鍒椂readerIndex鍜屽啓鎿嶄綔writeIndex

甯哥敤鏂规硶瀹氫箟锛?/p>

   鎶€鏈浘鐗? style=

 

 

 

 

 

 

 

涓嬪浘鏄剧ず浜嗕竴涓紦鍐插尯鏄浣曡涓や釜鎸囬拡鍒嗗壊鎴愪笁涓尯鍩熺殑锛?/p>

鎶€鏈浘鐗? style=

 

 

绀轰緥浠g爜

/**
 * bytebuf鐨勫父瑙凙PI鎿嶄綔绀轰緥
 */
public class ByteBufDemo 
    @Test
    public void apiTest() 
        //  +-------------------+------------------+------------------+
        //  | discardable bytes |  readable bytes  |  writable bytes  |
        //  |                   |     (CONTENT)    |                  |
        //  +-------------------+------------------+------------------+
        //  |                   |                  |                  |
        //  0      <=       readerIndex   <=   writerIndex    <=    capacity

        // 1.鍒涘缓涓€涓潪姹犲寲鐨凚yteBuf锛屽ぇ灏忎负10涓瓧鑺?/span>
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("鍘熷ByteBuf涓?===================>" + buf.toString());
        System.out.println("1.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 2.鍐欏叆涓€娈靛唴瀹?/span>
        byte[] bytes = 1, 2, 3, 4, 5;
        buf.writeBytes(bytes);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
        System.out.println("2.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 3.璇诲彇涓€娈靛唴瀹?/span>
        byte b1 = buf.readByte();
        byte b2 = buf.readByte();
        System.out.println("璇诲彇鐨刡ytes涓?===================>" + Arrays.toString(new byte[]b1, b2));
        System.out.println("璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
        System.out.println("3.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 4.灏嗚鍙栫殑鍐呭涓㈠純
        buf.discardReadBytes();
        System.out.println("灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?=======>" + buf.toString());
        System.out.println("4.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 5.娓呯┖璇诲啓鎸囬拡
        buf.clear();
        System.out.println("灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?=========>" + buf.toString());
        System.out.println("5.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 6.鍐嶆鍐欏叆涓€娈靛唴瀹癸紝姣旂涓€娈靛唴瀹瑰皯
        byte[] bytes2 = 1, 2, 3;
        buf.writeBytes(bytes2);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes2));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
        System.out.println("6.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 7.灏咮yteBuf娓呴浂
        buf.setZero(0, buf.capacity());
        System.out.println("灏嗗唴瀹规竻闆跺悗ByteBuf涓?=============>" + buf.toString());
        System.out.println("7.ByteBuf涓殑鍐呭涓?===============>" + Arrays.toString(buf.array()) + "\n");

        // 8.鍐嶆鍐欏叆涓€娈佃秴杩囧閲忕殑鍐呭
        byte[] bytes3 = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11;
        buf.writeBytes(bytes3);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes3));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
        System.out.println("8.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");
        //  闅忔満璁块棶绱㈠紩 getByte
        //  椤哄簭璇?read*
        //  椤哄簭鍐?write*
        //  娓呴櫎宸茶鍐呭 discardReadBytes
        //  娓呴櫎缂撳啿鍖?clear
        //  鎼滅储鎿嶄綔
        //  鏍囪鍜岄噸缃?
        //  瀹屾暣浠g爜绀轰緥锛氬弬鑰?
        // 鎼滅储鎿嶄綔 璇诲彇鎸囧畾浣嶇疆 buf.getByte(1);
        //
    

Unpooled鎺ㄨ崘鐨勬柟寮忓垱寤篵uf

鍙互鍔ㄦ€佹墿瀹?/p>

杩愯缁撴灉

鍘熷ByteBuf涓?===================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
1.ByteBuf涓殑鍐呭涓?==============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3, 4, 5]
鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
2.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

璇诲彇鐨刡ytes涓?/span>====================>[1, 2]
璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
3.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?/span>========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
4.ByteBuf涓殑鍐呭涓?==============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?/span>==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
5.ByteBuf涓殑鍐呭涓?==============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3]
鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
6.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

灏嗗唴瀹规竻闆跺悗ByteBuf涓?/span>==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
7.ByteBuf涓殑鍐呭涓?===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
8.ByteBuf涓殑鍐呭涓?==============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

 

鍔ㄦ€佹墿瀹?/p>

io.netty.buffer.AbstractByteBuf涓嬶細

  final void ensureWritable0(int minWritableBytes) 
        ensureAccessible();
        if (minWritableBytes <= writableBytes()) 
            return;
        
        final int writerIndex = writerIndex();
        if (checkBounds) 
            if (minWritableBytes > maxCapacity - writerIndex) 
                throw new IndexOutOfBoundsException(String.format(
                        "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                        writerIndex, minWritableBytes, maxCapacity, this));
            
        

        // Normalize the current capacity to the power of 2.
        int minNewCapacity = writerIndex + minWritableBytes;
        int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);

        int fastCapacity = writerIndex + maxFastWritableBytes();
        // Grow by a smaller amount if it will avoid reallocation
        if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) 
            newCapacity = fastCapacity;
        

        // Adjust to the new capacity.
        capacity(newCapacity);
    

calculateNewCapacity

    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity)  // minNewCapacity锛?4 maxCapacity锛?147483647
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity)  // minCapacity锛?4
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
         // 闃堝€?M锛岃繖涓槇鍊肩殑鐢ㄦ剰锛氬閲忚姹?M浠ュ唴锛屾瘡娆℃墿瀹逛互2鐨勫€嶆暟杩涜璁$畻锛岃秴杩?M瀹归噺锛屽彟澶栫殑璁$畻鏂瑰紡銆?
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

        if (minNewCapacity == threshold)  // 鏂板閲忕殑鏈€灏忚姹傦紝濡傛灉绛変簬闃堝€硷紝鍒欑珛鍒昏繑鍥?
            return threshold;
        

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) 
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) 
                newCapacity = maxCapacity;
             else 
                newCapacity += threshold;
            
            return newCapacity;
        
        // 濡傛灉瀹归噺瑕佹眰娌¤秴杩囬槇鍊硷紝鍒欎粠64瀛楄妭寮€濮嬶紝涓嶆柇澧炲姞涓€鍊嶏紝鐩磋嚦婊¤冻鏂板閲忔渶灏忚姹?
        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) 
            newCapacity <<= 1;
        

        return Math.min(newCapacity, maxCapacity);
    

鎶€鏈浘鐗? style=

鎶€鏈浘鐗? style=

 

 

閫夋嫨鍚堥€傜殑ByteBuf瀹炵幇锛?/p>

浜嗚В鏍稿績鐨?涓淮搴︾殑鍒掑垎鏂瑰紡锛?绉嶅叿浣撳疄鐜?/p>

鎶€鏈浘鐗? style=

 

 鍦ㄤ娇鐢ㄤ腑锛岄兘鏄娇鐢˙yteBufAllocator鍒嗛厤鍣ㄨ繘琛岀敵璇凤紝鍚屾椂鍒嗛厤鍣ㄥ叿鏈夊唴瀛樼鐞嗙殑鍔熻兘

 

鍫嗗鍐呭瓨绀轰緥

/**
 * 鍫嗗鍐呭瓨鐨勫父瑙凙PI鎿嶄綔绀轰緥
 */
public class DirectByteBufDemo 
    @Test
    public void apiTest() 
        //  +-------------------+------------------+------------------+
        //  | discardable bytes |  readable bytes  |  writable bytes  |
        //  |                   |     (CONTENT)    |                  |
        //  +-------------------+------------------+------------------+
        //  |                   |                  |                  |
        //  0      <=       readerIndex   <=   writerIndex    <=    capacity

        // 1.鍒涘缓涓€涓潪姹犲寲鐨凚yteBuf锛屽ぇ灏忎负10涓瓧鑺?/span>
        ByteBuf buf = Unpooled.directBuffer(10);
        System.out.println("鍘熷ByteBuf涓?===================>" + buf.toString());
        // System.out.println("1.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); 鍫嗗鍐呭瓨涓嶈兘鐢╞uf.array()

        // 2.鍐欏叆涓€娈靛唴瀹?/span>
        byte[] bytes = 1, 2, 3, 4, 5;
        buf.writeBytes(bytes);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
        //System.out.println("2.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 3.璇诲彇涓€娈靛唴瀹?/span>
        byte b1 = buf.readByte();
        byte b2 = buf.readByte();
        System.out.println("璇诲彇鐨刡ytes涓?===================>" + Arrays.toString(new byte[]b1, b2));
        System.out.println("璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
       //System.out.println("3.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 4.灏嗚鍙栫殑鍐呭涓㈠純
        buf.discardReadBytes();
        System.out.println("灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?=======>" + buf.toString());
        //System.out.println("4.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 5.娓呯┖璇诲啓鎸囬拡
        buf.clear();
        System.out.println("灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?=========>" + buf.toString());
        //System.out.println("5.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 6.鍐嶆鍐欏叆涓€娈靛唴瀹癸紝姣旂涓€娈靛唴瀹瑰皯
        byte[] bytes2 = 1, 2, 3;
        buf.writeBytes(bytes2);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes2));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
       // System.out.println("6.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");

        // 7.灏咮yteBuf娓呴浂
        buf.setZero(0, buf.capacity());
        System.out.println("灏嗗唴瀹规竻闆跺悗ByteBuf涓?=============>" + buf.toString());
       // System.out.println("7.ByteBuf涓殑鍐呭涓?===============>" + Arrays.toString(buf.array()) + "\n");

        // 8.鍐嶆鍐欏叆涓€娈佃秴杩囧閲忕殑鍐呭
        byte[] bytes3 = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11;
        buf.writeBytes(bytes3);
        System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes3));
        System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString());
       // System.out.println("8.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n");
        //  闅忔満璁块棶绱㈠紩 getByte
        //  椤哄簭璇?read*
        //  椤哄簭鍐?write*
        //  娓呴櫎宸茶鍐呭 discardReadBytes
        //  娓呴櫎缂撳啿鍖?clear
        //  鎼滅储鎿嶄綔
        //  鏍囪鍜岄噸缃?
        //  瀹屾暣浠g爜绀轰緥锛氬弬鑰?
        // 鎼滅储鎿嶄綔 璇诲彇鎸囧畾浣嶇疆 buf.getByte(1);
        //
    

Unsafe鐨勫疄鐜?/p>

鎶€鏈浘鐗? style=

鎶€鏈浘鐗? style=

 

 

 

鍐呭瓨澶嶇敤

 

PooledByteBuf瀵硅薄銆佸唴瀛樺鐢?/strong>

鎶€鏈浘鐗? style=

鎶€鏈浘鐗? style=

3.闆舵嫹璐濇満鍒?/h3>

Netty鐨勯浂鎷疯礉鏈哄埗锛屾槸涓€绉嶅簲鐢ㄥ眰鐨勫疄鐜般€傚拰搴曞眰鐨凧VM銆佹搷浣滅郴缁熷唴瀛樻満鍒跺苟鏃犺繃澶氬叧鑱斻€?/p>

鎶€鏈浘鐗? style=

 

 浠g爜绀轰緥

/**
 * 闆舵嫹璐濈ず渚?
 */
public class ZeroCopyTest 
    @org.junit.Test
    public void wrapTest() 
        byte[] arr = 1, 2, 3, 4, 5;
        ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
        System.out.println(byteBuf.getByte(4));
        arr[4] = 6;
        System.out.println(byteBuf.getByte(4));
     // java鏁扮粍杞负buf 5 arr淇敼涓?鍚庯紝byteBuf涔熷彉涓?锛岃鏄庝袱鑰呯敤鐨勬槸鐩稿悓鐨勬暟鎹紝闆舵嫹璐?

    @org.junit.Test
    public void sliceTest() 
        ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
        ByteBuf newBuffer = buffer1.slice(1, 2);
        newBuffer.unwrap();
        System.out.println(newBuffer.toString());
     // 鎷嗗垎銆備笉浼氬姩鍘熸潵鐨刡uf锛岃繕淇濈暀鍘熸潵buf鐨勫湴鍧€

    @org.junit.Test
    public void compositeTest() 
        ByteBuf buffer1 = Unpooled.buffer(3);
        buffer1.writeByte(1);
        ByteBuf buffer2 = Unpooled.buffer(3);
        buffer2.writeByte(4);
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
        System.out.println(newBuffer);
     // 鍚堝苟銆傝繕淇濈暀鍘熸潵buf鐨勪俊鎭?

 

以上是关于Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty的主要内容,如果未能解决你的问题,请参考以下文章

骞跺彂鐩稿叧闂浠ュ強java鍩虹鐭ヨ瘑

鏁版嵁搴撳垎鐗?Sharding)

EF Core 寮傛缂栫▼娉ㄦ剰瑕佺偣

2019-2020-1瀛︽湡20192429銆婄綉缁滅┖闂村畨鍏ㄤ笓涓氬璁恒€嬬涓夊懆瀛︿範鎬荤粨

2019-2020-1瀛︽湡20192402銆婄綉缁滅┖闂村畨鍏ㄤ笓涓氬璁恒€嬬鍗佷竴鍛ㄥ涔犳€荤粨

浜窐椤圭洰Day01馃毄鈽呪槄鈽咅煉?