[干货分享]Unity3D 深入解析Timeline编辑器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[干货分享]Unity3D 深入解析Timeline编辑器相关的知识,希望对你有一定的参考价值。

参考技术A

这是Timeline系列的第三篇,前两篇分别是 Unity3D Timeline预览 和 Unity3D Timeline工作流 。本篇将会详细详解Timeline编辑器的具体功能,主要分为以下几个部分:

如果你还不熟悉timeline编辑器,强烈建议搜藏了以备不时之需。接下来还会有终篇,讲Timeline结合脚本在Runtime时候的一些进阶使用技巧。

在Timeline编辑器中,我们可以使用Timeline选择器来选中一个Timeline实例来进行查看、修改或者预览等操作。预览按钮(左边红框)用来开启\\关闭Timline在场景理的预览效果。

点击Timeline选择器,在弹出的菜单中会列出当前场景所有的Timeline实例,菜单中的每一项显示Timeline资源的名字以及他所关联的GameObject的名字。例如,GroundTimeline关联了Ground这个GameObject,因此显示的名字为:“GroundTimeline(Ground)”。

Timeline控制区域的按钮用与播放Timeline实例,以及控制Timeline的播放指针。

点击Timeline的Start按钮,或按下Shift+<来把Timeline的播放指针移动到Timeline的开始位置。

点击Timeline的Previous按钮,或按下<键来把Timeline的播放指针移动到前一帧的位置。

点击Timeline播放按钮或按下空格键来在Timeline播放模式中预览Timline的动画内容。Timeline播放模式涉及下列具体的细节:

点击Timeline的Next Frame按钮,或者按下>键来把Timeline的播放指针移动到下一帧的位置。

点击Timeline的End按钮,或者按住Shift然后再按下>键,Timeline的播放指针会移动到Timeline的末尾。

激活Play Range按钮后,把Timeline限制在指定的范围内播放(帧或者秒,具体取决于设置)。Timeline编辑器用白色的高亮区域来显示播放区域,拖动白色高亮标记的两端可以调整播放的范围。

此时,Playable Director组件中的Warp Mode属性用于确定播放指针抵达播放范围的末尾时的行为。需要注意的是:这里指定的播放范围,只在Timeline编辑器窗口中有效。在Play模式下,Unity会忽略这个限制。

Timeline播放指针( 右边的红框 )代表了Timeline在预览时精确时间点。Timeline播放指针的位置输入框( 左边红色框里的值 )表示了当前在整个Timeline中的哪一帧或哪一秒。

要把Timeline播放指针放在特定的时间,你可以通过点击Timeline的时间轴,也可以通过在播放指针位置的输入框内输入时间值然后按下回车键。当输入值后,输入的数字会根据Timeline的具体设置转化成秒或者帧数。例如,假如Timeline的时间轴表示的是秒且每秒有30帧,那么输入180的时候,输入框里面的值会转化成秒,同时把Timeline的播放指针移动到6:00的位置。

本小节内容主要描写Track视图的各种功能。

Track列表用来添加、选择、复制、删除、锁定、屏蔽、重新排列组成Timeline资源的track。同时你也可以把多个track编成一个track组。

track前面的竖线的颜色表示track的类型。默认情况下,Activation track是绿色的,Animation track是蓝色的,Audio track是橙色的,Control track是蓝绿色的,以及Playable track是白色的。
track的绑定信息存储在timeline实例中。而Playable Director 组件把GameObject和Timeline关联在一起,这种关联被认为是Timeline资源的实例化,具体请参考Timeline预览篇。

在Timeline编辑器为我们提供了多种方法来添加track。最简单的方法是点击Add按钮,然后在弹出的菜单里面选择要你想创建的track类型。也可以右键点击Track列表的空白区域,接下来的操作和点击Add按钮之后一样。

Timeline编辑器也支持把GameObject拖拽进Track列表。拖拽一个GameObject到Track列表的空白区域之后,从弹出的菜单里选择要添加的track类型。根据所选择的不同track类型,Timeline编辑器会根据对应track类型执行不同的操作:

通过点击来选择单个track,在选择一个track的同时会反选其他的track。选中track的属性显示在Inspector窗口中,由于选择的track类型不用,因此在Inspector中可以修改的属性也会有所差别。

如果要选中相邻的几个track,可以在选择第一个track之后,按住Shift然后点击最后一个track。例如,要选择3个相邻的tracks,点击第一个track,然后按住Shift后点击第三个track。这样就完成了3个track的选择。

按住Command或者Control,然后点击track来选择不相邻的多个track。同样的操作也可以用于反选不同的track。

Timeline编辑器提供了下列不同的方法来复制track:

需要注意的是:复制track会复制track里面的所有clip,blend,以及Inspector属性。如果被复制的track有绑定GameObject,复制出来的track,binding会设置成空值。

