PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark

Posted

技术标签:

【中文标题】PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark【英文标题】:PySpark 2.x: Programmatically adding Maven JAR Coordinates to Spark 【发布时间】:2019-06-11 04:05:02 【问题描述】:

以下是我的PySpark启动sn-p,相当靠谱(我用了很久)。今天我添加了spark.jars.packages 选项中显示的两个 Maven 坐标(有效地“插入”了 Kafka 支持)。现在通常会触发依赖项下载(由 Spark 自动执行):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------
 
num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'

# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[]'.format(num_cpus)),
                   ('spark.app.name', 'myApp'),
                   ('spark.submit.deployMode', 'client'),
                   ('spark.ui.showConsoleProgress', 'true'),
                   ('spark.eventLog.enabled', 'false'),
                   ('spark.logConf', 'false'),
                   ('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),
                   ('spark.jars.ivy', JARS_IVY_REPO),
                   ('spark.jars.packages', spark_jars_packages), ])

spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

但是,当我运行 sn-p(例如 ./python -i init_spark.py)时,插件没有下载和/或加载,因为它们应该这样做。

这种机制曾经有效,但后来停止了。我错过了什么?

提前谢谢你!

【问题讨论】:

您可以尝试将--jars spark-sql-kafka-0-10_2.11-2.4.0.jar 添加到 spark 提交中。这样做很容易让它发挥作用。 谢谢。我不是通过 CLI 启动 pysparkpyspark 而是在 Python 交互式会话(或 Python 应用程序)中导入,如图所示。我明白你在说什么,但需要让你提到的机制在这种情况下工作。我在虚拟机中工作,但发生了一些变化,我不知道是什么。 无论您如何开始,JAR 都需要在某个时候提供给 Spark 驱动程序。请参阅您的spark.jars.packages 配置 当你指定 Maven 坐标时,正如我上面所说的,Spark 将下载 jars 和所有依赖项。引用手册:spark.jars.packages: Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories. 【参考方案1】:

在这种帖子中,问题比答案更有价值,因为上面的代码有效,但在 Spark 2.x 文档或示例中找不到。

以上是我通过 Maven 坐标以编程方式向 Spark 2.x 添加功能的方式。我有这个工作,但后来它停止工作。为什么?

当我在jupyter notebook 中运行上述代码时,笔记本电脑已经——在幕后——已经通过我的PYTHONSTARTUP 脚本运行了相同的代码sn-p。 PYTHONSTARTUP 脚本与上面的代码相同,但省略了 maven 坐标(有意)。

那么,这个微妙的问题是如何出现的:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

因为 Spark Session 已经存在,所以上面的语句只是重用了现有的 session (.getOrCreate()),它没有加载 jars/libraries(同样,因为我的 PYTHONSTARTUP 脚本故意省略了它们)。这就是为什么最好将打印语句放在 PYTHONSTARTUP 脚本中(否则它们是静默的)。

最后,我只是忘了这样做:$ unset PYTHONSTARTUP 在启动 JupyterLab / Notebook 守护进程之前。

我希望这个问题对其他人有所帮助,因为这就是如何以编程方式向 Spark 2.x(在本例中为 Kafka)添加功能。请注意,您需要互联网连接才能从 Maven Central 一次性下载指定的 jar 和递归依赖项。

【讨论】:

以上是关于PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pyspark 中以编程方式使用“计数”?

aws glue / pyspark - 如何使用 Glue 以编程方式创建 Athena 表

使用 udf 以编程方式从数据框中选择列

在 SWIFT 2.x 中以编程方式使用 NavigationController 调用 UIViewController

以编程方式终止 Spark 作业

映射 dict(来自 rdd)以递归方式更改 Python/PySpark 中的列名