Spark是不是支持melt和dcast [重复]
Posted
技术标签:
【中文标题】Spark是不是支持melt和dcast [重复]【英文标题】:Does Spark supports melt and dcast [duplicate]Spark是否支持melt和dcast [重复] 【发布时间】:2016-07-28 07:39:29 【问题描述】:我们使用melt 和dcast 将数据从wide->long 和long->wide 格式转换。 详情请参考http://seananderson.ca/2013/10/19/reshape.html。
scala 或 SparkR 都可以。
我经历过这个blog 和scala functions 和R API。 我没有看到做类似工作的函数。
Spark 中有没有等价的功能?如果没有,在 Spark 中有没有其他方法可以做到这一点?
【问题讨论】:
看起来不像。如果您可以将数据放入内存,请使用as.data.frame()
将 Spark DataFrame 转换为原生 data.frame,对其进行整形,然后将其写回 Spark。
因为没有。您需要自己编写。
【参考方案1】:
Reshaping Data with Pivot in Spark 支持使用pivot
进行整形。我知道melt
大致与枢轴相反,也称为unpivot
。我对Spark
比较陌生。据我所知,我试图实施熔化操作。
def melt(df: DataFrame, columns: List[String]): DataFrame =
val restOfTheColumns = df.columns.filterNot(columns.contains(_))
val baseDF = df.select(columns.head, columns.tail: _*)
val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true)))
var newdf = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure)
for(variableCol <- restOfTheColumns)
val colValues = df.select(variableCol).map(r=> r(0).toString)
val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString)))
var colDF =sqlContext.createDataFrame(colRdd, newStructure)
newdf =newdf.unionAll(colDF)
newdf
它完成了工作。但我不太确定效率。
+-----+---+---+----------+------+
| name|sex|age| street|weight|
+-----+---+---+----------+------+
|Alice| f| 34| somewhere| 70|
| Bob| m| 63| nowhere| -70|
|Alice| f|612|nextstreet| 23|
| Bob| m|612| moon| 8|
+-----+---+---+----------+------+
可以作为
melt(df, List("name", "sex"))
结果如下:
+-----+---+--------+----------+
| name|sex|variable| value|
+-----+---+--------+----------+
|Alice| f| age| 34|
| Bob| m| age| 63|
|Alice| f| age| 612|
| Bob| m| age| 612|
|Alice| f| street| somewhere|
| Bob| m| street| nowhere|
|Alice| f| street|nextstreet|
| Bob| m| street| moon|
|Alice| f| weight| 70|
| Bob| m| weight| -70|
|Alice| f| weight| 23|
| Bob| m| weight| 8|
+-----+---+--------+----------+
如果有改进的余地,我希望它有用并感谢您的 cmets。
【讨论】:
【参考方案2】:这是一个 spark.ml.Transformer
,它只使用数据集操作(没有 RDD 的东西)
case class Melt(meltColumns: String*) extends Transformer
override def transform(in: Dataset[_]): DataFrame =
val nonMeltColumns = in.columns.filterNot meltColumns.contains
val newDS = in
.select(nonMeltColumns.head,meltColumns:_*)
.withColumn("variable", functions.lit(nonMeltColumns.head))
.withColumnRenamed(nonMeltColumns.head,"value")
nonMeltColumns.tail
.foldLeft(newDS) case (acc,col) =>
in
.select(col,meltColumns:_*)
.withColumn("variable", functions.lit(col))
.withColumnRenamed(col,"value")
.union(acc)
.select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*)
override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
@DeveloperApi
override def transformSchema(schema: StructType): StructType = ???
override val uid: String = Identifiable.randomUID("Melt")
这是一个使用它的测试
"spark" should "melt a dataset" in
import spark.implicits._
val schema = StructType(
List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++
Range(3,10).map i => StructField("name_"+i,DoubleType).toList)
val ds = Range(1,11)
.map i => Row("a" :: "b" :: Range(3,10).map j => Math.random() .toList :_ *)
.|> rows => spark.sparkContext.parallelize(rows)
.|> rdd => spark.createDataFrame(rdd,schema)
val newDF = ds.transform df =>
Melt("Melt1","Melt2").transform(df)
assert(newDF.count() === 70)
.|> 是 scalaZ 管道运算符
【讨论】:
【参考方案3】:Spark DataFrame 具有 explode
方法,该方法提供 R melt
功能。
适用于 Spark 1.6.1 的示例:
// input df has columns (anyDim, n1, n2)
case class MNV(measureName: String, measureValue: Integer);
val dfExploded = df.explode(col("n1"), col("n2"))
case Row(n1: Int, n2: Int) =>
Array(MNV("n1", n1), MNV("n2", n2))
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue)
【讨论】:
以上是关于Spark是不是支持melt和dcast [重复]的主要内容,如果未能解决你的问题,请参考以下文章
R语言使用reshape2包的melt函数进行dataframe变形将dataframe数据从宽表变换为长表dcast函数把melt函数处理后的数据基于一个自定义公式(formula)从长表到宽表