如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?
Posted
技术标签:
【中文标题】如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?【英文标题】:How to access Amazon kinesis streams files in S3 in my pyspark code? 【发布时间】:2016-07-11 01:45:07 【问题描述】:我在 S3 上获得了一个存储桶,其中包含使用“snappy”压缩格式的 Kinesis 流文件。文件夹结构和文件格式为s3://<mybucket>/yyyy/mm/dd/*.snappy
。
我正在尝试将其读入pyspark
中的sqlContext
。通常,我会将存储桶指定为:
df = sqlContext.read.json('s3://<mybucket>/inputfile.json')
如何将所有这些多部分压缩文件放入我的数据框中?
更新:似乎我使用相同的构造取得了更多进展。但是,遇到堆大小问题:
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p
kill -9 %p"
# Executing /bin/sh -c "kill 6128
kill -9 6128"...
数据量并没有那么大,但不知何故,这个解压缩步骤似乎让事情变得更糟。
【问题讨论】:
这属于服务器故障而不是堆栈溢出。 嗯...我不确定。对这个东西很陌生。你能不投票而不是投票吗? :) @CharlieFish 为什么这完全属于Server Fault?对我来说,这似乎是Stack Overflow 的问题。 已将其标记为需要进入服务器故障。至于否决票,不太清楚为什么 Stack Overflow 不让我删除它,给我一个错误,说我的投票被锁定,除非问题被编辑。 @MichaelHampton 我可能完全错了,但在我看来,这更像是一个与服务器相关的问题,而不是与编程相关的问题。 【参考方案1】:如果您想获取所有天/月/年的所有 snappy 文件,请尝试以下操作:
s3://<mybucket>/*/*/*/*.snappy
前三个*
指的是/yyyy/mm/dd/
子文件夹。
为了证明这是可行的,您可以执行以下测试:
创建了一个testDirectories
文件夹...并在其中嵌套了一些日期文件夹。
nestedDirectories/
-- 2016/
-- -- 12/
-- -- -- 15/
-- -- -- -- data.txt
在data.txt
里面:
hello
world
I
Have
Some
Words
然后我运行pyspark
:
>>> rdd = sc.readText("/path/to/nestedDirectories/*/*/*/*.txt")
>>> rdd.count()
6
因此,星型模式适用于将文件导入 RDD。
因此,从这里开始,如果您在内存和其他方面遇到问题,可能是因为您的文件太多而文件大小太小。这被称为“小文件问题”https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html
【讨论】:
谢谢。这很有帮助。 我对此进行了测试,它与组织成分层文件夹结构的明文文件(忽略太多文件)完美配合。但是,当我有 snappy 压缩文件时,完全相同的代码无法将数据加载到 Spark 中(这是在 EMR 上,我相信默认软件中提供 snappy 编解码器)。对可能发生的事情有任何想法吗?我'这'接近。 :)以上是关于如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?的主要内容,如果未能解决你的问题,请参考以下文章
如何从数据帧列中的路径读取 AWS 上 pyspark 中的许多 Json 文件?