如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列
Posted
技术标签:
【中文标题】如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列【英文标题】:How to get all columns after groupby on Dataset<Row> in spark sql 2.1.0 【发布时间】:2017-01-05 07:05:24 【问题描述】:首先,我对 SPARK 很陌生
我的数据集中有数百万条记录,我想用名称列分组并查找具有最大年龄的名称。我得到了正确的结果,但我需要结果集中的所有列。
Dataset<Row> resultset = studentDataSet.select("*").groupBy("name").max("age");
resultset.show(1000,false);
我的结果集数据集中只有姓名和最大(年龄)。
【问题讨论】:
【参考方案1】:对于您的解决方案,您必须尝试不同的方法。您几乎可以找到解决方案,但让我帮助您理解。
Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");
现在你可以做的是加入resultset
和studentDataSet
Dataset<Row> joinedDS = studentDataset.join(resultset, "name");
groupBy
的问题在于,在应用 groupBy 后,您会得到 RelationalGroupedDataset
,因此这取决于您执行的下一个操作,例如 sum, min, mean, max
等,然后这些操作的结果与 groupBy
结合使用
在您的情况下,name
列与 age
的 max
连接,因此它将仅返回两列,但如果在 age
上使用应用 groupBy
,然后在“年龄”上应用 max
列您将得到两列,第一列是age
,第二列是max(age)
。
注意 :- 代码未经测试,如有需要请进行更改 希望这可以清除您的查询
【讨论】:
嘿,谢谢! studentDataset.join(resultset, expression) 解决了我的问题。 我知道这是 1 年前的帖子,但仍然是嘿阿努普!很高兴知道您找到了解决方案,您能否通过发布您的代码来帮助我们? 嘿 Akash 和 Anup 这对我不起作用。即使在加入之后,我也只得到两列而不是其他列。它是否也与火花版本有关。我正在使用火花 2.1 我也认为加入可以产生第二轮洗牌,所以我添加了另一个解决方案,在最后不加入并保持使用严格的数据集使用(无数据帧)。【参考方案2】:接受的答案并不理想,因为它需要加入。加入大数据帧可能会导致执行缓慢的大洗牌。
让我们创建一个示例数据集并测试代码:
val df = Seq(
("bob", 20, "blah"),
("bob", 40, "blah"),
("karen", 21, "hi"),
("monica", 43, "candy"),
("monica", 99, "water")
).toDF("name", "age", "another_column")
此代码在大型 DataFrame 上运行速度应该更快。
df
.groupBy("name")
.agg(
max("name").as("name1_dup"),
max("another_column").as("another_column"),
max("age").as("age")
).drop(
"name1_dup"
).show()
+------+--------------+---+
| name|another_column|age|
+------+--------------+---+
|monica| water| 99|
| karen| hi| 21|
| bob| blah| 40|
+------+--------------+---+
【讨论】:
.agg( max("name").as("name1_dup"), max("another_column").as("another_column"), max("age").as("age ") ) 给出编译时异常 如果您将行("monica", 43, "candy")
更改为 ("monica", 43, "zebra")
,此示例不工作。 @AnujMehra - 代码在 REPL 中为我解释。【参考方案3】:
你想要达到的目标是
-
按年龄分组行
将每组减少到 1 行,最大年龄
此替代方案无需使用聚合即可实现此输出
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object TestJob5
def main (args: Array[String]): Unit =
val sparkSession = SparkSession
.builder()
.appName(this.getClass.getName.replace("$", ""))
.master("local")
.getOrCreate()
val sc = sparkSession.sparkContext
sc.setLogLevel("ERROR")
import sparkSession.sqlContext.implicits._
val rawDf = Seq(
("Moe", "Slap", 7.9, 118),
("Larry", "Spank", 8.0, 115),
("Curly", "Twist", 6.0, 113),
("Laurel", "Whimper", 7.53, 119),
("Hardy", "Laugh", 6.0, 118),
("Charley", "Ignore", 9.7, 115),
("Moe", "Spank", 6.8, 118),
("Larry", "Twist", 6.0, 115),
("Charley", "fall", 9.0, 115)
).toDF("name", "requisite", "funniness_of_requisite", "age")
rawDf.show(false)
rawDf.printSchema
val nameWindow = Window
.partitionBy("name")
val aggDf = rawDf
.withColumn("id", monotonically_increasing_id)
.withColumn("maxFun", max("funniness_of_requisite").over(nameWindow))
.withColumn("count", count("name").over(nameWindow))
.withColumn("minId", min("id").over(nameWindow))
.where(col("maxFun") === col("funniness_of_requisite") && col("minId") === col("id") )
.drop("maxFun")
.drop("minId")
.drop("id")
aggDf.printSchema
aggDf.show(false)
请记住,一个组可能有超过 1 行的最大年龄,因此您需要通过某种逻辑选择一个。在示例中,我认为这无关紧要,所以我只分配一个唯一的数字来选择
【讨论】:
【参考方案4】:注意到随后的连接是额外的洗牌,并且其他一些解决方案在返回中似乎不准确,甚至将数据集转换为数据帧,我寻求更好的解决方案。这是我的:
case class People(name: String, age: Int, other: String)
val df = Seq(
People("Rob", 20, "cherry"),
People("Rob", 55, "banana"),
People("Rob", 40, "apple"),
People("Ariel", 55, "fox"),
People("Vera", 43, "zebra"),
People("Vera", 99, "horse")
).toDS
val oldestResults = df
.groupByKey(_.name)
.mapGroups
case (nameKey, peopleIter) =>
var oldestPerson = peopleIter.next
while(peopleIter.hasNext)
val nextPerson = peopleIter.next
if(nextPerson.age > oldestPerson.age) oldestPerson = nextPerson
oldestPerson
oldestResults.show
以下产生:
+-----+---+------+
| name|age| other|
+-----+---+------+
|Ariel| 55| fox|
| Rob| 55|banana|
| Vera| 99| horse|
+-----+---+------+
【讨论】:
【参考方案5】:您需要记住聚合函数会减少行数,因此您需要使用减少函数指定您想要哪些行年龄。如果您想保留组的所有行(警告!这可能导致爆炸或分区倾斜),您可以将它们收集为一个列表。然后,您可以使用 UDF(用户定义的函数)按照您的标准减少它们,在这个例子中是 funniness_of_requisite。然后使用另一个 UDF 从单个缩减行扩展属于缩减行的列。 出于此答案的目的,我假设您希望保留具有最大 funniness_of_requisite 的人的年龄。
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType, StringType
import scala.collection.mutable
object TestJob4
def main (args: Array[String]): Unit =
val sparkSession = SparkSession
.builder()
.appName(this.getClass.getName.replace("$", ""))
.master("local")
.getOrCreate()
val sc = sparkSession.sparkContext
import sparkSession.sqlContext.implicits._
val rawDf = Seq(
(1, "Moe", "Slap", 7.9, 118),
(2, "Larry", "Spank", 8.0, 115),
(3, "Curly", "Twist", 6.0, 113),
(4, "Laurel", "Whimper", 7.53, 119),
(5, "Hardy", "Laugh", 6.0, 18),
(6, "Charley", "Ignore", 9.7, 115),
(2, "Moe", "Spank", 6.8, 118),
(3, "Larry", "Twist", 6.0, 115),
(3, "Charley", "fall", 9.0, 115)
).toDF("id", "name", "requisite", "funniness_of_requisite", "age")
rawDf.show(false)
rawDf.printSchema
val rawSchema = rawDf.schema
val fUdf = udf(reduceByFunniness, rawSchema)
val nameUdf = udf(extractAge, IntegerType)
val aggDf = rawDf
.groupBy("name")
.agg(
count(struct("*")).as("count"),
max(col("funniness_of_requisite")),
collect_list(struct("*")).as("horizontal")
)
.withColumn("short", fUdf($"horizontal"))
.withColumn("age", nameUdf($"short"))
.drop("horizontal")
aggDf.printSchema
aggDf.show(false)
def reduceByFunniness= (x: Any) =>
val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]]
val red = d.reduce((r1, r2) =>
val funniness1 = r1.getAs[Double]("funniness_of_requisite")
val funniness2 = r2.getAs[Double]("funniness_of_requisite")
val r3 = funniness1 match
case a if a >= funniness2 =>
r1
case _ =>
r2
r3
)
red
def extractAge = (x: Any) =>
val d = x.asInstanceOf[GenericRowWithSchema]
d.getAs[Int]("age")
d.getAs[String]("name")
这是输出
+-------+-----+---------------------------+-------------------------------+---+
|name |count|max(funniness_of_requisite)|short
|age|
+-------+-----+---------------------------+-------------------------------+---+
|Hardy |1 |6.0 |[5, Hardy, Laugh, 6.0, 18]
|18 |
|Moe |2 |7.9 |[1, Moe, Slap, 7.9, 118]
|118|
|Curly |1 |6.0 |[3, Curly, Twist, 6.0, 113]
|113|
|Larry |2 |8.0 |[2, Larry, Spank, 8.0, 115]
|115|
|Laurel |1 |7.53 |[4, Laurel, Whimper, 7.53, 119]|119|
|Charley|2 |9.7 |[6, Charley, Ignore, 9.7, 115] |115|
+-------+-----+---------------------------+-------------------------------+---+
【讨论】:
以上是关于如何在 spark sql 2.1.0 中的 Dataset<Row> 上获取 groupby 之后的所有列的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 2.1.0 中使用 SparkSQL 将“.txt”转换为“.parquet”?
在 HDP 的 spark2 sql 中无法访问 Hive 表