PySpark (Python):通过 SparkContext.newAPIHadoopFile 加载多行记录

Posted

技术标签:

【中文标题】PySpark (Python):通过 SparkContext.newAPIHadoopFile 加载多行记录【英文标题】:PySpark (Python): loading multiline records via SparkContext.newAPIHadoopFile 【发布时间】:2016-07-02 15:55:09 【问题描述】:

我正在加载一个文本文件,它采用 TSV(表格分隔值)表示法,但每行中没有键。因此,一行表示一个特定的变量,随后的所有行都是该变量的值,直到出现新变量。

因此,我使用自定义分隔符加载文件(在 Jupyter Notebook Python 2.7 - Pyspark 中):

sheet = sc.newAPIHadoopFile(
    'sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf='textinputformat.record.delimiter': 'var::'
)

我的问题是,这样的多行记录的大小如何?一个变量的值可能是数千行。 Spark 是在一台机器上一次加载文件还是将该块拆分为较小的块(块)然后进行处理?

只是想确保内存没有在处理节点上爆炸。感谢您的任何解释。

【问题讨论】:

【参考方案1】:

o.a.h.mapreduce.lib.input.TextInputFormat 返回的每个 (key, value) 对都是包含偏移量 (long) 和字符串的单个本地数据结构。没有机制可以在不创建自定义 Hadoop InputFormat 的情况下在多个记录之间拆分值。

“千行”不是很精确的描述,但作为一个经验法则:

如果磁盘上的大小小于几兆字节,那么您很可能会这样做。 否则您将不得不跟踪内存使用情况和 GC 并调整配置。

还请记住,大型记录可能会导致资源利用率不佳。在最坏的情况下,您最终可能会得到每个任务的单个记录,其中记账成本可能远高于实际执行成本。

【讨论】:

一个 170MB 的文件包含 50 条以“时间”开头的多行记录,总共 5.252.874 行。因此,一条记录大约有 106.000 行。将其作为单个块阅读可能不是一个好主意。另一种选择可能是读取所有文件并记住“时间”发生的行号,然后使用该信息重新读取文件以构造键-时间戳-值三元组。 和以前一样的应用吗? 确实如此。但我正在用一个小样本进行测试。每个文件的真实数据约为 70 到 170MB。 这应该不是什么大问题。它给出了什么?每条记录 3.5MB?稍大但没什么特别的。 我会在星期一检查一下,然后通知你。其他选项可能是上面提到的选项,将“时间”的出现保存在全局变量中并在重新读取时使用它。或者写一个自己的converter as mentioned here。但确实,3.5MB 应该没问题。

以上是关于PySpark (Python):通过 SparkContext.newAPIHadoopFile 加载多行记录的主要内容,如果未能解决你的问题,请参考以下文章

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

独家 | PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)

在非 Spark 环境中加载 pyspark ML 模型

python连接spark(pyspark)

Apache Spark:如何在Python 3中使用pyspark

可从 PySpark/Python 调用的 Spark(2.3+)Java 函数 [重复]