Eurake源码分析(十一) 增量获取

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Eurake源码分析(十一) 增量获取相关的知识,希望对你有一定的参考价值。

参考技术A 下面我们来说一下eureka的增量获取。
Applications.appsHashCode ,应用集合一致性哈希码。

增量获取注册的应用集合( Applications ) 时,Eureka-Client 会获取到:

Eureka-Server 近期变化( 注册、下线 )的应用集合
Eureka-Server 应用集合一致性哈希码
Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,Eureka-Client 重新和 Eureka-Server 全量获取应用集合。
计算公式
appsHashCode = status+count

使用每个应用实例状态( status ) + 数量( count )拼接出一致性哈希码。若数量为 0 ,该应用实例状态不进行拼接。状态以字符串大小排序。

举个例子,8 个 UP ,0 个 DOWN ,则 appsHashCode = UP_8_ 。8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP_8_ 。
看下Applications的getReconcileHashCode方法

调用 populateInstanceCountMap方法,计算每个应用实例状态的数量,看下具体的实现

调用 getReconcileHashCode方法,计算 hashcode,看下具体的实现

调用 DiscoveryClient的getAndUpdateDelta方法,增量获取注册信息,并刷新本地缓存,看下具体的实现

调用 updateDelta方法,将变化的应用集合和本地缓存的应用集合进行合并,看下具体的实现

ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收增量获取请求,映射 ApplicationsResource#getContainers() 方法。
AbstractInstanceRegistry.recentlyChangedQueue,最近租约变更记录队列。看下具体的实现

当应用实例注册、下线、状态变更时,创建最近租约变更记录( RecentlyChangedItem ) 到队列。

后台任务定时顺序扫描队列,当 lastUpdateTime 超过一定时长后进行移除。

配置 eureka.deltaRetentionTimerIntervalInMs, 移除队列里过期的租约变更记录的定时任务执行频率,单位:毫秒。默认值 :30 * 1000 毫秒。
配置 eureka.retentionTimeInMSInDeltaQueue,租约变更记录过期时长,单位:毫秒。默认值 : 3 * 60 * 1000 毫秒。
在 generatePayload方法里,调用 AbstractInstanceRegistry的getApplicationDeltas方法,获取近期变化的应用集合,看下具体的实现

eureka的增量获取过程就完成了。

介绍开源的.net通信框架NetworkComms框架 源码分析(二十一 )TCPConnectionListener

原文网址: http://www.cnblogs.com/csdev

Networkcomms 是一款C# 语言编写的TCP/UDP通信框架  作者是英国人  以前是收费的 目前作者已经开源  许可是:Apache License v2

开源地址是:https://github.com/MarcFletcher/NetworkComms.Net

