从数据存储区查询大量 ndb 实体的最佳实践
Posted
技术标签:
【中文标题】从数据存储区查询大量 ndb 实体的最佳实践【英文标题】:Best practice to query large number of ndb entities from datastore 【发布时间】:2012-07-15 14:23:05 【问题描述】:我在使用 App Engine 数据存储时遇到了一个有趣的限制。我正在创建一个处理程序来帮助我们分析我们的一个生产服务器上的一些使用数据。要执行分析,我需要查询和汇总从数据存储中提取的 10,000 多个实体。计算并不难,它只是通过使用样本的特定过滤器的项目的直方图。我遇到的问题是,我无法以足够快的速度从数据存储中取回数据,无法在达到查询截止日期之前进行任何处理。
我已经尝试了所有我能想到的将查询分块为并行 RPC 调用以提高性能的方法,但根据 appstats,我似乎无法让查询实际并行执行。无论我尝试哪种方法(见下文),RPC 似乎总是会退回到顺序下一个查询的瀑布。
注意:查询和分析代码确实有效,只是运行缓慢,因为我无法足够快地从数据存储中获取数据。
背景
我没有可以分享的实时版本,但这里是我正在谈论的系统部分的基本模型:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
您可以将示例视为用户使用给定名称的功能的时间。 (例如:'systemA.feature_x')。标签基于客户详细信息、系统信息和功能。例如:['winxp'、'2.5.1'、'systemA'、'feature_x'、'premium_account'])。因此,这些标签形成了一组非规范化的标记,可用于查找感兴趣的样本。
我正在尝试进行的分析包括获取一个日期范围并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的一组功能(可能是所有功能)的次数)。
所以处理程序的输入类似于:
开始日期 结束日期 标签输出将是:
[
'company_account': <string>,
'counts': [
'timeperiod': <iso8601 date>, 'count': <int>, ...
]
, ...
]
查询通用代码
以下是所有查询的一些通用代码。处理程序的一般结构是一个使用 webapp2 的简单 get 处理程序,它设置查询参数、运行查询、处理结果、创建要返回的数据。
# -- Build Query Object --- #
query_opts =
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
尝试的方法
我尝试了多种方法来尝试尽快并行地从数据存储中提取数据。目前我尝试过的方法包括:
A.单次迭代
这更像是与其他方法进行比较的简单基本情况。我只是构建查询并遍历所有项目,让 ndb 做它做的事情,一个接一个地拉它们。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B.大获取
这里的想法是看看我是否可以进行一次非常大的提取。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C.跨时间范围异步获取
这里的想法是认识到样本在时间上的间隔相当好,因此我可以创建一组独立的查询,将整个时间区域分成多个块,并尝试使用异步并行运行每个查询:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D.异步映射
我尝试了这种方法,因为文档听起来像是 ndb 在使用 Query.map_async 方法时可能会自动利用一些并行性。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
结果
我测试了一个示例查询来收集总体响应时间和 appstats 跟踪。结果是:
A.单次迭代
真实:15.645s
这个过程一个接一个地依次获取批次,然后从 memcache 中检索每个会话。
B.大获取
真实:12.12s
实际上与选项 A 相同,但由于某种原因要快一些。
C.跨时间范围异步获取
真实:15.251 秒
似乎在开始时提供了更多的并行性,但似乎在结果迭代期间因一系列对 next 的调用而减慢了速度。似乎也无法将会话 memcache 查找与挂起的查询重叠。
D.异步映射
真实:13.752s
这个是我最难理解的。看起来它有很多重叠,但一切似乎都在瀑布中延伸而不是平行。
建议
基于这一切,我错过了什么?我只是达到了 App Engine 的限制,还是有更好的方法来并行拉下大量实体?
我不知道下一步该尝试什么。我考虑过重写客户端以并行向应用程序引擎发出多个请求,但这似乎很暴力。我真的希望应用引擎应该能够处理这个用例,所以我猜我缺少一些东西。
更新
最后我发现选项 C 最适合我的情况。我能够优化它以在 6.1 秒内完成。仍然不完美,但要好得多。
在听取了几个人的建议后,我发现以下几点是理解和牢记的关键:
多个查询可以并行运行 一次只能有 10 个 RPC 飞行 尝试去规范化到没有辅助查询的程度 这种类型的任务最好留给映射 reduce 和任务队列,而不是实时查询所以我做了什么让它更快:
我从一开始就根据时间对查询空间进行了分区。 (注意:分区在返回的实体方面越相等越好) 我进一步对数据进行了非规范化以消除对辅助会话查询的需要 我利用 ndb 异步操作和 wait_any() 将查询与处理重叠我仍然没有得到我期望或喜欢的性能,但它现在是可行的。我只是希望它们是一种更好的方法,可以在处理程序中将大量顺序实体快速拉入内存。
【问题讨论】:
我已经取得了一些进展,并在不到 9 秒的时间内让选项 C 起作用。我想我可以进一步优化它。我发现,如果我将初始查询分成 40 个部分,并且同时发送对所有会话实体的查询,那么大部分 RPC 时间可以重叠。我目前的最大努力是在 9 秒的实时时间内完成 245 秒的 RPC 总时间。我会尝试更多的选择,然后回复最有效的方法。同时,如果有人有更多想法,请告诉我。 嗨,我知道这个问题很老了,但是关于 D. 异步映射,您的 addCountForPeriod 方法是否写入数据存储区?如果是,那么我认为这可能会导致级联,因为异步数据存储操作和同步数据存储操作的混合。 感谢您的精彩帖子。我在这里发布了类似的问题后遇到了这个问题:***.com/questions/25796142/…。和你一样,我很沮丧我无法提高异步查询的性能。我至少想了解他们为什么这么慢。 我遇到了同样的性能问题,试图找到更通用的解决方案here #26759950 这个问题应该在 *** 的一般问答部分中作为正确问题的示例***.com/help/how-to-ask 【参考方案1】:我遇到了类似的问题,在与 Google 支持人员合作了几周后,我可以确认至少在 2017 年 12 月没有神奇的解决方案。
tl;dr: 对于在 B1 实例上运行的标准 SDK,可以预期每秒 220 个实体/秒的吞吐量高达 900 个实体/秒对于在 B8 实例上运行的已修补 SDK。
限制与 CPU 相关,更改实例类型直接影响性能。在 B4 和 B4_1G 实例上获得的类似结果证实了这一点
对于具有大约 30 个字段的 Expando 实体,我获得的最佳吞吐量是:
标准 GAE SDK
B1 实例:~220 个实体/秒 B2 实例:~250 个实体/秒 B4 实例:~560 个实体/秒 B4_1G 实例:~560 个实体/秒 B8 实例:~650 个实体/秒已修补的 GAE SDK
B1 实例:~420 个实体/秒 B8 实例:~900 个实体/秒对于标准 GAE SDK,我尝试了各种方法,包括多线程,但最好的方法是 fetch_async
和 wait_any
。当前的 NDB 库已经在后台使用异步和期货方面做得很好,所以任何使用线程来推动它的尝试只会让情况变得更糟。
我发现了两种有趣的方法来优化它:
马特·福斯 - Speeding up GAE Datastore Reads with Protobuf Projection 埃文·琼斯 - Tracing a Python performance bug on App EngineMatt Faus 很好地解释了这个问题:
GAE SDK 提供了一个 API 用于读取和写入从 您的类到数据存储区。这样可以省去你无聊的工作 验证从数据存储返回的原始数据并重新打包 变成一个易于使用的对象。特别是,GAE 使用协议缓冲区 将原始数据从存储传输到需要的前端机器 它。然后 SDK 负责解码此格式并返回 你的代码的一个干净的对象。这个实用程序很棒,但有时它 做的工作比你想要的多一点。 [...] 使用我们的分析 工具,我发现 50% 的时间花在获取这些 实体处于 protobuf-to-python-object 解码阶段。这 意味着前端服务器上的 CPU 是这些中的瓶颈 数据存储读取!
这两种方法都试图通过减少解码字段的数量来减少从 protobuf 到 Python 解码所花费的时间。
我尝试了这两种方法,但我只用马特的方法成功了。自 Evan 发布他的解决方案以来,SDK 内部结构发生了变化。我不得不对 Matt here 发布的代码稍作修改,但这很容易——如果有兴趣我可以发布最终代码。
对于具有大约 30 个字段的常规 Expando 实体,我使用 Matt 的解决方案仅解码几个字段并获得了显着改进。
总之,需要相应地进行计划,并且不要期望能够在“实时”GAE 请求中处理超过数百个实体。
【讨论】:
【参考方案2】:新的实验性Data Processing 功能(MapReduce 的 AppEngine API)看起来非常适合解决这个问题。它会自动分片以执行多个并行工作进程。
【讨论】:
【参考方案3】:这样的大型处理不应该在有 60 秒时间限制的用户请求中进行。相反,它应该在支持长时间运行请求的上下文中完成。 task queue 支持最长 10 分钟的请求,并且(我相信)正常的内存限制(F1 实例,默认情况下,有 128MB of memory)。对于更高的限制(无请求超时,1GB+ 内存),请使用backends。
这里有一些尝试:设置一个 URL,当访问该 URL 时,会触发一个任务队列任务。如果任务队列任务尚未完成,它会返回一个网页,该网页每隔约 5 秒轮询另一个以 true/false 响应的 URL。任务队列处理数据,这可能需要大约 10 秒,并将结果作为计算数据或呈现的网页保存到数据存储区。一旦初始页面检测到它已经完成,用户就会被重定向到该页面,该页面从数据存储中获取现在计算的结果。
【讨论】:
我也一直在考虑使用后端。我仍然希望让查询在正常的截止日期内工作,但如果这不起作用,我将回退到使用后端来运行它,就像你描述的那样。由于我的瓶颈之一是将所有会话对象加载到本地缓存中,因此如果我可以始终将所有会话保留在内存中,那么还有一种方法可以使用后端来提高性能。 这没有任何答案。这个问题是特定于数据存储应该如何工作的,但事实并非如此。当必须获取 100,000 或 1M 个实体时,同样的问题也适用于任务队列和后端。狗慢,昂贵的数据存储 看看下面 Martin Berends 的 MapReduce andwer。后端已被弃用。有一个很好的指南描述了迁移过程:cloud.google.com/appengine/docs/python/modules/converting【参考方案4】:App Engine 上的大数据操作最好使用某种 mapreduce 操作来实现。
这是一个描述该过程的视频,但包括 BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/
听起来您不需要 BigQuery,但您可能希望同时使用管道的 Map 和 Reduce 部分。
您正在执行的操作与 mapreduce 情况之间的主要区别在于,您要启动一个实例并遍历查询,而在 mapreduce 上,您将为每个查询并行运行一个单独的实例。您将需要一个 reduce 操作来“总结”所有数据,并将结果写入某个地方。
您遇到的另一个问题是您应该使用游标进行迭代。 https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors
如果迭代器使用查询偏移量,它的效率会很低,因为偏移量会发出相同的查询,跳过一些结果,然后给你下一个集合,而光标直接跳转到下一个集合。
【讨论】:
您能否展示一个简单的示例来说明如何使用您的方法来并行获取实体?我认为一个 tasklet 会处理这个问题,但它看起来不像。 我没有使用游标,因为稍后没有任何查询在中间重新启动。它们都立即无偏移地抓取所有实体。至于 map reduce,我想过,但这不是离线分析,它是一个实时查询,内部用户在探索数据时会动态更改。我对 map reduce 的理解是它不适合这种实时交互用例。 我可能做了一个错误的假设,我认为 C 中的 datastore_v3.Next 调用是由于使用了一些基于偏移的迭代器。根据我的经验,Mapreduce 不适合交互式用例,因为 a)您无法预测操作将花费多长时间,并且 b)您通常必须将结果写入数据存储区,而不是接收可以放入的简单结果在模板上。它在客户端有点难看,我认为您需要一种轮询方法来查看结果是否准备好。但是,由于并行性质,它确实比序列化查询要快。 同意 map reduce 可以并行化。我只是希望 ndb 和 async 操作也可以为我的用例进行足够的并行化。我不需要并行计算,只需数据检索。我还考虑过使用 urlfetch 编写一个多级处理程序,该处理程序将产生对子处理程序的请求以获取数据,然后在父处理程序中收集和处理它。似乎必须有一种更简单的方法。 我不认为您将能够可靠地作为实时查询执行此操作,尤其是如果您的数据集(返回的结果变得更大)。以上是关于从数据存储区查询大量 ndb 实体的最佳实践的主要内容,如果未能解决你的问题,请参考以下文章
ndb.query.count() 失败,大型实体的查询截止日期为 60 秒