如何使用 MongoDB 以编程方式预拆分基于 GUID 的分片键

Posted

技术标签:

【中文标题】如何使用 MongoDB 以编程方式预拆分基于 GUID 的分片键【英文标题】:How to Programmatically Pre-Split a GUID Based Shard Key with MongoDB 【发布时间】:2013-11-09 10:32:28 【问题描述】:

假设我使用的是相当标准的 32 字符十六进制 GUID,并且我已经确定,因为它是为我的用户随机生成的,所以它非常适合用作水平扩展写入 MongoDB 集合的分片键我将在其中存储用户信息(并且写入缩放是我的主要关注点)。

我也知道我需要从至少 4 个分片开始,因为流量预测和在测试环境中完成的一些基准测试工作。

最后,我对我的初始数据大小(平均文档大小 * 初始用户数)有了一个不错的了解——大约为 120GB。

我想让初始加载变得又快又好,并尽可能多地利用所有 4 个分片。如何预先拆分这些数据,以便利用 4 个分片并最大限度地减少初始数据加载期间需要在分片上发生的移动、拆分等次数?

【问题讨论】:

【参考方案1】:

我们知道初始数据大小 (120GB),并且我们知道 MongoDB is 64MB 中的默认最大块大小。如果我们将 64MB 分成 120GB,我们得到 1920 - 所以这是我们应该开始寻找的最小块数。碰巧 2048 恰好是 16 除以 2 的幂,并且鉴于 GUID(我们的分片键)是基于十六进制的,这比 1920 更容易处理(见下文)。

注意:此预拆分必须在任何数据添加到集合之前完成。如果您对包含数据的集合使用 enableSharding() 命令,MongoDB 将自行拆分数据,然后您将在块已经存在时运行此命令 - 这可能会导致非常奇怪的块分布,所以要小心。

为了回答这个问题,我们假设数据库将被称为users,而集合被称为userInfo。我们还假设 GUID 将被写入_id 字段。使用这些参数,我们将连接到 mongos 并运行以下命令:

// first switch to the users DB
use users;
// now enable sharding for the users DB
sh.enableSharding("users"); 
// enable sharding on the relevant collection
sh.shardCollection("users.userInfo", "_id" : 1);
// finally, disable the balancer (see below for options on a per-collection basis)
// this prevents migrations from kicking off and interfering with the splits by competing for meta data locks
sh.stopBalancer(); 

现在,根据上面的计算,我们需要将 GUID 范围分成 2048 个块。为此,我们至少需要 3 个十六进制数字 (16 ^ 3 = 4096),我们将把它们放在范围的最高有效数字(即最左边的 3 个)中。同样,这应该从 mongos shell 运行

// Simply use a for loop for each digit
for ( var x=0; x < 16; x++ )
  for( var y=0; y<16; y++ ) 
  // for the innermost loop we will increment by 2 to get 2048 total iterations
  // make this z++ for 4096 - that would give ~30MB chunks based on the original figures
    for ( var z=0; z<16; z+=2 ) 
    // now construct the GUID with zeroes for padding - handily the toString method takes an argument to specify the base
        var prefix = "" + x.toString(16) + y.toString(16) + z.toString(16) + "00000000000000000000000000000";
        // finally, use the split command to create the appropriate chunk
        db.adminCommand(  split : "users.userInfo" , middle :  _id : prefix   );
    
  

完成后,让我们使用 sh.status() 助手检查游戏状态:

mongos> sh.status()
--- Sharding Status ---
  sharding version: 
        "_id" : 1,
        "version" : 3,
        "minCompatibleVersion" : 3,
        "currentVersion" : 4,
        "clusterId" : ObjectId("527056b8f6985e1bcce4c4cb")

  shards:
          "_id" : "shard0000",  "host" : "localhost:30000" 
          "_id" : "shard0001",  "host" : "localhost:30001" 
          "_id" : "shard0002",  "host" : "localhost:30002" 
          "_id" : "shard0003",  "host" : "localhost:30003" 
  databases:
          "_id" : "admin",  "partitioned" : false,  "primary" : "config" 
          "_id" : "users",  "partitioned" : true,  "primary" : "shard0001" 
                users.userInfo
                        shard key:  "_id" : 1 
                        chunks:
                                shard0001       2049
                        too many chunks to print, use verbose if you want to force print

