如何将每一列映射到pyspark数据框中的其他列?

Posted

技术标签:

【中文标题】如何将每一列映射到pyspark数据框中的其他列?【英文标题】:how to map each column to other column in pyspark dataframe? 【发布时间】:2018-10-16 07:30:31 【问题描述】:

我通过执行以下代码创建了数据框。

from pyspark.sql import Row
l = [('Ankit',25,'Ankit','Ankit'),('Jalfaizy',22,'Jalfaizy',"aa"),('saurabh',20,'saurabh',"bb"),('Bala',26,"aa","bb")]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]),lname=x[2],mname=x[3]))
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.show()

执行上述代码后,我的结果如下所示。

+---+--------+-----+--------+
|age|   lname|mname|    name|
+---+--------+-----+--------+
| 25|   Ankit|Ankit|   Ankit|
| 22|Jalfaizy|   aa|Jalfaizy|
| 20| saurabh|   bb| saurabh|
| 26|      aa|   bb|    Bala|
+---+--------+-----+--------+

但我想映射每一行中的每一列值,并根据年龄列,哪些列是相同的,我的预期结果如下所示。

+---+----------------+-------------------+------------------+
|age| lname_map_same | mname_map_same    |    name_map_same |
+---+----------------+-------------------+------------------+
| 25|  mname,name    |   lname,name      |   lname,mname    |
| 22|    name        |  none             |   lname          |
| 20|    name        |  none             |   lname          |
| 26|    none        |  none             |   none           |
+---+----------------+-------------------+------------------+

【问题讨论】:

受人尊敬的输出对我来说看起来很可疑。你确定这应该是输出吗? 是的,我在问题中提到的任何输出都是正确的,以便于理解在给定行中哪些列是相等的。 【参考方案1】:

您可以使用地图功能解决您的问题。看看下面的代码:

df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy"),( 26,"aa","bb","Bala")
], ("age", "lname","mname","name"))
#only 3 records added to dataset

def find_identical(row):
    labels = ["lname","mname","name"]
    result = [row[0],]                 #save the age for final result
    row = row[1:]                      #drop the age from row
    for i in range(3):
        s = []
        field = row[i]
        if field == row[(i+1)%3]:     #check whether field is identical with next field
            s.append(labels[(i+1)%3])
        if field == row[(i-1)%3]:     #check whether field is identical with previous field
            s.append(labels[(i-1)%3])
        if not s:                     #if no identical values found return None
            s = None     
        result.append(s)
    return result

df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same"]).show()

输出:

+---+--------------+--------------+--------------+
|age|lname_map_same|mname_map_same| name_map_same|
+---+--------------+--------------+--------------+
| 25| [mname, name]| [name, lname]|[lname, mname]|
| 22|        [name]|          null|       [lname]|
| 26|          null|          null|          null|
+---+--------------+--------------+--------------+

如果您想要考虑 5 列,您可以按照评论中的说明进行操作。因此,您必须修改标签列表并添加额外的 if 语句。此外,必须调整所有模运算以匹配 5,并且 for 循环应该迭代 5 个元素。然后你会得到如下代码:

df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "lname","mname","name","n1","n2"))

def find_identical(row):
    labels = ["lname","mname","name","n1","n2"]
    result = [row[0],]
    row = row[1:]
        for i in range(5):
            s = []
            field = row[i]
            if field == row[(i+1)%5]:
                s.append(labels[(i+1)%5])
            if field == row[(i-1)%5]:
                s.append(labels[(i-1)%5])
            if field == row[(i+2)%5]:
                s.append(labels[(i+2)%5])
            if field == row[(i+3)%5]:
                s.append(labels[(i+3)%5])
            if not s:
                s = None
            result.append(s)
        return result

df_new.rdd.map(find_identical).toDF(["age","lname_map_same","mname_map_same","name_map_same","n1_map_same","n2_map_same"]).show(truncate=False)

输出:

    +---+---------------------+---------------------+----------------------+------------------------+------------------------+
|age|lname_map_same       |mname_map_same       |name_map_same         |n1_map_same             |n2_map_same             |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+
|25 |[mname, n2, name, n1]|[name, lname, n1, n2]|[n1, mname, n2, lname]|[n2, name, lname, mname]|[lname, n1, mname, name]|
|22 |[name, n1]           |[n2]                 |[n1, lname]           |[name, lname]           |[mname]                 |
|26 |null                 |null                 |null                  |null                    |null                    |
+---+---------------------+---------------------+----------------------+------------------------+------------------------+

动态方法将列数作为参数。但在我的例子中,这个数字应该在 1 到 5 之间,因为数据集是用最多 5 个属性创建的。它可能看起来像这样:

df_new = spark.createDataFrame([
( 25,"Ankit","Ankit","Ankit","Ankit","Ankit"),( 22,"Jalfaizy","aa","Jalfaizy","Jalfaizy","aa"),( 26,"aa","bb","Bala","cc","dd")
], ("age", "n1","n2","n3","n4","n5"))


def find_identical(row,number):
    labels = []
    for n in range(1,number+1):
        labels.append("n"+str(n))   #create labels dynamically
    result = [row[0],]
    row = row[1:]
    for i in range(number):
        s = []
        field = row[i]
        for x in range(1,number):
            if field == row[(i+x)%number]:
                s.append(labels[(i+x)%number]) #check for similarity in all the other fields
        if not s:
            s = None
        result.append(s)
    return result

number=4
colNames=["age",]
for x in range(1,number+1):
    colNames.append("n"+str(x)+"_same") #create the 'nX_same' column names
df_new.rdd.map(lambda r: find_identical(r,number)).toDF(colNames).show(truncate=False)

根据数字参数,输出会有所不同,我将年龄列静态保留为第一列。

输出:

+---+------------+------------+------------+------------+
|age|n1_same     |n2_same     |n3_same     |n4_same     |
+---+------------+------------+------------+------------+
|25 |[n2, n3, n4]|[n3, n4, n1]|[n4, n1, n2]|[n1, n2, n3]|
|22 |[n3, n4]    |null        |[n4, n1]    |[n1, n3]    |
|26 |null        |null        |null        |null        |
+---+------------+------------+------------+------------+

【讨论】:

嗨,gaw,感谢您的快速回复,现在它被硬编码为 3 值,但如果它是 4 列,则无法按预期工作,请您帮帮我。 如果它是 4 列(所以年龄 + 4 个附加列),您必须在标签数组中添加另一个标签并将 range(3) 替换为 range(4) 和所有模运算符(例如所有 %3使用%4),它应该可以工作。你必须添加一个额外的 if 语句:if field == row[(i+2)%4]: s.append(labels[(i+2)%4]) 我添加了以下代码 " labels = ["lname","mname","name","n1","n2"] result = [row[0],] #save the age for final result row = row[1:] #drop the age from row for i in range(5): " but not working 它给出了错误的结果,例如 [mname, n2] 而不是 5 个匹配的列标题。 感谢更新,这仅适用于 5 列。因为我们是硬编码范围值。请动态建议而不是硬编码值。

以上是关于如何将每一列映射到pyspark数据框中的其他列?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框中所有列的总计数为零

减去 Pandas 或 Pyspark 数据框中的连续列

如何创建一列数组,其值来自一列并且它们的长度来自pyspark数据帧中的另一列?

如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况

pyspark 中的 UDF 能否返回与列不同的对象?

如何遍历熊猫数据框中的每一列和每个单元格