Elasticsearch的底层模块深入解析之node

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch的底层模块深入解析之node相关的知识,希望对你有一定的参考价值。

参考技术A node module,主要是用来处理各种不同类型的节点的,es有哪些类型的node,另外就是对这些类型的node有些什么特殊的参数,对于一个较大的集群来说,如何去规划和配置各种各样的node

1、node类型

如果我们启动es的一个实例,那么就是启动了一个es node,一些es node就可以组成一个es集群。如果仅仅运行了一个es node,那么也有一个es集群,只是节点数量就是1。

集群中的每个node都可以处理http和transport请求,其中transport层是用来处理节点间的通信的,http层是用来处理外部的客户端rest请求的。

所有的node都知道集群中的其他node,并且可以将客户端的请求转发到适当的节点上去。

节点的类型包含以下几种:

(1)master-eligible node:master候选节点,将node.master设置为true(默认),代表这个node就是master的候选节点,可以被选举为master node,然后控制整个集群。

(2)data node:将node.data设置为true(默认),data node可以存储数据,同时处理这些数据相关的操作,比如CRUD操作,搜索操作,聚合操作,等等。

(3)ingest node:将node.ingest设置为true(默认),ingest node是用来对document写入索引文件之前进行预处理的。可以对每个document都执行一条ingest pipeline,在document写入索引文件之前,先对其数据进行处理和转化。但是如果要执行的ingest操作太过繁重,那么可以规划单独的一批ingest node出来,然后将node.master和node.data都设置为false即可。

(4)tribe node:tribe node可以通过tribe.*相关参数来设置,它是一种特殊的coordinate node,可以连接到多个es集群上去,然后对多个集群执行搜索等操作。

(5)默认情况下,每个node的node.master,node.data,node.ingest都是true,都是master候选节点,也可以作为data node存储和操作数据,同时也可以作为ingest node对数据进行预处理。对于小于20个节点的小集群来说,这种架构是ok的,没问题的。但是如果对于大于20个物理机的集群来说,最好是单独规划出master node、data node和ingest node来。

(6)coordinate node

搜索和bulk等请求可能会涉及到多个节点上的不同shard里的数据,比如一个search请求,就需要两个阶段执行,首先第一个阶段就是一个coordinating node接收到这个客户端的search request。接着,coordinating node会将这个请求转发给存储相关数据的node,每个data node都会在自己本地执行这个请求操作,同时返回结果给coordinating node,接着coordinating node会将返回过来的所有的请求结果进行缩减和合并,合并为一个global结果。

每个node都是一个coordinating node。这就意味着如果一个node,将node.master,node.data,node.ingest全部设置为false,那么它就是一个纯粹的coordinating node,仅仅用于接收客户端的请求,同时进行请求的转发和合并。

如果真的是大集群的话,最好也是单独规划一批node出来,就作为coordinating node,然后让es client全部往这些node上去发送请求。

如果真的是一个大于20个节点的生产集群的话,建议将4种node,master node,data node,ingest node,cooridating node,全部分离开来

集群中有30台机器

master node:3个

ingest node:视具体情况而定,具体是看你的ingest预处理操作有多么的复杂,耗费多少资源,但是一般情况下来说,es ingest node用的比较少的,ingest node也可以不用单独规划一批出来

coordinate node:视具体情况而定,但是对于大集群来说,最好是单独拆几个节点出来,用于接收客户端的请求,3个节点。主要是看你的并发访问量有多大,比如说你的最大的QPS也就是10,或者是100,那么3个节点肯定够了。如果你的QPS是1000,或者是10000,那么可能就要规划,10个coordinate node,或者100个

data node:24个data node,data node肯定是分配的是最多的,主要用来存储数据,执行各种对数据的操作么,资源耗费肯定是最多的

2、master eligible node

(1)master-eligible node的介绍以及配置

master node负责轻量级的集群管理工作,比如创建和删除索引,追踪集群中的每个node,决定如何将shards分配给各个node。对于集群来说,有一个稳定的master node,是非常关键的。然后master-eligible node都有机会被选举为一个master node,同时master node必须有权限访问path.data指定的data目录,因为master node需要在data目录中存储cluster state。

对数据进行index和search操作,会耗费大量的cpu,内存,磁盘io,以及网络io,耗费的是每个node的资源。因此我们必须要确保master node是非常稳定的,而且是压力不大的,对于大集群来说,比较好的办法是划分出单独的master node和data node。如果不拆开的话,一个node又要是data node,要复杂存储数据,处理各种操作,同时又要负责管理集群,可能就会不稳定,出问题。