TCP连接监听器

 /// <summary>
    /// A TCP connection listener
    /// TCP连接监听器
    /// </summary>
    public class TCPConnectionListener : ConnectionListenerBase
    {
#if WINDOWS_PHONE || NETFX_CORE
        /// <summary>
        /// The equivalent TCPListener class in windows phone
        /// WP系统中与TCPListener类相对应的类
        /// </summary>
        StreamSocketListener listenerInstance;
#else
        /// <summary>
        /// The .net TCPListener class.
        /// .net中TCPListener类
        /// </summary>
        TcpListener listenerInstance;

        /// <summary>
        /// SSL options that are associated with this listener
        /// 监听器相对应的SSL选项
        /// </summary>
        public SSLOptions SSLOptions { get; private set; }
#endif

        /// <summary>
        /// Create a new instance of a TCP listener
        /// 创建一个新的TCP监听器实例
        /// </summary>
        /// <param name="sendReceiveOptions">The SendReceiveOptions to use with incoming data on this listener  此监听器上接收进入的数据所使用的收发参数 </param>
        /// <param name="applicationLayerProtocol">If enabled NetworkComms.Net uses a custom 
        /// application layer protocol to provide useful features such as inline serialisation, 
        /// transparent packet transmission, remote peer handshake and information etc. We strongly 
        /// recommend you enable the NetworkComms.Net application layer protocol.</param>
        /// applicationLayerProtocol 应用层协议状态  我们强烈建议您启用此项,使得networkcomms能为您提供内部序列化,透明数据包传送,远程端点握手等功能。
        /// 只有与其他语言进行通信时,此项才设置为禁用
        /// <param name="allowDiscoverable">Determines if the newly created <see cref="ConnectionListenerBase"/> will be discoverable if <see cref="Tools.PeerDiscovery"/> is enabled.</param>
        /// allowDiscoverable  是否允许被发现  此项与networkcomms中一项端点自动扫描功能有关
        public TCPConnectionListener(SendReceiveOptions sendReceiveOptions,
            ApplicationLayerProtocolStatus applicationLayerProtocol, bool allowDiscoverable = false)
            :base(ConnectionType.TCP, sendReceiveOptions, applicationLayerProtocol, allowDiscoverable)
        {
#if !WINDOWS_PHONE && !NETFX_CORE
            SSLOptions = new SSLOptions();
#endif
        }

#if !WINDOWS_PHONE && !NETFX_CORE
        /// <summary>
        /// Create a new instance of a TCP listener
        /// 创建一个新的TCP监听器实例
        /// </summary>
        /// <param name="sendReceiveOptions">The SendReceiveOptions to use with incoming data on this listener  此监听器上接收进入的数据所使用的收发参数 </param>
        /// <param name="applicationLayerProtocol">If enabled NetworkComms.Net uses a custom 
        /// application layer protocol to provide useful features such as inline serialisation, 
        /// transparent packet transmission, remote peer handshake and information etc. We strongly 
        /// recommend you enable the NetworkComms.Net application layer protocol.</param>
        /// applicationLayerProtocol 应用层协议状态  我们强烈建议您启用此项,使得networkcomms能为您提供内部序列化,透明数据包传送,远程端点握手等功能。
        /// 只有与其他语言进行通信时,此项才设置为禁用
        /// <param name="allowDiscoverable">Determines if the newly created <see cref="ConnectionListenerBase"/> will be discoverable if <see cref="Tools.PeerDiscovery"/> is enabled.</param>
        /// allowDiscoverable  是否允许被发现  此项与networkcomms中一项端点自动扫描功能有关
        public TCPConnectionListener(SendReceiveOptions sendReceiveOptions,
            ApplicationLayerProtocolStatus applicationLayerProtocol, SSLOptions sslOptions, bool allowDiscoverable = false)
            : base(ConnectionType.TCP, sendReceiveOptions, applicationLayerProtocol, allowDiscoverable)
        {
            this.SSLOptions = sslOptions;
        }
#endif

        /// <inheritdoc />
        internal override void StartListening(EndPoint desiredLocalListenEndPoint, bool useRandomPortFailOver)
        {
            if (desiredLocalListenEndPoint.GetType() != typeof(IPEndPoint)) throw new ArgumentException("Invalid desiredLocalListenEndPoint type provided.", "desiredLocalListenEndPoint");
            if (IsListening) throw new InvalidOperationException("Attempted to call StartListening when already listening.");

            IPEndPoint desiredLocalListenIPEndPoint = (IPEndPoint)desiredLocalListenEndPoint;

            try
            {
#if WINDOWS_PHONE || NETFX_CORE
                listenerInstance = new StreamSocketListener();
                listenerInstance.ConnectionReceived += newListenerInstance_ConnectionReceived;
                listenerInstance.BindEndpointAsync(new Windows.Networking.HostName(desiredLocalListenIPEndPoint.Address.ToString()), desiredLocalListenIPEndPoint.Port.ToString()).AsTask().Wait();
#else
                listenerInstance = new TcpListener(desiredLocalListenIPEndPoint);
                listenerInstance.Start();
                listenerInstance.BeginAcceptTcpClient(TCPConnectionReceivedAsync, null);
#endif
            }
            catch (SocketException)
            {
                //If the port we wanted is not available
                //如果我们希望监听的端口已经被占用  useRandomPortFailOver(是否随机指定一个新端口)
                if (useRandomPortFailOver)
                {
                    try
                    {
#if WINDOWS_PHONE || NETFX_CORE
                        listenerInstance.BindEndpointAsync(new Windows.Networking.HostName(desiredLocalListenIPEndPoint.Address.ToString()), "").AsTask().Wait(); 
#else
                        listenerInstance = new TcpListener(desiredLocalListenIPEndPoint.Address, 0);
                        listenerInstance.Start();
                        listenerInstance.BeginAcceptTcpClient(TCPConnectionReceivedAsync, null);
#endif
                    }
                    catch (SocketException)
                    {
                        //If we get another socket exception this appears to be a bad IP. We will just ignore this IP
                        //如果我们重新指定端口,还是没有监听成功,则可能是IP地址的问题
                        if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("It was not possible to open a random port on " + desiredLocalListenIPEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port.");
                        throw new CommsSetupShutdownException("It was not possible to open a random port on " + desiredLocalListenIPEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port.");
                    }
                }
                else
                {
                    if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Error("It was not possible to open port #" + desiredLocalListenIPEndPoint.Port.ToString() + " on " + desiredLocalListenIPEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port.");
                    throw new CommsSetupShutdownException("It was not possible to open port #" + desiredLocalListenIPEndPoint.Port.ToString() + " on " + desiredLocalListenIPEndPoint.Address + ". This endPoint may not support listening or possibly try again using a different port.");
                }
            }

#if WINDOWS_PHONE || NETFX_CORE
            this.LocalListenEndPoint = new IPEndPoint(desiredLocalListenIPEndPoint.Address, int.Parse(listenerInstance.Information.LocalPort));  
#else
            this.LocalListenEndPoint = (IPEndPoint)listenerInstance.LocalEndpoint;
#endif
            this.IsListening = true;
        }

        /// <inheritdoc />
        internal override void StopListening()
        {
            IsListening = false;

            try
            {
#if WINDOWS_PHONE || NETFX_CORE
                listenerInstance.Dispose();
#else
                listenerInstance.Stop();
#endif
            }
            catch (Exception) { }
        }

#if WINDOWS_PHONE || NETFX_CORE
        private void newListenerInstance_ConnectionReceived(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
        {
            try
            {
                IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Parse(args.Socket.Information.LocalAddress.DisplayName.ToString()), int.Parse(args.Socket.Information.LocalPort));
                IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(args.Socket.Information.RemoteAddress.DisplayName.ToString()), int.Parse(args.Socket.Information.RemotePort));

                ConnectionInfo newConnectionInfo = new ConnectionInfo(ConnectionType.TCP, remoteEndPoint, localEndPoint, ApplicationLayerProtocol, this);
                TCPConnection.GetConnection(newConnectionInfo, NetworkComms.DefaultSendReceiveOptions, args.Socket, true);
            }
            catch (ConfirmationTimeoutException)
            {
                //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
            }
            catch (CommunicationException)
            {
                //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
            }
            catch (ConnectionSetupException)
            {
                //If we are the server end and we did not pick the incoming connection up then tooo bad!
            }
            catch (SocketException)
            {
                //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
            }
            catch (Exception ex)
            {
                //For some odd reason SocketExceptions don‘t always get caught above, so another check
                if (ex.GetBaseException().GetType() != typeof(SocketException))
                {
                    //Can we catch the socketException by looking at the string error text?
                    if (ex.ToString().StartsWith("System.Net.Sockets.SocketException"))
                        LogTools.LogException(ex, "ConnectionSetupError_SE");
                    else
                        LogTools.LogException(ex, "ConnectionSetupError");
                }
            }
        }
#else
        /// <summary>
        /// Async method for handling up new incoming TCP connections
        /// 处理新的TCP连接的异步方法
        /// </summary>
        private void TCPConnectionReceivedAsync(IAsyncResult ar)
        {
            if (!IsListening)
                return;

            try
            {
                TcpClient newTCPClient = listenerInstance.EndAcceptTcpClient(ar);
                ConnectionInfo newConnectionInfo = new ConnectionInfo(ConnectionType.TCP, (IPEndPoint)newTCPClient.Client.RemoteEndPoint, (IPEndPoint)newTCPClient.Client.LocalEndPoint, ApplicationLayerProtocol, this);

                if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Info("New incoming TCP connection from " + newConnectionInfo);

                //We have to use our own thread pool here as the performance of the .Net one is awful
                //此处我们使用了networkcomms自带的自定义线程池  其性能超越了.net自带的线程池
                NetworkComms.IncomingConnectionEstablishThreadPool.EnqueueItem(QueueItemPriority.Normal, new WaitCallback((obj) =>
                {
                    #region Pickup The New Connection
                    try
                    {
                        TCPConnection.GetConnection(newConnectionInfo, ListenerDefaultSendReceiveOptions, newTCPClient, true, SSLOptions);
                    }
                    catch (ConfirmationTimeoutException)
                    {
                        //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
                        //如果此处抛出异常  一般为客户端创建连接后立即关闭了连接
                    }
                    catch (CommunicationException)
                    {
                        //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
                        // //如果此处抛出异常  一般为客户端创建连接后立即关闭了连接
                    }
                    catch (ConnectionSetupException)
                    {
                        //If we are the server end and we did not pick the incoming connection up then tooo bad!
                        //如果我们是服务器端 我们没有接收到进入的连接
                    }
                    catch (SocketException)
                    {
                        //If this exception gets thrown its generally just a client closing a connection almost immediately after creation
                        // //如果此处抛出异常  一般为客户端创建连接后立即关闭了连接
                    }
                    catch (Exception ex)
                    {
                        //For some odd reason SocketExceptions don‘t always get caught above, so another check
                        //由于一些未知的原因  上面的代码不能捕捉到所有的异常  所以我们还要尽行一些检测
                        if (ex.GetBaseException().GetType() != typeof(SocketException))
                        {
                            //Can we catch the socketException by looking at the string error text?
                            //通过检测字符型错误信息来捕捉 socket异常
                            if (ex.ToString().StartsWith("System.Net.Sockets.SocketException"))
                                LogTools.LogException(ex, "ConnectionSetupError_SE");
                            else
                                LogTools.LogException(ex, "ConnectionSetupError");
                        }
                    }
                    #endregion
                }), null);
            }
            catch (SocketException)
            {
                //如果此处抛出异常  一般为客户端创建连接后立即关闭了连接
            }
            catch (Exception ex)
            {
                //For some odd reason SocketExceptions don‘t always get caught above, so another check
                //由于一些未知的原因  上面的代码不能捕捉到所有的异常   所以我们还要尽行一些检测
                if (ex.GetBaseException().GetType() != typeof(SocketException))
                {
                    //Can we catch the socketException by looking at the string error text?
                    if (ex.ToString().StartsWith("System.Net.Sockets.SocketException"))
                        LogTools.LogException(ex, "ConnectionSetupError_SE");
                    else
                        LogTools.LogException(ex, "ConnectionSetupError");
                }
            }
            finally
            {
                listenerInstance.BeginAcceptTcpClient(TCPConnectionReceivedAsync, null);
            }
        }
#endif
    }

 

以上是关于Eurake源码分析(十一) 增量获取的主要内容,如果未能解决你的问题,请参考以下文章

motan源码分析十一:部分特性

ChunJun-JDBC轮询增量更新-源码分析

k8s client-go源码分析 informer源码分析-概要分析

《SLAM机器人基础教程》第十一章 ROS Navigation源码解析(章节目录)

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

第十一篇:转载-mysql源码分析书籍_MySQL8的代码分析方法