Spark - 将数据帧分成n条记录

Posted

技术标签:

【中文标题】Spark - 将数据帧分成n条记录【英文标题】:Spark - Divide a dataframe into n number of records 【发布时间】:2020-04-15 01:23:57 【问题描述】:

我有一个包含 2 列或更多列和 1000 条记录的数据框。我想不带任何条件地将数据随机分成100个记录块。

所以记录计数中的预期输出应该是这样的,

[(1,2....100),(101,102,103...200),.....,(900,901...1000)]

在尝试不同的方法后,这是适用于我的用例的解决方案:

https://***.com/a/61276734/12322995

【问题讨论】:

这能回答你的问题吗? Spark Data Frame Random Splitting 感谢您提出可能的答案,但不幸的是它没有回答我的问题。我已经用解决方案更新了我的问题,以及为什么 randomSplit 在这种情况下不起作用 没问题。我建议您按照这个网站的问答模式,将解决方案部分移到新的答案中,而不是在问题中(自我回答的问题很好)。 【参考方案1】:

正如@Shaido 所说,randomsplit 是用于拆分数据帧的流行方法..

用 => spark 2.3 对 repartitionByRange 有不同的看法

repartitionByRange public Dataset repartitionByRange(int 分区数, scala.collection.Seq partitionExprs) 返回由给定分区的新数据集 将表达式划分为 numPartitions。结果数据集是 范围划分。必须至少有一个 partition-by 表达式 指定的。当未指定明确的排序顺序时,“升序空值 假定第一个”。参数:numPartitions -(未记录) partitionExprs -(未记录)返回:(未记录)由于: 2.3.0

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.Dataset, SparkSession

object RepartitionByRange extends App 


  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  val spark = SparkSession.builder().appName(getClass.getName).master("local").getOrCreate()
  val sc = spark.sparkContext

  import spark.implicits._

  val t1 = sc.parallelize(0 until 1000).toDF("id")


  val repartitionedOrders: Dataset[String] = t1.repartitionByRange(10, $"id")
    .mapPartitions(rows => 
      val idsInPartition = rows.map(row => row.getAs[Int]("id")).toSeq.sorted.mkString(",")
      Iterator(idsInPartition)
    )

  repartitionedOrders.show(false)
  println("number of chunks or partitions :" + repartitionedOrders.rdd.getNumPartitions)


结果:

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99                                                                                                              |
|100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199|
|200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,278,279,280,281,282,283,284,285,286,287,288,289,290,291,292,293,294,295,296,297,298,299|
|300,301,302,303,304,305,306,307,308,309,310,311,312,313,314,315,316,317,318,319,320,321,322,323,324,325,326,327,328,329,330,331,332,333,334,335,336,337,338,339,340,341,342,343,344,345,346,347,348,349,350,351,352,353,354,355,356,357,358,359,360,361,362,363,364,365,366,367,368,369,370,371,372,373,374,375,376,377,378,379,380,381,382,383,384,385,386,387,388,389,390,391,392,393,394,395,396,397,398,399|
|400,401,402,403,404,405,406,407,408,409,410,411,412,413,414,415,416,417,418,419,420,421,422,423,424,425,426,427,428,429,430,431,432,433,434,435,436,437,438,439,440,441,442,443,444,445,446,447,448,449,450,451,452,453,454,455,456,457,458,459,460,461,462,463,464,465,466,467,468,469,470,471,472,473,474,475,476,477,478,479,480,481,482,483,484,485,486,487,488,489,490,491,492,493,494,495,496,497,498,499|
|500,501,502,503,504,505,506,507,508,509,510,511,512,513,514,515,516,517,518,519,520,521,522,523,524,525,526,527,528,529,530,531,532,533,534,535,536,537,538,539,540,541,542,543,544,545,546,547,548,549,550,551,552,553,554,555,556,557,558,559,560,561,562,563,564,565,566,567,568,569,570,571,572,573,574,575,576,577,578,579,580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599|
|600,601,602,603,604,605,606,607,608,609,610,611,612,613,614,615,616,617,618,619,620,621,622,623,624,625,626,627,628,629,630,631,632,633,634,635,636,637,638,639,640,641,642,643,644,645,646,647,648,649,650,651,652,653,654,655,656,657,658,659,660,661,662,663,664,665,666,667,668,669,670,671,672,673,674,675,676,677,678,679,680,681,682,683,684,685,686,687,688,689,690,691,692,693,694,695,696,697,698,699|
|700,701,702,703,704,705,706,707,708,709,710,711,712,713,714,715,716,717,718,719,720,721,722,723,724,725,726,727,728,729,730,731,732,733,734,735,736,737,738,739,740,741,742,743,744,745,746,747,748,749,750,751,752,753,754,755,756,757,758,759,760,761,762,763,764,765,766,767,768,769,770,771,772,773,774,775,776,777,778,779,780,781,782,783,784,785,786,787,788,789,790,791,792,793,794,795,796,797,798,799|
|800,801,802,803,804,805,806,807,808,809,810,811,812,813,814,815,816,817,818,819,820,821,822,823,824,825,826,827,828,829,830,831,832,833,834,835,836,837,838,839,840,841,842,843,844,845,846,847,848,849,850,851,852,853,854,855,856,857,858,859,860,861,862,863,864,865,866,867,868,869,870,871,872,873,874,875,876,877,878,879,880,881,882,883,884,885,886,887,888,889,890,891,892,893,894,895,896,897,898,899|
|900,901,902,903,904,905,906,907,908,909,910,911,912,913,914,915,916,917,918,919,920,921,922,923,924,925,926,927,928,929,930,931,932,933,934,935,936,937,938,939,940,941,942,943,944,945,946,947,948,949,950,951,952,953,954,955,956,957,958,959,960,961,962,963,964,965,966,967,968,969,970,971,972,973,974,975,976,977,978,979,980,981,982,983,984,985,986,987,988,989,990,991,992,993,994,995,996,997,998,999|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

