SparkContext.textFile 可以与自定义接收器一起使用吗?

Posted

技术标签:

【中文标题】SparkContext.textFile 可以与自定义接收器一起使用吗?【英文标题】:Can SparkContext.textFile be used with a custom receiver? 【发布时间】:2017-06-26 22:51:13 【问题描述】:

我正在尝试实现使用自定义接收器从 SQS 读取消息的流式传输作业。每条消息都包含一个对 S3 文件的引用,然后我想读取、解析并存储为 ORC。

这是我目前的代码:

val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(5))

val sqs = streamContext.receiverStream(new SQSReceiver("events-elb")
  .credentials("accessKey", "secretKey")
  .at(Regions.US_EAST_1)
  .withTimeout(5))

val s3File = sqs.map(messages => 
  val sqsMsg: JsValue = Json.parse(messages)
  val s3Key = "s3://" +
    Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
    Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
  val rawLogs = sc.textFile(s3Key)

  rawLogs
).saveAsTextFiles("/tmp/output")

很遗憾,此操作失败并出现以下错误:

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@52fc5eb1)
    - field (class: SparrowOrc$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class SparrowOrc$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这是使用sc.textFile不正确方式吗?如果是这样,我可以使用什么方法将从 SQS 收到的每个文件路径转发到文件阅读器进行处理?

FWIW,val s3File 最终成为 mappedDStream 类型。

为了进一步了解,我使用它作为我的接收器:https://github.com/imapi/spark-sqs-receiver。

【问题讨论】:

【参考方案1】:

确实,我们不能在map 操作中使用sparkContext,因为在一个阶段中转换的闭包是在没有定义SparkContext 的执行器中运行的。

解决这个问题的方法是将过程分成两部分:首先,我们使用现有的map 计算文件,但在transform 操作中使用textFile

val s3Keys = sqs.map(messages => 
  val sqsMsg: JsValue = Json.parse(messages)
  val s3Key = "s3://" +
  Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
  Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")

val files DStream = s3Keys.transformkeys => 
    val fileKeys= keys.collect()
    Val files = fileKeys.map(f=>
      sparkContext.textFile(f))
    sparkContext.union(files)

filesDStream.saveAsTextFiles(..)

【讨论】:

【参考方案2】:

没有。这是不正确的,因为 SparkContext 是:

    不可序列化(如您在日志中所见) 没有意义

我非常感谢 Spark 开发人员,他们照顾它,所以我们不会忘记它。

不允许这样使用的原因是SparkContext 存在于驱动程序上(或者可以说构成驱动程序)并负责编排任务(用于 Spark 作业)。

执行者很笨,因此只知道如何运行任务。

Spark 不是这样工作的,您越早接受该设计决策,您就越能熟练地正确开发 Spark 应用程序。

如果是这样,我可以使用什么方法将我从 SQS 收到的每个文件路径转发到文件阅读器进行处理?

这是我无法回答的,因为我从未开发过自定义接收器。

【讨论】:

以上是关于SparkContext.textFile 可以与自定义接收器一起使用吗?的主要内容,如果未能解决你的问题,请参考以下文章

spark textFile读取多个文件

Spark学习:RDD编程

Spark - 如何将 Bz2 文件解压缩为 parquet 文件

spark学习---打印RDD内容

RDD 分区问题

Spark 基础 —— 创建 DataFrame 的三种方式