WCF基于MSMQ的事件代理服务

Posted For Best

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WCF基于MSMQ的事件代理服务相关的知识,希望对你有一定的参考价值。

前言

公司目前楼主负责的项目正在改版升级,对之前的服务也在作调整,项目里有个操作日志的模块,就决定把日志单独提取出来,做个日志服务,所以就有了这篇文章

正文

MSMQ作为消息队列,B/S项目调用日志服务,日志服务往消息队列发送消息,事件代理服务负责处理消息队列的消息,贴下核心代码

事件代理服务契约

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Text;
using TestService.Contract.Faults;

namespace TestService.Contract
{
    /// <summary>
    /// 事件代理
    /// </summary>
    [ServiceContract(Namespace = "http://TestService.Contract",
                     SessionMode = SessionMode.Required,
                     CallbackContract = typeof(IEventBrokerCallback))]
    public interface IEventBroker
    {
        [OperationContract(IsOneWay = false)]
        [FaultContract(typeof(EventBrokerException))]
        void Subscribe(Guid subscriptionId, string[] eventNames);

        [OperationContract(IsOneWay = true)]
        void EndSubscription(Guid subscriptionId);
    }
}

事件代理服务回调处理契约

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Text;
using TestService.Contract.Data;

namespace TestService.Contract
{
    /// <summary>
    /// 事件代理回调
    /// </summary>
    public interface IEventBrokerCallback
    {
        [OperationContract(IsOneWay = true)]
        void ReceiveStreamingResult(RealTimeEventMessage streamingResult);
    }
}

事件代理服务异常实体

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;

namespace TestService.Contract.Faults
{
    [DataContract]
    public class EventBrokerException
    {
        [DataMember]
        public string Message
        {
            get;
            set;
        }

        public EventBrokerException(string message)
        {
            Message = message;
        }
    }
}

消息处理实体

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;

namespace TestService.Contract.Data
{
    /// <summary>
    /// 数据实体
    /// </summary>
    [DataContract]
    public class RealTimeEventMessage
    {

        public RealTimeEventMessage()
        {

        }

        public RealTimeEventMessage(MessageModel msg, string eventName, string entityIdType,
            string description, string additionalData, DateTime date)
        {
            this.Msg = msg;
            this.EventName = eventName;
            this.EntityIdType = entityIdType;
            this.Description = description;
            this.AdditionalData = additionalData;
            this.Date = date;
        }

        [DataMember]
        public MessageModel Msg { get; set; }

        [DataMember]
        public string EventName { get; set; }

        [DataMember]
        public string EntityIdType { get; set; }

        [DataMember]
        public string Description { get; set; }

        [DataMember]
        public string AdditionalData { get; set; }

        [DataMember]
        public DateTime? Date { get; set; }
    }
}

以上是事件代理服务的契约部分,下面看看实现,先看EventBroker的实现

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.ServiceModel;
using System.Threading.Tasks;
using TestService.ApplicationService.Data;
using TestService.ApplicationService.Services.Interface;
using TestService.Common.IoC;
using TestService.Contract;
using TestService.Contract.Data;
using TestService.Contract.Faults;

