pyspark 结构数据处理

Posted 数融咖啡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark 结构数据处理相关的知识,希望对你有一定的参考价值。

现在随着技术的更新,数据化实现越来越高效便捷,一整套大数据系统,至少需要从数据建模、技术选型、页面交互三方面实现。数据建模如水流,贯穿整个数据分析系统;技术选型是基础设施,支撑整个系统高效运转;页面交互是面向用户,用数据说话, 对业务增长进行数据赋能,实现数据驱动。

在复杂的数据分析场景中,通常需要基于用户画像与用户行为,对用户进行OLAP多维自由交叉组合分析。因此,对于百万级以上的产品业务,使用mysql是无法满足OLAP实时分析,需要尝试新的技术选型。

存储引擎上对于现在的很多半结构化或是非结构化数据,大多基于HDFS和HBASE,计算查询引擎多用impala、clickhouse、Druid和sparkSQL等,clickhouse查询速度较快,但是druid和clickhouse在join的多表关联上是短板,所以现在比较流行的是整合各个的优势,一种方案是spark结合clickhouse 或是flink等,另外,Spark可以无缝访问HDFS中Hive表数据,无需重新导数据,应用效率高。比如使用HDFS存储历史全量用户标签与用户行为数据,使用Clickhouse存储近期用户标签与用户行为数据 。

所以sparksql就是非常多用的数据处理工具,比如用户画像,推荐系统里。

Spark提供了一个Python_Shell,即pyspark,从而可以以交互的方式使用Python编写Spark程序。
有关Spark的基本架构介绍参考http://blog.csdn.net/cymy001/article/details/78483614;
有关Pyspark的环境配置参考http://blog.csdn.net/cymy001/article/details/78430892。

可以和pandas对比下, 理解pyspark的机制和语法

项目 Pandas Spark
工作方式 单机模式,没有并行机制 分布式并行计算框架,内建并行机制,所有数据和操作可以自动并行分布在各个集群节点。支持Hadoop,能处理海量数据
延迟机制 非延迟计算 延迟计算
内存缓存 单机缓存 persist() 或cache 转换的RDDs保存在内存
DataFrame可变性 Pandas中DataFrame是可变的 Spark中RDDs是不可变的,因此DataFrame也是不可变,一般是新增数据来做更新
数据框互相转换 从spark_df转换:pandas_df= spark_df.toPandas() 从pandas_df转为spark:spark_df=SQLContext.createDataFrame(Pandas_df),另外,createDataFrame支持从list转换,list元素可以是tuple,dict和rdd
其他数据类型转换 list,dict,ndarray 已有RDDs转换
数据集读取 csv,hdf5,Excel读取 结构化数据文件,json,hive表或外部数据库等读取
index索引 自动创建 没有index索引,若需要额外创建该列
行结构 Series结构,属于pandas DataFrame结构 Row结构,属于spark DataFrame结构
列结构 Series结构,属于pandas DataFrame结构 Column结构,属于Spark DataFrame结构,如:DataFrame[name: string]
列名称 不允许重名 允许重名,修改列名采用alias方法
列添加 df["xx"]=0 df.withColumn(“xx”,0).show() 会报错,from pyspark.sql import functions,df.withColumn(“xx”, functions.lit(0)).show()
列修改 原来有df[“xx”]列,df[“xx”] = 1 原来有df[“xx”]列,df.withColumn(“xx”, 1).show()
数据显示 df输出具体内容,没有树结构输出形式 df不输出具体内容,输出内容用show,以树的形式打印概要,df.printSchema(),df.collect()

还有很多结构化处理的方式,比如合并,筛选过滤等方式
可以通过一个具体案例来了解,案例来自阿里天池的二手车价格练习数据:

数据预览

数据导入

#读取数据
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("dataframe").getOrCreate()
sc = SparkContext.getOrCreate()
data = spark.read.csv(r"input/used_car_train_20200313.csv",header=True,inferSchema=True,sep = ' ')

/spark/python/pyspark/context.py:123: UserWarning: You are passing in an insecure Py4j gateway.  This presents a security risk, and will be completely forbidden in Spark 3.0
"You are passing in an insecure Py4j gateway. This "

test = spark.read.csv(r"input/used_car_testA_20200313.csv",header=True,inferSchema=True,sep = ' ')

