如何对 Pyspark spark.sql 数据框中的数据进行同质化

Posted

技术标签:

【中文标题】如何对 Pyspark spark.sql 数据框中的数据进行同质化【英文标题】:How to homogonize data in a Pyspark spark.sql dataframe 【发布时间】:2019-04-11 00:14:57 【问题描述】:

我下载了一个 1.9 GB 的 csv 文件,其中包含 AirBnB 数据。尽管所有列的数据类型都是“字符串”,但我有一些列不是“同质”的,例如“设施”的列,其中一些条目在该特定属性处有设施计数,而其他条目有便利设施清单。都是字符串格式。

所以,这是我目前所拥有的:

from pyspark import SparkContext, SparkConf
import pandas as pd
import numpy as np
conf = SparkConf().setAppName("app")
sc = SparkContext(conf=conf)

from pyspark.sql import SQLContext
SQLCtx = SQLContext(sc)

air =SQLCtx.read.load('/home/john/Downloads/airbnb-listings.csv',
                             format = "com.databricks.spark.csv",
                             header = "true",
                             sep = ";",
                             inferSchema = "true")

#check for missing values
from pyspark.sql.functions import col,sum
air.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in air.columns)).show()

所以在删除几列然后删除缺失值之后,我有这个:

Keep = ['Price', 'Bathrooms', 'Bedrooms', 'Beds', 'Bed Type', 'Amenities',
       'Security Deposit', 'Cleaning Fee', 'Guests Included', 'Extra People',
       'Review Scores Rating', 'Cancellation Policy','Host Response Rate', 
       'Country Code', 'Zipcode']

data = air.select(*Keep)
reduced2 = data.na.drop()

#final shape after dropping missing values.
print((reduced2.count(), len(reduced2.columns)))

我可以将几行转换为 pandas 数据框:

df3 = pd.DataFrame(reduced2.take(50), columns = reduced2.columns)

“设施”列表的一小部分:

Wireless Internet,Air conditioning,Kitchen,Fre...
2                                                    10
3     Internet,Wireless Internet,Air conditioning,Ki...
4     TV,Cable TV,Internet,Wireless Internet,Air con...
5     TV,Wireless Internet,Air conditioning,Pool,Kit...
6     TV,Wireless Internet,Air conditioning,Pool,Kit...
7     Internet,Wireless Internet,Kitchen,Free parkin...
8     TV,Wireless Internet,Air conditioning,Pool,Kit...
9     Wireless Internet,Air conditioning,Kitchen,Fre...
10    TV,Cable TV,Internet,Wireless Internet,Air con...
14                                                   10
16                                                   10
17    TV,Internet,Wireless Internet,Air conditioning...
18    TV,Cable TV,Internet,Wireless Internet,Air con...
19    TV,Internet,Wireless Internet,Air conditioning...
20    TV,Wireless Internet,Air conditioning,Pool,Kit...
23    TV,Cable TV,Internet,Wireless Internet,Air con...
28                                                    9
33                                                   10
34    Internet,Wireless Internet,Kitchen,Elevator in...
37                                                   10

如您所见,我将很难处理这个问题。 我可以很容易地在普通熊猫中做一些事情来修复它,就像这样:

for i in range(len(df3['Amenities'])):
    if len(df3["Amenities"][i])>2:
        df3['Amenities'][i] = str(len(df3['Amenities'][i].split(',')))

现在我意识到这可能不是最好的方法,但它会将列表中的所有内容都转换为数字。 如果可能的话,我需要一种对 pyspark SQL 数据框中的列执行类似操作的方法。

谢谢!

【问题讨论】:

【参考方案1】:

如果我理解正确,您想计算由, 分隔的项目数,但保留已经是数字的行。如果是这样,您可以尝试以下方法:

from pyspark.sql import functions as F

df.withColumn('Amenities'
    , F.when(df.Amenities.rlike('^\d+$'), df.Amenities) \
       .otherwise(F.size(F.split('Amenities', ","))) \
       .astype("string")
).show()  

因此,如果设施栏是整数df.Amenities.rlike('^\d+$'),我们将保持df.Amenities 不变,否则,使用F.size()F.split() 来计算项目数。然后将结果转换为“字符串”

【讨论】:

这似乎成功了。一个问题,这部分 rlike('^\d+$') 是做什么的?我以前没见过。 @Jabernet, rlike() 是正则匹配,查看文档:spark.apache.org/docs/2.4.0/api/python/…。模式^\d+$ 表示匹配的值仅包含1 个或多个数字[0-9],而不包含任何其他字符。【参考方案2】:

我不熟悉 PySpark SQL Dataframes,只熟悉 vanilla Pandas。

不确定您的任务是什么,但不妨考虑将该栏分为两栏。例如。 (假设这在 PySpark 中是可能的):

df['Amenities_count'] = pd.to_numeric(df['Amenities'], errors='coerce')
mask_entries_with_list = df['Amenities_count'].isna()
mask_entries_with_number = ~mask_entries_with_list
df.loc[mask_entries_with_number, 'Amenities'] = []
df.loc[mask_entries_with_list, 'Amenities_count'] = df['Amenities'].apply(len)

(未经测试)

【讨论】:

以上是关于如何对 Pyspark spark.sql 数据框中的数据进行同质化的主要内容,如果未能解决你的问题,请参考以下文章

在 spark sql--pyspark 中查找特定字符串

如何从pyspark中的数据框中选择一系列行

使用 sql 或 pandas 数据框获取前 5 行的 pyspark 数据框

如何在 Spark SQL 中以多列为中心?

org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'

如何在 pyspark.sql.functions.when() 中使用多个条件?