namespace TestService.ApplicationService
{
    [IocServiceBehavior]
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class EventBroker : ApplicationBaseService, IEventBroker
    {
        Dictionary<string, List<UniqueCallbackHandle>> eventNameToCallbackLookups = new Dictionary<string, List<UniqueCallbackHandle>>();
        private static Object syncObj = new Object();
        private static string inputQueueName = "";
        private bool shouldRun = true;
        private static readonly TimeSpan queueReadTimeOut = TimeSpan.FromSeconds(500);
        private static readonly TimeSpan queuePeekTimeOut = TimeSpan.FromSeconds(30);
        private IXmlParserService _xmlParserService;

        public EventBroker(IXmlParserService xmlParserService)
        {
            inputQueueName = AppSettings.InputQueueName;
            StartCollectingMessage();
            _xmlParserService = xmlParserService;
        }

        public void StartCollectingMessage()
        {
            try
            {
                GetMessageFromQueue();
            }
            catch (Exception ex)
            {
                throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));
            }
        }

        public void Subscribe(Guid subscriptionId, string[] eventNames)
        {
            try
            {
                CreateSubscription(subscriptionId, eventNames);
            }
            catch (Exception ex)
            {
                throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));
            }
        }

        public void EndSubscription(Guid subscriptionId)
        {
            lock (syncObj)
            {
                //create new dictionary that will be populated by those remaining
                Dictionary<string, List<UniqueCallbackHandle>> remainingEventNameToCallbackLookups =
                    new Dictionary<string, List<UniqueCallbackHandle>>();

                foreach (KeyValuePair<string, List<UniqueCallbackHandle>> kvp in eventNameToCallbackLookups)
                {
                    //get all the remaining subscribers whos session id is not the same as the one we wish to remove
                    List<UniqueCallbackHandle> remainingMessageSubscriptions =
                        kvp.Value.Where(x => x.CallbackSessionId != subscriptionId).ToList();
                    if (remainingMessageSubscriptions.Any())
                    {
                        remainingEventNameToCallbackLookups.Add(kvp.Key, remainingMessageSubscriptions);
                    }
                }
                //now left with only the subscribers that are subscribed
                eventNameToCallbackLookups = remainingEventNameToCallbackLookups;
            }
        }

        #region 私有方法

        /// <summary>
        /// 从消息队列中获取消息
        /// </summary>
        private void GetMessageFromQueue()
        {
            try
            {
                Task messageQueueReaderTask = Task.Factory.StartNew(() =>
                {
                    using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Receive))
                    {

                        queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });

                        while (shouldRun)
                        {
                            Message message = null;

                            try
                            {
                                if (!queue.IsEmpty())
                                {
                                    //Logs.Debug("接受队列里的消息");
                                    message = queue.Receive(queueReadTimeOut);
                                    ProcessMessage(message);
                                }
                            }
                            catch (MessageQueueException e)
                            {
                                if (e.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
                                {
                                    Log.Warn("消息队列出现异常:", e);
                                }
                            }
                            catch (Exception e)
                            {
                                // Write the message details to the Error queue
                                Log.Warn("操作异常:", e);
                            }
                        }
                    }
                }, TaskCreationOptions.LongRunning);
            }
            catch (AggregateException ex)
            {
                throw;
            }
        }

        /// <summary>
        /// 处理消息
        /// </summary>
        /// <param name="msmqMessage"></param>
        private void ProcessMessage(Message msmqMessage)
        {
            string messageBody = (string)msmqMessage.Body;

#if DEBUG
            Log.Info(string.Format("接受消息 : {0}", messageBody));
#endif

            RealTimeEventMessage messageToSendToSubscribers = _xmlParserService.ParseRawMsmqXml(messageBody);
            if (messageToSendToSubscribers != null)
            {
                lock (syncObj)
                {
                    if (messageToSendToSubscribers.Msg != null)
                    {
                        // 保存到数据库
                        
                    }

                    List<Guid> deadSubscribers = new List<Guid>();

                    if (eventNameToCallbackLookups.ContainsKey(messageToSendToSubscribers.EventName))
                    {
                        List<UniqueCallbackHandle> uniqueCallbackHandles =
                            eventNameToCallbackLookups[messageToSendToSubscribers.EventName];
                        foreach (UniqueCallbackHandle uniqueCallbackHandle in uniqueCallbackHandles)
                        {
                            try
                            {
                                uniqueCallbackHandle.Callback.ReceiveStreamingResult(messageToSendToSubscribers);

                            }
                            catch (CommunicationObjectAbortedException coaex)
                            {
                                deadSubscribers.Add(uniqueCallbackHandle.CallbackSessionId);
                            }
                        }
                    }

                    //end all subcriptions for dead subscribers
                    foreach (Guid deadSubscriberId in deadSubscribers)
                    {
                        EndSubscription(deadSubscriberId);
                    }
                }
            }
        }

        private void CreateSubscription(Guid subscriptionId, string[] eventNames)
        {
            //Ensure that a subscription is created for each message type the subscriber wants to receive
            lock (syncObj)
            {
                foreach (string eventName in eventNames)
                {
                    if (!eventNameToCallbackLookups.ContainsKey(eventName))
                    {
                        List<UniqueCallbackHandle> currentCallbacks = new List<UniqueCallbackHandle>();
                        eventNameToCallbackLookups[eventName] = currentCallbacks;
                    }
                    eventNameToCallbackLookups[eventName].Add(
                        new UniqueCallbackHandle(subscriptionId, OperationContext.Current.GetCallbackChannel<IEventBrokerCallback>()));
                }
            }
        }

        #endregion
    }
}

事件代理实现里的回调处理实体

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TestService.Contract;

namespace TestService.ApplicationService.Data
{
    public class UniqueCallbackHandle
    {
        public UniqueCallbackHandle(Guid callbackSessionId, IEventBrokerCallback callback)
        {
            this.CallbackSessionId = callbackSessionId;
            this.Callback = callback;
        }

        public Guid CallbackSessionId { get; private set; }

        public IEventBrokerCallback Callback { get; private set; }
    }
}

 

其中事件代理服务构造方法的AppSettings.InputQueueName是读取配置文件里的专用队列名称,这里有个构造方法,后面会使用Autofac进行注入,另外还有个IXmlParserService普通业务操作的,主要是解析消息队列里存放的xml消息

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TestService.Contract.Data;

namespace TestService.ApplicationService.Services.Interface
{
    public interface IXmlParserService
    {
        RealTimeEventMessage ParseRawMsmqXml(string messageBody);
    }
}