数据概览

#检测数据格式,确保数据格式是否正确
print(data.count())
data.printSchema()

150000
root
|-- SaleID: integer (nullable = true)
|-- name: integer (nullable = true)
|-- regDate: integer (nullable = true)
|-- model: double (nullable = true)
|-- brand: integer (nullable = true)
|-- bodyType: double (nullable = true)
|-- fuelType: double (nullable = true)
|-- gearbox: double (nullable = true)
|-- power: integer (nullable = true)
|-- kilometer: double (nullable = true)
|-- notRepairedDamage: string (nullable = true)
|-- regionCode: integer (nullable = true)
|-- seller: integer (nullable = true)
|-- offerType: integer (nullable = true)
|-- creatDate: integer (nullable = true)
|-- price: integer (nullable = true)
|-- v_0: double (nullable = true)
|-- v_1: double (nullable = true)
|-- v_2: double (nullable = true)
|-- v_3: double (nullable = true)
|-- v_4: double (nullable = true)
|-- v_5: double (nullable = true)
|-- v_6: double (nullable = true)
|-- v_7: double (nullable = true)
|-- v_8: double (nullable = true)
|-- v_9: double (nullable = true)
|-- v_10: double (nullable = true)
|-- v_11: double (nullable = true)
|-- v_12: double (nullable = true)
|-- v_13: double (nullable = true)
|-- v_14: double (nullable = true)

看下数据长什么样,类似pandas的head

 
data.take(10)

