PySpark - 运行进程
Posted
技术标签:
【中文标题】PySpark - 运行进程【英文标题】:PySpark - running processes 【发布时间】:2017-07-03 11:51:48 【问题描述】:在我的 Spark Streaming (Spark 2.1.0) 应用程序中,我需要从一个文件构建一个图表,并且初始化该图表需要 5 到 10 秒。
所以我尝试为每个执行程序初始化一次,这样它只会被初始化一次。
在运行应用程序后,我注意到每个执行程序启动了不止一次,每次都使用不同的进程 ID(每个进程都有自己的记录器)。
不是每个执行者都有自己的 JVM 和自己的进程吗?还是仅当我使用 Scala/Java 等 JVM 语言开发时才相关? PySpark 中的执行程序会为新任务生成新进程吗?
如果他们这样做了,我如何确保我的图形对象真的只会被启动一次?
我初始化对象的方式:
class MySingletons(object):
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
tagger = _init_tagger() # This returns an object with a graph inside of it
@classmethod
def handle_batch(cls, records_batch):
analyzed = cls.tagger.tag(records_batch)
return analyzed
然后我在驱动内部调用它:
def handle_partition(records: Sequence):
records_lst = list(records)
if len(records_lst) > 0:
MySingletons.handle_batch(records_lst)
def handle_rdd(rdd: RDD):
rdd_values = rdd.map(lambda x: x[1])
rdd_values.foreachPartition(handle_partition)
ssc.union(*streams).filter(lambda x: x[1] is not None).foreachRDD(handle_rdd)
ssc.start()
ssc.awaitTermination()
谢谢:)
【问题讨论】:
请发布您正在使用的软件组件的版本,您的代码 sn-p 的详细信息以及您如何初始化图形,如果您有任何屏幕截图.....没有它,无法回答正确。 如果您需要这种级别的控制,直接在 Scala 中编码将是您的最佳选择。 【参考方案1】:简而言之,JVM 语言和 Python 有很大的区别:
在 JVM 上,执行程序使用单独的线程。 在 Python 中,执行程序使用单独的进程。Python 对象的生命周期将被限制在一个特定的子进程中,并且每个进程都有自己的“单例”。
如果spark.python.worker.reuse
false
或长时间空闲,则 Python 工作人员可能会被杀死。
最后,动态分配会使事情变得更加复杂。
根据逻辑的细节,有很多可能的解决方法,但通常涉及很多,而且不通用。
【讨论】:
以上是关于PySpark - 运行进程的主要内容,如果未能解决你的问题,请参考以下文章