Spark 将数据流式传输到 S3

Posted

技术标签:

【中文标题】Spark 将数据流式传输到 S3【英文标题】:Spark Streaming Data to S3 【发布时间】:2017-10-08 07:51:12 【问题描述】:

我正在 S3 中构建数据湖。因此,我想将原始数据流存储到 s3 中,下面是我的代码 sn-p,我尝试过使用本地存储。

val tweets = TwitterUtils.createStream(ssc, None)
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText())
  import sql.implicits._
engtweets.foreachRDD  rdd =>
    val df = rdd.toDF()
    df.write.format("json").save("../Ramesh")
 

我想将原始数据(整个 JSON 对象)存储在 s3 中。

【问题讨论】:

【参考方案1】:

只需在 core-site.xml 中设置访问密钥和密钥如下:

<property>
    <name>fs.s3a.access.key</name>
    <value>...</value>
</property>
<property>
    <name>fs.s3a.secret.key</name>
    <value>...</value>
</property>

完成此操作后,您应该能够使用 s3 协议写入 s3,例如:s3a:///

希望这会有所帮助!

【讨论】:

【参考方案2】:

你可以简单地使用saveAsTextFile方法,路径前缀为

s3a://<file path>

必填,您的 Amazon s3 已正确设置(无论是否具有凭据)。

https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html

【讨论】:

感谢您的回复。我能够在本地存储数据。由于数据是流式传输的,我想将数据存储在 firehose 中并将其推送到 S3。因此,我在 Java 中编写了一个用于存储到 kinesis firehose 的方法,它运行良好。但是,我无法从 Dstreams 获取字符串。 val tweets = TwitterUtils.createStream(ssc, None) val engtweets = tweets.filter(status => status.getLang() == "en") val statuses = engtweets.map(engtweets => engtweets.toString()) 状态。 map(record => record.getBytes().toString()).print()

以上是关于Spark 将数据流式传输到 S3的主要内容,如果未能解决你的问题,请参考以下文章

定期将数据从 S3 存储桶流式传输到红移

Spark流式传输作业不会删除随机播放文件

如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件

带有广播连接的 Spark 流式传输

如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?

@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3