[Row(SaleID=0, name=736, regDate=20040402, model=30.0, brand=6, bodyType=1.0, fuelType=0.0, gearbox=0.0, power=60, kilometer=12.5, notRepairedDamage='0.0', regionCode=1046, seller=0, offerType=0, creatDate=20160404, price=1850, v_0=43.357796312505826, v_1=3.9663441656784544, v_2=0.050257094214176436, v_3=2.15974409399339, v_4=1.143786186793559, v_5=0.23567590669911015, v_6=0.10198824077953883, v_7=0.129548661418789, v_8=0.02281636740006269, v_9=0.09746182870576199, v_10=-2.8818032385553165, v_11=2.8040967707208506, v_12=-2.4208207926122784, v_13=0.7952919433118377, v_14=0.9147624995703408), Row(SaleID=1, name=2262, regDate=20030301, model=40.0, brand=1, bodyType=2.0, fuelType=0.0, gearbox=0.0, power=0, kilometer=15.0, notRepairedDamage='-', regionCode=4366, seller=0, offerType=0, creatDate=20160309, price=3600, v_0=45.30527301812686, v_1=5.236111897708937, v_2=0.13792532384004388, v_3=1.3806574602893849, v_4=-1.4221649206603162, v_5=0.2647772555037097, v_6=0.12100359404116512, v_7=0.1357307068829055, v_8=0.026597448118262774, v_9=0.020581662632484482, v_10=-4.9004818817666775, v_11=2.0963376444273414, v_12=-1.0304828371563102, v_13=-1.7226737753851349, v_14=0.2455224109670493), Row(SaleID=2, name=14874, regDate=20040403, model=115.0, brand=15, bodyType=1.0, fuelType=0.0, gearbox=0.0, power=163, kilometer=12.5, notRepairedDamage='0.0', regionCode=2806, seller=0, offerType=0, creatDate=20160402, price=6222, v_0=45.97835906231524, v_1=4.823792215285537, v_2=1.3195241517895064, v_3=-0.9984672739518792, v_4=-0.996911034763586, v_5=0.25141014780875875, v_6=0.11491227654046415, v_7=0.16514749334496415, v_8=0.06217283730726245, v_9=0.02707482416830506, v_10=-4.846749260269903, v_11=1.803558941229932, v_12=1.5653296250457633, v_13=-0.8326873267265079, v_14=-0.22996285613259074), Row(SaleID=3, name=71865, regDate=19960908, model=109.0, brand=10, bodyType=0.0, fuelType=0.0, gearbox=1.0, power=193, kilometer=15.0, notRepairedDamage='0.0', regionCode=434, seller=0, offerType=0, creatDate=20160312, price=2400, v_0=45.687478202385684, v_1=4.492574133926967, v_2=-0.05061584257537274, v_3=0.8835996711505136, v_4=-2.228078725239773, v_5=0.2742931709082824, v_6=0.11030008468643802, v_7=0.12196374573186793, v_8=0.033394547122199615, v_9=0.0, v_10=-4.5095988235247955, v_11=1.2859397444845837, v_12=-0.5018679084368517, v_13=-2.4383527366881763, v_14=-0.4786993792688288), Row(SaleID=4, name=111080, regDate=20120103, model=110.0, brand=5, bodyType=1.0, fuelType=0.0, gearbox=0.0, power=68, kilometer=5.0, notRepairedDamage='0.0', regionCode=6977, seller=0, offerType=0, creatDate=20160313, price=5200, v_0=44.38351084286087, v_1=2.031433258227642, v_2=0.5721689478637533, v_3=-1.5712390275218755, v_4=2.246088325318186, v_5=0.2280356217997828, v_6=0.0732050535564685, v_7=0.09188047928262777, v_8=0.07881938473498606, v_9=0.12153424142524565, v_10=-1.8962402786050725, v_11=0.9107831337379366, v_12=0.9311095588151709, v_13=2.8345178203938377, v_14=1.9234819632780635), Row(SaleID=5, name=137642, regDate=20090602, model=24.0, brand=10, bodyType=0.0, fuelType=1.0, gearbox=0.0, power=109, kilometer=10.0, notRepairedDamage='0.0', regionCode=3690, seller=0, offerType=0, creatDate=20160319, price=8000, v_0=46.323165381774395, v_1=-3.229285171266851, v_2=0.15661492833900464, v_3=-1.7272169244954632, v_4=-0.3456895594045602, v_5=0.2602464497107163, v_6=0.000518240777448753, v_7=0.11983762684215195, v_8=0.09092173819180396, v_9=0.04876931318389327, v_10=1.8855255403890592, v_11=-2.7219432986174272, v_12=2.457659673092216, v_13=-0.28697345216385745, v_14=0.20657258424627856), Row(SaleID=6, name=2402, regDate=19990411, model=13.0, brand=4, bodyType=0.0, fuelType=0.0, gearbox=1.0, power=150, kilometer=15.0, notRepairedDamage='0.0', regionCode=3073, seller=0, offerType=0, creatDate=20160317, price=3500, v_0=46.10433461703255, v_1=4.926219444368321, v_2=0.11331143265396665, v_3=1.644606453037599, v_4=-1.2703813885986035, v_5=0.26799814828129104, v_6=0.11767528049788555, v_7=0.14233417845627075, v_8=0.025445733090970857, v_9=0.02817408894848062, v_10=-4.902199843903487, v_11=1.6106163868619114, v_12=-0.8346050900939878, v_13=-1.9961172184852607, v_14=-0.10318036156903608), Row(SaleID=7, name=165346, regDate=19990706, model=26.0, brand=14, bodyType=1.0, fuelType=0.0, gearbox=0.0, power=101, kilometer=15.0, notRepairedDamage='0.0', regionCode=4000, seller=0, offerType=0, creatDate=20160326, price=1000, v_0=42.255585819004814, v_1=-3.167771424456629, v_2=-0.6766928879278368, v_3=1.9426729837728287, v_4=0.5242058881986318, v_5=0.2395059297716484, v_6=0.0, v_7=0.12294313007355424, v_8=0.03983936215453336, v_9=0.0824125340955883, v_10=3.6938287580246114, v_11=-0.2450140381538229, v_12=-2.1928097189004454, v_13=0.23672797186677025, v_14=0.1955674507297452), Row(SaleID=8, name=2974, regDate=20030205, model=19.0, brand=1, bodyType=2.0, fuelType=1.0, gearbox=1.0, power=179, kilometer=15.0, notRepairedDamage='0.0', regionCode=4679, seller=0, offerType=0, creatDate=20160326, price=2850, v_0=46.08488801384834, v_1=4.893716550309196, v_2=0.4753331302828047, v_3=0.5565750102457186, v_4=-1.2624898685134558, v_5=0.26383278576210195, v_6=0.1165826806382554, v_7=0.1442547659281785, v_8=0.03985120044003428, v_9=0.024387577181693702, v_10=-4.925234337008594, v_11=1.5877956620094642, v_12=0.07534761622084843, v_13=-1.5510978738920809, v_14=0.06943296402435542), Row(SaleID=9, name=82021, regDate=19980101, model=7.0, brand=7, bodyType=5.0, fuelType=0.0, gearbox=0.0, power=88, kilometer=15.0, notRepairedDamage='0.0', regionCode=302, seller=0, offerType=0, creatDate=20160402, price=650, v_0=43.07462647776722, v_1=1.6663861955536914, v_2=-2.2015449141144696, v_3=3.096860713179949, v_4=0.8438521497767868, v_5=0.2624727278632309, v_6=0.06826711631143091, v_7=0.012175606025187658, v_8=0.010291331810831215, v_9=0.09872749127736392, v_10=-1.089583621016556, v_11=0.600682839511134, v_12=-4.186209599379301, v_13=0.1982730611708035, v_14=-1.0258224809501677)]