右键点击track,然后从弹出的窗口中选择 Delete .
删除track用来从timeline编辑器里面移除track,这个操作会删除track包含的所有clip,blend,以及属性。这是一个有风险的操作,它会修改Timeline资源和影响这个资源生成的所有Timeline实例。
删除一个Animation track同时也会从Timeline资源里面删除已经录制的Infinite clips。这里需要注意一点,在Save Project之前,在Project窗口中的Timeline资源下依然会显示已经被删除的Infinite clip.

锁定track用于锁定track以及其使用的所有clip。当一个track的动画制作完成后,为了避免误操作导致的修改,把这个track给锁定。锁定后的track无法编辑,它的clip无法被选中。track上显示的锁的图标,标志着当前track处于被锁定状态。

这里一定要注意:被锁定的track也是可以被删除的,所以删除track还是要仔细看清楚啦。

屏蔽track用于关闭track里所有的clip。当你制作的Timeline实例有很多个Track的时候,屏蔽track功能可以帮助你把注意力集中在你想重点关注的track上。track显示屏蔽图标的,标志着当前track处于屏蔽状态。

右键点击轨道,然后在弹出界面中选择Mute。也可以选中一个track之后按下M键。你可以同时屏蔽多个track,要解除屏蔽,直接点击屏蔽的图标就可以了。

track的渲染和动画优先级是从按从上至下的顺序来排列的。重新排列track可以改变渲染或者动画的优先级。例如,Track列表包含两个Animation track,用于使GameObjec产生位移动画,第二个Track将会覆盖第一个track的动画。动画优先级很好地解释了为什么Animation Override tracks是作为Animation track的child track来创建的,因为如果不是child track的话,它会完全覆盖第一个track的动画。

要重新排列tracks,你需要先选择1个或者多个track,然后拖动他们,直到track列表中间出现一条白线。白线标志着被拖拽的track在松开鼠标后的目标位置。这里要注意的的是Animation Override track和它的父Animation track绑定的是同一个GameObject,重新摆列Animation Override track会把它转换成Animation track,同时也会把绑定值设置为空。

当我们的Timeline中有很多track的时候,我们可以使用Track Group来管理它们。例如,一个Timeline资源包含和同一个GameObject交互的Animation track和一个Audio track,我们可以把这两个track移动到为它们创建的Track group里面。

点击Add按钮,然后选择Track Group来创建一个Group。我们也可以右键点击Track列表的空白区域,然后在弹出的菜单里面选择Track Group,一个新的Track group被添加在track列表的底部。

点击Track Group的名字,会出现一个工字型的光标,此时输入Track Group的新名字,然后按下回车。
选择一个或多个track,然后拖动到Track group的上面,此时Track group显示为高亮,松开鼠标后,被拖动的track就会被编组到Track Group里面去。
如果要把一个track放置到Track group中某个特定的track的前面,只需要拖动到那个特定的track上,等标志位置的白色插入线显示后,放开即可。

Track group同时也可以包含任意数量的子group。选择一个Track Group然后点击Add按钮,也可以点击Track group名字旁边的加号图标,然后选择Track Sub-Group。

点击Track group名字旁边的三角形图标来隐藏Track group内的所有track。这些track并没有被屏蔽,只是不在Timeline编辑器中显示。要显示它们的话,再次点击三角形图标就可以了。

Clip视图是我们用来操作clip的视图,Clip视图中的clip必须是和track关联的。

每一个clip下有一条线用来显示track的类型。默认情况下,Activation clip为绿色,Animation clip为蓝色,Audio clip为橙色,Control clip是蓝绿色的,Playable clip是白色的。

使用下面的方法来平移、缩放、以及让Clip的大小自适应Clips视图:

同样,Timeline编辑器根据track的类型,支持多种不同的创建Clip的方法。最快的方法是点击Track的空白区域,在菜单里选择“Add ……”来创建。根据不同的track类型。,“Add ……”的选项内容有所不同。

也可以直接拖一个Animation Clip到Timeline编辑器窗口的空白区域,这会自动创建一个track,并把这个clip添加到轨道里。

通过点击clip来进行选择操作,选中一个clip会反选之前选择的clip,被选中的clip会显示一个白色边框。选中一个clip之后,它的属性将显示在Inspector窗口中。你可以在Inspector窗口做一些细微的调整,具体可以调整的属性,取决于clip的类型。
按住Shift键然后点击clip用来选择连续相邻的clips。例如,我们要选择连续的3个clips,点击第一个clip,然后按住Shift键再点击第三个clip,这3个clips就全部被选中了。

按住Command/Control,再点击clip可以用选择/反选不相邻的clips。

点击Clip视图的空白区域然后拖动鼠标会在Clips视图里面绘制出一个选择矩形,和选择矩形相交的所有Clips都讲会被选中。如果在用矩形选择的过程中按住Shift键,在选择的时候不会反选之前选中的Clips。
还可以通过Timeline的播放指针来选择Clips。右键点击Timeline的播放指针,然后选择一个选择的选项。