我们有 2048 个块(由于最小/最大块,再加上一个),但由于平衡器已关闭,它们都仍在原始分片上。所以,让我们重新启用平衡器:

sh.startBalancer();

这将立即开始平衡,并且会相对较快,因为所有块都是空的,但仍需要一点时间(如果与其他集合的迁移竞争,则要慢得多)。过了一段时间后,再次运行sh.status(),您(应该)就拥有它了 - 2048 个块都很好地分成了 4 个分片并准备好进行初始数据加载:

mongos> sh.status()
--- Sharding Status ---
  sharding version: 
        "_id" : 1,
        "version" : 3,
        "minCompatibleVersion" : 3,
        "currentVersion" : 4,
        "clusterId" : ObjectId("527056b8f6985e1bcce4c4cb")

  shards:
          "_id" : "shard0000",  "host" : "localhost:30000" 
          "_id" : "shard0001",  "host" : "localhost:30001" 
          "_id" : "shard0002",  "host" : "localhost:30002" 
          "_id" : "shard0003",  "host" : "localhost:30003" 
  databases:
          "_id" : "admin",  "partitioned" : false,  "primary" : "config" 
          "_id" : "users",  "partitioned" : true,  "primary" : "shard0001" 
                users.userInfo
                        shard key:  "_id" : 1 
                        chunks:
                                shard0000       512
                                shard0002       512
                                shard0003       512
                                shard0001       513
                        too many chunks to print, use verbose if you want to force print
          "_id" : "test",  "partitioned" : false,  "primary" : "shard0002" 

您现在已准备好开始加载数据,但为了绝对保证在数据加载完成之前不会发生拆分或迁移,您还需要做一件事 - 在导入期间关闭平衡器和自动拆分:

要禁用所有平衡,请从 mongos 运行此命令:sh.stopBalancer() 如果您想让其他平衡操作继续运行,您可以在特定集合上禁用。以上面的命名空间为例:sh.disableBalancing("users.userInfo") 要在加载过程中关闭自动拆分,您需要使用 --noAutoSplit 选项重新启动您将用于加载数据的每个 mongos

导入完成后,根据需要反转步骤(sh.startBalancer()sh.enableBalancing("users.userInfo"),并在没有--noAutoSplit 的情况下重新启动mongos)以将所有内容恢复为默认设置。

**

更新:优化速度

**

如果您不着急,上述方法很好。从目前的情况来看,如果您对此进行测试,您会发现,平衡器并不是很快 - 即使是空块。因此,当您增加创建的块数时,平衡所需的时间越长。我已经看到完成平衡 2048 个块需要 30 多分钟,但这会因部署而异。

这对于测试或相对安静的集群可能没问题,但在繁忙的集群上,关闭平衡器并且不需要其他更新干扰将更难确保。那么,我们如何加快速度呢?

答案是尽早进行一些手动操作,然后将块放在各自的分片上后将其拆分。请注意,这仅适用于某些分片键(如随机分布的 UUID)或某些数据访问模式,因此请注意不要导致数据分布不佳。

使用上面的示例,我们有 4 个分片,因此我们没有进行所有拆分,然后进行平衡,而是拆分为 4 个。然后我们通过手动移动它们在每个分片上放置一个块,最后我们将这些块分成所需的数量。

上例中的范围如下所示:

$min --> "40000000000000000000000000000000"
"40000000000000000000000000000000" --> "80000000000000000000000000000000"
"80000000000000000000000000000000" --> "c0000000000000000000000000000000"
"c0000000000000000000000000000000" --> $max     