同时因为默认情况下,master node也能扮演coordinating node的角色,并且将search和index请求路由到对应的data node上去执行,最好是不要让master node来执行这些coordinate操作。因为msater node的稳定运行对于整个集群来说非常重要,比你利用master node资源来执行一些coordinate操作要重要的多。

如果要设置一个node为专门的master-eligible node,需要做如下的设置:

node.master: true
node.data: false
node.ingest: false

(2)通过minimum_master_nodes来避免脑裂问题

要预防数据的丢失,我们就必须设置discovery.zen.minimum_master_nodes参数为一个合理的值,这样的话,每个master-eligible node才知道至少需要多少个master-eligible node才能组成一个集群。

比如说,我们现在有一个集群,其中包含两个master-eligible nodes。然后一个网络故障发生了,这两个节点之间丢失了联络。每个节点都认为当前只有一个master-eligible node,就是它们自己。此时如果discovery.zen.minimum_master_nodes参数的默认值是1,那么每个node就可以让自己组成一个集群,选举自己为master node即可。结果就会导致出现了两个es集群,这就是脑裂现象。即使网络故障解决了,但是这两个master node是不可能重新组成一个集群了。除非某个master eligible node重启,然后自动加入另外一个集群,但是此时写入这个节点的数据就会彻底丢失。

那么如果现在我们有3个master-eligible node,同时将discovery.zen.minimum_master_nodes设置为2.如果网络故障发生了,此时一个网络分区有1个node,另外一个网络分区有2个node,只有一个node的那个网络分区,没法检测到足够数量的master-eligible node,那么此时它就不能选举一个master node出来组成一个新集群。但是有两个node的那个网络分区,它们会发现这里有足够数量的master-eligible node,那么就选举出一个新的master,然后组成一个集群。当网络故障解除之后,那个落单的node就会重新加入集群中。

discovery.zen.minimum_master_nodes,必须设置为master-eligible nodes的quorum,quorum的公式为:(master_eligible_nodes / 2) + 1。

换句话来说,如果有3个master-eligible nodes,那么那个参数就必须设置为(3 / 2) + 1 = 2,比如下面这样:

discovery.zen.minimum_master_nodes: 2

随着集群节点的上线和下限,这个参数都是要重新设置的,可以通过api来设置

此时将master node和data node分离的好处就出来了,一般如果单独规划一个master nodes的话,只要规划固定的3个node是master-eligible node就可以了,那么data node无论上线和下限多少个,都无所谓的。

3、data node

data node负责存储shard的数据,也就是那些document。data node可以处理各种操作,比如CRUD,搜索,聚合。这些操作全都是很耗费IO,内存和cpu资源的。因此监控这些资源的使用是很重要的,同时如果资源过载了,那么就要添加更多的data node。

如果要设置一个专门的data node,需要做出如下的设置:

node.master: false
node.data: true
node.ingest: false

4、ingest node

nigest node可以执行预处理pipeline,包含了多个ingest processors。不同的ingest processor执行的操作类型是不同的,那么对资源的需求也是不同的,不过还是最好是规划一批单独的ingest node出来,不要跟master node和data node混合在一起。

如果要配置一个单独的ingest node:

node.master: false
node.data: false
node.ingest: true
search.remote.connect: false

5、cooridnating only node

如果我们规划了一批专门的master node,data node以及ingest node,那么此时还遗留下来了一种node,那就是coordinating node,这些node专门用来接收客户端的请求,同时对请求进行路由和转发,并对请求的结果进行合并。

coordinating only nodes对于大集群来说,可以使用专门的node来负载coordinate操作,而不是让coordinate操作的工作负载集中到master node和data node上去。coordinating node也会加入cluster,同时可以获取到完整的cluster state,它们主要是用cluster state中包含的node info来进行请求转发。

如果在一个集群中规划太多的coordinating node可能会加重整个集群的负担,因为被选举出来的master node必须要从所有的node上得到cluster state update的ack,如果coordinating nodes过多,那么可能会加重master node的负担。

如果要设置coordinating only node的话:

node.master: false
node.data: false
node.ingest: false
search.remote.connect: false

6、node data path设置

(1)path.data