先拍脑袋了解下业务问题,初步看数据字段的描述:

  • name - 汽车编码

  • regDate - 汽车注册时间

  • model - 车型编码

  • brand - 品牌

  • bodyType - 车身类型

  • fuelType - 燃油类型

  • gearbox - 变速箱

  • power - 汽车功率

  • kilometer - 汽车行驶公里

  • notRepairedDamage - 汽车有尚未修复的损坏

  • regionCode - 看车地区编码

  • seller - 销售方

  • offerType - 报价类型

  • creatDate - 广告发布时间

  • price - 汽车价格

  • v_0', 'v_1', 'v_2', 'v_3', 'v_4', 'v_5', 'v_6', 'v_7', 'v_8', 'v_9', 'v_10', 'v_11', 'v_12', 'v_13','v_14' 【匿名特征,包含v0-14在内15个匿名特征】

data.show(5)

+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+------------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+------------------+-------------------+-------------------+--------------------+
|SaleID| name| regDate|model|brand|bodyType|fuelType|gearbox|power|kilometer|notRepairedDamage|regionCode|seller|offerType|creatDate|price| v_0| v_1| v_2| v_3| v_4| v_5| v_6| v_7| v_8| v_9| v_10| v_11| v_12| v_13| v_14|
+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+------------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+------------------+-------------------+-------------------+--------------------+
| 0| 736|20040402| 30.0| 6| 1.0| 0.0| 0.0| 60| 12.5| 0.0| 1046| 0| 0| 20160404| 1850|43.357796312505826|3.9663441656784544|0.050257094214176436| 2.15974409399339| 1.143786186793559|0.23567590669911015|0.10198824077953883| 0.129548661418789| 0.02281636740006269| 0.09746182870576199|-2.8818032385553165|2.8040967707208506|-2.4208207926122784| 0.7952919433118377| 0.9147624995703408|
| 1| 2262|20030301| 40.0| 1| 2.0| 0.0| 0.0| 0| 15.0| -| 4366| 0| 0| 20160309| 3600| 45.30527301812686| 5.236111897708937| 0.13792532384004388| 1.3806574602893849|-1.4221649206603162| 0.2647772555037097|0.12100359404116512| 0.1357307068829055|0.026597448118262774|0.020581662632484482|-4.9004818817666775|2.0963376444273414|-1.0304828371563102|-1.7226737753851349| 0.2455224109670493|
| 2| 14874|20040403|115.0| 15| 1.0| 0.0| 0.0| 163| 12.5| 0.0| 2806| 0| 0| 20160402| 6222| 45.97835906231524| 4.823792215285537| 1.3195241517895064|-0.9984672739518792| -0.996911034763586|0.25141014780875875|0.11491227654046415|0.16514749334496415| 0.06217283730726245| 0.02707482416830506| -4.846749260269903| 1.803558941229932| 1.5653296250457633|-0.8326873267265079|-0.22996285613259074|
| 3| 71865|19960908|109.0| 10| 0.0| 0.0| 1.0| 193| 15.0| 0.0| 434| 0| 0| 20160312| 2400|45.687478202385684| 4.492574133926967|-0.05061584257537274| 0.8835996711505136| -2.228078725239773| 0.2742931709082824|0.11030008468643802|0.12196374573186793|0.033394547122199615| 0.0|-4.5095988235247955|1.2859397444845837|-0.5018679084368517|-2.4383527366881763| -0.4786993792688288|
| 4|111080|20120103|110.0| 5| 1.0| 0.0| 0.0| 68| 5.0| 0.0| 6977| 0| 0| 20160313| 5200| 44.38351084286087| 2.031433258227642| 0.5721689478637533|-1.5712390275218755| 2.246088325318186| 0.2280356217997828| 0.0732050535564685|0.09188047928262777| 0.07881938473498606| 0.12153424142524565|-1.8962402786050725|0.9107831337379366| 0.9311095588151709| 2.8345178203938377| 1.9234819632780635|
+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+------------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+------------------+-------------------+-------------------+--------------------+
only showing top 5 rows