选择track,通过拖动来调整Clip的位置,在拖拽的时候,黑色的参考线显示clip的范围,Timeline的时间轴上显示被调整位置的clip的开始时间和结束时间。

还可以选中clip后在Inspector窗口修改start或end的时间,也可以把一个clip移动到另外一个相同类型的track中去。
当调整一个clip的位置的时候,如果一个clip叠在了同一个track中的另外一个clip上,此时是产生blend还是overrie效果,取决于track的类型:

平铺Clips用于移除同一个track中的两个相邻clip之间的缝隙、融合,以及重叠。如果你想让每一个clip都紧接着它前一个clip开始的话,使用clip的平铺可以非常方便实现。在平铺clip的时候,需要在同一个track上至少选择2个 clip。

Timeline编辑器支持下面几种方法来复制Clips:

复制clip会拷贝选中的clip,然后把复制出的clip放在当前track的最后一个clip的后面。如果你复制的clip使用了融合,复制出的clip会消除融合,对齐放在track的末尾。
如果复制的Animation clip使用了recorded clip作为资源,那么在复制的时候,也会在TimelineAsset里面产生一个recorded clip的拷贝(这个拷贝在Save Project后出现在Timeline资源下)。

修剪操作非常有用,就好比是在影视后期剪辑中,只引用某些拍好的影片片段的一部分。

我们可以拖动clip的起点和终点来调整clip的长度,此时会自动选中clip,并且在Inspector 窗口显示它的属性。我们可以使用Inspector窗口中的来精确的设置Clip的开始、结束、clip的长度、以及偏移(Clip In)。

在Clip窗口中调整Animation Clip或者Audio clip的起点时,Start只有在clip关联了资源之后才能被拖动。

修剪动画的操作是安全的,可以反复修改动画的起点来调整clip包含的动画或者声音的波形。为了精确的调整clip的起点所在的帧或者时间,在Inspector窗口中直接设置Clip In的值有同样的效果。

和调整clip的起点一样,通过调整clip的终点也可以用来裁剪出原始资源在clip中使用的部分。

如果你设置的终点超过了clip所基于的原始资源的长度,那么clip的额外区域是处于保持还是循环,取决于资源的设置。举个例子,名为“End Move”的clip使用了动画文件“Recoreded(2)”。“Recorded(2)”的文件设置成的循环模式。当设置clip的终点超出了“Recorded(2)”的长度时,超出的部分,也会循环。下图中白色曲线表示是循环模式。

通过设置资源的循环模式来设置超出区域的是保持还是循环,如果你忘记当前clip是使用的哪一个clip,可以选中clip,然后点击右键,选择 Find Source Asset ,资源被高亮显示在Project窗口中。

timeline编辑器为循环模式的Animation clip或Audio clip提供了两种特殊的剪切方法:1、直接删除最后部分循环,2、补完最后的部分循环。例如run_away的长度超过了它基于的资源的三倍长度,由于资源文件的循环模式是设置成循环的,因此Animation Clip会循环播放动画。

如果想要补完动画结束时不完整的循环,选中Clip,然后右键点击并选择Editing->Complete Last Loop.如果要剪掉最后不完整的循环,则使用Trim Last Loop。

Timeline编辑器还支持通过播放指针的位置来剪切clip,把Timeline的编辑器指针拖到要裁剪的位置,右键点击clip,然后选择Editing > Trim Start或者Editing > Trim End。Trim Start用于把clip的起点调整到播放指针的位置。Trim End用于把Clip的终点调整到播放指针的位置。

[图片上传失败...(image-e111cc-1529590146702)]

Timeline编辑器支持把一个clip拆分成两个相同的clip,把播放指针放在Clip的中间,然后右键点击Clip选择Editing > Split(或者按下S),Clip就被分成两个分离的clip,它们可以各自的编辑,修剪,调整在Timline中的位置。这里要注意的是,拆分Clip的操作不会修改到Clip所引用的原始资源,因此可以通过reset a clip或者手动调整clip的开头和结尾来扩展成包含全部的动画。

可以通过reset a clip操作把一个clip调整到其原始的长度,右键点击clip然后选择Editing-> Reset Editing。需要注意的是重置Clip并不会重置下列属性的和设置:

修改clip的播放速度来实现动作、动画、音效、粒子效果等的加速或者减速播放,改变clip的播放速度会影响clip的长度。在Timeline中,我们可以修改Animation,Audio,Control这三种类型的clip的播放速度,其他类型的Speed选项是无法修改的。

在Inspector窗口中,Speed Multiplier属性显示的是clip的原始速度的倍率。例如,把一个长度为80帧的动画修改为两倍速播放的话,需要把Speed Multiplier设置成2,此时该动画的长度为40帧。

