Spark 将数据流式传输到 S3
Posted
技术标签:
【中文标题】Spark 将数据流式传输到 S3【英文标题】:Spark Streaming Data to S3 【发布时间】:2017-10-08 07:51:12 【问题描述】:我正在 S3 中构建数据湖。因此,我想将原始数据流存储到 s3 中,下面是我的代码 sn-p,我尝试过使用本地存储。
val tweets = TwitterUtils.createStream(ssc, None)
val engtweets = tweets.filter(status => status.getLang() == "en").map(x => x.getText())
import sql.implicits._
engtweets.foreachRDD rdd =>
val df = rdd.toDF()
df.write.format("json").save("../Ramesh")
我想将原始数据(整个 JSON 对象)存储在 s3 中。
【问题讨论】:
【参考方案1】:只需在 core-site.xml 中设置访问密钥和密钥如下:
<property>
<name>fs.s3a.access.key</name>
<value>...</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>...</value>
</property>
完成此操作后,您应该能够使用 s3 协议写入 s3,例如:s3a:///
希望这会有所帮助!
【讨论】:
【参考方案2】:你可以简单地使用saveAsTextFile
方法,路径前缀为
s3a://<file path>
必填,您的 Amazon s3 已正确设置(无论是否具有凭据)。
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html
【讨论】:
感谢您的回复。我能够在本地存储数据。由于数据是流式传输的,我想将数据存储在 firehose 中并将其推送到 S3。因此,我在 Java 中编写了一个用于存储到 kinesis firehose 的方法,它运行良好。但是,我无法从 Dstreams 获取字符串。 val tweets = TwitterUtils.createStream(ssc, None) val engtweets = tweets.filter(status => status.getLang() == "en") val statuses = engtweets.map(engtweets => engtweets.toString()) 状态。 map(record => record.getBytes().toString()).print()以上是关于Spark 将数据流式传输到 S3的主要内容,如果未能解决你的问题,请参考以下文章
如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件
如何使用 boto 将文件从 Amazon S3 流式传输到 Rackspace Cloudfiles?
@aws-sdk/lib-storage 使用 JSONStream.stringify() 将 JSON 从 MongoDB 流式传输到 S3