PySpark - 运行进程

Posted

技术标签:

【中文标题】PySpark - 运行进程【英文标题】:PySpark - running processes 【发布时间】:2017-07-03 11:51:48 【问题描述】:

在我的 Spark Streaming (Spark 2.1.0) 应用程序中,我需要从一个文件构建一个图表,并且初始化该图表需要 5 到 10 秒。

所以我尝试为每个执行程序初始化一次,这样它只会被初始化一次。

在运行应用程序后,我注意到每个执行程序启动了不止一次,每次都使用不同的进程 ID(每个进程都有自己的记录器)。

不是每个执行者都有自己的 JVM 和自己的进程吗?还是仅当我使用 Scala/Java 等 JVM 语言开发时才相关? PySpark 中的执行程序会为新任务生成新进程吗?

如果他们这样做了,我如何确保我的图形对象真的只会被启动一次?

我初始化对象的方式:

class MySingletons(object):
    kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    tagger = _init_tagger()  # This returns an object with a graph inside of it
    @classmethod
    def handle_batch(cls, records_batch):
        analyzed = cls.tagger.tag(records_batch)
        return analyzed

然后我在驱动内部调用它:

def handle_partition(records: Sequence):
    records_lst = list(records)
    if len(records_lst) > 0:
        MySingletons.handle_batch(records_lst)


def handle_rdd(rdd: RDD):
    rdd_values = rdd.map(lambda x: x[1])
    rdd_values.foreachPartition(handle_partition)


ssc.union(*streams).filter(lambda x: x[1] is not None).foreachRDD(handle_rdd)
ssc.start()
ssc.awaitTermination()

谢谢:)

【问题讨论】:

请发布您正在使用的软件组件的版本,您的代码 sn-p 的详细信息以及您如何初始化图形,如果您有任何屏幕截图.....没有它,无法回答正确。 如果您需要这种级别的控制,直接在 Scala 中编码将是您的最佳选择。 【参考方案1】:

简而言之,JVM 语言和 Python 有很大的区别:

在 JVM 上,执行程序使用单独的线程。 在 Python 中,执行程序使用单独的进程。

Python 对象的生命周期将被限制在一个特定的子进程中,并且每个进程都有自己的“单例”。

如果spark.python.worker.reuse false 或长时间空闲,则 Python 工作人员可能会被杀死。

最后,动态分配会使事情变得更加复杂。

根据逻辑的细节,有很多可能的解决方法,但通常涉及很多,而且不通用。

【讨论】:

以上是关于PySpark - 运行进程的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:异常:Java 网关进程在向驱动程序发送其端口号之前退出

Pyspark 数据分布

Pyspark - 使用累加器检查 Json 格式

在 PySpark 中涉及带有管道的子进程的映射步骤失败

在pyspark中替换循环到并行进程

PySpark python错误:异常:Java网关进程在向驱动程序发送其端口号之前退出