先大致看下有多少比率空值,主要是有个底,但是也不是绝对,因为也有可能是一些异常数据比如填的null,nan,-1 ,-,999 ,9999 ,?,00,还有时间是大于正常当前的等。
都是异常的数据,本质上也是一种空值,比如这里的汽车有尚未修复的损坏,这里先看下空值列比较少,等后面在进一步处理。使用有的算法会自己处理空值,比如lightbgm

#
import pyspark.sql.functions as fn
data.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c+'_missing') for c in data.columns]).show()

+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+
|SaleID_missing|name_missing|regDate_missing| model_missing|brand_missing| bodyType_missing| fuelType_missing| gearbox_missing|power_missing|kilometer_missing|notRepairedDamage_missing|regionCode_missing|seller_missing|offerType_missing|creatDate_missing|price_missing|v_0_missing|v_1_missing|v_2_missing|v_3_missing|v_4_missing|v_5_missing|v_6_missing|v_7_missing|v_8_missing|v_9_missing|v_10_missing|v_11_missing|v_12_missing|v_13_missing|v_14_missing|
+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+
| 0.0| 0.0| 0.0|6.666666666710341E-6| 0.0|0.030039999999999956|0.05786666666666662|0.039873333333333316| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+

#name - 汽车编码
#这是个整数编码,看下有多少个
data.select('name').distinct().count()

99662

data.select('name').distinct().show()

+------+
| name|
+------+
| 2866|
| 148|
| 93486|
| 79220|
| 24663|
| 16386|
| 24347|
| 83693|
| 53565|
|167071|
|188614|
| 1342|
|141533|
| 22373|
| 17420|
|189903|
|137501|
|178199|
| 1238|
| 9376|
+------+
only showing top 20 rows

import pyspark.sql.functions as fn
data.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c+'_missing') for c in data.columns]).show()

+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+
|SaleID_missing|name_missing|regDate_missing| model_missing|brand_missing| bodyType_missing| fuelType_missing| gearbox_missing|power_missing|kilometer_missing|notRepairedDamage_missing|regionCode_missing|seller_missing|offerType_missing|creatDate_missing|price_missing|v_0_missing|v_1_missing|v_2_missing|v_3_missing|v_4_missing|v_5_missing|v_6_missing|v_7_missing|v_8_missing|v_9_missing|v_10_missing|v_11_missing|v_12_missing|v_13_missing|v_14_missing|
+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+
| 0.0| 0.0| 0.0|6.666666666710341E-6| 0.0|0.030039999999999956|0.05786666666666662|0.039873333333333316| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
+--------------+------------+---------------+--------------------+-------------+--------------------+-------------------+--------------------+-------------+-----------------+-------------------------+------------------+--------------+-----------------+-----------------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+

#看下空值的列,model比较少
from pyspark.sql.functions import isnull
df = data.filter(isnull("model"))
df.show(20)