实现

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Xml.Linq;
using TestService.ApplicationService.Services.Interface;
using TestService.Contract.Data;

namespace TestService.ApplicationService.Services.Impl
{
    public class XmlParserService : IXmlParserService
    {
        private MessageModel msg;

        public XmlParserService(MessageModel msg)
        {
            this.msg = msg;
        }

        public RealTimeEventMessage ParseRawMsmqXml(string messageBody)
        {
            try
            {
                RealTimeEventMessage info = new RealTimeEventMessage();
                XElement xelement = XElement.Parse(messageBody);
                MessageModel model = new MessageModel();
                model.MessageId = GetSafeString(xelement, "messageId");
                model.Content = GetSafeString(xelement, "content");
                model.CreateTime = GetSafeDate(xelement, "createTime") ?? DateTime.Now;

                info.Msg = model;
                info.EventName = GetSafeString(xelement, "eventName");
                //info.EntityIdType = GetSafeString(xelement, "entityIdType");
                //info.Description = GetSafeString(xelement, "description").Replace("\n\n", "\n\r");
                //info.Date = GetSafeDate(xelement, "date");
                //info.AdditionalData = GetSafeString(xelement, "additionalData");
                return info;

            }
            catch (Exception ex)
            {
                Log.Error("解析Xml消息出现异常:" + ex);
                return null;
            }
        }

        public static Int32 GetSafeInt32(XElement root, string elementName)
        {
            try
            {
                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
                return Convert.ToInt32(element.Value);
            }
            catch
            {
                return 0;
            }
        }

        private static DateTime? GetSafeDate(XElement root, string elementName)
        {
            try
            {
                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
                return DateTime.Parse(element.Value);
            }
            catch
            {
                return null;
            }
        }

        public static String GetSafeString(XElement root, string elementName)
        {
            try
            {
                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
                return element.Value;
            }
            catch
            {
                return String.Empty;
            }
        }

        public static bool GetSafeBool(XElement root, string elementName)
        {
            try
            {
                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
                return Convert.ToBoolean(element.Value);
            }
            catch
            {
                return false;
            }
        }
    }
}

这里的xml节点主要是根据消息服务里发送消息的xml节点来定的,事件代理服务的就是上面的这么多,下面看看消息服务,

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.Text;
using TestService.Contract.Data;

namespace TestService.Contract
{
    [ServiceContract]
    public interface IMessageService
    {
        [OperationContract]
        bool Send(MessageModel model);
    }
}

实现

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using TestService.Common;
using TestService.Common.IoC;
using TestService.Contract;
using TestService.Contract.Data;

namespace TestService.ApplicationService
{
    /// <summary>
    /// 消息服务
    /// </summary>
    [IocServiceBehavior]
    public class MessageService : ApplicationBaseService, IMessageService
    {
        private static string inputQueueName = "";

        public MessageService()
        {
            inputQueueName = AppSettings.InputQueueName;
        }

        public bool Send(MessageModel model)
        {
            using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Send))
            {
                queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
                try
                {
                    Message message = new Message(
                        GetXmlData(model));
                    Log.Info(string.Format("发送消息: {0}", message.Body.ToString()));
                    queue.Send(message);

                    return true;
                }
                catch (MessageQueueException mex)
                {
                    if (mex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
                    {
                        Log.Error(string.Format("Message queue exception occured", mex));
                    }
                    return false;
                }
                catch (Exception ex)
                {
                    // Write the message details to the Error queue
                    Log.Error(ex);
                    return false;
                }
            }
        }

        private string GetXmlData(MessageModel model)
        {
            StringBuilder sb = new StringBuilder("<realtimeEvent>");
            sb.AppendFormat("<eventName>ClientDealEvent</eventName>");
            sb.AppendFormat("<messageId>{0}</messageId>", model.MessageId);
            sb.AppendFormat("<createTime>{0}</createTime>", model.CreateTime);
            sb.AppendFormat("<content>{0}</content>", model.Content);
            sb.AppendLine("</realtimeEvent>");
            return sb.ToString();
        }
    }
}

消息服务比较简单,就是往消息队列里发送消息,细心的人会发现,我在每个服务实现里都加了个IocServiceBehavior特性,这个主要是标识了注入了该服务

核心代码就是上面介绍的那么多,有些操作没将代码贴出来,后面会将代码开源到码云上,今天就先写到这儿了

以上是关于WCF基于MSMQ的事件代理服务的主要内容,如果未能解决你的问题,请参考以下文章

IIS 中的 WCF,在工作组模式下使用 MSMQ

WCF net.msmq 服务自动激活

为啥 MSMQ 比 WCF QueueService 快?

如何处理 WCF 的 MSMQ 绑定中的消息失败

私有或公共 MSMQ

WCF msmq 事务和工作单元