Spark中创建DataFrame方法总结
Posted 超爱思考
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中创建DataFrame方法总结相关的知识,希望对你有一定的参考价值。
春节假期,难得有时间,对Spark进行了复习,捡重点总结下:
建立DF方式一:导入外部文件
对应开发场景:将提前在离线环境计算好的内容相似表 item_sim_table 导入到sparkstreaming中进行实时召回(Real-Match)计算
item_sim_table_sim_DF= spark.sparkContext.textFile("filepath",partitions).map(itr->{
val arr = itr.split("\\|")
var action_itemid = "" //用户行为itemid
var sim_itemid = "" //与行为item相似的itemid
var sim_item_authorid = "" //相似item的作者id
var sim_score = "" //相似item的相似得分
if(arr.length==4){
var action_itemid = arr(0)
var sim_itemid = arr(1)
var sim_item_authorid = arr(2)
var_sim_score = arr(3).toLong}}
(action_itemid, sim_itemid, sim_item_authorid, sim_score)
).toDF("action_itemid", "sim_itemid", "sim_item_authorid", "sim_score")
建立DF方式二:接入客户端用户实时行为
对应开发场景:用户在客户端上实时收藏(点红心)的内容,通过kafka接入到sparkstreaming中,进行流式处理
var stream:InputDstream[ConsumerRecord
[String,String]] = KafkaUtils.createDirectStream
[String, String](ssc,
LocationStrageties.PreferConsistant,
ConsumerStrategies.Subscribe[String,String](topics, kafkaconf, offsets))
stream.foreachRDD{rdd=>{
val inputData = rdd.map(r={utils.getjson(r.value())}).filter(_!="")
val rowData = InputData.map(p=>Row(p.userid,p.collect_itemid))
val shema = StuctType(Array(
StructField("userid", StringType, true)
StructField("collect_itemid", StringType, true)))
val user_real_collect_DF=spark.creatDataFrame(rowData,shema)
.where($"user_id"=!="")
.where($"collect_itemid"=!="")}}
以上是关于Spark中创建DataFrame方法总结的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区
使用具有常量值的 var 在 Spark DataFrame 中创建一个新列
使用带有 Spark 版本 2.2 的 row_number() 函数在 PySpark DataFrame 中创建每一行的行号