SignalR注册成Windows后台服务,并实现web前端断线重连
Posted lee576
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SignalR注册成Windows后台服务,并实现web前端断线重连相关的知识,希望对你有一定的参考价值。
注意下文里面的 SignalR 不是 Core 版本,而是 Framework 下的
本文使用的方式是把 SignalR 写在控制台项目里,再用 Topshelf 注册成 Windows 服务
这样做有两点好处
- 传统 Window 服务项目调试时需要“附加到进程”,开发体验比较差,影响效率
- 使用控制台不仅可以随时打断点调试,还可以随时打印调试信息,非常方便
Topshelf 的使用方法这里不再阐述,在控制台里使用 Topshelf 三个步骤 :
- 定义一个 Owin 自托管作为 SignalR的宿主,里面设置允许跨域,起名为 Startup
using Microsoft.AspNet.SignalR; using Microsoft.Owin.Cors; using Owin; using System; using System.Diagnostics; namespace HenryMes.SignalR.Hosting /// <summary> /// 配置跨域请求、SignalR Server /// </summary> class Startup public void Configuration(IAppBuilder app) //app.UseErrorPage(); app.UseCors(CorsOptions.AllowAll); // 有关如何配置应用程序的详细信息,请访问 http://go.microsoft.com/fwlink/?LinkID=316888 //Hub Mode app.MapSignalR("/lcc", new HubConfiguration()); app.Map("/signalr", map => var config = new HubConfiguration // You can enable JSONP by uncommenting this line // JSONP requests are insecure but some older browsers (and some // versions of IE) require JSONP to work cross domain EnableJSONP = true ; //config.EnableCrossDomain = true; // Turns cors support on allowing everything // In real applications, the origins should be locked down map.UseCors(CorsOptions.AllowAll) .RunSignalR(config); ); Make long polling connections wait a maximum of 110 seconds for a response. When that time expires, trigger a timeout command and make the client reconnect. //GlobalHost.Configuration.ConnectionTimeout = TimeSpan.FromSeconds(110); Wait a maximum of 30 seconds after a transport connection is lost before raising the Disconnected event to terminate the SignalR connection. //GlobalHost.Configuration.DisconnectTimeout = TimeSpan.FromSeconds(30); For transports other than long polling, send a keepalive packet every 10 seconds. This value must be no more than 1/3 of the DisconnectTimeout value. //GlobalHost.Configuration.KeepAlive = TimeSpan.FromSeconds(10); // Turn tracing on programmatically GlobalHost.TraceManager.Switch.Level = SourceLevels.Information;
- 建立可在服务里运行的服务类,使用了上面的Startup配置实例化宿主对象,里面定义了服务的启动,暂停,关闭等触发时的一些动作,本文就建立一个 JobManager 类来完成这些工作
using HenryMes.Utils; using Microsoft.Owin.Hosting; using System; using System.Threading.Tasks; namespace HenryMes.SignalR.Hosting public class JobManager private const string displayName = "SignalR 状态监控"; IDisposable SignalR get; set; public bool Start() try //signalr server地址,端口可以更换,确保不被占用,否则服务启动不了 #if DEBUG var url = $"http://JsonConfig.Instance.Root()?.Debug?.Ip:JsonConfig.Instance.Root()?.Debug?.Port"; var Port = $"JsonConfig.Instance.Root()?.Debug?.Port"; #else var url = $"http://JsonConfig.Instance.Root()?.Release?.Ip:JsonConfig.Instance.Root()?.Release?.Port"; var Port = $"JsonConfig.Instance.Root()?.Release?.Port"; #endif StartOptions options = new StartOptions(); options.Urls.Add(url); options.Urls.Add($"http://+:Port"); //此处需要用一个全局变量来保存WebApp,否则在发布为后台服务的时候生命周期会提前结束,被系统回收掉 SignalR = WebApp.Start<Startup>(options); Task.Delay(TimeSpan.FromSeconds(1)).Wait(); Console.WriteLine("Server running on 0", url); Console.WriteLine($"displayName服务开始"); Console.ReadLine(); LogHelper.GetInstance().Information($"displayName服务开始,地址 url"); return true; catch (Exception ex) LogHelper.GetInstance().Error(ex); return false; public bool Stop() SignalR.Dispose(); LogHelper.GetInstance().Information($"displayName服务停止"); System.Threading.Thread.Sleep(1500); return true; public bool Shutdown() SignalR.Dispose(); LogHelper.GetInstance().Information($"displayName服务停止"); System.Threading.Thread.Sleep(1500); return true;
- 在 Program.cs 文件,也就是入口函数 main 调用 Topshelf 对服务进行配置
using Topshelf;
namespace HenryMes.SignalR.Hosting
internal class Program
private const string displayName = "HenryMes.SignalR.Hosting";
static void Main(string[] args)
HostFactory.Run(x =>
x.Service<JobManager>(s =>
s.ConstructUsing(name => new JobManager());
s.WhenStarted(tc => tc.Start());
s.WhenShutdown(tc => tc.Shutdown());
s.WhenStopped(tc => tc.Stop());
);
x.RunAsLocalSystem();
x.StartAutomatically();
x.SetDescription(displayName);
x.SetDisplayName(displayName);
x.SetServiceName(displayName);
);
下面定义一个 SignalR 的 Hub 基类,里面管理了SignalR 的连接和断开,一个线程管理一个连接,连接断开,线程自动取消,建立一个抽象类 BaseHub
using HenryMes.Utils;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace HenryMes.SignalR.Hosting
/// <summary>
///
/// </summary>
[HubName(nameof(T))]
public abstract class BaseHub<T> : Hub where T : IHub
/// <summary>
/// 线程安全版本的字典,管理SignalR的连接
/// </summary>
protected static readonly ConcurrentDictionary<string, CancellationTokenSource> Connections =
new ConcurrentDictionary<string, CancellationTokenSource>();
/// <summary>
/// 异步锁
/// </summary>
public static readonly object AsyncObj = new object();
/// <summary>
/// 设置超时时间
/// </summary>
abstract protected int millisecondsTimeout get;
/// <summary>
/// 设置线程轮询时间
/// </summary>
abstract protected int intervalTime get;
/// <summary>
/// 跑单个Task
/// </summary>
abstract protected Func<object> RunMethod get;
/// <summary>
/// 跑多个Task, 返回是否超时
/// </summary>
abstract protected Func<CancellationTokenSource, (bool, object)> RunMultiTaskMethod get;
/// <summary>
/// 是否跑多任务
/// </summary>
abstract protected bool runMultiTask get;
//当客户端与服务器建立连接后执行的方法
public override Task OnConnected()
//获取客户端ID
Console.WriteLine("0已连接", Context.ConnectionId);
LogHelper.GetInstance().Information($"服务端与客户端:【typeof(T).Name】Context.ConnectionId 成功建立连接!");
return base.OnConnected();
public override Task OnReconnected()
Console.WriteLine("0已重连", Context.ConnectionId);
LogHelper.GetInstance().Information($"服务端与客户端:【typeof(T).Name】Context.ConnectionId 已重连!");
Send(Context.ConnectionId);
return base.OnReconnected();
/// <summary>
/// 所有任务执行完是否超时
/// </summary>
/// <param name="tokenSource"></param>
/// <param name="allTasks"></param>
/// <returns></returns>
public bool IsCompletedAllTasks(CancellationTokenSource tokenSource, Task[] allTasks)
try
return Task.WaitAll(allTasks, millisecondsTimeout, tokenSource.Token);
catch (AggregateException ex)
LogHelper.GetInstance().Error($"系统错误:this.GetType().Name,ex.Flatten().InnerException.Message");
tokenSource.Cancel();
return false;
/// <summary>
/// 向客户端发送消息
/// </summary>
/// <param name="connectId"></param>
public void Send(string connectId)
lock (AsyncObj)
var tokenSource = new CancellationTokenSource();
Connections.TryAdd(connectId, tokenSource);
Task.Run(() =>
while (!tokenSource.Token.IsCancellationRequested)
try
// 是否是多任务
if (runMultiTask == false)
var result = RunMethod();
var message = $"【typeof(T).Name】 connectId 正在回传数据!";
LogHelper.GetInstance().Information(message);
// 把组装好的数据推送到前端
BaseNotifer<T>.Refresh(connectId, JsonConvert.SerializeObject(result));
tokenSource.Token.WaitHandle.WaitOne(intervalTime);
else
// 是否超时
var (isCompleted, result) = RunMultiTaskMethod(tokenSource);
if (isCompleted)
var message = $"【typeof(T).Name】 connectId 正在回传数据!";
LogHelper.GetInstance().Information(message);
// 把组装好的数据推送到前端
BaseNotifer<T>.Refresh(connectId, JsonConvert.SerializeObject(result));
// 下一次推送等待N秒后进行
tokenSource.Token.WaitHandle.WaitOne(intervalTime);
else
// 等待超时
tokenSource.Cancel();
// 打印超时错误日志
LogHelper.GetInstance().Error($@"this.GetType().Name 推送超时! 当前超时时间设置为millisecondsTimeout毫秒!");
// 重新执行
Connections.TryRemove(connectId, out tokenSource);
Send(connectId);
catch(AggregateException ex)
LogHelper.GetInstance().Error($"系统错误:this.GetType().Name,ex.Flatten().InnerException.Message");
tokenSource.Token.WaitHandle.WaitOne(intervalTime);
, tokenSource.Token);
/// <summary>
/// 连接断开事件
/// </summary>
/// <param name="stopCalled"></param>
/// <returns></returns>
public override Task OnDisconnected(bool stopCalled)
lock (AsyncObj)
try
var tokenSource = Connections[Context.ConnectionId];
Connections.TryRemove(Context.ConnectionId, out tokenSource);
tokenSource.Cancel();
LogHelper.GetInstance().Information($"服务端与客户端:【typeof(T).Name】Context.ConnectionId 连接已断开!");
catch (Exception ex)
if (Connections.ContainsKey(Context.ConnectionId))
var tokenSource = Connections[Context.ConnectionId];
Connections.TryRemove(Context.ConnectionId, out tokenSource);
// 打印错误日志
LogHelper.GetInstance().Error($@"this.GetType().Name 已断开! ex.Message!");
return base.OnDisconnected(stopCalled);
以一个具体的 Hub 为例,继承上面的 BaseHub, 建立一个 具体实现的 Hub 名为 OperationKanBanHub ,使用 RunMultiTaskMethod 并行执行一些任务,这是项目里的一个真实案例,不必关心细节
using HenryMes.Entitys;
using HenryMes.WebApi.Controllers;
using HenryMes.WebApi.Controllers.Other;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace HenryMes.SignalR.Hosting.Hubs
/// <summary>
/// 运营看板
/// </summary>
public class OperationKanBanHub : BaseHub<OperationKanBanHub>
/// <summary>
/// 设置超时时间
/// </summary>
protected override int millisecondsTimeout => 10000;
/// <summary>
/// 设置线程轮询时间
/// </summary>
protected override int intervalTime => 5000;
/// <summary>
/// 是否跑多任务
/// </summary>
protected override bool runMultiTask => true;
/// <summary>
/// 跑单个Task
/// </summary>
protected override Func<object> RunMethod => throw new NotImplementedException();
/// <summary>
/// 跑多个Task
/// </summary>
protected override Func<CancellationTokenSource, (bool, object)> RunMultiTaskMethod => (TokenSource) =>
#region Task取数
// 菜籽收购
var taskSum4Rapeseed = Task.Run(() =>
KanbanController controller = new KanbanController();
dynamic data = controller.ControlCenter_Center_Sum4Rapeseed();
return data.Content;
);
// 油品生产,销售
var taskSum4Oil = Task.Run(() =>
KanbanController controller = new KanbanController();
dynamic data = controller.ControlCenter_Center_Sum4Oil();
return data.Content;
);
// 库存量前10的存货
var taskSum4Top = Task.Run(() =>
KanbanController controller = new KanbanController();
dynamic data = controller.ControlCenter_Center_Sum4Top();
return data.Content;
);
// 罐区存油,读取 mongodb
var taskTankOilQuantity = Task.Run(() =>
return new List<dynamic>
new tank = "Tank1001",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1002",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1003",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1004",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1005",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1006",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1007",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1008",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1009",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1010",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1011",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1012",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1013",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
new tank = "Tank1014",temperature = "21.4℃",pressure = "21PA", quantity = "100" ,
;
);
// 近一年产出销售 1-12月
var taskSaleDispatch4Month = Task.Run(() =>
KanbanController controller = new KanbanController();
dynamic data = controller.ControlCenter_Center_SaleDispatch4Month();
return data.Content;
);
// 最近10条采购信息
var taskPreInStore4Lately = Task.Run(() =>
KanbanController controller = new KanbanController();
dynamic data = controller.ControlCenter_Center_PreInStore4Lately();
return data.Content;
);
// 最近10条生产计划
var taskProductionTask = Task.Run(() =>
ProductionTaskController controller = new ProductionTaskController();
dynamic data = controller.QueryTake(new SqlSugarPageRequest
PageIndex = 1,
PageSize = 10,
Filter = new List<SqlSugar.ConditionalModel>()
);
return data.Content;
);
#endregion
#region 同步阻塞等待所有Task执行完
// 所有线程任务是否完成 默认false
var isCompleted = IsCompletedAllTasks(TokenSource, new Task[]
taskSum4Rapeseed,
taskSum4Oil,
taskSum4Top,
taskTankOilQuantity,
taskSaleDispatch4Month,
taskPreInStore4Lately,
taskProductionTask
);
#endregion
if (isCompleted)
#region 所有Task已完成
// 菜籽
var RapeseedResult = taskSum4Rapeseed.Result;
var Rapeseed = new
GYS = new
PurchaseReceiveQuantity = RapeseedResult?.Data?.Rapeseed_GYS?.PurchaseReceiveQuantity,
BalanceQuantity = RapeseedResult?.Data?.Rapeseed_GYS?.BalanceQuantity,
,
SD = new
PurchaseReceiveQuantity = RapeseedResult?.Data?.Rapeseed_SD?.PurchaseReceiveQuantity,
BalanceQuantity = RapeseedResult?.Data?.Rapeseed_SD?.BalanceQuantity,
;
// 油品生产,销售
dynamic Sum4OilResult = taskSum4Oil.Result;
var Sum4Oil = new
// 产出成品油
TankOilQuantity = Sum4OilResult?.Data?.TankOil?.ProductReceiveQuantity,
// 产出包装油
PackageOilQuantity = Sum4OilResult?.Data?.PackageOil?.ProductReceiveQuantity,
// 销售包装油
SaleOilQuantity = Sum4OilResult?.Data?.PackageOil?.SaleDispatchQuantity
;
// 存货中库存量前10的存货
var Sum4TopResult = taskSum4Top.Result;
var Sum4Top = new
Sum4TopResult?.Data?.DataSource
;
// 罐区存油
var TankOilQuantity = taskTankOilQuantity.Result;
// 近一年产出销售 1-12月
var TaskSaleDispatch4MonthResult = taskSaleDispatch4Month.Result;
var SaleDispatch4Month = new
Sale = TaskSaleDispatch4MonthResult?.Data?.SaleDispatch.Details,
Product = TaskSaleDispatch4MonthResult?.Data?.ProductReceive.Details
;
// 最近10条采购信息
var TaskPreInStore4LatelyResult = taskPreInStore4Lately.Result;
var PreInStore4Lately = TaskPreInStore4LatelyResult?.Data?.DataSource;
// 最近10条生产计划
var taskProductionTaskResult = taskProductionTask.Result;
var ProductionTask = taskProductionTaskResult?.Data;
return (isCompleted, new
Rapeseed,
Sum4Oil,
Sum4Top,
TankOilQuantity,
SaleDispatch4Month,
PreInStore4Lately,
ProductionTask
);
#endregion
return (isCompleted, new );
;
此时 SignalR 的后台推送基本就完成了,再来就是web前端的接收推送和断线下的自动重新连接(比如说后台服务程序做了更新,此时需要关闭服务再启动服务,这个时候要求web端不断尝试重新连接,直到后台服务启动并重新连接上为止)
前端使用 Vue 2.0 + jQuery.signalR 2.4.2 , 只列一下关键代码
import $ from "jquery";
import "signalr";
import echarts from "../../pages/kanban/OperationKanBanEcharts.vue";
export default
components: echarts ,
data()
return
connection: null,
proxy: null,
// 是否需要断线重连的标记,当页面关闭时是不需要继续推送的
tryReconnect : true
,
methods:
// 从SignalR推送过来的数据,刷新看板
refreshKanban(message)
// 刷新时间
this.getDateTime()
let obj = JSON.parse(message)
// 省略无关代码......
,
,
mounted()
this.$nextTick(() =>
this.connection = $.hubConnection(process.env.SignalR);
// 定义服务器端SignalR推送过来的消息接收代理
this.proxy = this.connection.createHubProxy("OperationKanBanHub");
this.proxy.on("Refresh", (message) =>
console.log(`接收到来自服务端 $this.connection.id 的数据!`)
this.refreshKanban(message)
);
// 创建连接到服务器端SignalR的连接
this.connection
.start()
.done(() =>
// 客户端发送信息到服务器
this.proxy.invoke("Send", this.connection.id);
)
.fail((err) =>
console.log(err);
);
this.connection.disconnected(() =>
if(this.tryReconnect)
setTimeout(() =>
console.log('连接已断开,正尝试重新连接!')
this.connection
.start()
.done(() =>
this.proxy.invoke("Send", this.connection.id); // 客户端发送信息到服务器
)
.fail((err) =>
console.log(err);
);
, 5000); // Restart connection after 5 seconds.
);
);
,
deactivated()
if (this.connection)
// 关闭SignalR连接
this.tryReconnect = false
this.connection.stop();
// 清除缓存
this.$vnode.parent.componentInstance.cache = ;
this.$vnode.parent.componentInstance.keys = [];
,
;
最后的一个步骤,怎么把后台的控制台SignalR宿主程序安装成 Windows 服务?在项目里建立两个批处理文件,Install.bat 安装服务,UnInstall.bat 卸载服务,点击右键点文件属性,把他们的编码改为 ansi(不要问我为什么......因为不改的话,打开批处理命令窗口的时候中文会显示成乱码)
Install.bat
@echo on
rem 设置DOS窗口的背景颜色及字体颜色
color 2f
rem 设置DOS窗口大小
mode con: cols=80 lines=25
@echo off
echo 请按任意键开始安装 HenryMes.SignalR.Hosting 服务
rem 以管理员身份运行
%1 mshta vbscript:CreateObject("Shell.Application").ShellExecute("cmd.exe","/c %~s0 ::","","runas",1)(window.close)&&exit
:Admin
rem 输出空行
echo.
pause
cd /d %~dp0
HenryMes.SignalR.Hosting install --autostart start
net start HenryMes.SignalR.Hosting
pause
UnInstall.bat
@echo on
rem 设置DOS窗口的背景颜色及字体颜色
color 2f
rem 设置DOS窗口大小
mode con: cols=80 lines=25
@echo off
echo 请按任意键开始卸载 HenryMes.SignalR.Hosting 服务
rem 以管理员身份运行
%1 mshta vbscript:CreateObject("Shell.Application").ShellExecute("cmd.exe","/c %~s0 ::","","runas",1)(window.close)&&exit
:Admin
rem 输出空行
echo.
pause
cd /d %~dp0
net stop HenryMes.SignalR.Hosting
HenryMes.SignalR.Hosting uninstall
pause
以上是关于SignalR注册成Windows后台服务,并实现web前端断线重连的主要内容,如果未能解决你的问题,请参考以下文章
基于SignalR实现B/S系统对windows服务运行状态的监测
使用nssm将命令行启动的应用程序(.exe,.bat等)注册成windows后台服务