Spark中创建DataFrame方法总结

Posted 超爱思考

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中创建DataFrame方法总结相关的知识,希望对你有一定的参考价值。

春节假期,难得有时间,对Spark进行了复习,捡重点总结下:


建立DF方式一:导入外部文件

对应开发场景:将提前在离线环境计算好的内容相似表 item_sim_table 导入到sparkstreaming中进行实时召回(Real-Match)计算

item_sim_table_sim_DF= spark.sparkContext.textFile("filepath",partitions).map(itr->{

val arr = itr.split("\\|")

var action_itemid = "" //用户行为itemid

var sim_itemid = "" //与行为item相似的itemid

var sim_item_authorid = "" //相似item的作者id

var sim_score = ""  //相似item的相似得分 

if(arr.length==4){

var action_itemid = arr(0)

var sim_itemid = arr(1)

var sim_item_authorid = arr(2)

var_sim_score = arr(3).toLong}}

(action_itemid, sim_itemid, sim_item_authorid, sim_score)

).toDF("action_itemid", "sim_itemid", "sim_item_authorid", "sim_score")


建立DF方式二:接入客户端用户实时行为

对应开发场景:用户在客户端上实时收藏(点红心)的内容,通过kafka接入到sparkstreaming中,进行流式处理

var stream:InputDstream[ConsumerRecord

[String,String]] = KafkaUtils.createDirectStream

[String, String](ssc,

LocationStrageties.PreferConsistant,

ConsumerStrategies.Subscribe[String,String](topics, kafkaconf, offsets))

stream.foreachRDD{rdd=>{

val inputData = rdd.map(r={utils.getjson(r.value())}).filter(_!="")

val rowData = InputData.map(p=>Row(p.userid,p.collect_itemid))

val shema = StuctType(Array(

StructField("userid", StringType, true)

StructField("collect_itemid", StringType, true)))

val user_real_collect_DF=spark.creatDataFrame(rowData,shema)

.where($"user_id"=!="")

.where($"collect_itemid"=!="")}}


以上是关于Spark中创建DataFrame方法总结的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

使用Scala在Spark中创建DataFrame时出错

使用具有常量值的 var 在 Spark DataFrame 中创建一个新列

Spark中将RDD转换成DataFrame的两种方法

使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号

python 在Pandas中创建DataFrame的不同方法