每个data和master-eligible node都需要能够访问data目录,在那里存储了每个shard的数据,包括cluster state也存储在那里。path.data默认是指向$ES_HOME/data目录的,但是在生产环境中,肯定是不能这样设置的,因为在升级es的时候,可能会导致数据被清空或者覆盖。

此时一般需要在elasticsearch.yml中设置path.data:

path.data: /var/elasticsearch/data

(2)node.max_local_storage_nodes

data目录可以被多个node共享,即使是不同集群中的es node,也许他们在一个物理机上启动了。这个共享的方式对于我们测试failover是很有用的,以及在开发机上测试不同的配置。但是在生产环境下,绝对不用这么做,一个data目录就给一个es node使用即可。默认情况下,es被配置成阻止超过一个node共享data目录中的数据,如果要允许多个node共享一个data目录,需要设置node.max_local_storage_nodes为一个超过1的数字。

RxSwift之深入解析核心逻辑Observable的底层原理

一、前言

  • 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示:
	// 创建序列
	let ob = Observable<Any>.create { (observer) -> Disposable in
		// 发送信号
		observer.onNext("发送信号")
		observer.onError(NSError.init(domain: "EpisodeError", code: 10086, userInfo: nil))
		return Disposables.create()
	}
	// 订阅信号
	let _ = ob.subscribe(onNext: { (text) in
		print("订阅到:\\(text)")
	}, onError: { (error) in
		print("error:\\(error)")
	}, onCompleted: {
		print("完成")
	}) {
		print("销毁")
	}.disposed(by: disposeBag)
  • 代码分析:
    • 通过 Observable 的 create 创建序列,在 create 闭包内调用 onNext 方法实现信号发送;
    • 调用 subscribe 方法订阅序列,并实现 subscribe 的参数闭包 onNext,在闭包内监听信号;
    • 最后通过 disposed 对序列打包等待销毁。
  • 运行程序,结果如下:
	订阅到:发送信号
	完成
	销毁
  • 那么,发送的信号是如何被订阅到的呢?按正常逻辑,订阅后才能收到信息,我们可以猜测,在成为订阅者并布置好监听后,订阅者向序列发送了一条消息,通知可观察序列可以发送信号。大致流程如下所示:

二、核心逻辑分析

① 创建序列
  • 创建序列的源码如下所示:
	extension ObservableType {
	    // MARK: create
	    /**
	     Creates an observable sequence from a specified subscribe method implementation.
	     - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
	     - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
	     - returns: The observable sequence with the specified implementation for the `subscribe` method.
	     */
	    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
	        return AnonymousObservable(subscribe)
	    }
	}
  • 该方法是对 ObservableType 协议的扩展,最外层实现的闭包 subscribe 则作为参数传入 AnonymousObservable,并返回 AnonymousObservable 对象。可观察序列的创建是利用协议拓展功能的 create 方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 。
  • 继续执行追踪到 AnonymousObservable 类,如下:
	final private class AnonymousObservable<Element>: Producer<Element> {
	    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
	
	    let _subscribeHandler: SubscribeHandler
	
	    init(_ subscribeHandler: @escaping SubscribeHandler) {
	        self._subscribeHandler = subscribeHandler
	    }
	
	    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
	        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
	        let subscription = sink.run(self)
	        return (sink: sink, subscription: subscription)
	    }
	}
  • AnonymousObservable 的继承链如下:
	AnonymousObservable -> Product -> ObservableType -> ObservableConvertible
  • 至此为止,序列的创建完成:
    • create 方法的时候创建了一个内部对象 AnonymousObservable;
    • AnonymousObservable 保存了外界的闭包;
    • AnonymousObservable 继承了 Producer 具有非常重要的方法 subscribe。
  • AnonymousObservable 的继承关系如下所示:

