Azure Event hub usage

Posted _iorilan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure Event hub usage相关的知识,希望对你有一定的参考价值。

1. create event hub on azure
技术分享


技术分享

2. create a prj , event hub sender, install nuget pkg - azure service bus
技术分享


3. check connection string
技术分享


4. sender sample code


static void Main(string[] args)
        {
            Console.WriteLine("Press Ctrl-C to stop the sender process");
            Console.WriteLine("Press Enter to start now");
            Console.ReadLine();
            SendingRandomMessages();
        }


        static string eventHubName = "get from event hub connection information";
        static string connectionString = "get from event hub connection information";


        static void SendingRandomMessages()
        {
            var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
            while (true)
            {
                try
                {
                    var message = Guid.NewGuid().ToString();
                    Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                    eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                }
                catch (Exception exception)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                    Console.ResetColor();
                }


                Thread.Sleep(200);
            }
        }



5. create a storage account
技术分享
6. install nuget for receiver prj
技术分享


7. check keys
技术分享


8. sample code for receiver


class SimpleEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;


        async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine("Processor Shutting Down. Partition ‘{0}‘, Reason: ‘{1}‘.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }


        Task IEventProcessor.OpenAsync(PartitionContext context)
        {
            Console.WriteLine("SimpleEventProcessor initialized.  Partition: ‘{0}‘, Offset: ‘{1}‘", context.Lease.PartitionId, context.Lease.Offset);
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }


        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());


                Console.WriteLine(string.Format("Message received.  Partition: ‘{0}‘, Data: ‘{1}‘",
                    context.Lease.PartitionId, data));
            }


            //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }


static void Main(string[] args)
        {
            string eventHubConnectionString = "get from azure event hub connection information";
            string eventHubName = "get from azure event hub connection information";
            string storageAccountName = "get from azure storage keys";
            string storageAccountKey = "get from azure storage keys";
            string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);


            string eventProcessorHostName = Guid.NewGuid().ToString();
            EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
            Console.WriteLine("Registering EventProcessor...");
            var options = new EventProcessorOptions();
            options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
            eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();


            Console.WriteLine("Receiving. Press enter key to stop worker.");
            Console.ReadLine();
            eventProcessorHost.UnregisterEventProcessorAsync().Wait();
        }


技术分享

以上是关于Azure Event hub usage的主要内容,如果未能解决你的问题,请参考以下文章

Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3

Azure IoT Hub和Event Hub相关的技术系列-索引篇

第一次将内容添加到azure event hubs

Azure 事件中心 Event Grid(事件网格)+Azure Functions处理IOT Hub中的消息

微软发布用于Kafka生态系统的Azure Event Hub公开预览版

Event Hub小白入门指南