由于WCF的机制,连接池会在连接建立一定时间后超时,即使设置了超时时间非常长,也可能被服务端系统主动回收。之前做项目时碰到了这个问题,所以项目上考虑采用长连接,自动管理连接池,当连接超时后,自动重建,保持会话,这样在业务层就不需要再去处理连接超时的问题。具体的思路是,在程序启动时,先将需要使用长连接的连接放到长连接容器中,并设置连接的最大数量,在使用时,轮询使用连接,当使用时捕获到异常时,自动切换到下一个连接,并重建上一个连接。代码如下:
AliveConnection类功能是保持连接,具体执行WCF调用,在检测到连接不可用时进行重建。
class AliveConnection<T> where T : System.ServiceModel.ICommunicationObject, new() { private string _endpointConfigName = string.Empty; private T _instance = default(T); /// <summary> /// 正在执行其他过程时,设置为正忙。执行完成后闲置 /// 连接出错后,正在重新连接创建时设置为正忙,解除正忙状态有俩种情况: /// 1.第一次重建连接成功后; /// 2.在线程中重试成功后; /// </summary> public bool IsBusy { get; set; } internal AliveConnection(string endpointConfigName) { if (string.IsNullOrEmpty(endpointConfigName.Trim())) throw new ArgumentException("终结点不能配置为空。"); _endpointConfigName = endpointConfigName; //_instance = CreateConnection(); } internal bool Execute(Action<T> expression) { try { Open(); expression(_instance); return true; } catch (System.ServiceModel.CommunicationException e) { return false; } } internal bool Execute<TResult>(Func<T, TResult> expression,out TResult result) { result = default(TResult); try { Open(); result = expression(_instance); return true; } catch (System.ServiceModel.CommunicationException e) { return false; } } private void Open() { if (_instance == null)//使用时才创建 { _instance = CreateConnection(); _instance.Faulted += Faulted; } if (_instance.State != System.ServiceModel.CommunicationState.Opened) _instance.Open(); } private void Faulted(object sender, EventArgs e) { lock (_instance) { IsBusy = true; //失败后锁住并重新建立连接 _instance = CreateConnection(); } } private T CreateConnection() { try { var instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName); IsBusy = false; return instance; } catch (Exception e) { IsBusy = true; RetryWhenFailed(); throw new Exception("创建连接失败,请检测终结点配置和服务。", e); } } //创建一个线程来不间断重试创建连接 private void RetryWhenFailed() { int retryTimes = 0; Task.Factory.StartNew(() => { while (true) { //如果抛出异常,表示创建失败,继续重试,如果睡眠时间大于60秒,则睡眠时间不再增加 try { retryTimes++; _instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName); IsBusy = false; break; } catch { int sleepMillionSeconds = retryTimes * 5 * 1000; sleepMillionSeconds = sleepMillionSeconds > 60 * 1000 ? 60 * 1000 : sleepMillionSeconds; System.Threading.Thread.Sleep(sleepMillionSeconds); continue; } } }); } }
另外我们需要一个类,来做连接的初始化,并做轮询,并且暴露执行的函数。
/// <summary> /// WCF长连接容器 /// </summary> /// <typeparam name="T">待创建的WCF服务类型</typeparam> public class AliveConnectionContainer<T> where T : System.ServiceModel.ICommunicationObject, new() { #region fields private List<AliveConnection<T>> _connections = null;//所有连接 private int _currentConnectionIndex = 0;//当前使用的连接的索引 #endregion #region Octor /// <summary> /// 通过终结点配置名称,创建长连接。如果终结点数不等于连接数,则轮询跟节点配置列表,最终创建达到连接数的连接。 /// </summary> /// <param name="maxConnection">需要创建的长连接数</param> /// <param name="endpointConfigNames">所有的终结点配置名称,对应配置节点里bind的name</param> /// <exception>如果终结点配置为空,则抛出异常,如果创建失败,也会抛出异常</exception> public AliveConnectionContainer(int maxConnection, params string[] endpointConfigNames) { _connections = new List<AliveConnection<T>>(maxConnection); int tmpIndex = 0; for (int index = 0; index < maxConnection; index++) { if (tmpIndex >= endpointConfigNames.Count()) tmpIndex = 0; _connections.Add(new AliveConnection<T>(endpointConfigNames[tmpIndex])); } } #endregion #region public method /// <summary> /// 执行服务调用,会一直轮询执行直到执行成功 /// </summary> /// <param name="expression">需要调用的处理方法</param> public void Execute(Action<T> expression) { Func<bool> executeExpression = () => Instance.Execute(expression); Execute(executeExpression); } /// <summary> /// 执行服务调用,会一直轮询执行直到执行成功 /// </summary> /// <param name="expression">需要调用的处理方法</param> public TResult Execute<TResult>(Func<T, TResult> expression) { TResult result = default(TResult); Func<bool> executeExpression = () => Instance.Execute(expression,out result); Execute(executeExpression); return result; } private void Execute(Func<bool> expression) { bool success = false; int failedCount = 0; try { while (true) { success = expression(); if (!success) failedCount++; else break; if (failedCount >= _connections.Count) throw new Exception("没有可用的服务,请检测服务运行状态。"); } } catch (Exception e) { throw new Exception("执行WCF服务调用失败。", e); } } #endregion #region private method private AliveConnection<T> Instance { get { if (_connections == null || _connections.Count == 0) throw new Exception("没有可用的连接,请先设置连接。"); AliveConnection<T> result; while (!(result = GetInstance()).IsBusy) break;//轮询直到找到空闲的连接 return result; } } private AliveConnection<T> GetInstance() { if (_currentConnectionIndex >= _connections.Count) _currentConnectionIndex = 0; return _connections[_currentConnectionIndex++]; } #endregion }
使用静态类,做全局的WCF连接注册和检索使用
/// <summary> /// 长连接服务的管理类 /// </summary> /// <example> /// AliveConnectionManager.Register<Service>(endpoints,10); /// var client = AliveConnectionManager.Resolve<Service>(); /// List<Movie> movies; /// client.Execute(service => movies = service.GetMovies()); /// </example> public static class AliveConnectionManager { private static Dictionary<Type, object> _container = new Dictionary<Type, object>(); /// <summary> /// 根据输入的终结点列表,在app.config文件中查找对应的终结点,并建立连接 /// </summary> /// <typeparam name="T">要注册的WCF的服务类型</typeparam> /// <param name="maxConnection">连接数</param> /// <param name="endpointConfigNames">配置的终结点列表</param> public static void Register<T>(int maxConnection, params string[] endpointConfigNames) where T : System.ServiceModel.ICommunicationObject, new() { if (_container.ContainsKey(typeof(T))) throw new ArgumentException(string.Format("容器中已经添加过{0}的长连接服务。", typeof(T))); _container.Add(typeof(T), new AliveConnectionContainer<T>(maxConnection, endpointConfigNames)); } /// <summary> /// 获取类型为T的长连接服务 /// </summary> /// <typeparam name="T">待获取的WCF的服务类型</typeparam> /// <returns>对应类型为T的服务</returns> public static AliveConnectionContainer<T> Resolve<T>() where T : System.ServiceModel.ICommunicationObject, new() { Type type = typeof(T); if (!_container.ContainsKey(type)) throw new ArgumentException(string.Format("没有找到类型为{0}的长连接服务。", type)); return _container[type] as AliveConnectionContainer<T>; } }
服务注册代码
AliveConnectionManager.Register<Service>(endpoints,10);
调用服务
var client = AliveConnectionManager.Resolve<Service>(); List<Movie> movies; client.Execute(service => movies = service.GetMovies());
Service即我们在客户端引入的WCF服务