也可以通过右键点击clip,然后选择下面的方法来修改Clip的播放速度:

Gap extrapolation指的是Animation track在Animation clip的前后如何接近动画数据。它的主要作用是在两个Animation clip之间的间歇中间进行插值来避免动画的异常。这种异常包括GameObject在两个坐标间抖动或者类人形动画在不同的两个姿态之间抖动。每一个Animation clip 都有两个gap extrapolation设置:pre-extrapolate和post-extrapolate.前者控制在Animation clip之前的间隙如何接近动画数据,后者控制Animation clip后面的缝隙如何接近动画数据。默认情况下,两个都设置成的Hold,指的是在Animation clip之前,保持动画数据的第一帧,在Animation clip之后保持动画数据的最后一帧。Animation clip前后的图标表示所选的gap extrapolation的模式。

在Clip视图中选择Clip,然后修改Inspector窗口中的Animation Extrapolation来修改pre-extrapolate和post-extrapolate模式。

如果被选中的Animation clip所在的track只有这个clip,那么对于Pre-Extrapolate模式有以下可选项:

如果被选中的Animation clip所在的track只有这个clip,那么对于Post-Extrapolate模式有以下可选项:

Ease-in和Ease-Out用于创建clip和包围它的间隙之间的平滑过渡,选中clip,然后在Inspector 窗口中设置Ease In Duration或Ease Out Duration来创建。(在Unity的官方用户手册中还提到了按住Control/Command, 然后把clip的起点往右拖来创建ease-in。但是我在Unity2017.4中实这个功能,发现没有效果,只能猜测是Unity的Bug了。)

ease-in和ease-out的具体效果取决于track的类型:

可以通过点击下拉菜单把值从Auto修改成Manual来自定义ease-in或ease-out过渡的曲线。设置成Manual后,Inspector窗口中出曲线的预览。点击曲线预览,在Inspector窗口下方会打开曲线编辑器窗口。

干货分享RocketMQ命名服务和路由组件——namesrv解析

写在前面的话


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

本文的代码分析基于RocketMQ的4.3.0版本。


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1

namesrv在RocketMQ集群中扮演的角色

先允许我从Apache RocketMQ官网盗一张图。RocketMQ集群部署架构图(多Master-多Slave集群)。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

RocketMQ集群分为四部分:生产者、消费者、broker、namesrv。生产者和消费者很好理解,就是消息的生产方和使用方。broker负责消息的接收、存储、投递等。


1.namesrv一般部署为集群模式,每台broker服务器启动时都会向所有namesrv节点注册自己,随后会定时(默认30s)向namesrv集群的每一个节点上报自己。这使得namesrv集群是一个热备份模式。注册和上报的数据包括

  • broker中topic的信息,封装在请求体中:每个topic的名字,以及该topic下读写队列的个数,该topic在此broker下的读写权限等。

2.客户端(producer/consumer)随机的和namesrv集群中的其中一个节点建立长连接,定时(默认30s)拉取最新的topic路由信息。

3.namesrv集群各节点无状态,具体体现在所有broker上报的数据都存储在内存中,不进行任何的持久化。

4.namesrv集群各节点之间没有通信(其实这么说是不太严谨的,如果namesrv采用ClusterTestRequestProcessor作为请求的处理器,是可能会和其他节点发生通信的),集群内部没有机制保证信息的一致

2

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

namesrv 代码结构

先认识下namesrv模块的代码结构和各个类的作用,能使后面源码的阅读分析事半功倍。

1.代码结构

namesrv只有8个java文件,9个类。整个工程代码不超过1000行。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

2.各个类的介绍

1.KVConfigManager: key-value配置信息的操作类。提供以下操作方法:

  • load: 从指定文件(json格式)加载k-v配置到本地缓存configTable。

  • putKVConfig: 将指定的k-v配置加入到configTable中。

  • deleteKVConfig:将指定key值、指定namespace的配置从configTable中移除。

  • getKVConfig:获取指定key值、指定namespace的配置。

  • getKVListByNamespace:获取指定namespace的所有k-v配置信息。

  • persist: 持久化configTable到文件。

  • printAllPeriodically:打印configTable中所有的配置信息。

2.KVConfigSerializeWrapper:是为了方便序列化/反序列化的包装类。它和文件中的json格式数据对应。json字符串和对象间的转化使用了fastjson。

3.DefaultRequestProcessor:默认请求处理器类。用来处理客户端或者broker发来的RPC请求。其中就包括处理broker发来的注册请求。以下为几个常用的处理请求:

  • 注册Broker

  • 注销Broker

  • 根据指定topic获取路由信息

  • 获取集群的全量信息(包括每个集群内所有broker,每个broker的master/slave信息)。

  • 去除broker的写入权限。

  • 获取所有topic名字列表。

  • 从namesrv删除指定topic。

  • 获取指定cluster的所有topic信息。

  • 更新namesrv配置。

  • 获取namesrv配置。

