在 Python 中创建自定义 Spark RDD

Posted

技术标签:

【中文标题】在 Python 中创建自定义 Spark RDD【英文标题】:Creating a custom Spark RDD in Python 【发布时间】:2015-07-12 13:07:32 【问题描述】:

是否可以在 Python 中扩展 Spark 的 RDD 以添加自定义运算符?如果不可能,如何为扩展 RDD 的类包装 Scala 代码,例如这里的: http://blog.madhukaraphatak.com/extending-spark-api/

编辑:我正在尝试创建一个新的 RDD,比如 PersonRDD,并在 PersonRDD 上添加一组新的运算符,例如。 PersonRDD.computeMedianIncome()。根据下面的链接,在 Python 中做到这一点并非易事。但是,由于它是一个旧线程,我想知道是否有任何新的更新。如果没有,我想使用 Scala 来做,但我不知道如何使用 Py4J 从 Python 调用类(mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/...)

任何建议或帮助将不胜感激。

曼迪

【问题讨论】:

【参考方案1】:

在分布式环境中计算精确的中位数需要付出一些努力,因此假设您想要对 RDD 中的所有值进行平方运算。让我们将此方法称为squares 并假设它应该如下工作:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1。修改pyspark.RDD定义:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]

注意:如果您修改类定义,每个实例都可以访问squares

2。创建 RDD 子类:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

分配一个类是一种肮脏的技巧,因此在实践中您应该以适当的方式创建一个 RDD(例如参见 context.parallelize 实现)。

3。向实例添加方法

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

免责声明

首先,我没有测试任何这些足够长的时间来确保那里没有隐藏的问题。

此外,我认为这真的不值得大惊小怪。如果没有静态类型检查,真的很难找到任何好处,您可以使用函数、currying 和pipes 以更简洁的方式获得类似的结果。

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())

【讨论】:

谢谢@zero323。我本来希望干净地继承 RDD 形式,就像在 Scala 或 Java 中可以做的那样,而不是破解解决方案。解决方案 1 不起作用,因为用户可以调用错误类型的新运算符。 2 不适用于 RDD 的子类,例如:newAPIHadoopFile,但可以为我工作...再次感谢您抽出宝贵时间提出解决方案 嗯,你必须记住几件事。 Python 中的键入规则与 Scala 完全不同,Python RDD 不是按类型参数化的。从 Scala 的角度来看,每个 Python RDD 看起来都像 RDD[Any]。因此,您有责任仅调用适用的方法。 sc.parallelize(range(3)).groupByKey() 之类的东西显然没有意义,并且在执行转换时会失败,但在类型级别上并没有错。 与 Scala 不同,您可以在运行时修改现有类。没有隐式转换地狱,我们知道“显式优于隐式”。如果添加方法,类型仍然没有问题。它唯一说的是,根据实例的状态,调用此方法可能是有效的。从概念的角度来看,它可能是最接近 Scala 隐式方法的东西。尽管如此,我认为管道函数调用更安全,更 Pythonic,并且如果想在 Spark 之上创建 DSL 也同样有效。 这完全有道理。你显然比我有更多的 Spark 经验,所以我也会考虑管道示例。【参考方案2】:

我遇到了类似的问题,虽然到目前为止我还没有在我的扩展版本上测试正常 RDD 的全部功能,但它按预期工作。它确实需要一些工作,我不确定这是否是最好的解决方案,但我正在做的只是扩展 RDD 类,通过在新类的构造函数中传递返回新 RDD 的方法并添加类的方法。下面是一小段代码:

from pyspark.rdd import RDD, PipelinedRDD

class CustomRDD(RDD):
    def __init__(self, rdd, first=True):
        if first:
            rdd = custom_parser(rdd)
        self._jrdd = rdd._jrdd
        self.is_cached = rdd.is_cached
        self.is_checkpointed = rdd.is_checkpointed
        self.ctx = rdd.ctx
        self._jrdd_deserializer = rdd._jrdd_deserializer
        self._id = rdd._id
        self.partitioner = rdd.partitioner

    def mapPartitionsWithIndex(self, f, preservesPartition=False):
        return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)

    def union(self, other):
        return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)

    def custom_method(self):
        return CustomRDD(self.filter(lambda x: x.has_property()), False)

mapPartitionsWithIndex 方法被许多其他 RDD 功能调用,因此涵盖了很多功能,但是您必须使用自己的构造函数来包装很多其他方法,以像我对 union 所做的那样继续获取您自己的 CustomRDD .

【讨论】:

以上是关于在 Python 中创建自定义 Spark RDD的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Azure Databricks 中创建自定义进度条指示器

如何在 PySpark ML 中创建自定义标记器

如何在 PySpark ML 中创建自定义 SQLTransformer 以透视数据

在 NLTK 和 Python 中创建自定义分类语料库

Django中创建自定义标签与过虑器

在 JavaScript 中创建自定义 UI 框架的最佳实践 [关闭]