如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

Posted

技术标签:

【中文标题】如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?【英文标题】:How to access Amazon kinesis streams files in S3 in my pyspark code? 【发布时间】:2016-07-11 01:45:07 【问题描述】:

我在 S3 上获得了一个存储桶,其中包含使用“snappy”压缩格式的 Kinesis 流文件。文件夹结构和文件格式为s3://<mybucket>/yyyy/mm/dd/*.snappy

我正在尝试将其读入pyspark 中的sqlContext。通常,我会将存储桶指定为:

df = sqlContext.read.json('s3://<mybucket>/inputfile.json')

如何将所有这些多部分压缩文件放入我的数据框中?

更新:似乎我使用相同的构造取得了更多进展。但是,遇到堆大小问题:

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p
kill -9 %p"
#   Executing /bin/sh -c "kill 6128
kill -9 6128"...

数据量并没有那么大,但不知何故,这个解压缩步骤似乎让事情变得更糟。

【问题讨论】:

这属于服务器故障而不是堆栈溢出。 嗯...我不确定。对这个东西很陌生。你能不投票而不是投票吗? :) @CharlieFish 为什么这完全属于Server Fault?对我来说,这似乎是Stack Overflow 的问题。 已将其标记为需要进入服务器故障。至于否决票,不太清楚为什么 Stack Overflow 不让我删除它,给我一个错误,说我的投票被锁定,除非问题被编辑。 @MichaelHampton 我可能完全错了,但在我看来,这更像是一个与服务器相关的问题,而不是与编程相关的问题。 【参考方案1】:

如果您想获取所有天/月/年的所有 snappy 文件,请尝试以下操作:

s3://<mybucket>/*/*/*/*.snappy

前三个* 指的是/yyyy/mm/dd/ 子文件夹。

为了证明这是可行的,您可以执行以下测试:

创建了一个testDirectories 文件夹...并在其中嵌套了一些日期文件夹。

nestedDirectories/
-- 2016/
-- -- 12/
-- -- -- 15/
-- -- -- -- data.txt

data.txt里面:

hello
world
I
Have
Some
Words

然后我运行pyspark:

>>> rdd = sc.readText("/path/to/nestedDirectories/*/*/*/*.txt")
>>> rdd.count()
6

因此,星型模式适用于将文件导入 RDD。

因此,从这里开始,如果您在内存和其他方面遇到问题,可能是因为您的文件太多而文件大小太小。这被称为“小文件问题”https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html

【讨论】:

谢谢。这很有帮助。 我对此进行了测试,它与组织成分层文件夹结构的明文文件(忽略太多文件)完美配合。但是,当我有 snappy 压缩文件时,完全相同的代码无法将数据加载到 Spark 中(这是在 EMR 上,我相信默认软件中提供 snappy 编解码器)。对可能发生的事情有任何想法吗?我'这'接近。 :)

以上是关于如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?的主要内容,如果未能解决你的问题,请参考以下文章

从 pyspark 访问 S3 存储桶中的文件

在 pyspark 中从另一个数据库加载表

如何从数据帧列中的路径读取 AWS 上 pyspark 中的许多 Json 文件?

如何在 pyspark 中读取 s3 上的表格数据?

PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?

从 Google 的 dataproc 中读取 S3 数据