4.ClusterTestRequestProcessor:请求处理类。继承自DefaultRequestProcessor。只重写了一个请求处理方法:根据指定topic获取路由信息。里面只增加了一行关键代码。在本namesrv中找不到请求的topic时,向其他namesrv节点发送请求获取。这里就发生了namesrv节点间通信。只是从其他namesrv拿来的topic路由信息直接返回给原始请求方,并不会更新到本地namesrv。

5.BrokerHousekeepingService:连接的监听器。实现了ChannelEventListener接口。监听与Broker建立的通道的状态(空闲、关闭、异常三个状态),并调用相应onChannel****方法。通道的空闲、关闭、异常状态均调用RouteInfoManager.onChannelDestory方法,来更新RouteInfoManager类中的5个HashMap对象。在构建NettyRemotingServer时作为参数传入。

6.RouteInfoManager:namesrv中最重要的类。维护着生产者、消费者活动所需要的所有信息。这些信息存储在5个HashMap里,不会进行持久化处理。

private final HashMap<string >topicQueueTable;

private final HashMap brokerAddrTable;

private final HashMap> lusterAddrTable;

private final HashMap brokerLiveTable;

private final HashMap/* Filter Server */> filterServerTable;

8.NamesrvController:namesrv控制类,控制着namesrv的启动、初始化、停止等生命周期。

9.NamesrvStart:namesrv启动类,也即main函数所在类。用于构建NamesrvController,随后初始化和启动namesrv。


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

3

namesrv源码分析

1.启动流程

namesrv的启动流程并不复杂,参考下图:

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

根据该时序图重点关注几个关键的步骤:

1.构建NamesrvController。时序图表述比较简单,构建NamesrvController详细逻辑为:

(1)根据启动namesrv时的系统参数,构建命令行。

(2)构建NamesrvConfig和NettyServerConfig。主要是将命令行中的参数解析到NamesrvConfig/ NettyServerConfig中。如果命令行中指定了参数文件,也会将文件中的所有配置都解析进去。

(3)最后用上一步中构建的NamesrvConfig/NettyServerConfig 创建NamesrvContorller。

2.NamesrvController的初始化。

(1)调用this.kvConfigManager.load(); 将文件中的配置load到内存中。继续看代码可以看到: 拿到配置文件的路径,通常为xxx/xxx/kvConfig.json,然后读到本地缓存configTable中。

     
       
       
     
  1. public void load() {

  2.    String content = null;

  3.    try {

  4.        content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());

  5.    } catch (IOException e) {

  6.        log.warn("Load KV config table exception", e);

  7.    }

  8.    if (content != null) {

  9.        KVConfigSerializeWrapper kvConfigSerializeWrapper =

  10.            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);

  11.        if (null != kvConfigSerializeWrapper) {

  12.            this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());

  13.            log.info("load KV config table OK");

  14.        }

  15.    }

  16. }

(2) 构建NettyRemotingServer。NettyRemotingServer是RocektMQ对Netty的再一次封装。它负责和客户端进行RPC通信。

(3) 构建一个默认8个线程的线程池 remotingExecutor。这个线程池就是最终处理业务的线程所在的线程池。 也就是运行DefaultRequestProcessor的processRequest方法的线程池。暂且称它为M2。

(4) 注册处理器。namesrv定义了2种处理器DefaultRequestProcessor和ClusterTestRequestProcessor,后者继承了前者,重写了一个方法getRouteInfoByTopic。即在当前namesrv中拿不到指定topic的路由信息时会到别的namesrv去拿。注册处理器就是在NettyRemotingServer中构建一个 protected PairdefaultRequestProcessor;其中NettyRequestProcessor就是DefaultRequestProcessor,ExecutorService就是remotingExecutor。

     
       
       
     
  1. private void registerProcessor() {

  2.    if (namesrvConfig.isClusterTest()) {

  3.        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),

  4.        this.remotingExecutor);

  5.    } else {

  6.        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

  7.    }

  8. }  

(5)下面是启动2个定时任务,分别来扫描notActive的broker并移除,打印所有的configTable(前面构建NamesrvController时从内存 加载的)。 需要注意的是这两个定时任务共用一个单线程的线程池scheduledExecutorService。

3.NamesrvController的启动(start方法)

其实就是调用RemotingService的start方法,逻辑如下:

(1) 构建一个线程数为8的线程池defaultEventExecutorGroup(内有8个执行器EventExecutor,每个执行器都只有一个线程)。 这个线程池组暂且称它为M1。后面会详细讲到M1是做什么的。

