在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库

Posted

技术标签:

【中文标题】在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库【英文标题】:use an external library in pyspark job in a Spark cluster from google-dataproc 【发布时间】:2016-01-26 13:57:52 【问题描述】:

我有一个通过 google dataproc 创建的 spark 集群。我希望能够使用 databricks 中的 csv 库(请参阅 https://github.com/databricks/spark-csv)。所以我首先是这样测试的:

我与集群的主节点启动了一个 ssh 会话,然后我输入:

pyspark --packages com.databricks:spark-csv_2.11:1.2.0

然后它启动了一个 pyspark shell,我在其中输入:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('gs:/xxxx/foo.csv')
df.show()

它奏效了。

我的下一步是使用以下命令从我的主机启动此作业:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> my_job.py

但在这里它不起作用,我得到一个错误。我想是因为我没有给出--packages com.databricks:spark-csv_2.11:1.2.0 作为论据,但我尝试了 10 种不同的方式来给出它,但我没有成功。

我的问题是:

    在我输入pyspark --packages com.databricks:spark-csv_2.11:1.2.0 后是否安装了databricks csv 库 我可以在job.py 中写一行以便导入吗? 或者我应该为我的 gcloud 命令提供哪些参数来导入或安装它?

【问题讨论】:

Dataproc 中存在一个错误,即没有为 Pyspark 作业提取 JARS。我正在寻找一种替代解决方案。我只是想让您知道我们正在研究更大的错误,并且我正在查看我们是否也可以为您确定临时修复程序。 :) 希望在这里也能找到解决方法和修复方法,谢谢@James!我们正在尝试将 dataproc 与 python 和 scala 中的 cassandra 连接器一起使用 【参考方案1】:

简答

如果--packages 出现在my_job.py 参数之后,则spark-submit 不接受参数的排序。要解决此问题,您可以在从 Dataproc 的 CLI 提交时执行以下操作:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
    --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py

基本上,只需在命令中的.py 文件之前添加--properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0

长答案

因此,这实际上是一个与已知的 gcloud beta dataproc jobs submit pyspark 中缺乏对 --jars 的支持不同的问题;看来,如果 Dataproc 没有将 --packages 明确识别为特殊的 spark-submit 级别标志,它会尝试在应用程序参数之后传递它,以便 spark-submit 让 --packages 作为应用程序参数,而不是将其正确解析为提交级别选项。实际上,在 SSH 会话中,以下操作不起作用

# Doesn't work if job.py depends on that package.
spark-submit job.py --packages com.databricks:spark-csv_2.11:1.2.0

但是,即使在 pyspark 的情况下,两种顺序都有效,但切换参数的顺序确实可以再次工作:

# Works with dependencies on that package.
spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py
pyspark job.py --packages com.databricks:spark-csv_2.11:1.2.0
pyspark --packages com.databricks:spark-csv_2.11:1.2.0 job.py

因此,尽管 spark-submit job.py 应该是以前称为 pyspark job.py 的所有内容的直接替代品,但 --packages 之类的解析顺序的差异意味着它实际上并不是 100% 兼容的迁移。这可能是 Spark 方面需要跟进的事情。

无论如何,幸运的是有一个解决方法,因为--packages 只是 Spark 属性spark.jars.packages 的另一个别名,而且 Dataproc 的 CLI 支持属性很好。因此,您可以执行以下操作:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
    --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py

注意--properties 必须在my_job.py 之前,否则它会作为应用程序参数而不是配置标志发送。希望对你有用!请注意,SSH 会话中的等效项是 spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py

【讨论】:

这对我有帮助,但我现在正努力在我的包之外注册一个新的存储库。我尝试添加--properties spark.jars.packages=org.elasticsearch:elasticsearch-hadoop:2.4.0,spark.jars.ivy=http://conjars.org/repo,但不知何故,两个正斜杠被转换为一个,驱动程序通过下面的错误输出。您对此错误有任何想法/提供带有两个正斜杠的完全限定网址的正确方法:Exception in thread "main" java.lang.IllegalArgumentException: basedir must be absolute: http:/conjars.org/repo/local【参考方案2】:

除了@Dennis。

注意,如果你需要加载多个外部包,你需要像这样指定一个自定义的转义字符:

--properties ^#^spark.jars.packages=org.elasticsearch:elasticsearch-spark_2.10:2.3.2,com.data‌​bricks:spark-avro_2.10:2.0.1

请注意软件包列表之前的 ^#^。 有关详细信息,请参阅gcloud topic escaping

【讨论】:

以上是关于在 google-dataproc 的 Spark 集群中的 pyspark 作业中使用外部库的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习笔记5:Spark集群架构

如何在 spark-shell (spark 2.1.1) 中启用对 spark 的 Hive 支持

《Spark 官方文档》在Mesos上运行Spark

spark日志收集

Spark 在 Yarn 上运行 Spark 应用程序

Spark-Unit1-spark概述与安装部署