在 pyspark 地图逻辑中使用 sparksql 不起作用

Posted

技术标签:

【中文标题】在 pyspark 地图逻辑中使用 sparksql 不起作用【英文标题】:Using sparksql inside pyspark map logic not working 【发布时间】:2017-10-02 11:31:21 【问题描述】:

我有许多小文件。我想将它们加载到 RDD 中。然后映射它们以在这些文件上并行执行算法。该算法将需要从 HDFS/Hive 表中获取数据。当我使用 SparkSQL 获取数据时,出现以下错误:

pickle.PicklingError:无法序列化对象:异常:它 似乎您正试图从 广播变量、动作或转换。 SparkContext 只能 用于驱动程序,而不是在工作人员上运行的代码中。更多 信息,请参阅 SPARK-5063。

SparkSQL 使用 SQLContext,它是 SparkContext 的包装器。这是否意味着我不能在对工作人员执行的代码中使用 SparkSQL?但是那样的话,就太局限了。

有人可以分享一些关于如何在 PySpark 中编写我的逻辑的知识吗?

这是我正在使用的示例 PySpark 代码:

def apply_algorithm(filename):
    /* SparkSQL logic goes here */ 
    /* some more logic */
    return someResult


def main(argv):
    print "Entered main method"
    input_dir = sys.argv[1]
    output_dir = sys.argv[2]

    fileNameContentMapRDD = sc.wholeTextFiles(input_dir)
    print "fileNameContentMapRDD = " , fileNameContentMapRDD.collect()

    resultRDD = fileNameContentMapRDD.map(lambda x : apply_algorithm(x[0]))

    print resultRDD.collect()
    print "end of main."

【问题讨论】:

【参考方案1】:

这是否意味着我不能在工作人员执行的代码中使用 SparkSQL?

是的,就是这个意思。在并行上下文中,您既不能使用 RDDs 也不能使用 DataFrames

【讨论】:

感谢您的及时回复。我想并行处理每个小文件(因为它们是完全独立的)。当我无法使用 SparkSQL 时,哪些选项适合在工作人员中查询 HDFS/HiveTables? worker 内应该不需要查询 HDFS/Hive 表。您可以在驱动程序中查询相关数据。当您调用 collect 时,Spark 会将查询的评估结果分发到不同的工作节点并将结果返回给驱动程序。 然后我将不得不像这样重组我的代码: 然后我将不得不像这样重构我的代码: def apply_algorithm(filename): /* SparkSQL 逻辑在这里 / / 更多逻辑 / / 为每个输入文件将输出写入单独的文件*/ def main(argv): print "Entered main method" input_dir = sys.argv[1] output_dir = sys.argv[2] /* 循环遍历每个文件输入文件目录并为每个文件调用 apply_algorithm(filename) */ 但是如果我如上所示循环输入文件,我将按顺序处理它们 - 似乎违背了我为此查看 Spark 的目的。此外,在独立处理每个输入文件后,我需要生成单独的输出文件。我只能利用 Spark 的分布式计算在文件内而不是跨文件进行处理。

以上是关于在 pyspark 地图逻辑中使用 sparksql 不起作用的主要内容,如果未能解决你的问题,请参考以下文章

如何将 pyspark 输出写入 impala 表?

如何在 pyspark 的结构化流作业中运行地图转换

如何在pyspark地图中添加增量数字

在脚本外部编写时,具有用户定义功能的 pyspark 地图不起作用

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?

在 pyspark RDD 上应用地图功能