(2) 使用构建器模式构建netty server端引导类。首先是指明要用的Reactor线程模型,这里的eventLoopGroupBoss为只有一个线程的线程池(暂且叫它1),eventLoopGroupSelector是有 3个线程的线程池(暂且叫它n)。然后指定了Channel类型。下面的几个参数是对端口、TCP机制的设定,这里就不作分析了。然后是指定感兴趣的ip(对连接来的客户端做限制)以及监听端口。

childHandler(new ChannelInitializer() 是重点。在新的连接被接受时,新的Channel被创建,这里的ChannelInitializer就是对这个Channel进行初始化。可以看到在这个Channel的pipeline里添加了几个ChannelHandler,并且这些ChannelHandler共用一个线程池defaultEventExecutorGroup来执行逻辑,也就是在(1)中创建的M1。这些ChannelHandler分别关注不同的I/O事件,当对应的I/O事件被触发后会调用执行里面的方法。对于最后一个NettyServerHandler,最终会被defaultEventExecutorGroup线程池再委托给remotingExecutor来执行(还记得在做NamesrvController的初始化时构建了一个8个线程的线程池M2吗),最终会执行DefaultRequestProcessor的processRequest方法,对请求做真正的处理。

(3)netty server端启动:this.serverBootstrap.bind().sync()。

以上逻辑分析对应的代码如下:

     
       
       
     
  1. @Override

  2. public void start() {

  3.    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(

  4.    nettyServerConfig.getServerWorkerThreads(),

  5.    new ThreadFactory() {

  6.        private AtomicInteger threadIndex = new AtomicInteger(0);

  7.        @Override

  8.        public Thread newThread(Runnable r) {

  9.            return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());

  10.        }

  11.    });

  12. ServerBootstrap childHandler =

  13.    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

  14.        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

  15.        .option(ChannelOption.SO_BACKLOG, 1024)

  16.        .option(ChannelOption.SO_REUSEADDR, true)

  17.        .option(ChannelOption.SO_KEEPALIVE, false)

  18.        .childOption(ChannelOption.TCP_NODELAY, true)

  19.        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

  20.        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

  21.        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

  22.        .childHandler(new ChannelInitializer<SocketChannel>() {

  23.            @Override

  24.            public void initChannel(SocketChannel ch) throws Exception {

  25.                ch.pipeline()

  26.                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,

  27.                        new HandshakeHandler(TlsSystemConfig.tlsMode))

  28.                    .addLast(defaultEventExecutorGroup,

  29.                        new NettyEncoder(),

  30.                        new NettyDecoder(),

  31.                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),

  32.                        new NettyConnectManageHandler(),

  33.                        new NettyServerHandler()

  34.                    );

  35.            }

  36.        });

  37. if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {

  38.    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

  39. }

  40. try {

  41.    ChannelFuture sync = this.serverBootstrap.bind().sync();

  42.    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();

  43.    this.port = addr.getPort();

  44. } catch (InterruptedException e1) {

  45.    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);

  46. }

  47. if (this.channelEventListener != null) {

  48. this.nettyEventExecutor.start();

  49. }

  50. this.timer.scheduleAtFixedRate(new TimerTask() {

  51.    @Override

  52.    public void run() {

  53.        try {

  54.            NettyRemotingServer.this.scanResponseTable();

  55.        } catch (Throwable e) {

  56.            log.error("scanResponseTable exception", e);

  57.        }

  58.    }

  59.    }, 1000 * 3, 1000);

  60. }

2.namesrv使用的线程模型:

Reactor多线程模型(1+n+M1+M2)

通过namesrv的启动流程,可以看到其中创建了许多的线程池(1,n,M1,M2),来执行客户端请求或者是执行定时任务。这就是我们下面要讨论的namesrv的RPC通信+业务处理的线程模型:1+n+M1+M2线程模型。

namesrv(RocektMQ的其他组件也是)的PRC通信采用Netty, Netty的设计就是Reactor线程模型(Reactor的三种线程模型感兴趣的同学可以学习下),但Netty不是采用的传统的Reactor多线程模型(1+n,即一个线程接收连接、一组线程处理IO事件),而是在此基础上加以扩展,发展成了1+n+M1+M2。

下面的图描述了线程之间的关系以及分工。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1.其中的1是eventLoopGroupBoss,他的任务是负责监听并接受来自客户端的TCP连接请求。连接建立好以后马上扔给eventLoopGroupSelector,也就是n。

2.eventLoopGroupSelector工作就是在前一步的基础上将建立好的Channel注冊到EventLoop上(这里是NioEventLoop),对应到java底层就是调用NIO接口将SocketChannel注册到selector上。然后监听网络数据,拿到网络数据后,丢给defaultEventExecutorGroup(M1)线程池。

