07 从RDD创建DataFrame
Posted 只吃外卖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了07 从RDD创建DataFrame相关的知识,希望对你有一定的参考价值。
1.pandas df 与 spark df的相互转换
df_s=spark.createDataFrame(df_p)
df_p=df_s.toPandas()
1
2
3
|
import pandas as pd import numpy as np arr = np.arange( 6 ).reshape( - 1 , 3 ) |
1
2
|
df_p = pd.DataFrame(arr) df_p |
1
2
|
df_p.columns = [ \'a\' , \'b\' , \'c\' ] df_p |
1
2
|
df_s = spark.createDataFrame(df_p) df_s.show() |
1
|
df_s.collect() |
1
|
df_s.toPandas() |
2. Spark与Pandas中DataFrame对比
http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/
3.1 利用反射机制推断RDD模式
- sc创建RDD
- 转换成Row元素,列名=值
- spark.createDataFrame生成df
- df.show(), df.printSchema()
1
2
3
4
|
from pyspark.sql import Row people = spark.sparkContext.textFile( \'file:///usr/local/spark/examples/src/main/resources/people.txt\' ). map ( lambda line:line.split( \',\' )). map ( lambda w:Row(name = w[ 0 ],age = int (w[ 1 ]))) sPeople = spark.createDataFrame(people) sPeople.createOrReplaceTempView( \'people\' ) |
1
2
3
|
personDF = spark.sql( \'select name,age from people where age>20\' ) personRDD = personDF.rdd. map ( lambda p: "Name:" + p.name + "," + "Age:" + str (p.age)) personRDD.foreach( print ) |
1
|
sPeople.show() |
1
|
sPeople.printSchema() |
3.2 使用编程方式定义RDD模式
- 生成“表头”
- fields = [StructField(field_name, StringType(), True) ,...]
- schema = StructType(fields)
1
2
3
4
|
from pyspark.sql.types import * from pyspark.sql import Row schemaString = \'name age\' fields = [StructField(field_name,StringType(), True ) for field_name in schemaString.split( \' \' )]<br>schema = StructType(fields) |
- 生成“表中的记录”
- 创建RDD
- 转换成Row元素,列名=值
1
2
3
4
|
lines = spark.sparkContext.textFile( \'file:///usr/local/spark/examples/src/main/resources/people.txt\' ) part = lines. map ( lambda w:w.split( "," )) peoples = part. map ( lambda p:Row(p[ 0 ],p[ 1 ].strip())) peoples.collect() |
- 把“表头”和“表中的记录”拼装在一起
- = spark.createDataFrame(RDD, schema)
1
2
3
|
schemaPeople = spark.createDataFrame(people,schema) schemaPeople.show() schemaPeople.printSchema() |
4. DataFrame保存为文件
df.write.json(dir)
1
|
schemaPeople.write.json( \'file:///home/hadoop/schema_out\' ) |
以上是关于07 从RDD创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章