② 订阅序列
  • 根据方法名找到 subscribe 的实现(订阅方法 subscribe 和上面所说的 subscribe 不是同一个方法),该方法是对 ObservableType 的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver 类型的闭包 observer,源码如下:
	extension ObservableType {
	    public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
	           // 此处省略不影响探索的代码
	           ......
	            let observer = AnonymousObserver<E> { event in                
	                switch event {
	                case .next(let value):
	                    onNext?(value)
	                case .error(let error):
	                    if let onError = onError {
	                        onError(error)
	                    }
	                    else {
	                        Hooks.defaultErrorHandler(callStack, error)
	                    }
	                    disposable.dispose()
	                case .completed:
	                    onCompleted?()
	                    disposable.dispose()
	                }
	            }
	            return Disposables.create(
	                self.asObservable().subscribe(observer),
	                disposable
	            )
	    }
	}
  • 说明:
    • E 是 Swift 的关联类型,仔细观察可观察序列的继承链源码,应该不难得出:E 就是序列类型,这里就是 String:
	public class Observable<Element> : ObservableType {
	    /// Type of elements in sequence.
	    public typealias E = Element
    • observer 内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,event 是 observer 的参数,不难看出,observer 闭包也是在其他地方调用,传入带有信号值的 event 参数。
    • observer 的继承链关系如下:

  • observer 被当做参数传入到 subscribe 中,而 observer 的调用必然是在 subscribe 中实现:
	self.asObservable().subscribe(observer)
  • self.asObservable() 该方法返回本身,保证协议的一致性,方法如下:
	public class Observable<Element> : ObservableType {
	    // 省去代码若干
	    ......
	    public func asObservable() -> Observable<E> {
	        return self
	    }
	}
  • self.asObservable().subscribe(observer) 其实本质就是 self.subscribe(observer),继续断点执行找到 subscribe 方法,通过可观察序列的继承关系,可以非常快速的定位 Producer 订阅代码,正是上面所提到的 Producer 中的方法,方法如下:
	override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
	    // 省去代码若干
	    ......
	    return CurrentThreadScheduler.instance.schedule(()) { _ in
	            let disposer = SinkDisposer()
	            let sinkAndSubscription = self.run(observer, cancel: disposer)
	            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
	
	            return disposer
	        }
	}
  • 接着,observer 观察者被传入到 run 中,上面说到该观察者一定会被调用,继续深入:
	let sinkAndSubscription = self.run(observer, cancel: disposer)
  • 继续断点执行,发现 self.run 的调用,调用的是 AnonymousObservable 中的 run 方法,代码如下:
	final private class AnonymousObservable<Element>: Producer<Element> {
	    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
	
	    let _subscribeHandler: SubscribeHandler
	
	    init(_ subscribeHandler: @escaping SubscribeHandler) {
	        self._subscribeHandler = subscribeHandler
	    }
	
	    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
	        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
	        let subscription = sink.run(self)
	        return (sink: sink, subscription: subscription)
	    }
	}
  • 此处就是创建序列时的 AnonymousObservable 类,在 run 方法类创建了 sink 对象,在初始化时传入了上面所说的观察者,记住 sink 保存了观察者 observer 闭包,并且调用了 sink.run(self) 方法,传入的是创建时产生的可观察序列 observable 闭包对象,深入 run:
	final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
	    typealias E = O.E
	    typealias Parent = AnonymousObservable<E>
	    // 省去代码若干
	    // 此处向父类Sink初始化了observer对象
	    override init(observer: O, cancel: Cancelable) {
	        super.init(observer: observer, cancel: cancel)
	    }
	    func run(_ parent: Parent) -> Disposable {
	        return parent._subscribeHandler(AnyObserver(self))
	    }
	}
  • 此处 parent 由 let subscription = sink.run(self) 传入,self 即为创建序列 create 方法返回的 observable 对象,而 _subscribeHandler 是创建序列所保存的闭包,此时闭包就被调用了,被调用闭包如下:
    let obs = Observable<Any>.create { (observer) -> Disposable in
    // 发送消息
    observer.onNext("我是一条消息")
    return Disposables.create()
	}
  • 发送信号的闭包被调用,接下来就是信号发送。订阅的内部流程如下:

