有没有办法修改此代码以让火花流从 json 中读取?

Posted

技术标签:

【中文标题】有没有办法修改此代码以让火花流从 json 中读取?【英文标题】:Is there a way to modify this code to let spark streaming read from json? 【发布时间】:2021-09-11 02:39:00 【问题描述】:

我正在开发一个 spark 流应用程序/代码,它不断地从 localhost 9098 读取数据。有没有办法将 localhost 修改为 以便自动从文件夹路径或 json 读取数据?

import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level

object StreamingApplication extends App 

  Logger.getLogger("Org").setLevel(Level.ERROR)

  //creating spark streaming context
  val sc = new SparkContext("local[*]", "wordCount")
  val ssc = new StreamingContext(sc, Seconds(5))

  // lines is a Dstream
  val lines = ssc.socketTextStream("localhost", 9098)

  // words is a transformed Dstream
  val words = lines.flatMap(x => x.split(" "))

  // bunch of transformations
  val pairs = words.map(x=> (x,1))
  val wordsCount = pairs.reduceByKey((x,y) => x+y)

  // print is an action
  wordsCount.print()

  // start the streaming context
  ssc.start()

ssc.awaitTermination()



基本上,我需要帮助来修改以下代码:

val lines = ssc.socketTextStream("localhost", 9098)

到这里:

val lines = ssc.socketTextStream("<folder path>")

仅供参考,我正在使用 IntelliJ Idea 来构建它。

【问题讨论】:

搜索“spark流文件”的第一个结果是:sparkbyexamples.com/spark/… 【参考方案1】:

我建议阅读 Spark 文档,尤其是 scaladoc。

似乎存在一个方法fileStream

https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/streaming/StreamingContext.html

【讨论】:

也试过 textFileStream 但不知何故它不起作用。我错过了什么吗? val lines = ssc.textFileStream("Users/Desktop/raw.json") 它侦听目录中的新文件,它不会读取 1 个现有文件。 好的,但是有没有办法读取文件内的数据?

以上是关于有没有办法修改此代码以让火花流从 json 中读取?的主要内容,如果未能解决你的问题,请参考以下文章

net.jpounz.lz4 使用火花流从 kafka 读取时出现异常

有没有办法在火花流中展平嵌套的 JSON?

使用 json 文件触发流式传输

在火花中读取 json [重复]

当 AMQ 主题中没有数据可读取时如何停止流式传输

使用 spark 结构化流从 s3 读取 avro 文件