Twitter使用Spark流式传输

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twitter使用Spark流式传输相关的知识,希望对你有一定的参考价值。

我正在尝试使用spark scala代码来传输Twitter数据。我能够获取数据并创建数据帧并查看它。但是当尝试提取status.getPlace.getCountry()时,我得到了一个java.lang.NullPointerException。

Spark版本:2.0.0,Scala版本:2.11.8

尝试条件,检查价值等,但徒劳无功。

码:

val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))

val filters:Seq[String] =  Seq("hadoop")
val cb = new ConfigurationBuilder()
      .setOAuthConsumerKey("******")
      .setOAuthConsumerSecret("******")
      .setOAuthAccessToken("********")
      .setOAuthAccessTokenSecret("******").build()

val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())

val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status => 
      val places = status.getPlace
      val id = status.getUser.getId
      val date = status.getUser.getCreatedAt.toString()
      val user = status.getUser.getName()
      val place = places.getCountry()

      (id,date,user,place)
      }
data.foreachRDD{rdd =>
      import spark.implicits._
      rdd.toDF("id","date","user","place").show()
    }

ssc.start()
ssc.awaitTermination()

从twitter访问位置信息有什么限制吗?任何的意见都将会有帮助。

谢谢

答案

你可以使用Option来处理nulls:

val data = tweetsdstream.map {
  status =>
    val place = Option(status.getPlace).map(_.getCountry).orNull
    val id = status.getUser.getId
    val user = status.getUser.getName
    val date = status.getUser.getCreatedAt.toString
    (id, date, user, place)
}

通过这种方式,您将能够可视化所有推文,无论它们是否具有某个国家/地区(并且在未定义国家/地区时它将为空)。

Option对于处理可能缺少的数据非常有用,可以随意将其用于其他可能空的字段。

另一答案

我想请换行*val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)*

改变成这样,然后简单地锻炼它

val stream = TwitterUtils.createStream(scc, None, filters)

val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

如果您想了解更多信息,请访问:http://commandstech.com/spark-streaming-twitter-example/

以上是关于Twitter使用Spark流式传输的主要内容,如果未能解决你的问题,请参考以下文章

流式传输 Twitter 直接消息

如何使用 tweepy 流式传输 Twitter 提及?

Spark-Scala:另存为 csv 文件(RDD)[重复]

Spark 流式传输 Kafka 消息未使用

Twitter下一代流式计算框架Heron开源了

Apache Flume twitter 代理没有流式传输数据