未保存事件中心检查点数据

Posted

技术标签:

【中文标题】未保存事件中心检查点数据【英文标题】:Event Hub Checkpoint Data is Not Saved 【发布时间】:2021-06-07 13:32:08 【问题描述】:

我正在从 https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send#create-a-python-script-to-receive-events 运行事件中心接收器实现,除了连接字符串等之外我没有更改任何内容。

创建数百个事件后,我可以看到接收器在存储帐户内创建了检查点文件夹,但当我再次运行接收器时,我看到它处理相同的事件。

每个分区创建的文件也是空的。

存储在consumerClient中提供:

checkpoint_store = BlobCheckpointStore.from_connection_string("...", "eventhubcontainer")


client = EventHubConsumerClient.from_connection_string("...", consumer_group="$Default", eventhub_name="eventhub1", checkpoint_store=checkpoint_store)

阅读事件后还有保存检查点的方法:

await partition_context.update_checkpoint(event)

我错过了什么吗?


整个代码:

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

async def on_event(partition_context, event):
    print("Received the event: \"\" from the partition with ID: \"\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

【问题讨论】:

我可以成功运行代码,它只接收新事件。能否请您重新启动接收器,然后再次检查。并确保使用最新版本的 sdk。 检查点信息存储为blob文件的元数据(不是blob文件的内容),其中包含两个键值对:offset和sequencenumber,你能检查一下blob的元数据吗调用 update_checkpoint 后是否更新? @IvanYang 是的,更新 SDK 解决了多次处理相同元素的问题。 AdamLing 感谢您提供的信息,我希望将数据存储在 blob 文件中。 @Lubu 我为此添加了一个答案。如果它有帮助,你能接受它作为答案吗?谢谢。 【参考方案1】:

这可能是旧 sdk 中的错误。

请尝试安装最新版本的sdk:azure-eventhub 5.3.1和azure-eventhub-checkpointstoreblob-aio 1.1.3。

我用这些最新的 sdk 测试了你的代码,效果很好。

【讨论】:

【参考方案2】:

更新 SDK 解决了这个问题。 @AdamLing 还在评论中为我澄清了元数据的位置。

【讨论】:

以上是关于未保存事件中心检查点数据的主要内容,如果未能解决你的问题,请参考以下文章

如何从 JavaScript 中的无线电输入中获取“未检查”事件?

检查 angularjs 中未保存的表单数据(使用 ui-router 的多个表单)

TensorFlow 分布式 master worker 静默保存失败;未创建检查点文件,但未引发异常

Azure 事件中心偏移

TF2.0:翻译模型:恢复保存的模型时出错:检查点(根)中未解析的对象.optimizer.iter:属性

具有输入绑定的 Azure 函数的 Azure 事件中心存储容器配置