07 从RDD创建DataFrame
Posted 隔壁老尤
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了07 从RDD创建DataFrame相关的知识,希望对你有一定的参考价值。
0.前次作业:从文件创建DataFrame
1.pandas df 与 spark df的相互转换
df_s=spark.createDataFrame(df_p)
df_p=df_s.toPandas()
>>> import pandas as pd >>> import numpy as np >>> arr = np.arange(6).reshape(-1,3) >>> df_p = pd.DataFrame(arr) >>> df_p >>> arr >>> df_p.columns=[\'a\',\'b\',\'c\'] >>> df_p
2. Spark与Pandas中DataFrame对比
http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/
3.1 利用反射机制推断RDD模式
- sc创建RDD
- 转换成Row元素,列名=值
- spark.createDataFrame生成df
- df.show(), df.printSchema()
-
>>> from pyspark.sql import Row >>>people=spark.sparkContext.textFile(\'file:///home/hadoop/chapter4-data01.txt\') >>>people=spark.sparkContext.textFile(\'file:///home/hadoop/chapter4-data01.txt\').map(lambda line:line.split(\',\')).map(lambda p:Row(name=p[0],course=p[1],score=int(p[2]))) >>> df=spark.createDataFrame(people) >>> df.first() Row(course=\'OperatingSystem\', name=\'Aaron\', score=100) >>> df.printSchema()
3.2 使用编程方式定义RDD模式
- 生成“表头”
- fields = [StructField(field_name, StringType(), True) ,...]
- schema = StructType(fields)
-
>>> from pyspark.sql.types import * >>> from pyspark.sql import Row >>> schemaString=\'name course score\' >>> fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split(\' \')] >>> schema=StructType(fields)
- 生成“表中的记录”
- 创建RDD
- 转换成Row元素,列名=值
-
>>>lines=spark.sparkContext.textFile(\'file:///home/hadoop/chapter4-data01.txt\') >>> parts=lines.map(lambda x:x.split(\',\')) >>> people=parts.map(lambda p:Row(p[0],p[1],p[2].strip()))
- 把“表头”和“表中的记录”拼装在一起
- = spark.createDataFrame(RDD, schema)
-
>>> schemaPeople=spark.createDataFrame(people,schema) >>> schemaPeople.show()
4. DataFrame保存为文件
df.write.json(dir)
>>> dir=\'file:///home/hadoop/sqlrdd\' >>> df.write.json(dir)
以上是关于07 从RDD创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章