使用 Spark 将列转置为行
Posted
技术标签:
【中文标题】使用 Spark 将列转置为行【英文标题】:Transpose column to row with Spark 【发布时间】:2016-10-18 06:25:40 【问题描述】:我正在尝试将表格的某些列转换为行。 我正在使用 Python 和 Spark 1.5.0。这是我的初始表格:
+-----+-----+-----+-------+
| A |col_1|col_2|col_...|
+-----+-------------------+
| 1 | 0.0| 0.6| ... |
| 2 | 0.6| 0.7| ... |
| 3 | 0.5| 0.9| ... |
| ...| ...| ...| ... |
我想要这样的东西:
+-----+--------+-----------+
| A | col_id | col_value |
+-----+--------+-----------+
| 1 | col_1| 0.0|
| 1 | col_2| 0.6|
| ...| ...| ...|
| 2 | col_1| 0.6|
| 2 | col_2| 0.7|
| ...| ...| ...|
| 3 | col_1| 0.5|
| 3 | col_2| 0.9|
| ...| ...| ...|
有人知道我能做到吗?感谢您的帮助。
【问题讨论】:
另见unpivot in spark-sql/pyspark和How to melt Spark DataFrame? 【参考方案1】:使用基本的 Spark SQL 函数相对简单。
Python
from pyspark.sql.functions import array, col, explode, struct, lit
df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])
def to_long(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
to_long(df, ["A"])
斯卡拉:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.array, col, explode, lit, struct
val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")
def toLong(df: DataFrame, by: Seq[String]): DataFrame =
val (cols, types) = df.dtypes.filter case (c, _) => !by.contains(c).unzip
require(types.distinct.size == 1, s"$types.distinct.toString.length != 1")
val kvs = explode(array(
cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
))
val byExprs = by.map(col(_))
df
.select(byExprs :+ kvs.alias("_kvs"): _*)
.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
toLong(df, Seq("A"))
【讨论】:
我不认为这“相对”简单:) 我收到错误AssertionError: All columns have to be of the same type
如何用 Java 写这个?
如何反其道而行之。如何从第二个数据帧制作第一个数据帧?
@ShekharKoirala 这是因为你的数据框中的列是不同的数据类型,在函数代码中明确提到了。【参考方案2】:
使用函数create_map
和explode
解决pyspark sql
的一种方法。
from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant
df = df.withColumn('mapCol', \
func.create_map(func.lit('col_1'),df.col_1,
func.lit('col_2'),df.col_2,
func.lit('col_3'),df.col_3
)
)
#Use explode function to explode the map
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()
【讨论】:
很好的答案,你能解释一下吗? ***.com/questions/67374048/…你能看看这里吗?【参考方案3】:Spark 局部线性代数库目前非常薄弱:它们不包括上述基本操作。
有一个 JIRA 可以为 Spark 2.1 解决此问题 - 但这对您今天没有帮助。
需要考虑的一点:执行转置可能需要完全打乱数据。
现在您需要直接编写 RDD 代码。我在 scala 中写过 transpose
- 但不是在 python 中。这是scala
版本:
def transpose(mat: DMatrix) =
val nCols = mat(0).length
val matT = mat
.flatten
.zipWithIndex
.groupBy
_._2 % nCols
.toSeq.sortBy
_._1
.map(_._2)
.map(_.map(_._1))
.toArray
matT
因此您可以将其转换为 python 以供您使用。在这个特定时刻,我没有足够的带宽来编写/测试它:如果您无法进行该转换,请告诉我。
至少 - 以下内容很容易转换为python
。
zipWithIndex
--> enumerate()
(python 等效项 - 归功于 @zero323)
map
--> [someOperation(x) for x in ..]
groupBy
--> itertools.groupBy()
这是flatten
的实现,它没有等效的python:
def flatten(L):
for item in L:
try:
for i in flatten(item):
yield i
except TypeError:
yield item
因此,您应该能够将它们放在一起以获得解决方案。
【讨论】:
感谢您的回答。我不知道 scala,但我会尝试理解您的代码。我会及时通知您。 @Raouf 上面的代码在 python 中都有等价物。如果你很了解python,应该不会有问题。我展示了flatten
,这是python中唯一缺少的。让我知道;)
zipWithIndex
--> enumerate()
(Python 等效项)?
@zero323 好眼睛!顺便说一句,我要为你的好答案投票。
谢谢。它稍微有点冗长,但不会移动太多数据。【参考方案4】:
您可以使用 stack 函数:
例如:
df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")
地点:
2 是要堆叠的列数(col_1 和 col_2) 'col_1' 是键的字符串 col_1 是从中获取值的列如果您有多个列,您可以构建整个 stack 字符串迭代列名并将其传递给 selectExpr
【讨论】:
df.selectExpr('column_names_to_keep', 'column_names_to_keep', "stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)") 你能看看这里吗? ***.com/questions/67374048/… 我想我因为列名而面临一个问题 我正在使用这个函数,但是遇到了不同数据类型的列。 IE。有些是字符串,有些是十进制。如何使用堆栈将十进制类型转换为字符串?【参考方案5】:使用平面图。像下面这样的东西应该可以工作
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop('A')
for k in rowDict:
yield Row(**'A': valA , 'colID': k, 'colValue': row[k])
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
【讨论】:
感谢您的回答。但它不起作用。这是我收到的错误消息:TypeError: tuple indices must be integers, not str【参考方案6】:我采用了 @javadba 编写的 Scala 答案,并创建了一个 Python 版本,用于转置 DataFrame
中的所有列。这可能与 OP 所要求的有点不同......
from itertools import chain
from pyspark.sql import DataFrame
def _sort_transpose_tuple(tup):
x, y = tup
return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]
def transpose(X):
"""Transpose a PySpark DataFrame.
Parameters
----------
X : PySpark ``DataFrame``
The ``DataFrame`` that should be tranposed.
"""
# validate
if not isinstance(X, DataFrame):
raise TypeError('X should be a DataFrame, not a %s'
% type(X))
cols = X.columns
n_features = len(cols)
# Sorry for this unreadability...
return X.rdd.flatMap( # make into an RDD
lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
lambda grp_res: grp_res[0]).map( # sort by index % n_features key
lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
lambda key_col: key_col[1]).toDF() # return to DF
例如:
>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 7| 8| 9|
+---+---+---+
>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| 4| 7|
| 2| 5| 8|
| 3| 6| 9|
+---+---+---+
【讨论】:
你能看看这里吗? ***.com/questions/67374048/…【参考方案7】:一种非常方便的实现方式:
from pyspark.sql import Row
def rowExpander(row):
rowDict = row.asDict()
valA = rowDict.pop('A')
for k in rowDict:
yield Row(**'A': valA , 'colID' : k, 'colValue' : row[k])
newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)
【讨论】:
【参考方案8】:为了在pySpark
中转置Dataframe,我在临时创建的列上使用pivot
,我在操作结束时删除了该列。
说,我们有一张这样的桌子。我们要做的是找到每个listed_days_bin
值的所有用户。
+------------------+-------------+
| listed_days_bin | users_count |
+------------------+-------------+
|1 | 5|
|0 | 2|
|0 | 1|
|1 | 3|
|1 | 4|
|2 | 5|
|2 | 7|
|2 | 2|
|1 | 1|
+------------------+-------------+
创建新的临时列 - 'pvt_value'
,对其进行聚合并透视结果
import pyspark.sql.functions as F
agg_df = df.withColumn('pvt_value', lit(1))\
.groupby('pvt_value')\
.pivot('listed_days_bin')\
.agg(F.sum('users_count')).drop('pvt_value')
新数据框应如下所示:
+----+---+---+
| 0 | 1 | 2 | # Columns
+----+---+---+
| 3| 13| 14| # Users over the bin
+----+---+---+
【讨论】:
你能看看这里吗? ***.com/questions/67374048/… 从 OPs 问题来看,这是相反的情况以上是关于使用 Spark 将列转置为行的主要内容,如果未能解决你的问题,请参考以下文章