3.defaultEventExecutorGroup主要负责处理网络通信相关的,具体是编码/解码、空闲链接管理、网络连接管理以及网络请求处理,分别对应如下几个ChannelHandler:NettyEncoder/NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。M1在处理好这些任务后,将最终处理业务逻辑的工作交给M2,remotingExecutor。

     
       
       
     
  1. ServerBootstrap childHandler =

  2.    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

  3.        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

  4.        .option(ChannelOption.SO_BACKLOG, 1024)

  5.        .option(ChannelOption.SO_REUSEADDR, true)

  6.        .option(ChannelOption.SO_KEEPALIVE, false)

  7.        .childOption(ChannelOption.TCP_NODELAY, true)

  8.        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

  9.        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

  10.        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

  11.        .childHandler(new ChannelInitializer<SocketChannel>() {

  12.            @Override

  13.            public void initChannel(SocketChannel ch) throws Exception {

  14.                ch.pipeline()

  15.                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,

  16.                        new HandshakeHandler(TlsSystemConfig.tlsMode))

  17.                    .addLast(defaultEventExecutorGroup,

  18.                        new NettyEncoder(), //编码

  19.                        new NettyDecoder(), //解码

  20.                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //空闲连接管理

  21.                        new NettyConnectManageHandler(), //网络连接管理

  22.                        new NettyServerHandler() //网络请求处理

  23.                    );

  24.            }

  25.        });

其中最后一个NettyServerHandler是进行网络请求处理的ChannelHandler。代码如下:

     
       
       
     
  1. class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

  2.    @Override

  3.    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

  4.        processMessageReceived(ctx, msg);

  5.    }

  6. }

再追进去:

     
       
       
     
  1. public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

  2.    final RemotingCommand cmd = msg;

  3.    if (cmd != null) {

  4.        switch (cmd.getType()) {

  5.            case REQUEST_COMMAND:

  6.                processRequestCommand(ctx, cmd);

  7.                break;

  8.            case RESPONSE_COMMAND:

  9.                processResponseCommand(ctx, cmd);

  10.                break;

  11.            default:

  12.                break;

  13.        }

  14.    }

  15. }

可以看到根据当前是Client端(接收到的是回复)还是Server端(接收到的是请求),而进行不同处理。以Server端为例:

     
       
       
     
  1. public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

  2.    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

  3.    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

  4.    final int opaque = cmd.getOpaque();

  5.    if (pair != null) {

  6.        //将请求封装成可执行任务

  7.        Runnable run = new Runnable() {

  8.            @Override

  9.            public void run() {

  10.                try {

  11.                    //调用DefaultRequestProcessorprocessRequest方法执行业务逻辑

  12.                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

  13.                    //省略代码

  14.                } catch (Throwable e) {

  15.                    //省略代码

  16.                }

  17.            }

  18.        };

  19.        //省略代码

  20.        try {

  21.            //将上面的可执行任务再封装为RequestTask,放到线程池remotingExecutor中。

  22.            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);

  23.            pair.getObject2().submit(requestTask);

  24.        } catch (RejectedExecutionException e) {

  25.            //省略代码

  26.        }

  27.    }

  28. }

上面的代码中getObject2就是remotingExecutor(M2),也就是此时将后续逻辑交给了它处理。

4.remotingExecutor处理业务逻辑。根据上一步的分析,最终执行的其实就是DefaultRequestProcessor的processRequest方法。


3.namesrv中服务线程总结

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

4

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1.编程式设定

     
       
       
     
  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

  2. producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

  4. consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

     
       
       
     
  1. sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION

2.JVM/Java启动参数中指定

     
       
       
     
  1. -Drocketmq.namesrv.addr=name-server1-ip:port;name-server2-ip:port

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

3.环境变量中指定

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

前面2和3主要用于在本地通过源码启动RocketMQ时采用。

4.http方式获取

     
       
       
     
  1. http://jmenv.tbsite.net:8080/rocketmq/nsaddr

producer/consumer客户端启动时会从配置的namesrv列表中随机选择一个建立TCP连接。broker端则会和所有的namesrv建立长连接。 注意,以上4种方式的优先级是:

1.编程式设定 > 2.JVM/Java启动参数中指定 > 3.环境变量中指定 > 4.http方式获取

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

5

客户端/broker端如何保持与nameserver集群稳定连接

我们知道broker与所有nameserver都保持一个长连接,broker侧topic信息如有变化,则向所有nameserver都发送消息。而客户端(生产者和消费者)只是跟某一台nameserver节点保持联系。

假定一个场景,如果某个broker的topic配置发生了变化,它向所有nameserver发布通知,但是此时如果某一台nameserver推送失败(超时或者挂掉了),则nameserver集群之间的信息是不一致的,这种情况下RocketMQ如何保证数据的一致性?

再假定一个场景,如果客户端从namesrv拉取topic路由信息时,namesrv挂掉导致原先一直保持的长连接断开,客户端如何保证能获取到最新的路由信息?

让我们来看看RocektMQ是如何应对这些场景的。首先RocketMQ在进行任何RPC请求之前都会检查长连接的状态,如果channel的状态不OK,会尝试进行重连。

