CloudONS_MQ阿里消息队列

Posted 735882640

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CloudONS_MQ阿里消息队列相关的知识,希望对你有一定的参考价值。

参考文档

https://www.zybuluo.com/iamfox/note/239385 

环境准备

1.申请阿里云账号,开通消息队列服务,申请accessKey。

2.SDK导入(c++): ManagedONS.dll, ONSClient4CPP.exp(阿里后台下载)

3.添加引用(c#):ManagedONS.dll

 

using ons;

namespace MQTask
{
    class Program
    {
        static void Main(string[] args)
        {
            //申请账号
            string ConsumerId = GetConsumerId();
            string ProducerId = GetProducerId();
            string OrderTopic = ConfigurationSettings.AppSettings["OrderTopic"];
            string Topic = ConfigurationSettings.AppSettings["Topic"];
            string SecretKey = GetSecretKey();
            string AccessKey = GetAccessKey();

            //创建工厂
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();

            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, AccessKey);
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, SecretKey);
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, ConsumerId);
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, ProducerId);
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Topic);

            //创建producer
            Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);
            //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。   
            pProducer.start();

            ////确定消费完成后,调用shutdown函数;在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
            // pConsumer.shutdown();
        }

        public static SendResultONS sendmsg(Producer pProducer, string topic, string tag, string json, string keys)
        {

            string json1 = HttpUtility.UrlEncode(json);
            Message msg = new Message(
                //Message Topic
                        topic,
                //Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤        
                        tag,
                //Message Body,任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
                        json1
            );

            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
            // 注意:不设置也不会影响消息正常收发
            msg.setKey(keys);

            //发送消息,只要不抛异常就是成功    
            SendResultONS sendResult = null;
            try
            {
                sendResult = pProducer.send(msg);
            }
            catch
            {
                //异常处理
                return null;
            }

            return sendResult;
        }

        public static void getmsg(PushConsumer pConsumer)
        {
            MessageListener msgListener = new MyMsgListener();
            pConsumer.subscribe(Topic, "*", msgListener);
        }

        public class MyMsgListener : MessageListener
        {
            public MyMsgListener()
            {
            }

            ~MyMsgListener()
            {
            }

            public override ons.Action consume(Message value, ConsumeContext context)
            {
                string getTopic = value.getTopic();
                string getTag = value.getTag();
                string getKey = value.getKey();
                string getMsgID = value.getMsgID();
                string getBody = value.getBody();

                return ons.Action.CommitMessage;
            }
        }
    }
}

 

以上是关于CloudONS_MQ阿里消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

阿里云微消息队列(MQTT)的基本使用

阿里云消息队列RocketMQKafka及微消息队列优惠

阿里云-ONS-Help:消息队列 RocketMQ 版

阿里云消息队列 Kafka-消息检索实践