在 PySpark 中涉及带有管道的子进程的映射步骤失败

Posted

技术标签:

【中文标题】在 PySpark 中涉及带有管道的子进程的映射步骤失败【英文标题】:Map step that involves subprocess with pipe fails in PySpark 【发布时间】:2015-06-16 21:25:55 【问题描述】:

我的目标是读取包含 csv 数据的 hdfs 上的二进制(gpg 加密)文件。我的方法 - 遵循 this answer - 一直是定义一个 Python 函数来读取和解密 gpg 文件,产生每一行,并将此函数作为 flatMap 应用于文件的并行列表。

本质上,Python 函数会生成一个子进程,该子进程使用hadoop 读取文件并将结果通过管道传输到gpg 以进行解密。这在本地模式下运行 Spark 时工作得很好。然而,分布式运行它 (yarn-client),一个简单的行数返回 0,本质上是因为 Python 认为 stdout 管道总是关闭的。

问题似乎是子进程涉及两个命令之间的管道。当我删除后者(只是加密文件的行数)时,行数与我在命令行上得到的相匹配。我尝试了多种方法,结果都一样。

这是 Python 函数:

import subprocess as sp

def read_gpg_file_on_hdfs(filename):
    # Method 1:
    p = sp.Popen('hadoop fs -cat  | gpg -d'.format(filename), shell=True,
                 stdout=sp.PIPE)
    # Method 2:
    p1 = sp.Popen(['hadoop', 'fs', '-cat', filename], stdout=sp.PIPE)
    p = sp.Popen(['gpg', '-d'], stdin=p1.stdout, stdout=sp.PIPE)
    p1.stdout.close()

    # Method 3:
    p = sp.Ppen('gpg -d <(hadoop fs -cat )'.format(filename), shell=True,
                stdout=sp.PIPE, stderr=sp.PIPE)

    for line in p.stdout:
        yield line.strip()

这里是 Spark 命令:

sc.parallelize(['/path/to/file.gpg']).flatMap(read_gpg_file_on_hdfs).count()

现在我知道 PySpark 使用管道与 Spark 进行通信,但我没有关注细节,我不知道这是否会影响我正在尝试做的事情。我的问题是是否有办法完成我想做的事情。

请注意,我使用的是 Spark 1.2.1 分布式(MapR 的最新版本)。另外,我考虑过使用binaryFiles,但这对于我有时会遇到的大型 gpg 文件会失败。

提前致谢!

【问题讨论】:

【参考方案1】:

事实证明,gpg 命令实际上是问题所在。大概它与子进程在本地模式与分布式模式下如何启动的细节有关,但在本地模式下,gpghomedir 设置正确。但是在分布式模式下启动时,homedir 指向了一个错误的目录,第二个子进程立即失败。此错误消息似乎没有记录在任何地方,因此 stdout 只是作为空字符串返回。

【讨论】:

以上是关于在 PySpark 中涉及带有管道的子进程的映射步骤失败的主要内容,如果未能解决你的问题,请参考以下文章

使用带有 python3 的子进程模块管道两个命令时遇到问题

python子进程中的子shell

使用 pySpark 读取分号数据的管道

Perl:关闭信号处理程序中的子进程管道挂起?

Pyspark 将数组列分解为带有滑动窗口的子列表

持久的子进程管道 - 没有读取标准输出