+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+-----------------+-------------------+-------------------+------------------+-----------------+------------------+--------------------+---+-------------------+------------------+----------------+------------------+------------------+------------------+-----------------+
|SaleID| name| regDate|model|brand|bodyType|fuelType|gearbox|power|kilometer|notRepairedDamage|regionCode|seller|offerType|creatDate|price| v_0| v_1| v_2| v_3| v_4| v_5| v_6|v_7| v_8| v_9| v_10| v_11| v_12| v_13| v_14|
+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+-----------------+-------------------+-------------------+------------------+-----------------+------------------+--------------------+---+-------------------+------------------+----------------+------------------+------------------+------------------+-----------------+
| 38424|148730|20150809| null| 37| 6.0| 1.0| 1.0| 190| 2.0| 0.0| 1425| 0| 0| 20160320|47950|41.13936510747499|-2.1670885331942804|-3.4360082603978594|-7.275036706722688|6.829351640037848|0.1815618395494292|0.002091782809291...|0.0|0.14848682171008498|0.2227874876178012|1.67570041936766|-3.250560440486322|0.8760013150740789|11.147668612774803|8.658417876941384|
+------+------+--------+-----+-----+--------+--------+-------+-----+---------+-----------------+----------+------+---------+---------+-----+-----------------+-------------------+-------------------+------------------+-----------------+------------------+--------------------+---+-------------------+------------------+----------------+------------------+------------------+------------------+-----------------+

for col in data.columns:
print(col,data.filter(isnull(col)).count())

SaleID 0
name 0
regDate 0
model 1
brand 0
bodyType 4506
fuelType 8680
gearbox 5981
power 0
kilometer 0
notRepairedDamage 0
regionCode 0
seller 0
offerType 0
creatDate 0
price 0
v_0 0
v_1 0
v_2 0
v_3 0
v_4 0
v_5 0
v_6 0
v_7 0
v_8 0
v_9 0
v_10 0v_11 0
v_12 0
v_13 0
v_14 0

regDate表示的是注册时间,也就是真正可以上路时间,而createDate是广告发布时间,就是开始卖的时间,那么可以计算出使用了多久。
model为车型编码,初步看车型也会对价格有影响,看下有哪些:

data.select('model').distinct().show()

+-----+
|model|
+-----+
|170.0|
|147.0|
|184.0|
|160.0|
|169.0|
| 8.0|
| 70.0|
| 67.0|
|168.0|
| 0.0|
| 69.0|
|206.0|
| 7.0|
|142.0|
|191.0|
|112.0|
|154.0|
|232.0|
|124.0|
|201.0|
+-----+
only showing top 20 rows

data.groupBy('model').count().show()

+-----+-----+
|model|count|
+-----+-----+
|170.0| 111|
|147.0| 89|
|184.0| 164|
|160.0| 467|
|169.0| 161|
| 8.0| 4391|
| 70.0| 157|
| 67.0| 1084|
|168.0| 372|
| 0.0|11762|
| 69.0| 1522|
|206.0| 62|
| 7.0| 1460|
|142.0| 168|
|191.0| 87|
|112.0| 120|
|154.0| 122|
|232.0| 10|
|124.0| 55|
|201.0| 76|
+-----+-----+
only showing top 20 rows

可以看出notRepairedDamage为字符串,其他均为数字,看下具体数据

 
data.groupBy('notRepairedDamage').count().show()

+-----------------+------+
|notRepairedDamage| count|
+-----------------+------+
| 1.0| 14315|
| 0.0|111361|
| -| 24324|
+-----------------+------+

自定义函数

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def fc(a):
if a=='-':
return -1
else:
return a

fc = udf(fc, StringType())
data1 = data.withColumn('notRepairedDamage', fc('notRepairedDamage'))

data1.groupBy('notRepairedDamage').count().show()

+-----------------+------+
|notRepairedDamage| count|
+-----------------+------+
| 1.0| 14315|
| -1| 24324|
| 0.0|111361|
+-----------------+------+

这里1.0表示有没有修复的损坏,肯定相对价格低。-表示空值,这里用另一个指派比如-1比较好。

brand汽车品牌,对价格有影响 ,但是比较多类


data.groupBy('brand').count().show()

+-----+-----+
|brand|count|
+-----+-----+
| 31| 318|
| 34| 227|
| 28| 649|
| 27| 2053|
| 26| 966|
| 12| 1109|
| 22| 1085|
| 1|13794|
| 13| 3817|
| 6|10217|
| 16| 2223|
| 3| 2461|
| 20| 1236|
| 5| 4665|
| 19| 1388|
| 15| 1458|
| 37| 333|
| 9| 7306|
| 17| 913|
| 35| 180|
+-----+-----+
only showing top 20 rows

车身类型, 燃油类型, 变速箱,汽车功率, 汽车行驶公里等是价格主要影响因素,而最后的销售方和销售方式数据倾斜严重,没有信息量,我们认为对价格影响不大

