在 Spark 中使用 Dataframe 获取平均值

Posted

技术标签:

【中文标题】在 Spark 中使用 Dataframe 获取平均值【英文标题】:Fetch Avg using Dataframe in Spark 【发布时间】:2019-11-25 14:30:49 【问题描述】:

我想在 Spark 中使用 scala 代码查找“rate”列的平均值。为此,我创建了 Dataframe 并查看,然后使用 Spark SQL 进行查询。当我使用视图运行选择查询时,它会给出正确的输出但是当我使用视图执行平均和分组时,它不会给出任何记录。

data.txt 是一个制表符分隔文件。

data.txt :
abandon     -2
abandoned   -2
abandons    -2

我想执行推特情绪分析,所以我将推文存储在 dstream 中,并从 dstream 生成数据帧,然后将数据帧与 AFINN.txt 文件数据帧连接但是当我执行下面的代码时,它会在 AFINN 的 DF 上执行分组和平均时获取空记录。

val consumerKey="xxxxxxxxx"
val consumerSecret="xxxxxxxxxx"
val accessToken="xxxxx-xxxxxxx"
val accessTokenSecret="xxxxxxxx"
val args = Array(consumerKey, consumerSecret, accessToken, accessTokenSecret)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("twitterSentiment").setMaster("local[4]").set("spark.driver.allowMultipleContexts","true")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val tweets = TwitterUtils.createStream(ssc, None, Array("#India","#Sports"),StorageLevel.MEMORY_AND_DISK)
val englishTweets = tweets.filter(_.getLang() == "en") 
val textMsg = englishTweets.map( status => (status.getId(), status.getText(),status.getText().split(" ")))
val AFINN = sc.textFile("hdfs://sandbox-hdp.hortonworks.com:8020/Input/AFINN1.txt").map(x=> x.split("\t")).map(x=>(x(0).toString,x(1).toInt))
val AFINNDF = AFINN.toDF("word","rate") 
AFINNDF.createOrReplaceTempView("temp")
val DF = spark.sql("select * from temp")
DF.show()

输出:

+----------+----+
|      word|rate|
+----------+----+
|   abandon|  -2|
| abandoned|  -2|
|  abandons|  -2|
+----------+----+
val DF = spark.sql("select word,avg(rate) as rating from temp group by word")
//DF: org.apache.spark.sql.DataFrame = [word: string, rating: double]

输出:

+----+------+
|word|rating|
+----+------+
+----+------+

如何在 scala 中使用 Spark SQL 查询求平均值?

谢谢,

【问题讨论】:

【参考方案1】:

我已经执行了相同的查询,并且我得到了 avg 作为输出。 enter image description here

【讨论】:

或者,您可以跳过创建临时表并执行此操作。 val df= AFINNDF.groupBy("word").agg(avg("rate")) 我尝试使用 val df= DF1.groupBy("word").agg(avg("rate")) 但未获取任何记录。请给出您为 groupby 和平均。 能否请您展示您的逻辑您一直在做什么。尝试在使用 groupby 时分配不同的 DF。 我想在我的问题中提到的数据帧上执行 groupby 和 avg。我按照我在我的问题中写的步骤进行操作,并尝试使用 val AFINNDF = AFINN.toDF("word", "rate") val df= AFINNDF.groupBy("word").agg(avg("rate")) 但它对我不起作用,我仍然得到空记录。 是的,你是对的。当我运行我之前在帖子中提到的简单流程时,我能够在 Dataframe 上执行分组和平均。我编辑了我正在做的帖子。

以上是关于在 Spark 中使用 Dataframe 获取平均值的主要内容,如果未能解决你的问题,请参考以下文章

如何在 apache spark 中读取最新的分区

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

使用均值合并 Pandas 中的 DataFrame

在 Spark 中使用 Dataframe 获取平均值

Spark 1.6:在 DataFrame 中使用转义的列名删除列

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器