③ 发送信号
  • 继续探索,代码已经执行到业务层,在信号发送闭包中通常调用一下三种方法,用来发送信号,如下:
    • observer.onNext(“发送消息”) 信号发送;
    • observer.onCompleted() 序列完成,完成后序列将被释放;
    • observer.onError(error) 序列出错中断,序列不可继续使用,被释放。
  • 以上三个方法为 ObserverType 的拓展方法(E 表示一个泛型信号量,可表示任意类型的信号):
	extension ObserverType {
	    
	    /// Convenience method equivalent to `on(.next(element: E))`
	    ///
	    /// - parameter element: Next element to send to observer(s)
	    public func onNext(_ element: E) {
	        self.on(.next(element))
	    }
	    
	    /// Convenience method equivalent to `on(.completed)`
	    public func onCompleted() {
	        self.on(.completed)
	    }
	    
	    /// Convenience method equivalent to `on(.error(Swift.Error))`
	    /// - parameter error: Swift.Error to send to observer(s)
	    public func onError(_ error: Swift.Error) {
	        self.on(.error(error))
	    }
	}
  • .next(element) 是一个带泛型参数的枚举,管理了三种类型事件的消息传递,如下:
	public enum Event<Element> {
	    /// Next element is produced.
	    case next(Element)
	
	    /// Sequence terminated with an error.
	    case error(Swift.Error)
	
	    /// Sequence completed successfully.
	    case completed
	}
  • on 是 AnonymousObservableSink 中的方法,代码如下:
	final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
	    typealias E = O.E
	    typealias Parent = AnonymousObservable<E>
	    // 代码省略若干行
	    func on(_ event: Event<E>) {
	        #if DEBUG
	            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
	            defer { self._synchronizationTracker.unregister() }
	        #endif
	        switch event {
	        case .next:
	            if load(&self._isStopped) == 1 {
	                return
	            }
	            self.forwardOn(event)
	        case .error, .completed:
	            if fetchOr(&self._isStopped, 1) == 0 {
	                self.forwardOn(event)
	                self.dispose()
	            }
	        }
	    }
	}
  • 内部根据 Event 枚举不同成员变量做不同的信号发送,信号发送调用了 forwardOn 方法。方法实现如下:
	class Sink<O : ObserverType> : Disposable {
	    init(observer: O, cancel: Cancelable) {
	        self._observer = observer
	        self._cancel = cancel
	    }
	
	    final func forwardOn(_ event: Event<O.E>) {
	        if isFlagSet(&self._disposed, 1) {
	            return
	        }
	        self._observer.on(event)
	    }
	}
  • 代码有些长只保留了核心部分,Sink 即 AnonymousObservableSink 的父类,_observer 即是订阅中在内部产生的 AnonymousObserver 对象,而该对象调用了 on 方法并传递了信号,on 方法所在位置如下:AnonymousObserver -> ObserverBase -> on()
	class ObserverBase<ElementType> : Disposable, ObserverType {
	    typealias E = ElementType
	    func on(_ event: Event<E>) {
	        switch event {
	        case .next:
	            if load(&self._isStopped) == 0 {
	                self.onCore(event)
	            }
	        case .error, .completed:
	            if fetchOr(&self._isStopped, 1) == 0 {
	                self.onCore(event)
	            }
	        }
	    }
	}
  • 在方法内部又掉用了 self.onCore(event),此时该方法在 AnonymousObserver 中实现,代码如下:
	final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
	    typealias Element = ElementType
	    typealias EventHandler = (Event<Element>) -> Void
	    private let _eventHandler : EventHandler
	    init(_ eventHandler: @escaping EventHandler) {
	        self._eventHandler = eventHandler
	    }
	
	    override func onCore(_ event: Event<Element>) {
	        return self._eventHandler(event)
	    }
	}
  • 说明:
    • 此处通过 _eventHandler 来发送信号,_eventHandler 是从哪来的呢?逆推 onCore 调用者是 observer,而 observer 是订阅时在内部创建的,被一层层传入到此;
    • 而在 observer 初始化时即被保存为 _eventHandler,_eventHandler 调用即调用了订阅时创建的 observer 闭包,进而信号又通过闭包内的闭包传出到业务层。
	// 订阅序列
	obs.subscribe(onNext: { (val) in
	    print("onNext:\\(val)")
	}).disposed(by: disposeBag)
  • 发送信息的内部流程如下:

  • 响应式编程的创建、订阅、发送、接收等流程就已完成,整个流程会觉得很复杂,但它统一了所有事件的创建与监听,统一思想快速开发,今后的开发流程就是:
	创建序列 -> 订阅序列 -> 发送序列 -> 响应序列
  • sink 在 Rx 中充当管理者,管理序列,观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。

三、总结

  • 分析思维导图:

  • 核心逻辑流程图:

以上是关于Elasticsearch的底层模块深入解析之node的主要内容,如果未能解决你的问题,请参考以下文章

iOS之深入解析分类Category的底层原理

iOS之深入解析通知NSNotification的底层原理

iOS之深入解析渲染的底层原理

RxSwift之深入解析核心逻辑Observable的底层原理

iOS之深入解析YYModel的底层原理

iOS之深入解析malloc的底层原理