SparkContext.textFile 可以与自定义接收器一起使用吗?
Posted
技术标签:
【中文标题】SparkContext.textFile 可以与自定义接收器一起使用吗?【英文标题】:Can SparkContext.textFile be used with a custom receiver? 【发布时间】:2017-06-26 22:51:13 【问题描述】:我正在尝试实现使用自定义接收器从 SQS 读取消息的流式传输作业。每条消息都包含一个对 S3 文件的引用,然后我想读取、解析并存储为 ORC。
这是我目前的代码:
val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(5))
val sqs = streamContext.receiverStream(new SQSReceiver("events-elb")
.credentials("accessKey", "secretKey")
.at(Regions.US_EAST_1)
.withTimeout(5))
val s3File = sqs.map(messages =>
val sqsMsg: JsValue = Json.parse(messages)
val s3Key = "s3://" +
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
val rawLogs = sc.textFile(s3Key)
rawLogs
).saveAsTextFiles("/tmp/output")
很遗憾,此操作失败并出现以下错误:
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@52fc5eb1)
- field (class: SparrowOrc$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class SparrowOrc$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
这是使用sc.textFile
的不正确方式吗?如果是这样,我可以使用什么方法将从 SQS 收到的每个文件路径转发到文件阅读器进行处理?
FWIW,val s3File
最终成为 mappedDStream
类型。
为了进一步了解,我使用它作为我的接收器:https://github.com/imapi/spark-sqs-receiver。
【问题讨论】:
【参考方案1】:确实,我们不能在map
操作中使用sparkContext
,因为在一个阶段中转换的闭包是在没有定义SparkContext
的执行器中运行的。
解决这个问题的方法是将过程分成两部分:首先,我们使用现有的map
计算文件,但在transform
操作中使用textFile
:
val s3Keys = sqs.map(messages =>
val sqsMsg: JsValue = Json.parse(messages)
val s3Key = "s3://" +
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
val files DStream = s3Keys.transformkeys =>
val fileKeys= keys.collect()
Val files = fileKeys.map(f=>
sparkContext.textFile(f))
sparkContext.union(files)
filesDStream.saveAsTextFiles(..)
【讨论】:
【参考方案2】:没有。这是不正确的,因为 SparkContext 是:
-
不可序列化(如您在日志中所见)
没有意义
我非常感谢 Spark 开发人员,他们照顾它,所以我们不会忘记它。
不允许这样使用的原因是SparkContext
存在于驱动程序上(或者可以说构成驱动程序)并负责编排任务(用于 Spark 作业)。
执行者很笨,因此只知道如何运行任务。
Spark 不是这样工作的,您越早接受该设计决策,您就越能熟练地正确开发 Spark 应用程序。
如果是这样,我可以使用什么方法将我从 SQS 收到的每个文件路径转发到文件阅读器进行处理?
这是我无法回答的,因为我从未开发过自定义接收器。
【讨论】:
以上是关于SparkContext.textFile 可以与自定义接收器一起使用吗?的主要内容,如果未能解决你的问题,请参考以下文章