data.groupBy('seller').count().show()

+------+------+
|seller| count|
+------+------+
| 1| 1|
| 0|149999|
+------+------+

data.groupBy('offerType').count().show()

+---------+------+
|offerType| count|
+---------+------+
| 0|150000|
+---------+------+

data1.describe().show()

+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+------------------+------------------+--------------------+------------------+--------------------+---------+------------------+-----------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary| SaleID| name| regDate| model| brand| bodyType| fuelType| gearbox| power| kilometer| notRepairedDamage| regionCode| seller|offerType| creatDate| price| v_0| v_1| v_2| v_3| v_4| v_5| v_6| v_7| v_8| v_9| v_10| v_11| v_12| v_13| v_14|
+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+------------------+------------------+--------------------+------------------+--------------------+---------+------------------+-----------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| count| 150000| 150000| 150000| 149999| 150000| 145494| 141320| 144019| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000| 150000|
| mean| 74999.5|68349.17287333333| 2.003417051218E7|47.129020860139065|8.052733333333334|1.7923694447881011|0.37584206057175207|0.22494254230344607|119.31654666666667| 12.59716|-0.06672666666666667|2583.0772666666667|6.666666666666667E-6| 0.0| 2.016033079182E7|5923.327333333334| 44.40626753145158|-0.04480912301386435|0.08076505844810129|0.07883342346011384|0.01787461474853547| 0.2482035284008884|0.04492300430596517|0.12469246111865868|0.058143854762190444|0.061995894701729046|-0.00100023880864...|0.009034543472760136|0.004812595252074109|3.126119439056227...|-6.88231437719025...|
| stddev|43301.414526548666|61103.87509485772|53649.879255418615| 49.53603965449437|7.864956340998353|1.7606395033065771| 0.5486766226436979|0.41754593224243897| 177.1684191643629|3.9195755324885053| 0.5031327587287863|1885.3632181287917|0.002581988897471611| 0.0|106.73280882848444|7501.998476579888|2.4575479062272008| 3.641893017577527| 2.929617945039715| 2.026514035918904| 1.1936613868541974|0.04580397101668277|0.05174278749055652| 0.2014095303447802|0.029185755680003405| 0.03569197873442054| 3.7723863943161264| 3.286071221267933| 2.517477676187196| 1.288987639416899| 1.0386851513009645|
| min| 0| 0| 19910001| 0.0| 0| 0.0| 0.0| 0.0| 0| 0.5| -1| 0| 0| 0| 20150618| 11|30.451976492574097| -4.295588902998176| -4.470671429633041| -7.275036706722688| -4.364565242136745| 0.0| 0.0| 0.0| 0.0| 0.0| -9.16819241041124| -5.558206704301587| -9.639552113720727| -4.153898795903344| -6.5465559654597785|
| max| 149999| 196812| 20151212| 247.0| 39| 7.0| 6.0| 1.0| 19312| 15.0| 1.0| 8120| 1| 0| 20160407| 99999|52.304178264130876| 7.320308374832438| 19.035496499120867| 9.854701534498924| 6.829351640037848| 0.2918381130785164|0.15141959589716678| 1.4049363754803494| 0.16079098534254374| 0.2227874876178012| 12.357010623760555| 18.81904246676505| 13.847791524358339| 11.147668612774803| 8.658417876941384|
+-------+------------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+------------------+------------------+--------------------+------------------+--------------------+---------+------------------+-----------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

import pyspark.sql.functions as fn
data1.select(fn.mean('price').alias('mean'),fn.stddev('price').alias('stddev'), fn.skewness('price').alias('skewness'), fn.kurtosis('price').alias('kurtosis')).show()

+-----------------+-----------------+-----------------+----------------+
| mean| stddev| skewness| kurtosis|
+-----------------+-----------------+-----------------+----------------+
|5923.327333333334|7501.998476579888|3.346453297676422|18.9945101890304|
+-----------------+-----------------+-----------------+----------------+

#用log(x+1)做平滑
data2 = data1.withColumn('price', fn.log1p('price'))

data2.select(fn.mean('price').alias('mean'),fn.stddev('price').alias('stddev'), fn.skewness('price').alias('skewness'), fn.kurtosis('price').alias('kurtosis')).show()

