如何将天蓝色事件网格中的事件发布到python中的主题?

Posted

技术标签:

【中文标题】如何将天蓝色事件网格中的事件发布到python中的主题?【英文标题】:How to publish events in azure event grid to a topic in python? 【发布时间】:2022-01-04 08:53:24 【问题描述】:

无法在我的 VS Code 中导入 eventgrid 模块,我已在 require.txt 中添加了所有模块,并从我的 cmd 安装了 pip。

另外,下面是我正在寻找和尝试的 python 函数:

def publish_event(): 
    # authenticate client 
    credential = AzureKeyCredential(key) 
    client = EventGridPublisherClient(endpoint, credential) 
 
    custom_schema_event =  
        "customSubject": "sample", 
        "customEventType": "sample.event", 
        "customDataVersion": "2.0", 
        "customId": uuid.uuid4(), 
        "customEventTime": dt.datetime.now(UTC()).isoformat(), 
        "customData": "sample data" 
     
 
    # publish events 
    for _  in range(3): 
 
        event_list = []     # list of events to publish 
        # create events and append to list 
        for j in range(randint(1, 3)): 
            event_list.append(custom_schema_event) 
 
        # publish list of events 
        client.send(event_list) 
        print("Batch of size  published".format(len(event_list))) 
        time.sleep(randint(1, 5)) 
 
 
if name == '__main__': 
    publish_event() 

我不确定这是否是实现此目的的正确方法,正在寻找解决模块问题和发布事件的更好方法。

我们将不胜感激!

【问题讨论】:

【参考方案1】:

从你的问题我们可以看出有两点需要回答:

    Python 包安装问题 Eventgrid 主题问题

使用pip install安装Azure Python包,需要在终端开启虚拟环境,命令如下:

   python -m venv .venv
  .venv\Scripts\activate

启用venv后,在requirements.txt中添加所有eventgrid、storage等azure相关包,然后在cmd中运行如下命令:

pip install -r requirements.txt

现在,查看您的代码,函数看起来不错,因为它将事件发送到一般主题。以类似的方式,我们有一个示例,它为我们提供了有关将 eventgrid 事件发布到自定义主题的一些见解。 Azure-SDK-Python-GitHub 存储库中对此进行了解释。 Python代码如下:

  import os
  from random import randint, sample
  import time

  from azure.core.credentials import AzureKeyCredential
  from azure.eventgrid import EventGridPublisherClient, EventGridEvent

  key = os.environ["EG_ACCESS_KEY"]
  endpoint = os.environ["EG_TOPIC_HOSTNAME"]

  #authenticate client
  credential = AzureKeyCredential(key)
  client = EventGridPublisherClient(endpoint, credential)
  services = ["EventGrid", "ServiceBus", "EventHubs", "Storage"]    #possible values for data field

  def publish_event():
      #publish events
      for _ in range(3):

          event_list = []     #list of events to publish
          #create events and append to list
          for j in range(randint(1, 3)):
              sample_members = sample(services, k=randint(1, 4))      #select random subset of team members
              event = EventGridEvent(
                      subject="Door1",
                      data="team": sample_members,
                      event_type="Azure.Sdk.Demo",
                      data_version="2.0"
                      )
              event_list.append(event)

          #publish list of events
          client.send(event_list)
          print("Batch of size  published".format(len(event_list)))
          time.sleep(randint(1, 5))

  if __name__ == '__main__':
      publish_event()

【讨论】:

以上是关于如何将天蓝色事件网格中的事件发布到python中的主题?的主要内容,如果未能解决你的问题,请参考以下文章

诊断 Azure 事件网格中的故障?

Azure 事件中心 Event Grid(事件网格)+Azure Functions处理IOT Hub中的消息

如何使用 Azure 事件网格发布覆盖默认过期时间到队列?

如何在 Python 中侦听 Linux 中的“插入 USB 设备”事件?

如何从 EXTJS 中的委托事件中获取记录?

网格中的 Keypress 或 keydown 事件处理