Elasticsearch的底层模块深入解析之node
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch的底层模块深入解析之node相关的知识,希望对你有一定的参考价值。
参考技术A node module,主要是用来处理各种不同类型的节点的,es有哪些类型的node,另外就是对这些类型的node有些什么特殊的参数,对于一个较大的集群来说,如何去规划和配置各种各样的node1、node类型
如果我们启动es的一个实例,那么就是启动了一个es node,一些es node就可以组成一个es集群。如果仅仅运行了一个es node,那么也有一个es集群,只是节点数量就是1。
集群中的每个node都可以处理http和transport请求,其中transport层是用来处理节点间的通信的,http层是用来处理外部的客户端rest请求的。
所有的node都知道集群中的其他node,并且可以将客户端的请求转发到适当的节点上去。
节点的类型包含以下几种:
(1)master-eligible node:master候选节点,将node.master设置为true(默认),代表这个node就是master的候选节点,可以被选举为master node,然后控制整个集群。
(2)data node:将node.data设置为true(默认),data node可以存储数据,同时处理这些数据相关的操作,比如CRUD操作,搜索操作,聚合操作,等等。
(3)ingest node:将node.ingest设置为true(默认),ingest node是用来对document写入索引文件之前进行预处理的。可以对每个document都执行一条ingest pipeline,在document写入索引文件之前,先对其数据进行处理和转化。但是如果要执行的ingest操作太过繁重,那么可以规划单独的一批ingest node出来,然后将node.master和node.data都设置为false即可。
(4)tribe node:tribe node可以通过tribe.*相关参数来设置,它是一种特殊的coordinate node,可以连接到多个es集群上去,然后对多个集群执行搜索等操作。
(5)默认情况下,每个node的node.master,node.data,node.ingest都是true,都是master候选节点,也可以作为data node存储和操作数据,同时也可以作为ingest node对数据进行预处理。对于小于20个节点的小集群来说,这种架构是ok的,没问题的。但是如果对于大于20个物理机的集群来说,最好是单独规划出master node、data node和ingest node来。
(6)coordinate node
搜索和bulk等请求可能会涉及到多个节点上的不同shard里的数据,比如一个search请求,就需要两个阶段执行,首先第一个阶段就是一个coordinating node接收到这个客户端的search request。接着,coordinating node会将这个请求转发给存储相关数据的node,每个data node都会在自己本地执行这个请求操作,同时返回结果给coordinating node,接着coordinating node会将返回过来的所有的请求结果进行缩减和合并,合并为一个global结果。
每个node都是一个coordinating node。这就意味着如果一个node,将node.master,node.data,node.ingest全部设置为false,那么它就是一个纯粹的coordinating node,仅仅用于接收客户端的请求,同时进行请求的转发和合并。
如果真的是大集群的话,最好也是单独规划一批node出来,就作为coordinating node,然后让es client全部往这些node上去发送请求。
如果真的是一个大于20个节点的生产集群的话,建议将4种node,master node,data node,ingest node,cooridating node,全部分离开来
集群中有30台机器
master node:3个
ingest node:视具体情况而定,具体是看你的ingest预处理操作有多么的复杂,耗费多少资源,但是一般情况下来说,es ingest node用的比较少的,ingest node也可以不用单独规划一批出来
coordinate node:视具体情况而定,但是对于大集群来说,最好是单独拆几个节点出来,用于接收客户端的请求,3个节点。主要是看你的并发访问量有多大,比如说你的最大的QPS也就是10,或者是100,那么3个节点肯定够了。如果你的QPS是1000,或者是10000,那么可能就要规划,10个coordinate node,或者100个
data node:24个data node,data node肯定是分配的是最多的,主要用来存储数据,执行各种对数据的操作么,资源耗费肯定是最多的
2、master eligible node
(1)master-eligible node的介绍以及配置
master node负责轻量级的集群管理工作,比如创建和删除索引,追踪集群中的每个node,决定如何将shards分配给各个node。对于集群来说,有一个稳定的master node,是非常关键的。然后master-eligible node都有机会被选举为一个master node,同时master node必须有权限访问path.data指定的data目录,因为master node需要在data目录中存储cluster state。
对数据进行index和search操作,会耗费大量的cpu,内存,磁盘io,以及网络io,耗费的是每个node的资源。因此我们必须要确保master node是非常稳定的,而且是压力不大的,对于大集群来说,比较好的办法是划分出单独的master node和data node。如果不拆开的话,一个node又要是data node,要复杂存储数据,处理各种操作,同时又要负责管理集群,可能就会不稳定,出问题。
同时因为默认情况下,master node也能扮演coordinating node的角色,并且将search和index请求路由到对应的data node上去执行,最好是不要让master node来执行这些coordinate操作。因为msater node的稳定运行对于整个集群来说非常重要,比你利用master node资源来执行一些coordinate操作要重要的多。
如果要设置一个node为专门的master-eligible node,需要做如下的设置:
node.master: true
node.data: false
node.ingest: false
(2)通过minimum_master_nodes来避免脑裂问题
要预防数据的丢失,我们就必须设置discovery.zen.minimum_master_nodes参数为一个合理的值,这样的话,每个master-eligible node才知道至少需要多少个master-eligible node才能组成一个集群。
比如说,我们现在有一个集群,其中包含两个master-eligible nodes。然后一个网络故障发生了,这两个节点之间丢失了联络。每个节点都认为当前只有一个master-eligible node,就是它们自己。此时如果discovery.zen.minimum_master_nodes参数的默认值是1,那么每个node就可以让自己组成一个集群,选举自己为master node即可。结果就会导致出现了两个es集群,这就是脑裂现象。即使网络故障解决了,但是这两个master node是不可能重新组成一个集群了。除非某个master eligible node重启,然后自动加入另外一个集群,但是此时写入这个节点的数据就会彻底丢失。
那么如果现在我们有3个master-eligible node,同时将discovery.zen.minimum_master_nodes设置为2.如果网络故障发生了,此时一个网络分区有1个node,另外一个网络分区有2个node,只有一个node的那个网络分区,没法检测到足够数量的master-eligible node,那么此时它就不能选举一个master node出来组成一个新集群。但是有两个node的那个网络分区,它们会发现这里有足够数量的master-eligible node,那么就选举出一个新的master,然后组成一个集群。当网络故障解除之后,那个落单的node就会重新加入集群中。
discovery.zen.minimum_master_nodes,必须设置为master-eligible nodes的quorum,quorum的公式为:(master_eligible_nodes / 2) + 1。
换句话来说,如果有3个master-eligible nodes,那么那个参数就必须设置为(3 / 2) + 1 = 2,比如下面这样:
discovery.zen.minimum_master_nodes: 2
随着集群节点的上线和下限,这个参数都是要重新设置的,可以通过api来设置
此时将master node和data node分离的好处就出来了,一般如果单独规划一个master nodes的话,只要规划固定的3个node是master-eligible node就可以了,那么data node无论上线和下限多少个,都无所谓的。
3、data node
data node负责存储shard的数据,也就是那些document。data node可以处理各种操作,比如CRUD,搜索,聚合。这些操作全都是很耗费IO,内存和cpu资源的。因此监控这些资源的使用是很重要的,同时如果资源过载了,那么就要添加更多的data node。
如果要设置一个专门的data node,需要做出如下的设置:
node.master: false
node.data: true
node.ingest: false
4、ingest node
nigest node可以执行预处理pipeline,包含了多个ingest processors。不同的ingest processor执行的操作类型是不同的,那么对资源的需求也是不同的,不过还是最好是规划一批单独的ingest node出来,不要跟master node和data node混合在一起。
如果要配置一个单独的ingest node:
node.master: false
node.data: false
node.ingest: true
search.remote.connect: false
5、cooridnating only node
如果我们规划了一批专门的master node,data node以及ingest node,那么此时还遗留下来了一种node,那就是coordinating node,这些node专门用来接收客户端的请求,同时对请求进行路由和转发,并对请求的结果进行合并。
coordinating only nodes对于大集群来说,可以使用专门的node来负载coordinate操作,而不是让coordinate操作的工作负载集中到master node和data node上去。coordinating node也会加入cluster,同时可以获取到完整的cluster state,它们主要是用cluster state中包含的node info来进行请求转发。
如果在一个集群中规划太多的coordinating node可能会加重整个集群的负担,因为被选举出来的master node必须要从所有的node上得到cluster state update的ack,如果coordinating nodes过多,那么可能会加重master node的负担。
如果要设置coordinating only node的话:
node.master: false
node.data: false
node.ingest: false
search.remote.connect: false
6、node data path设置
(1)path.data
每个data和master-eligible node都需要能够访问data目录,在那里存储了每个shard的数据,包括cluster state也存储在那里。path.data默认是指向$ES_HOME/data目录的,但是在生产环境中,肯定是不能这样设置的,因为在升级es的时候,可能会导致数据被清空或者覆盖。
此时一般需要在elasticsearch.yml中设置path.data:
path.data: /var/elasticsearch/data
(2)node.max_local_storage_nodes
data目录可以被多个node共享,即使是不同集群中的es node,也许他们在一个物理机上启动了。这个共享的方式对于我们测试failover是很有用的,以及在开发机上测试不同的配置。但是在生产环境下,绝对不用这么做,一个data目录就给一个es node使用即可。默认情况下,es被配置成阻止超过一个node共享data目录中的数据,如果要允许多个node共享一个data目录,需要设置node.max_local_storage_nodes为一个超过1的数字。
RxSwift之深入解析核心逻辑Observable的底层原理
一、前言
- 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示:
// 创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
// 发送信号
observer.onNext("发送信号")
observer.onError(NSError.init(domain: "EpisodeError", code: 10086, userInfo: nil))
return Disposables.create()
}
// 订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\\(text)")
}, onError: { (error) in
print("error:\\(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}.disposed(by: disposeBag)
- 代码分析:
-
- 通过 Observable 的 create 创建序列,在 create 闭包内调用 onNext 方法实现信号发送;
-
- 调用 subscribe 方法订阅序列,并实现 subscribe 的参数闭包 onNext,在闭包内监听信号;
-
- 最后通过 disposed 对序列打包等待销毁。
- 运行程序,结果如下:
订阅到:发送信号
完成
销毁
- 那么,发送的信号是如何被订阅到的呢?按正常逻辑,订阅后才能收到信息,我们可以猜测,在成为订阅者并布置好监听后,订阅者向序列发送了一条消息,通知可观察序列可以发送信号。大致流程如下所示:
二、核心逻辑分析
① 创建序列
- 创建序列的源码如下所示:
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
- 该方法是对 ObservableType 协议的扩展,最外层实现的闭包 subscribe 则作为参数传入 AnonymousObservable,并返回 AnonymousObservable 对象。可观察序列的创建是利用协议拓展功能的 create 方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 。
- 继续执行追踪到 AnonymousObservable 类,如下:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- AnonymousObservable 的继承链如下:
AnonymousObservable -> Product -> ObservableType -> ObservableConvertible
- 至此为止,序列的创建完成:
-
- create 方法的时候创建了一个内部对象 AnonymousObservable;
-
- AnonymousObservable 保存了外界的闭包;
-
- AnonymousObservable 继承了 Producer 具有非常重要的方法 subscribe。
- AnonymousObservable 的继承关系如下所示:
② 订阅序列
- 根据方法名找到 subscribe 的实现(订阅方法 subscribe 和上面所说的 subscribe 不是同一个方法),该方法是对 ObservableType 的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver 类型的闭包 observer,源码如下:
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
// 此处省略不影响探索的代码
......
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
- 说明:
-
- E 是 Swift 的关联类型,仔细观察可观察序列的继承链源码,应该不难得出:E 就是序列类型,这里就是 String:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
-
- observer 内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,event 是 observer 的参数,不难看出,observer 闭包也是在其他地方调用,传入带有信号值的 event 参数。
-
- observer 的继承链关系如下:
- observer 被当做参数传入到 subscribe 中,而 observer 的调用必然是在 subscribe 中实现:
self.asObservable().subscribe(observer)
- self.asObservable() 该方法返回本身,保证协议的一致性,方法如下:
public class Observable<Element> : ObservableType {
// 省去代码若干
......
public func asObservable() -> Observable<E> {
return self
}
}
- self.asObservable().subscribe(observer) 其实本质就是 self.subscribe(observer),继续断点执行找到 subscribe 方法,通过可观察序列的继承关系,可以非常快速的定位 Producer 订阅代码,正是上面所提到的 Producer 中的方法,方法如下:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
// 省去代码若干
......
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
- 接着,observer 观察者被传入到 run 中,上面说到该观察者一定会被调用,继续深入:
let sinkAndSubscription = self.run(observer, cancel: disposer)
- 继续断点执行,发现 self.run 的调用,调用的是 AnonymousObservable 中的 run 方法,代码如下:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- 此处就是创建序列时的 AnonymousObservable 类,在 run 方法类创建了 sink 对象,在初始化时传入了上面所说的观察者,记住 sink 保存了观察者 observer 闭包,并且调用了 sink.run(self) 方法,传入的是创建时产生的可观察序列 observable 闭包对象,深入 run:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 省去代码若干
// 此处向父类Sink初始化了observer对象
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
- 此处 parent 由 let subscription = sink.run(self) 传入,self 即为创建序列 create 方法返回的 observable 对象,而 _subscribeHandler 是创建序列所保存的闭包,此时闭包就被调用了,被调用闭包如下:
let obs = Observable<Any>.create { (observer) -> Disposable in
// 发送消息
observer.onNext("我是一条消息")
return Disposables.create()
}
- 发送信号的闭包被调用,接下来就是信号发送。订阅的内部流程如下:
③ 发送信号
- 继续探索,代码已经执行到业务层,在信号发送闭包中通常调用一下三种方法,用来发送信号,如下:
-
- observer.onNext(“发送消息”) 信号发送;
-
- observer.onCompleted() 序列完成,完成后序列将被释放;
-
- observer.onError(error) 序列出错中断,序列不可继续使用,被释放。
- 以上三个方法为 ObserverType 的拓展方法(E 表示一个泛型信号量,可表示任意类型的信号):
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
- .next(element) 是一个带泛型参数的枚举,管理了三种类型事件的消息传递,如下:
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
- on 是 AnonymousObservableSink 中的方法,代码如下:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 代码省略若干行
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(&self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
- 内部根据 Event 枚举不同成员变量做不同的信号发送,信号发送调用了 forwardOn 方法。方法实现如下:
class Sink<O : ObserverType> : Disposable {
init(observer: O, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(&self._disposed, 1) {
return
}
self._observer.on(event)
}
}
- 代码有些长只保留了核心部分,Sink 即 AnonymousObservableSink 的父类,_observer 即是订阅中在内部产生的 AnonymousObserver 对象,而该对象调用了 on 方法并传递了信号,on 方法所在位置如下:AnonymousObserver -> ObserverBase -> on()
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
func on(_ event: Event<E>) {
switch event {
case .next:
if load(&self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
- 在方法内部又掉用了 self.onCore(event),此时该方法在 AnonymousObserver 中实现,代码如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
- 说明:
-
- 此处通过 _eventHandler 来发送信号,_eventHandler 是从哪来的呢?逆推 onCore 调用者是 observer,而 observer 是订阅时在内部创建的,被一层层传入到此;
-
- 而在 observer 初始化时即被保存为 _eventHandler,_eventHandler 调用即调用了订阅时创建的 observer 闭包,进而信号又通过闭包内的闭包传出到业务层。
// 订阅序列
obs.subscribe(onNext: { (val) in
print("onNext:\\(val)")
}).disposed(by: disposeBag)
- 发送信息的内部流程如下:
- 响应式编程的创建、订阅、发送、接收等流程就已完成,整个流程会觉得很复杂,但它统一了所有事件的创建与监听,统一思想快速开发,今后的开发流程就是:
创建序列 -> 订阅序列 -> 发送序列 -> 响应序列
- sink 在 Rx 中充当管理者,管理序列,观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。
三、总结
- 分析思维导图:
- 核心逻辑流程图:
以上是关于Elasticsearch的底层模块深入解析之node的主要内容,如果未能解决你的问题,请参考以下文章