+-----------------+----------------+--------------------+--------------------+
| mean| stddev| skewness| kurtosis|
+-----------------+----------------+--------------------+--------------------+
|8.035270518212545|1.21822244267324|-0.26172474171752863|-0.18216114634776304|
+-----------------+----------------+--------------------+--------------------+

# -*- coding: utf-8 -*-
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext

#初始化数据

#初始化pandas DataFrame
df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1', 'row2'], columns=['c1', 'c2', 'c3'])

#打印数据
print df

#初始化spark DataFrame
sc = SparkContext()
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("testDataFrame")\
.getOrCreate()

sentenceData = spark.createDataFrame([
(0.0, "I like Spark"),
(1.0, "Pandas is useful"),
(2.0, "They are coded by Python ")
], ["label", "sentence"])

#显示数据
sentenceData.select("label").show()

#spark.DataFrame 转换成 pandas.DataFrame
sqlContest = SQLContext(sc)
spark_df = sqlContest.createDataFrame(df)

#显示数据
spark_df.select("c1").show()


# pandas.DataFrame 转换成 spark.DataFrame
pandas_df = sentenceData.toPandas()

#打印数据
print pandas_df

相关性分析

data2.agg(fn.corr("model", "price").alias('a')).collect()

data2.agg(fn.corr("creatDate", "price").alias('a')).collect()

https://tianchi.aliyun.com/competition/entrance/231784/introduction?spm=5176.12281973.1005.7.30981f54IlDO6x


https://www.sohu.com/a/429786793_315839


转眼就开工了,7 天的假期,刷刷抖音,说走就走了。


说到抖音,就不得不提它的推荐系统,太 NB 了。刷了啥,立刻记住你的偏好,推荐相似内容,一不小心 2 小时就过去了,让人欲罢不能,要么日活 6 亿呢。

 

其实“推荐系统”从没像现在这样,影响着我们的生活。除了抖音、快手这类短视频,还有网购时,天猫、京东会为你推荐商品;想看看资讯,头条、知乎会为你准备感兴趣的新闻等等。

 

而驱动这些巨头进行推荐服务的,都是基于深度学习的推荐模型。

 

想起 2019 年阿里的千人千面系统,促成了天猫“双 11” 2684 亿成交额。假设通过改进商品推荐功能,使平台整体的转化率提升 1%,就能在 2684 亿成交额的基础上,再增加 26.84 亿。这就是推荐工程师的最牛的地方,也是为啥人能拿百万年薪的原因。

 

但在一个成熟的推荐系统上,找到提升的突破点并不容易——不能满足于协同过滤、矩阵分解这类传统方法,而要建立起完整的“深度学习推荐系统”知识体系,加深对深度学习模型的理解,以及大数据平台的熟悉程度,才能实现整体效果上的优化。

 

所以春节假期除了刷抖音,我又重新看了看《深度学习推荐系统》这个专栏,2 刷有不少新的启发。作者王喆,Roku 推荐系统架构负责人,也是咱圈里的大佬,一直深耕在推荐系统、计算广告领域,经验非常丰富。他之前出过同名的书,豆瓣评分 9.3,相当高。

 

当年我看书的时候,就感觉实践太少,偏模型原理。所以在听说王喆开了个实践专栏时,第一时间就订阅了,跟着学下来,受益匪浅。让我完整地把推荐系统的原理捋了一遍。下面王喆总结的「核心知识图谱」,建议收藏。


最重要的是,实操性特别强。王喆特地为了这个专栏,开发了一个开源项目「 SparrowRecsys」,能让你亲手尝试,搭建一套完整的深度学习推荐系统(下面有详细介绍,贼有意思)。可以说是书的实践版本,里面加入了更多技术细节的实现和讨论。

 

毫不夸张的说,这个专栏让我对深度学习推荐系统的认知,提升到了一个新高度,所以很想把它推荐给你。最近这个专栏要 涨价到 ¥129 了,现在到手仅  ¥69 ,一定别错过了,扫码免费试读

以上是关于pyspark 结构数据处理的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中定义 JSON 模式结构的配置文件

在 Pyspark 中将结构转换为数组

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

使用 pyspark 处理结构数据类型

Pyspark_结构化流2