读取驱动程序使用 spark-submit 发送的文件

Posted

技术标签:

【中文标题】读取驱动程序使用 spark-submit 发送的文件【英文标题】:Read files sent with spark-submit by the driver 【发布时间】:2016-01-20 12:25:07 【问题描述】:

我通过运行发送 Spark 作业以在远程集群上运行

spark-submit ... --deploy-mode cluster --files some.properties ...

我想通过 driver 代码读取some.properties 文件的内容,即在创建 Spark 上下文和启动 RDD 任务之前。该文件会复制到远程驱动程序,但不会复制到驱动程序的工作目录。

我知道的解决这个问题的方法是:

    将文件上传到 HDFS 将文件存储在应用 jar 中

两者都不方便,因为这个文件在提交的开发机器上经常更改。

有没有办法读取在驱动程序代码主方法期间使用--files 标志上传的文件?

【问题讨论】:

您可以将文件放在集群上所有节点都可以访问的网络挂载上。这样,您就可以在驱动程序中从该挂载中读取。您可以公开一个返回文件的简单端点。这样你的驱动程序就可以进行 http 调用了。 确实如此,@AlexNaspo,但多余。该文件与驱动程序 jar 一起传输,并存储在服务器上的同一文件系统中,只是不在同一文件夹中。通过 HTTP再次发送它似乎很浪费。 如果你接受@AlexNaspo 的建议,那么你就不需要通过--files 传递属性文件,这样就不会多余了? 【参考方案1】:

是的,您可以访问通过 --files 参数上传的文件。

这就是我能够访问通过--files 传入的文件的方式:

./bin/spark-submit \
--class com.MyClass \
--master yarn-cluster \
--files /path/to/some/file.ext \
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar \
/path/to/app.jar file.ext

在我的 Spark 代码中:

val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size

我确实相信这些文件会下载到与放置 jar 的同一目录中的工作程序中,这就是为什么只需传递文件名而不是 Source.fromFile 的绝对路径即可。

【讨论】:

这是我试过的。该文件被复制到工人的工作目录,而不是驱动程序的目录。也许在 YARN 模式下会有所不同。 哦,抱歉,我忽略了您没有在 YARN 模式下运行。有一个名为SparkFiles 的助手类。 SparkFiles.get(filename) 将返回 filename 下载到的路径,但在初始化 Spark 上下文之前您将无法使用它。在初始化 Spark 上下文之前,是否有任何特殊原因需要读取文件? 我使用其中一些属性来设置 Spark 参数 docs 说"every executor pulls the file from the driver HTTP server"。这是否意味着一个简单的/path/to/some/file 本地存储在master(或任何你调用的系统中执行spark-submit)就足够了?还是我需要预先提供hdfs:// 路径或手动将文件放在每个executor 中的指定路径? 要传递多个文件,是,还是文件名之间有空格?【参考方案2】:

经过调查,我找到了解决上述问题的一种方法。在 spark-submit 期间发送 any.properties 配置,并在 SparkSession 初始化前后由 spark 驱动程序使用。希望对你有帮助。

any.properties

spark.key=value
spark.app.name=MyApp

SparkTest.java

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class SparkTest

  public Static void main(String[] args)

    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();

    Config conf = loadConf();
    System.out.println(conf.getString("spark.key"));

    // Initialize SparkContext and use configuration from properties
    SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));

    SparkSession sparkSession = 
    SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
                .enableHiveSupport().getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

  


  public static Config loadConf() 

      String configFileName = "any.properties";
      System.out.println(configFileName);
      Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
      System.out.println(configs.getString("spark.key")); // get value from properties file
      return configs;
   

Spark 提交:

spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........

【讨论】:

【参考方案3】:

--files--archives 选项支持使用 # 指定文件名,就像 Hadoop。

例如,您可以指定:--files localtest.txt#appSees.txt,这会将您在本地命名为 localtest.txt 的文件上传到 Spark 工作目录,但这将通过名称链接到 appSees.txt,并且您的应用程序应使用名称作为appSees.txt 在YARN 上运行时引用它。

这适用于我在 yarn/client 和 yarn/cluster 模式下的 Spark Streaming 应用程序。

【讨论】:

嘿 - 这对我不起作用。你能看看here,看看你能不能告诉我我做错了什么?【参考方案4】:

使用spark-submit --help,会发现这个选项只适用于executor的工作目录而不是driver。

--files FILES: Comma-separated list of files to be placed in the working directory of each executor.

【讨论】:

【参考方案5】:

这是我在 Python Spark 中开发的一个很好的解决方案,用于将任何数据作为文件从外部集成到您的大数据平台。

玩得开心。

# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly)
# (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html)
def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar):
    localTextFilePath = localTextFilePath.strip(' ')
    if (localTextFilePath.startswith("file://")):
        localTextFilePath = localTextFilePath[7:]
    import subprocess
    dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True)
    textRDD = sparkContext.parallelize(dataBytes.split(splitChar))
    return textRDD

# Usage example
myRDD = parallelizeTextFileToRDD(sc, '~/myTextFile.txt', '\n') # Load my local file as a RDD
myRDD.saveAsTextFile('/user/foo/myTextFile') # Store my data to HDFS

【讨论】:

当你可以从 Python 中 open() 时,为什么要 cat 一个文件?此外,这似乎与实际的--files/--archives 支持相悖【参考方案6】:

解决此问题的一种方法是,您可以通过调用SparkContext.getOrCreate() 创建一个临时的SparkContext,然后在SparkFiles.get('FILE') 的帮助下读取您在--files 中传递的文件。

读取文件后,在SparkConf() 变量中检索您需要的所有必要配置。

之后调用这个函数:

SparkContext.stop(SparkContext.getOrCreate())

这将破坏现有的SparkContext,然后在下一行中简单地使用这样的必要配置初始化一个新的SparkContext

sc = SparkContext(conf=conf).getOrCreate()

你得到了一个SparkContext 并具有所需的设置

【讨论】:

【参考方案7】:

在 pyspark 中,我发现轻松实现这一点非常有趣,首先像这样安排你的工作目录:

/path/to/your/workdir/
|--code.py
|--file.txt

然后在你的 code.py 主函数中,像往常一样读取文件:

if __name__ == "__main__":
    content = open("./file.txt").read()

然后不加任何具体配置就提交如下:

spark-submit code.py

它运行正确,这让我很惊讶。我想提交过程将所有文件和子目录文件完全存档,并将它们发送到 pyspark 中的驱动程序,而您应该自己将它们存档在 scala 版本中。顺便说一句,--files 和 --archives 选项都在 worker 而不是驱动程序中工作,这意味着您只能在 RDD 转换或操作中访问这些文件。

【讨论】:

open("./file.txt") 不是 Spark 代码。您在这里所做的只是直接在驱动程序节点上运行 Python。上次查了一下,“当前工作目录”没有绑定spark-submit

以上是关于读取驱动程序使用 spark-submit 发送的文件的主要内容,如果未能解决你的问题,请参考以下文章

如何从 spark-shell/spark-submit 运行交互式 Spark 应用程序

使用 Spark-Submit 运行烧瓶应用程序

抛开spark-submit脚本提交spark程序

抛开spark-submit脚本提交spark程序

spark-submit 应用程序第三方jar文件

在 AWS EMR 中使用 spark-submit 启动 Python 应用程序