如何将天蓝色事件网格中的事件发布到python中的主题?
Posted
技术标签:
【中文标题】如何将天蓝色事件网格中的事件发布到python中的主题?【英文标题】:How to publish events in azure event grid to a topic in python? 【发布时间】:2022-01-04 08:53:24 【问题描述】:无法在我的 VS Code 中导入 eventgrid 模块,我已在 require.txt 中添加了所有模块,并从我的 cmd 安装了 pip。
另外,下面是我正在寻找和尝试的 python 函数:
def publish_event():
# authenticate client
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
custom_schema_event =
"customSubject": "sample",
"customEventType": "sample.event",
"customDataVersion": "2.0",
"customId": uuid.uuid4(),
"customEventTime": dt.datetime.now(UTC()).isoformat(),
"customData": "sample data"
# publish events
for _ in range(3):
event_list = [] # list of events to publish
# create events and append to list
for j in range(randint(1, 3)):
event_list.append(custom_schema_event)
# publish list of events
client.send(event_list)
print("Batch of size published".format(len(event_list)))
time.sleep(randint(1, 5))
if name == '__main__':
publish_event()
我不确定这是否是实现此目的的正确方法,正在寻找解决模块问题和发布事件的更好方法。
我们将不胜感激!
【问题讨论】:
【参考方案1】:从你的问题我们可以看出有两点需要回答:
-
Python 包安装问题
Eventgrid 主题问题
使用pip install安装Azure Python包,需要在终端开启虚拟环境,命令如下:
python -m venv .venv
.venv\Scripts\activate
启用venv后,在requirements.txt中添加所有eventgrid、storage等azure相关包,然后在cmd中运行如下命令:
pip install -r requirements.txt
现在,查看您的代码,函数看起来不错,因为它将事件发送到一般主题。以类似的方式,我们有一个示例,它为我们提供了有关将 eventgrid 事件发布到自定义主题的一些见解。 Azure-SDK-Python-GitHub 存储库中对此进行了解释。 Python代码如下:
import os
from random import randint, sample
import time
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]
#authenticate client
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
services = ["EventGrid", "ServiceBus", "EventHubs", "Storage"] #possible values for data field
def publish_event():
#publish events
for _ in range(3):
event_list = [] #list of events to publish
#create events and append to list
for j in range(randint(1, 3)):
sample_members = sample(services, k=randint(1, 4)) #select random subset of team members
event = EventGridEvent(
subject="Door1",
data="team": sample_members,
event_type="Azure.Sdk.Demo",
data_version="2.0"
)
event_list.append(event)
#publish list of events
client.send(event_list)
print("Batch of size published".format(len(event_list)))
time.sleep(randint(1, 5))
if __name__ == '__main__':
publish_event()
【讨论】:
以上是关于如何将天蓝色事件网格中的事件发布到python中的主题?的主要内容,如果未能解决你的问题,请参考以下文章
Azure 事件中心 Event Grid(事件网格)+Azure Functions处理IOT Hub中的消息