对以上2种场景分来来讲。第一个场景,broker与namesrv。如果namesrv没有真的挂掉,只是瞬间网络不稳定没有响应,经过重连后长连接恢复,则不会产生数据不一致。如果网络不稳定时间比较长,导致本次topic上报没有成功,在下一次上报(broker会定期上报给namesrv)之前长连接恢复,namesrv会接受到上次没接收到的信息。这期间会出现短暂的数据不一致。所以namesrv之间确实是弱一致性,这是namesrv提供高性能所牺牲的地方。即使namesrv真的挂掉,在恢复后broker与namesrv的连接也会自动重连。

第二个场景,客户端与namesrv。如果与该客户端一直保持长连接状态的namesrv挂掉。客户端会有failover(故障转移)机制。当检查到长连接无效后则会自动连接其他namesrv(轮询机制)。

6

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

为何弃zookeeper重新开发namesrv

rocketmq设计的早期阶段是很大程度参考借鉴了kafka的,比如早期的rocketmq也是依赖zookeeper来进行集群管理,提供服务发现和服务治理能力。但现在的版本去掉了zookeeper,重新设计开发了更加轻量的namesrv。rocketmq为什么要重新造轮子呢?要理解这一点,我们还要对zookeeper和kafka都有一定的认识。

1.Kafka分区原理

首先我们通过一张图来了解一下kafka。 

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

登场人物介绍:

  • broker:一台kafka服务器就是一个broker。和RocektMQ中的broker概念类似,不同的是RocektMQ中的一个broker可能对应于多个服务器(一个Master复数个Slave)。

  • zookeeper:对应RocketMQ中的namesrv概念,是服务发现和路由角色。除了保存broker/topic/partition等路由信息外,还负责为每个partition选举产生leader。

  • topic:标识一类消息的逻辑名字。

  • partition:分区。kafka引入这个概念是为了解决2个问题,性能和高可用。首先,如果某个topic的消息只存在一个broker中,这个broker就会成为并发瓶颈,无法水平扩展。自然而然就会想到将消息分布到整个集群,提高并发。由此引入分区的概念。这是一个物理概念,每个分区对应一个文件夹,文件夹中存储具体的消息。每个分区可以指定多个副本,其中一个为leader partition,其他为follower partition,这就是为了解决高可用问题了,当leader partition所在的broker宕机时,分布于其他broker上的follower partition同样拥有完整的消息信息。

  • producer:生产者,和RocektMQ中的生产者没有分别。

在上图中,topicA共有3个partition,每个partition有3个副本,分别分布在broker1~broker3中。对于partition1,生产者发消息时自然不是向3个副本都发送,而是向其中一个leader partition发送,然后leader partition采用同步或异步的方式将消息复制给其他的follower partition。那这个leader partition是如何产生的?又是在哪里维护的?生产者是从哪儿得知leader partition是在哪个broker上,从而准确的将消息发送给该broker?

这就是zookeeper所做的了。zookeeper会保存每个partition的信息,并且会选举产生一个leader partition。当生产者发送消息时,首先会根据策略计算出发往哪一个partition,然后根据从zookeeper获取的分区信息,找到该partition的leader partition所在的broker并将消息发往该broker。当某个有leader partition的broker宕机时,由zookeeper负责选出新的leader partition并通知给生产者。

2.RocketMQ主从架构

RocketMQ的架构在文章的开始已经叙述。broker一般采用Master-Slave方式。即在部署阶段已经明确主从节点,主从节点也不会发生切换。生产者只会和主节点通信。没有选举动作。

通过以上分析,zookeeper除了提供命名服务外还承担着选举leader的责任。不仅如此,zookeeper在处理数据的逻辑关系上比较繁琐。而RocektMQ不会用到zookeeper中的选举功能,与其使用笨重的zookeeper,短短1000多行代码的namesrv稳定性和性能肯定会更好。

7

总结

文章阐述了namesrv在集群中的作用以及和其他组件的关系,然后以namesrv的启动流程为引分析了关键代码,重点分析了namesrv中使用的1+n+M1+M2线程模型并总结了启动的线程服务,最后对为什么使用namesrv而非zookeeper进行了分析、阐述。文章中的观点如果有理解不到位、表述不清晰的地方,欢迎指出。


END

往期精选

1.

2. 

3. 

以上是关于[干货分享]Unity3D 深入解析Timeline编辑器的主要内容,如果未能解决你的问题,请参考以下文章

腾讯Bugly干货分享深入源码探索 ReactNative 通信机制

干货分享深入理解高可用之限流

干货分享深入理解高可用之限流

干货!OpenStack云主机网络异常丢包问题深入解析

干货分享通用 api 封装实战,带你深入理解 PO!建议收藏

深入理解JVM虚拟机4:Java class介绍与解析实践