使用 Spark 获取值超过某个阈值的所有列的名称

Posted

技术标签:

【中文标题】使用 Spark 获取值超过某个阈值的所有列的名称【英文标题】:Using Spark to get names of all columns that have a value over some threshold 【发布时间】:2017-04-25 14:46:40 【问题描述】:

烘焙地

我们正在将数据从 Redshift 卸载到 S3 中,然后将其加载到数据帧中,如下所示:

df = spark.read.csv(path, schema=schema, sep='|')

我们在 Spark 2.1.0 中使用 PySpark 和 AWS EMR(版本 5.4.0)。

问题

我有一个 Redshift 表,它以 CSV 格式读入 PySpark。记录采用这种格式:

url,category1,category2,category3,category4
http://example.com,0.6,0.0,0.9,0.3

url 是 VARCHAR,category 值是介于 0.0 和 1.0 之间的 FLOAT。

我想要做的是生成一个新的 DataFrame,每个类别有一行,其中原始数据集中的值高于某个阈值 X。例如,如果阈值设置为 0.5,那么我希望我的新数据集看起来像这样:

url,category
http://example.com,category1
http://example.com,category3

我是 Spark/PySpark 的新手,所以我不确定如何/是否可以这样做,任何帮助将不胜感激!

编辑:

想添加我的解决方案(基于 Pushkr 的代码)。我们要加载大量类别,因此为了避免对每个选择进行硬编码,我做了以下操作:

parsed_df = None
for column in column_list:
    if not parsed_df:
        parsed_df = df.select(df.url, when(df[column]>threshold,column).otherwise('').alias('cat'))
    else:
        parsed_df = parsed_df.union(df.select(df.url, when(df[column]>threshold,column).otherwise('')))
if parsed_df is not None:
    parsed_df = parsed_df.filter(col('cat') != '')

其中 column_list 是先前生成的类别列名称列表,threshold 是选择类别所需的最小值。

再次感谢!

【问题讨论】:

为什么不尝试在单个语句中卸载和创建数据帧的 databrick redshift 包.. 【参考方案1】:

这是我尝试过的东西 -

data = [('http://example.com',0.6,0.0,0.9,0.3),('http://example1.com',0.6,0.0,0.9,0.3)]

df = spark.createDataFrame(data)\
     .toDF('url','category1','category2','category3','category4')

from pyspark.sql.functions import *



df\
    .select(df.url,when(df.category1>0.5,'category1').otherwise('').alias('category'))\
    .union(\
    df.select(df.url,when(df.category2>0.5,'category2').otherwise('')))\
    .union(\
    df.select(df.url,when(df.category3>0.5,'category3').otherwise('')))\
    .union(\
    df.select(df.url,when(df.category4>0.5,'category4').otherwise('')))\
    .filter(col('category')!= '')\
    .show()

输出:

+-------------------+---------+
|                url| category|
+-------------------+---------+
| http://example.com|category1|
|http://example1.com|category1|
| http://example.com|category3|
|http://example1.com|category3|
+-------------------+---------+

【讨论】:

以上是关于使用 Spark 获取值超过某个阈值的所有列的名称的主要内容,如果未能解决你的问题,请参考以下文章

pandas筛选dataframe数据:指定一个数据列的值不等于某个固定值,而且另外一个数据列的值大于某一阈值

pandas筛选dataframe数据:指定字符串数据列的长度超过某一固定阈值的所有数据行

查找引用某个表中特定列的所有存储过程

pandas筛选dataframe数据:筛选一个数据列的内容等于某一固定值,而且另一数据列的数值大于固定阈值的所有数据行

pandas筛选dataframe数据:筛选一个数据列的内容等于某一固定值,而且另一数据列的数值大于固定阈值的所有数据行

pandas筛选dataframe数据:筛选一个数据列的内容等于某一固定值或者另一数据列的数值大于固定阈值的所有数据行(或关系or)