创建这些只有 4 个命令,但既然我们有它,为什么不以简化/修改的形式重新使用上面的循环:

for ( var x=4; x < 16; x+=4)
    var prefix = "" + x.toString(16) + "0000000000000000000000000000000";
    db.adminCommand(  split : "users.userInfo" , middle :  _id : prefix   ); 
 

这是 thinks 现在的样子 - 我们有 4 个块,都在 shard0001 上:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: 
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("53467e59aea36af7b82a75c1")

  shards:
      "_id" : "shard0000",  "host" : "localhost:30000" 
      "_id" : "shard0001",  "host" : "localhost:30001" 
      "_id" : "shard0002",  "host" : "localhost:30002" 
      "_id" : "shard0003",  "host" : "localhost:30003" 
  databases:
      "_id" : "admin",  "partitioned" : false,  "primary" : "config" 
      "_id" : "test",  "partitioned" : false,  "primary" : "shard0001" 
      "_id" : "users",  "partitioned" : true,  "primary" : "shard0001" 
        users.userInfo
            shard key:  "_id" : 1 
            chunks:
                shard0001   4
             "_id" :  "$minKey" : 1   -->>  "_id" : "40000000000000000000000000000000"  on : shard0001 Timestamp(1, 1) 
             "_id" : "40000000000000000000000000000000"  -->>  "_id" : "80000000000000000000000000000000"  on : shard0001 Timestamp(1, 3) 
             "_id" : "80000000000000000000000000000000"  -->>  "_id" : "c0000000000000000000000000000000"  on : shard0001 Timestamp(1, 5) 
             "_id" : "c0000000000000000000000000000000"  -->>  "_id" :  "$maxKey" : 1   on : shard0001 Timestamp(1, 6)                    

我们将把$min 块留在原处,并移动其他三个。您可以通过编程方式执行此操作,但这确实取决于块最初所在的位置、您如何命名分片等。所以我现在将离开本手册,它不会太繁琐 - 只需 3 个moveChunk 命令:

mongos> sh.moveChunk("users.userInfo", "_id" : "40000000000000000000000000000000", "shard0000")
 "millis" : 1091, "ok" : 1 
mongos> sh.moveChunk("users.userInfo", "_id" : "80000000000000000000000000000000", "shard0002")
 "millis" : 1078, "ok" : 1 
mongos> sh.moveChunk("users.userInfo", "_id" : "c0000000000000000000000000000000", "shard0003")
 "millis" : 1083, "ok" : 1           

让我们仔细检查一下,确保块在我们预期的位置:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: 
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("53467e59aea36af7b82a75c1")

  shards:
      "_id" : "shard0000",  "host" : "localhost:30000" 
      "_id" : "shard0001",  "host" : "localhost:30001" 
      "_id" : "shard0002",  "host" : "localhost:30002" 
      "_id" : "shard0003",  "host" : "localhost:30003" 
  databases:
      "_id" : "admin",  "partitioned" : false,  "primary" : "config" 
      "_id" : "test",  "partitioned" : false,  "primary" : "shard0001" 
      "_id" : "users",  "partitioned" : true,  "primary" : "shard0001" 
        users.userInfo
            shard key:  "_id" : 1 
            chunks:
                shard0001   1
                shard0000   1
                shard0002   1
                shard0003   1
             "_id" :  "$minKey" : 1   -->>  "_id" : "40000000000000000000000000000000"  on : shard0001 Timestamp(4, 1) 
             "_id" : "40000000000000000000000000000000"  -->>  "_id" : "80000000000000000000000000000000"  on : shard0000 Timestamp(2, 0) 
             "_id" : "80000000000000000000000000000000"  -->>  "_id" : "c0000000000000000000000000000000"  on : shard0002 Timestamp(3, 0) 
             "_id" : "c0000000000000000000000000000000"  -->>  "_id" :  "$maxKey" : 1   on : shard0003 Timestamp(4, 0)  

