火花模式rdd到RDD
Posted
技术标签:
【中文标题】火花模式rdd到RDD【英文标题】:spark schema rdd to RDD 【发布时间】:2015-05-23 12:10:29 【问题描述】:我想在 spark 中进行字数统计,我使用 spark sql 创建了一个 rdd,以从数据集中提取不同的推文。 我想在 RDD 之上使用 split 功能,但它不允许我这样做。
错误:- 值拆分不是 org.apache.spark.sql.SchemaRdd 的成员
无法计算字数的 Spark 代码:-
val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
但是当我将数据从 sparksql 输出到文件并再次加载并运行拆分时,它可以工作。
有效的示例代码:-
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")
我需要一个答案,而不是写入文件并再次加载来执行我的拆分功能是否有解决方法来使拆分功能工作
谢谢
【问题讨论】:
【参考方案1】:首先,我们不应该做 collect() 然后并行化创建 RDD;这将使驱动程序忙碌/停机。
相反,
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)
[考虑到这一点,您在查询中仅选择单个列 - distinct(text)
]
现在 distinct_tweets_op 只是一个 RDD。
所以,循环这个 RDD;并且您最好在该 RDD 中的每个字符串上应用 split("") 函数。
【讨论】:
【参考方案2】:找到答案,将数据框或 spark.sql.row.RDD 转换为普通 RDD 的三个步骤。
sc.parallelize(List()) 映射到字符串
val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)
val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy case (key,value) => -value.map case (key,value) => Array(key,value).mkString(",")
test_kali.collect().foreach(println)
case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)
这样我不需要加载文件,我可以在飞行中进行操作。
谢谢 斯里
【讨论】:
虽然这可能适用于小块数据,但这不是一个真正的答案。hiveCtx.sql
操作检索到的 RDD 对驱动程序是 collect
,失去了它的分布式和并行性。【参考方案3】:
您的第一个失败的主要原因是这一行:
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
这在 Spark 中完全没用,而且比无用更糟糕——正如您所看到的那样,它会破坏您的系统。
您希望避免执行collect()
,它会创建一个Array
并将其返回给驱动程序应用程序。相反,您希望尽可能长时间地将对象保留为 RDD,并将尽可能少的数据返回给 Driver(如减少后的键和计数)。
但要回答您的基本问题,以下将采用由单个 StringType 列组成的 DataFrame 并将其转换为 RDD[String]:
val myRdd = myDf.rdd.map(_.getString(0))
虽然 SchemaRDD 已不复存在,但我相信以下内容会将带有单个 String 列的 SchemaRDD 转换为普通的 RDD[String]:
val myRdd = mySchemaRdd.map(_.getString(0))
【讨论】:
以上是关于火花模式rdd到RDD的主要内容,如果未能解决你的问题,请参考以下文章