Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty相关的知识,希望对你有一定的参考价值。
鏍囩锛?a href='http://www.mamicode.com/so/1/%e4%bb%a3%e7%a0%81%e7%a4%ba%e4%be%8b' title='浠g爜绀轰緥'>浠g爜绀轰緥 rtti oba nios ota OLE display erro
鍦ㄧ綉缁滃簲鐢ㄥ紑鍙戠殑杩囩▼涓紝鐩存帴浣跨敤JDK鎻愪緵鐨凬IO鐨凙PI锛屾瘮杈冪箒鐞愶紝鑰屼笖鎯宠杩涜鎬ц兘鎻愬崌锛岃繕闇€瑕佺粨鍚堝绾跨▼鎶€鏈€?/p>
鐢变簬缃戠粶缂栫▼鏈韩鐨勫鏉傛€э紝浠ュ強JDK API寮€鍙戠殑浣跨敤闅惧害杈冮珮锛屾墍浠ュ湪寮€婧愮ぞ鍖轰腑锛屾秾鐜板嚭鏉ヤ簡寰堝瀵笿DK NIO杩涜灏佽銆佸寮虹殑缃戠粶缂栫▼妗嗘灦锛屾瘮濡侼etty銆丮ina绛夈€?/p>
涓€銆丯etty绠€浠?/h2>
Netty鏄竴涓珮鎬ц兘銆侀珮鍙墿灞曟€х殑寮傛浜嬩欢椹卞姩鐨勭綉缁滃簲鐢ㄧ▼搴忔鏋讹紝瀹冩瀬澶х畝鍖栦簡TCP鍜孶DP瀹㈡埛绔拰鏈嶅姟鍣ㄥ紑鍙戠瓑缃戠粶缂栫▼銆?/p>
Netty閲嶈鐨勫洓涓唴瀹癸細
- Reactor绾跨▼妯″瀷锛氫竴绉嶉珮鎬ц兘鐨勫绾跨▼绋嬪簭璁捐鎬濊矾
- Netty涓嚜宸卞畾涔夌殑Channel姒傚康锛氬寮虹増鐨勯€氶亾姒傚康
- ChannelPipeline鑱岃矗閾捐璁℃ā寮忥細浜嬩欢澶勭悊鏈哄埗
- 鍐呭瓨绠$悊锛氬寮虹殑ByteBuf缂撳啿鍖?/li>
鏁翠綋缁撴瀯鍥?/p>
浜屻€丯etty绾跨▼妯″瀷
涓轰簡璁㎞IO澶勭悊鏇村ソ鐨勫埄鐢ㄥ绾跨▼鐗规€э紝Netty瀹炵幇浜哛eactor绾跨▼妯″瀷銆?/p>
Reactor妯″瀷涓湁鍥涗釜鏍稿績姒傚康锛?/p>
- Resources璧勬簮锛堣姹?浠诲姟锛?/li>
- Synchronous Event Demultiplexer鍚屾浜嬩欢澶嶇敤鍣?/li>
- Dispatcher鍒嗛厤鍣?/li>
- Request Handler璇锋眰澶勭悊鍣?/li>
Netty鍚姩鏃朵細鏋勫缓澶氫釜Reactor
EventLoopGroup鍒濆鍖栬繃绋?/p>
涓ょ粍EventLoopGroup(Main&Sub)澶勭悊涓嶅悓閫氶亾涓嶅悓鐨勪簨浠?/p>
public final class EchoServer static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception // Configure the server.
// 鍒涘缓EventLoopGroup accept绾跨▼缁?NioEventLoop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 鍒涘缓EventLoopGroup I/O绾跨▼缁? EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try
// 鏈嶅姟绔惎鍔ㄥ紩瀵煎伐鍏风被 ServerBootstrap b = new ServerBootstrap();
// 閰嶇疆鏈嶅姟绔鐞嗙殑reactor绾跨▼缁勪互鍙婃湇鍔$鐨勫叾浠栭厤缃? b.group(bossGroup, workerGroup2) .channel(NioserverSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler((ChannelInitializer)(ch)-> ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler); ); // 閫氳繃bind鍚姩鏈嶅姟鍣?/span> ChannelFuture f = b.bind(PORT).sync(); // 闃诲涓荤嚎绋嬶紝鐩村埌缃戠粶鏈嶅姟琚叧闂?/span> f.channel().closeFuture().sync(); finally // 鍏抽棴绾跨▼缁?/span> bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
杩借釜NioEventLoopGroup婧愮爜锛屼細鍙戠幇鏄垱閫犲緢澶歂ioEventLoop
public class NioEventLoopGroup extends MultithreadEventLoopGroup ... public NioEventLoopGroup(int nThreads) this(nThreads, (Executor) null); ...
杩借釜鍒扮埗绫?/p>
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); private static final int DEFAULT_EVENT_LOOP_THREADS; static DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) logger.debug("-Dio.netty.eventLoopThreads: ", DEFAULT_EVENT_LOOP_THREADS); /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // 榛樿鏄痗pu鏍告暟*2
...
杩借釜鐖剁被
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup // 浠g爜鐪佺暐 // 澶氱嚎绋嬬殑浜嬩欢鎵ц鍣?/span> protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); if (executor == null) // Tony: 濡傛灉鎵ц鍣ㄤ负绌猴紝鍒欏垱寤轰竴涓?br />
// EventLoop閮芥槸閫氳繃executor鍒涘缓绾跨▼骞舵墽琛屽畠鐨? executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); // 绾跨▼鍒涘缓鍣紝婧愮爜瑙佷笅闈? // EventLoop鏄疎ventExecutor鎺ュ彛鐨勫叿浣撳疄鐜? children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) boolean success = false; try
// 鏈夊涓疄鐜版柟娉曪紝瑙佷笅闈?杩斿洖NioEventLoop children[i] = newChild(executor, args); success = true; catch (Exception e) // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); finally if (!success) for (int j = 0; j < i; j ++) children[j].shutdownGracefully(); for (int j = 0; j < i; j ++) EventExecutor e = children[j]; try while (!e.isTerminated()) e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); catch (InterruptedException interrupted) // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() @Override public void operationComplete(Future<Object> future) throws Exception if (terminatedChildren.incrementAndGet() == children.length) terminationFuture.setSuccess(null); ; for (EventExecutor e: children) e.terminationFuture().addListener(terminationListener); Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);
ThreadPerTaskExecutor鍒涘缓绾跨▼
public final class ThreadPerTaskExecutor implements Executor private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) if (threadFactory == null) throw new NullPointerException("threadFactory"); this.threadFactory = threadFactory; @Override public void execute(Runnable command) threadFactory.newThread(command).start();
NioEventLoopGroup
public class NioEventLoopGroup extends MultithreadEventLoopGroup // 鐪佺暐浠g爜 @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
杩斿洖NioEventLoop锛屼篃浼犲叆浜唀xecutor锛岀敤鏉ュ府鍔╁垱寤虹嚎绋嬫墽琛屼换鍔?/p>
鐪婲ioEventLoop鐨勫叿浣撳疄鐜?/p>
public final class NioEventLoop extends SingleThreadEventLoop // 浠g爜鐪佺暐 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) throw new NullPointerException("selectorProvider"); if (strategy == null) throw new NullPointerException("selectStrategy"); provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy;
// 浠g爜鐪佺暐
selector鏄疦IO鐨剆elector
NioEventLoop灏嗛€氶亾娉ㄥ唽鍒癊ventLoop鐨剆elector涓婏紝杩涜浜嬩欢杞
涓嶆柇杩借釜NioEventLoop
鏈€椤跺眰鏄?/p>
public interface Executor /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the @code Executor implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);
execute鐢盨ingleThreadEventExecutor瀹炵幇
鎻愪氦浠诲姟
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor // 鐪佺暐浠g爜 @Override public void execute(Runnable task) if (task == null) throw new NullPointerException("task"); // 鍒ゆ柇execute鏂规硶鐨勮皟鐢ㄨ€呮槸涓嶆槸EventLoop鍚屼竴涓嚎绋?/span> boolean inEventLoop = inEventLoop(); addTask(task);// 澧炲姞鍒颁换鍔¢槦鍒?/span> if (!inEventLoop) // 涓嶆槸鍚屼竴涓嚎绋嬶紝鍒欒皟鐢ㄥ惎鍔ㄦ柟娉?/span> startThread(); if (isShutdown()) boolean reject = false; try if (removeTask(task)) reject = true; catch (UnsupportedOperationException e) // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. if (reject) reject(); if (!addTaskWakesUp && wakesUpForTask(task)) wakeup(inEventLoop);
startThread
private void startThread() if (state == ST_NOT_STARTED) if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) try doStartThread();// Tony: 鏈惎鍔紝鍒欒Е鍙戝惎鍔?/span> catch (Throwable cause) STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); private void doStartThread() assert thread == null; executor.execute(new Runnable() // 杩欓噷鐨別xecutor鏄垵濮嬪寲EventLoop鐨勬椂鍊欎紶杩涙潵鐨?/span> @Override public void run() thread = Thread.currentThread(); if (interrupted) thread.interrupt(); boolean success = false; updateLastExecutionTime(); try // 鍒涘缓绾跨▼寮€濮嬫墽琛宺un鏂规硶锛屾墍浠ワ紝姣忎釜EventLoop閮芥槸鎵цrun SingleThreadEventExecutor.this.run(); success = true; catch (Throwable t) logger.warn("Unexpected exception from an event executor: ", t); finally for (;;) int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) break; // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) if (logger.isErrorEnabled()) logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates."); try // Run all remaining tasks and shutdown hooks. for (;;) if (confirmShutdown()) break; finally try cleanup(); finally STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) if (logger.isWarnEnabled()) logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + 鈥?鈥?span style="color: #000000;">); terminationFuture.setSuccess(null); );
@Override protected void run() // 鏈変换鍔℃彁浜ゅ悗锛岃瑙﹀彂鎵ц for (;;) // 鎵ц涓や欢浜媠elector,select鐨勪簨浠?鍜?taskQueue閲岄潰鐨勫唴瀹?/span> try try switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 鈥榳akenUp.compareAndSet(false, true)鈥?is always evaluated // before calling 鈥榮elector.wakeup()鈥?to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 鈥榳akenUp鈥?is set to // true too early. // // 鈥榳akenUp鈥?is set to true too early if: // 1) Selector is waken up between 鈥榳akenUp.set(false)鈥?and // 鈥榮elector.select(...)鈥? (BAD) // 2) Selector is waken up between 鈥榮elector.select(...)鈥?and // 鈥榠f (wakenUp.get()) ... 鈥? (OK) // // In the first case, 鈥榳akenUp鈥?is set to true and the // following 鈥榮elector.select(...)鈥?will wake up immediately. // Until 鈥榳akenUp鈥?is set to false again in the next round, // 鈥榳akenUp.compareAndSet(false, true)鈥?will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 鈥榮elector.select(...)鈥?call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) selector.wakeup(); // fall through default: catch (IOException e) // If we receive an IOException here its because the Selector is messed up. Let鈥榮 rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); continue; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) try // 澶勭悊浜嬩欢 processSelectedKeys(); finally // Ensure we always run tasks. runAllTasks(); else final long ioStartTime = System.nanoTime(); try processSelectedKeys(); finally // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); catch (Throwable t) handleLoopException(t); // Always handle shutdown even if the loop processing threw an exception. try if (isShuttingDown()) closeAll(); if (confirmShutdown()) return; catch (Throwable t) handleLoopException(t); private static void handleLoopException(Throwable t) logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try Thread.sleep(1000); catch (InterruptedException e) // Ignore. private void processSelectedKeys() if (selectedKeys != null) processSelectedKeysOptimized(); else // 澶勭悊浜嬩欢 processSelectedKeysPlain(selector.selectedKeys());
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) return; // 鑾峰彇selector鎵€鏈夐€変腑鐨勪簨浠讹紙ServerSocketChannel涓昏鏄疧P_ACCEPT,SocketChannle涓昏鏄疧P_READ锛?/span> Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) // 澶勭悊niochannel浜嬩欢 processSelectedKey(k, (AbstractNioChannel) a); else @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); if (!i.hasNext()) break; if (needsToSelectAgain) selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) break; else i = selectedKeys.iterator();
EventLoop鑷韩瀹炵幇浜咵xecutor鎺ュ彛锛屽綋璋冪敤executor鏂规硶鎻愪氦浠诲姟鏃讹紝鍒欏垽鏂槸鍚﹀惎鍔紝鏈惎鍔ㄥ垯璋冪敤鍐呯疆鐨別xecutor鍒涘缓鏂扮嚎绋嬫潵瑙﹀彂run鏂规硶鎵ц
channel娉ㄥ唽鍒皊elector涓?/p>
璇锋眰
鏈嶅姟绔惎鍔ㄧ殑杩囩▼锛屾湇鍔$鐨勫惎鍔ㄥ氨鏄疊ind缁戝畾绔彛鐨勮繃绋?/p>
鍥炲埌EchoServer
杩借釜bind婧愮爜
// Start the server. ChannelFuture f = b.bind(PORT).sync();
bind缁戝畾绔彛骞跺垱寤洪€氶亾
public ChannelFuture bind(int inetPort) return bind(new InetSocketAddress(inetPort)); // 缁戝畾绔彛鐨勫叆鍙d唬鐮?/span> /** * Create a new @link Channel and bind it. */ public ChannelFuture bind(String inetHost, int inetPort) return bind(SocketUtils.socketAddress(inetHost, inetPort)); /** * Create a new @link Channel and bind it. */ public ChannelFuture bind(InetAddress inetHost, int inetPort) return bind(new InetSocketAddress(inetHost, inetPort)); /** * Create a new @link Channel and bind it. */ public ChannelFuture bind(SocketAddress localAddress) validate(); if (localAddress == null) throw new NullPointerException("localAddress"); return doBind(localAddress);// 鐪熸骞蹭簨鐨勪唬鐮?/span> private ChannelFuture doBind(final SocketAddress localAddress) final ChannelFuture regFuture = initAndRegister();// 鍒涘缓/鍒濆鍖朣erverSocketChannel瀵硅薄锛屽苟娉ㄥ唽鍒癝elector final Channel channel = regFuture.channel(); if (regFuture.cause() != null) return regFuture; // 绛夋敞鍐屽畬鎴愪箣鍚庯紝鍐嶇粦瀹氱鍙c€?闃叉绔彛寮€鏀句簡锛屽嵈涓嶈兘澶勭悊璇锋眰 if (regFuture.isDone()) // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise);// 瀹為檯鎿嶄綔缁戝畾绔彛鐨勪唬鐮?/span> return promise; else // Registration future is almost always fulfilled already, but just in case it鈥榮 not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception Throwable cause = future.cause(); if (cause != null) // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); else // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); ); return promise; final ChannelFuture initAndRegister() Channel channel = null; try channel = channelFactory.newChannel(); // 閫氶亾 init(channel); // 鍒濆鍖栭€氶亾 catch (Throwable t) if (channel != null) // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); //锛堜竴寮€濮嬪垵濮嬪寲鐨刧roup锛塎ultithreadEventLoopGroup閲岄潰閫夋嫨涓€涓猠ventLoop杩涜缁戝畾 ChannelFuture regFuture = config().group().register(channel); // register瑙佷笅闈? if (regFuture.cause() != null) if (channel.isRegistered()) channel.close(); else channel.unsafe().closeForcibly(); // If we are here and the promise is not failed, it鈥榮 one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It鈥榮 safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop鈥榮 task queue for later execution. // i.e. It鈥榮 safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; abstract void init(Channel channel) throws Exception; private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() @Override public void run() //杩欓噷鍚慐ventLoop鎻愪氦浠诲姟锛屼竴鏃︽湁浠诲姟鎻愪氦鍒欎細瑙﹀彂EventLoop鐨勮疆璇?/span> if (regFuture.isSuccess()) // 鏈川鍙堢粫鍥炲埌channel鐨刡ind鏂规硶涓婇潰銆?/span> channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); else promise.setFailure(regFuture.cause()); );
@Override public ChannelFuture register(Channel channel) return next().register(channel); // 鏍规嵁閫夋嫨鍣紝閫夋嫨涓€涓悎閫傜殑NioEventLoop杩涜娉ㄥ唽(SingleEventLoop)
杩借釜register浠g爜...
netty涓殑Channel鏄竴涓娊璞$殑姒傚康锛屽彲浠ョ悊瑙d负瀵笿DK NIO Channel鐨勫寮哄拰鎷撳睍銆傚鍔犱簡寰堝灞炴€у拰鏂规硶锛屼笅闈㈢綏鍒楀嚑涓父瑙佺殑灞炴€у拰鏂规硶锛?/p>
涓夈€佽矗浠婚摼璁捐妯″紡
璐d换閾撅紙Chain of Responsibility Pattern锛?/strong>涓鸿姹傚垱寤轰簡涓€涓?span style="color: #0000ff;">澶勭悊瀵硅薄鐨勯摼銆?/p>
鍙戣捣璇锋眰鍜?span style="color: #0000ff;">鍏蜂綋澶勭悊璇锋眰鐨勮繃绋嬭繘琛岃В鑰︼細鑱岃矗閾句笂鐨勫鐞嗚€?/span>璐熻矗澶勭悊璇锋眰锛?span style="color: #0000ff;">瀹㈡埛鍙渶瑕佸皢璇锋眰鍙戦€佸埌鑱岃矗閾句笂鍗冲彲锛屾棤椤诲叧蹇冭姹傜殑澶勭悊缁嗚妭鍜岃姹傜殑浼犻€掋€?/p>
handler鏄叿浣撳鐞嗚姹傜殑绋嬪簭 瀹炵幇璐d换閾炬ā寮?/strong>4涓绱狅細澶勭悊鍣ㄦ娊璞$被銆佸叿浣撶殑澶勭悊鍣ㄥ疄鐜扮被銆佷繚瀛樺鐞嗗櫒淇℃伅銆佸鐞嗘墽琛?/p>
璐d换閾句唬鐮佺ず渚?/p>
Netty涓殑ChannelPipeline璐d换閾?/p>
Nettty涓畾涔変簡寰堝浜嬩欢 Pipeline涓殑handler鏄粈涔堬紵 ChannelPipeline鏄嚎绋嬪畨鍏ㄧ殑锛孋hannelHandler鍙互鍦ㄤ换浣曟椂鍊欐坊鍔犳垨鍒犻櫎銆?/p>
渚嬪锛屽彲浠ュ湪鍗冲皢浜ゆ崲鏁忔劅淇℃伅鏃舵彃鍏ュ姞瀵嗗鐞嗙▼搴忥紝骞跺湪浜ゆ崲鍚庡垹闄ゃ€?/p>
涓€鑸搷浣滐紝鍒濆鍖栫殑鏃跺€欏鍔犺繘鍘伙紝杈冨皯鍒犻櫎銆備笅闈㈡槸Pipeline涓鐞唄andler鐨凙PI锛?/p>
婧愮爜鏌ョ湅 handler鎵ц鍒嗘瀽 鍒嗘瀽registered鍏ョ珯浜嬩欢鐨勫鐞?/p>
婧愮爜鏌ョ湅 浠巄ind()杩涘叆 bind鍑虹珯浜嬩欢鍒嗘瀽 婧愮爜鏌ョ湅 鍒嗘瀽accept鍏ョ珯浜嬩欢鐨勫鐞?/p>
婧愮爜鏌ョ湅 read鍏ョ珯浜嬩欢鐨勫鐞?/p>
婧愮爜鏌ョ湅 JDK ByteBuffer瀛樺湪涓€浜?strong>缂虹偣 ByteBuf鏄负瑙e喅ByteBuffer鐨勯棶棰樺拰婊¤冻缃戠粶搴旂敤绋嬪簭寮€鍙戜汉鍛樼殑鏃ュ父闇€姹傝€岃璁$殑銆?/p>
ByteBuf鐨?strong>澧炲己// -----閾捐〃褰㈠紡璋冪敤------netty灏辨槸绫讳技鐨勮繖绉嶅舰寮?/span>
public class PipelineDemo
/**
* 鍒濆鍖栫殑鏃跺€欓€犱竴涓猦ead锛屼綔涓鸿矗浠婚摼鐨勫紑濮嬶紝浣嗘槸骞舵病鏈夊叿浣撶殑澶勭悊
*/
public HandlerChainContext head = new HandlerChainContext(new AbstractHandler()
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0)
handlerChainContext.runNext(arg0);
);
public void requestProcess(Object arg0)
this.head.handler(arg0);
public void addLast(AbstractHandler handler)
HandlerChainContext context = head;
while (context.next != null)
context = context.next;
context.next = new HandlerChainContext(handler);
public static void main(String[] args)
PipelineDemo pipelineChainDemo = new PipelineDemo();
pipelineChainDemo.addLast(new Handler2());
pipelineChainDemo.addLast(new Handler1());
pipelineChainDemo.addLast(new Handler1());
pipelineChainDemo.addLast(new Handler2());
// 鍙戣捣璇锋眰
pipelineChainDemo.requestProcess("鐏溅鍛滃憸鍛渵~");
/**
* handler涓婁笅鏂囷紝鎴戜富瑕佽礋璐g淮鎶ら摼锛屽拰閾剧殑鎵ц
*/
class HandlerChainContext
HandlerChainContext next; // 涓嬩竴涓妭鐐?/span>
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler)
this.handler = handler;
void handler(Object arg0)
this.handler.doHandler(this, arg0);
/**
* 缁х画鎵ц涓嬩竴涓?
*/
void runNext(Object arg0)
if (this.next != null)
this.next.handler(arg0);
// 澶勭悊鍣ㄦ娊璞$被
abstract class AbstractHandler
/**
* 澶勭悊鍣紝杩欎釜澶勭悊鍣ㄥ氨鍋氫竴浠朵簨鎯咃紝鍦ㄤ紶鍏ョ殑瀛楃涓蹭腑澧炲姞涓€涓熬宸?.
*/
abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); // handler鏂规硶
// 澶勭悊鍣ㄥ叿浣撳疄鐜扮被
class Handler1 extends AbstractHandler
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0)
arg0 = arg0.toString() + "..handler1鐨勫皬灏惧反.....";
System.out.println("鎴戞槸Handler1鐨勫疄渚嬶紝鎴戝湪澶勭悊锛? + arg0);
// 缁х画鎵ц涓嬩竴涓?/span>
handlerChainContext.runNext(arg0);
// 澶勭悊鍣ㄥ叿浣撳疄鐜扮被
class Handler2 extends AbstractHandler
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0)
arg0 = arg0.toString() + "..handler2鐨勫皬灏惧反.....";
System.out.println("鎴戞槸Handler2鐨勫疄渚嬶紝鎴戝湪澶勭悊锛? + arg0);
// 缁х画鎵ц涓嬩竴涓?/span>
handlerChainContext.runNext(arg0);
鍥涖€侀浂鎷疯礉鏈哄埗
1.Netty鑷繁鐨凚yteBuf
- API鎿嶄綔渚挎嵎鎬?/li>
- 鍔ㄦ€佹墿瀹?/li>
- 澶氱ByteBuf瀹炵幇
- 楂樻晥鐨勯浂鎷疯礉鏈哄埗
2.ByteBuf鎿嶄綔
ByteBuf涓変釜閲嶈灞炴€э細capacity瀹归噺銆乺eaderIndex璇诲彇浣嶇疆銆亀riteIndex鍐欏叆浣嶇疆銆?/p>
鎻愪緵浜?span style="color: #0000ff;">涓や釜鎸囬拡鍙橀噺鏉ユ敮鎸侀『搴忚鍜屽啓鎿嶄綔锛屽垎鍒椂readerIndex鍜屽啓鎿嶄綔writeIndex
甯哥敤鏂规硶瀹氫箟锛?/p>
涓嬪浘鏄剧ず浜嗕竴涓紦鍐插尯鏄浣曡涓や釜鎸囬拡鍒嗗壊鎴愪笁涓尯鍩熺殑锛?/p>
绀轰緥浠g爜
/** * bytebuf鐨勫父瑙凙PI鎿嶄綔绀轰緥 */ public class ByteBufDemo @Test public void apiTest() // +-------------------+------------------+------------------+ // | discardable bytes | readable bytes | writable bytes | // | | (CONTENT) | | // +-------------------+------------------+------------------+ // | | | | // 0 <= readerIndex <= writerIndex <= capacity // 1.鍒涘缓涓€涓潪姹犲寲鐨凚yteBuf锛屽ぇ灏忎负10涓瓧鑺?/span> ByteBuf buf = Unpooled.buffer(10); System.out.println("鍘熷ByteBuf涓?===================>" + buf.toString()); System.out.println("1.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 2.鍐欏叆涓€娈靛唴瀹?/span> byte[] bytes = 1, 2, 3, 4, 5; buf.writeBytes(bytes); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); System.out.println("2.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 3.璇诲彇涓€娈靛唴瀹?/span> byte b1 = buf.readByte(); byte b2 = buf.readByte(); System.out.println("璇诲彇鐨刡ytes涓?===================>" + Arrays.toString(new byte[]b1, b2)); System.out.println("璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); System.out.println("3.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 4.灏嗚鍙栫殑鍐呭涓㈠純 buf.discardReadBytes(); System.out.println("灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?=======>" + buf.toString()); System.out.println("4.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 5.娓呯┖璇诲啓鎸囬拡 buf.clear(); System.out.println("灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?=========>" + buf.toString()); System.out.println("5.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 6.鍐嶆鍐欏叆涓€娈靛唴瀹癸紝姣旂涓€娈靛唴瀹瑰皯 byte[] bytes2 = 1, 2, 3; buf.writeBytes(bytes2); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes2)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); System.out.println("6.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 7.灏咮yteBuf娓呴浂 buf.setZero(0, buf.capacity()); System.out.println("灏嗗唴瀹规竻闆跺悗ByteBuf涓?=============>" + buf.toString()); System.out.println("7.ByteBuf涓殑鍐呭涓?===============>" + Arrays.toString(buf.array()) + "\n"); // 8.鍐嶆鍐欏叆涓€娈佃秴杩囧閲忕殑鍐呭 byte[] bytes3 = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11; buf.writeBytes(bytes3); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes3)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); System.out.println("8.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 闅忔満璁块棶绱㈠紩 getByte // 椤哄簭璇?read* // 椤哄簭鍐?write* // 娓呴櫎宸茶鍐呭 discardReadBytes // 娓呴櫎缂撳啿鍖?clear // 鎼滅储鎿嶄綔 // 鏍囪鍜岄噸缃? // 瀹屾暣浠g爜绀轰緥锛氬弬鑰? // 鎼滅储鎿嶄綔 璇诲彇鎸囧畾浣嶇疆 buf.getByte(1); //
Unpooled鎺ㄨ崘鐨勬柟寮忓垱寤篵uf
鍙互鍔ㄦ€佹墿瀹?/p>
杩愯缁撴灉
鍘熷ByteBuf涓?===================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10) 1.ByteBuf涓殑鍐呭涓?==============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3, 4, 5] 鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10) 2.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0] 璇诲彇鐨刡ytes涓?/span>====================>[1, 2] 璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10) 3.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0] 灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?/span>========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10) 4.ByteBuf涓殑鍐呭涓?==============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0] 灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?/span>==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10) 5.ByteBuf涓殑鍐呭涓?==============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0] 鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3] 鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10) 6.ByteBuf涓殑鍐呭涓?==============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0] 灏嗗唴瀹规竻闆跺悗ByteBuf涓?/span>==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10) 7.ByteBuf涓殑鍐呭涓?===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 鍐欏叆鐨刡ytes涓?/span>====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] 鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?/span>===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64) 8.ByteBuf涓殑鍐呭涓?==============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
鍔ㄦ€佹墿瀹?/p>
io.netty.buffer.AbstractByteBuf涓嬶細
final void ensureWritable0(int minWritableBytes) ensureAccessible(); if (minWritableBytes <= writableBytes()) return; final int writerIndex = writerIndex(); if (checkBounds) if (minWritableBytes > maxCapacity - writerIndex) throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); // Normalize the current capacity to the power of 2. int minNewCapacity = writerIndex + minWritableBytes; int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity); int fastCapacity = writerIndex + maxFastWritableBytes(); // Grow by a smaller amount if it will avoid reallocation if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) newCapacity = fastCapacity; // Adjust to the new capacity. capacity(newCapacity);
calculateNewCapacity
@Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) // minNewCapacity锛?4 maxCapacity锛?147483647 checkPositiveOrZero(minNewCapacity, "minNewCapacity"); if (minNewCapacity > maxCapacity) // minCapacity锛?4 throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); // 闃堝€?M锛岃繖涓槇鍊肩殑鐢ㄦ剰锛氬閲忚姹?M浠ュ唴锛屾瘡娆℃墿瀹逛互2鐨勫€嶆暟杩涜璁$畻锛岃秴杩?M瀹归噺锛屽彟澶栫殑璁$畻鏂瑰紡銆? final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) // 鏂板閲忕殑鏈€灏忚姹傦紝濡傛灉绛変簬闃堝€硷紝鍒欑珛鍒昏繑鍥? return threshold; // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) newCapacity = maxCapacity; else newCapacity += threshold; return newCapacity; // 濡傛灉瀹归噺瑕佹眰娌¤秴杩囬槇鍊硷紝鍒欎粠64瀛楄妭寮€濮嬶紝涓嶆柇澧炲姞涓€鍊嶏紝鐩磋嚦婊¤冻鏂板閲忔渶灏忚姹? // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) newCapacity <<= 1; return Math.min(newCapacity, maxCapacity);
閫夋嫨鍚堥€傜殑ByteBuf瀹炵幇锛?/p>
浜嗚В鏍稿績鐨?涓淮搴︾殑鍒掑垎鏂瑰紡锛?绉嶅叿浣撳疄鐜?/p>
鍦ㄤ娇鐢ㄤ腑锛岄兘鏄娇鐢˙yteBufAllocator鍒嗛厤鍣ㄨ繘琛岀敵璇凤紝鍚屾椂鍒嗛厤鍣ㄥ叿鏈夊唴瀛樼鐞嗙殑鍔熻兘
鍫嗗鍐呭瓨绀轰緥
/** * 鍫嗗鍐呭瓨鐨勫父瑙凙PI鎿嶄綔绀轰緥 */ public class DirectByteBufDemo @Test public void apiTest() // +-------------------+------------------+------------------+ // | discardable bytes | readable bytes | writable bytes | // | | (CONTENT) | | // +-------------------+------------------+------------------+ // | | | | // 0 <= readerIndex <= writerIndex <= capacity // 1.鍒涘缓涓€涓潪姹犲寲鐨凚yteBuf锛屽ぇ灏忎负10涓瓧鑺?/span> ByteBuf buf = Unpooled.directBuffer(10); System.out.println("鍘熷ByteBuf涓?===================>" + buf.toString()); // System.out.println("1.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); 鍫嗗鍐呭瓨涓嶈兘鐢╞uf.array() // 2.鍐欏叆涓€娈靛唴瀹?/span> byte[] bytes = 1, 2, 3, 4, 5; buf.writeBytes(bytes); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); //System.out.println("2.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 3.璇诲彇涓€娈靛唴瀹?/span> byte b1 = buf.readByte(); byte b2 = buf.readByte(); System.out.println("璇诲彇鐨刡ytes涓?===================>" + Arrays.toString(new byte[]b1, b2)); System.out.println("璇诲彇涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); //System.out.println("3.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 4.灏嗚鍙栫殑鍐呭涓㈠純 buf.discardReadBytes(); System.out.println("灏嗚鍙栫殑鍐呭涓㈠純鍚嶣yteBuf涓?=======>" + buf.toString()); //System.out.println("4.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 5.娓呯┖璇诲啓鎸囬拡 buf.clear(); System.out.println("灏嗚鍐欐寚閽堟竻绌哄悗ByteBuf涓?=========>" + buf.toString()); //System.out.println("5.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 6.鍐嶆鍐欏叆涓€娈靛唴瀹癸紝姣旂涓€娈靛唴瀹瑰皯 byte[] bytes2 = 1, 2, 3; buf.writeBytes(bytes2); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes2)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); // System.out.println("6.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 7.灏咮yteBuf娓呴浂 buf.setZero(0, buf.capacity()); System.out.println("灏嗗唴瀹规竻闆跺悗ByteBuf涓?=============>" + buf.toString()); // System.out.println("7.ByteBuf涓殑鍐呭涓?===============>" + Arrays.toString(buf.array()) + "\n"); // 8.鍐嶆鍐欏叆涓€娈佃秴杩囧閲忕殑鍐呭 byte[] bytes3 = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11; buf.writeBytes(bytes3); System.out.println("鍐欏叆鐨刡ytes涓?===================>" + Arrays.toString(bytes3)); System.out.println("鍐欏叆涓€娈靛唴瀹瑰悗ByteBuf涓?==========>" + buf.toString()); // System.out.println("8.ByteBuf涓殑鍐呭涓?==============>" + Arrays.toString(buf.array()) + "\n"); // 闅忔満璁块棶绱㈠紩 getByte // 椤哄簭璇?read* // 椤哄簭鍐?write* // 娓呴櫎宸茶鍐呭 discardReadBytes // 娓呴櫎缂撳啿鍖?clear // 鎼滅储鎿嶄綔 // 鏍囪鍜岄噸缃? // 瀹屾暣浠g爜绀轰緥锛氬弬鑰? // 鎼滅储鎿嶄綔 璇诲彇鎸囧畾浣嶇疆 buf.getByte(1); //
Unsafe鐨勫疄鐜?/p>
鍐呭瓨澶嶇敤
PooledByteBuf瀵硅薄銆佸唴瀛樺鐢?/strong>
3.闆舵嫹璐濇満鍒?/h3>
Netty鐨勯浂鎷疯礉鏈哄埗锛屾槸涓€绉嶅簲鐢ㄥ眰鐨勫疄鐜般€傚拰搴曞眰鐨凧VM銆佹搷浣滅郴缁熷唴瀛樻満鍒跺苟鏃犺繃澶氬叧鑱斻€?/p>
浠g爜绀轰緥
/** * 闆舵嫹璐濈ず渚? */ public class ZeroCopyTest @org.junit.Test public void wrapTest() byte[] arr = 1, 2, 3, 4, 5; ByteBuf byteBuf = Unpooled.wrappedBuffer(arr); System.out.println(byteBuf.getByte(4)); arr[4] = 6; System.out.println(byteBuf.getByte(4)); // java鏁扮粍杞负buf 5 arr淇敼涓?鍚庯紝byteBuf涔熷彉涓?锛岃鏄庝袱鑰呯敤鐨勬槸鐩稿悓鐨勬暟鎹紝闆舵嫹璐? @org.junit.Test public void sliceTest() ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes()); ByteBuf newBuffer = buffer1.slice(1, 2); newBuffer.unwrap(); System.out.println(newBuffer.toString()); // 鎷嗗垎銆備笉浼氬姩鍘熸潵鐨刡uf锛岃繕淇濈暀鍘熸潵buf鐨勫湴鍧€ @org.junit.Test public void compositeTest() ByteBuf buffer1 = Unpooled.buffer(3); buffer1.writeByte(1); ByteBuf buffer2 = Unpooled.buffer(3); buffer2.writeByte(4); CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2); System.out.println(newBuffer); // 鍚堝苟銆傝繕淇濈暀鍘熸潵buf鐨勪俊鎭?
以上是关于Java楂樺苟鍙戠綉缁滅紪绋?鍥?Netty的主要内容,如果未能解决你的问题,请参考以下文章
2019-2020-1瀛︽湡20192429銆婄綉缁滅┖闂村畨鍏ㄤ笓涓氬璁恒€嬬涓夊懆瀛︿範鎬荤粨