这符合我们上面建议的范围,所以看起来都不错。现在运行上面的原始循环以在每个分片上“就地”拆分它们,一旦循环完成,我们应该有一个平衡的分布。另外一位sh.status() 应该确认一下:

mongos> for ( var x=0; x < 16; x++ )
...   for( var y=0; y<16; y++ ) 
...   // for the innermost loop we will increment by 2 to get 2048 total iterations
...   // make this z++ for 4096 - that would give ~30MB chunks based on the original figures
...     for ( var z=0; z<16; z+=2 ) 
...     // now construct the GUID with zeroes for padding - handily the toString method takes an argument to specify the base
...         var prefix = "" + x.toString(16) + y.toString(16) + z.toString(16) + "00000000000000000000000000000";
...         // finally, use the split command to create the appropriate chunk
...         db.adminCommand(  split : "users.userInfo" , middle :  _id : prefix   );
...     
...   
...           
 "ok" : 1 
mongos> sh.status()
--- Sharding Status --- 
  sharding version: 
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("53467e59aea36af7b82a75c1")

  shards:
      "_id" : "shard0000",  "host" : "localhost:30000" 
      "_id" : "shard0001",  "host" : "localhost:30001" 
      "_id" : "shard0002",  "host" : "localhost:30002" 
      "_id" : "shard0003",  "host" : "localhost:30003" 
  databases:
      "_id" : "admin",  "partitioned" : false,  "primary" : "config" 
      "_id" : "test",  "partitioned" : false,  "primary" : "shard0001" 
      "_id" : "users",  "partitioned" : true,  "primary" : "shard0001" 
        users.userInfo
            shard key:  "_id" : 1 
            chunks:
                shard0001   513
                shard0000   512
                shard0002   512
                shard0003   512
            too many chunks to print, use verbose if you want to force print    

你有它 - 不用等待平衡器,分配已经均匀。

【讨论】:

为什么不根据分片的数量进行拆分,并让拆分随着数据的进入而发生? 拆分现在很便宜,以后会更贵(尽管总体上是轻量级的)。这避免了拆分的需要,除非您增加您拥有的数据量(或以某种方式搞砸数据的分布) - 拆分成本低,但不是免费的,特别是从配置服务器的角度来看,它们可能由于各种原因而失败(下降配置服务器,网络等)-另外,如果您有很多 mongos 实例并且流量分布均匀(边缘情况,允许),它可能会特别糟糕。可能还有其他原因,那为什么要冒险呢? 嗨,亚当,我遇到了同样的问题,我也没有做任何预拆分突然之间它开始制作 1 KB 的块,而不是指定的 250 MB 限制。 dba.stackexchange.com/questions/87407/… @AdamComerford 我已经以更通用的方式将您的答案放在一起,可用作gist 不知何故,它创建的块总是比我要求的要少,也许您会在代码...(即:示例应该产生 256 个块,但不知何故创建了一个 201 和 211 之后的一个) 嘿,这是一个非常有用的答案@AdamComerford——谢谢。对于将来遇到此问题的任何人,我需要将约 4TB 和约 10 亿个文档加载到 4 分片集群上。最终,我发现性能完全没问题,只需创建 256 个块,让平衡器将它们分散开,然后在我开始加载数据时让平衡器保持开启状态。 (在进入这个 SO 帖子之前,我尝试了 1 个块,它甚至从未分裂,似乎负载对那个或其他东西来说太压倒了)

以上是关于如何使用 MongoDB 以编程方式预拆分基于 GUID 的分片键的主要内容,如果未能解决你的问题,请参考以下文章

如何以编程方式禁用硬件预取?

如何以编程方式强制 mongoose 查询并返回 mongodb 集合中的某个字段?

如何在 MongoDB 中以异步方式检索基于推送的通知的信息

以编程方式设置 MongoDb 转换器

如何通过 .NET 在 MongoDB 中创建索引

C#:以编程方式创建拆分 zip 文件