number of chunks or partitions : 10


更新:随机分割示例:

  import spark.implicits._

  val t1 = sc.parallelize(0 until 1000).toDF("id")
println("With Random Split ")
  val dfarray = t1.randomSplit(Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
  println("number of dataframes " + dfarray.length + "element order is not guaranteed ")
  dfarray.foreach 
    df => df.show
  

结果:将被分成 10 个数据帧,并且不保证顺序。

With Random Split 
number of dataframes 10element order is not guaranteed 
+---+
| id|
+---+
|  2|
| 10|
| 16|
| 30|
| 36|
| 46|
| 51|
| 91|
|100|
|121|
|136|
|138|
|149|
|152|
|159|
|169|
|198|
|199|
|220|
|248|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 26|
| 40|
| 45|
| 54|
| 63|
| 72|
| 76|
|107|
|129|
|137|
|142|
|145|
|153|
|162|
|173|
|179|
|196|
|208|
|214|
|232|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  7|
| 12|
| 31|
| 32|
| 38|
| 42|
| 53|
| 61|
| 68|
| 73|
| 80|
| 89|
| 96|
|115|
|117|
|118|
|131|
|132|
|139|
|146|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  0|
| 24|
| 35|
| 57|
| 58|
| 65|
| 77|
| 78|
| 84|
| 86|
| 90|
| 97|
|156|
|158|
|168|
|174|
|182|
|197|
|218|
|242|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  1|
|  3|
| 17|
| 18|
| 19|
| 33|
| 70|
| 71|
| 74|
| 83|
|102|
|104|
|108|
|109|
|122|
|128|
|143|
|150|
|154|
|157|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 14|
| 15|
| 29|
| 44|
| 64|
| 75|
| 88|
|103|
|110|
|113|
|116|
|120|
|124|
|135|
|155|
|213|
|221|
|238|
|241|
|251|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  5|
|  9|
| 21|
| 22|
| 23|
| 25|
| 27|
| 47|
| 52|
| 55|
| 60|
| 62|
| 69|
| 93|
|111|
|114|
|141|
|144|
|161|
|164|
+---+
only showing top 20 rows

+---+
| id|
+---+
| 13|
| 20|
| 39|
| 41|
| 49|
| 56|
| 67|
| 85|
| 87|
| 92|
|105|
|106|
|126|
|127|
|160|
|165|
|166|
|171|
|175|
|184|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  4|
| 34|
| 50|
| 79|
| 81|
|101|
|119|
|123|
|133|
|147|
|163|
|170|
|180|
|181|
|193|
|202|
|207|
|222|
|226|
|233|
+---+
only showing top 20 rows

+---+
| id|
+---+
|  6|
|  8|
| 11|
| 28|
| 37|
| 43|
| 48|
| 59|
| 66|
| 82|
| 94|
| 95|
| 98|
| 99|
|112|
|125|
|130|
|134|
|140|
|183|
+---+
only showing top 20 rows

【讨论】:

如果有用请关注accept the answer as owner and voteup 感谢您的回答。但是如果我想要动态的呢?就像我不知道总记录的数量,但我仍然希望它有 100 个块。我想 repartitionByRange 只有在你确切知道你有多少记录并且我们将它分成 10 个分区时才有帮助,因为我们希望每个分区 100 也更新了 randomsplit 示例。 AFAIK 没有其他方法可以实现这一目标。 @RamGhandiyaram 感谢您分享您对可能解决方案的想法。 randomSplit 可能绝对不是一个选项,如果我们动态更改分区计数,repartitionByRange 可以工作。但是我已经用对我有用的解决方案编辑了我的问题***.com/q/61219832/12322995 谢谢 df.collect.grouped(10) 是错误的,因为您正在收集大数据,因此可能会导致 OOM .. 您可以将近似逻辑应用于 repartitionBy Range 以了解您想要的分区数,您可以进行近似计算并除以通过 100 并将其传递给 repartitionByRange 应该可以工作【参考方案2】:

由于我希望数据均匀分布并且能够单独使用块或使用randomSplit 以迭代方式使用这些块不起作用,因为它可能会留下空数据框或分布不均。

因此,如果您不介意在数据帧上调用 collect,那么使用 grouped 可能是最可行的解决方案之一。

Eg: val newdf = df.collect.grouped(10)

这给出了Iterator[List[org.apache.spark.sql.Row]] = non-empty iterator。也可以通过在末尾添加.toList 将其转换为列表

另一种可能的解决方案,如果我们不希望数据帧中的 Array 数据块但仍希望以相等数量的记录对数据进行分区,我们可以尝试通过根据需要调整 timeoutconfidence 来使用 countApprox .然后将其除以我们在分区中需要的记录数,以后可以在使用repartitionCoalesce 时将其用作number of partitions

countApprox而不是count,因为它的操作成本更低,并且当数据量很大时您可以感受到差异

val approxCount = df.rdd.countApprox(timeout = 1000L,confidence = 0.95).getFinalValue().high

val numOfPartitions = Math.max(Math.round(approxCount / 100), 1).toInt

df.repartition(numOfPartitions)

【讨论】:

以上是关于Spark - 将数据帧分成n条记录的主要内容,如果未能解决你的问题,请参考以下文章

根据 CSV 记录从 Spark 数据帧中过滤一些数据

比较两个数据帧以在 spark 中查找子字符串

如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同

在 spark 数据框中使用 where 子句加载数据

API POST Response 仅读取转换为列表的数据帧的最后一条记录,如何更改?