读取驱动程序使用 spark-submit 发送的文件
Posted
技术标签:
【中文标题】读取驱动程序使用 spark-submit 发送的文件【英文标题】:Read files sent with spark-submit by the driver 【发布时间】:2016-01-20 12:25:07 【问题描述】:我通过运行发送 Spark 作业以在远程集群上运行
spark-submit ... --deploy-mode cluster --files some.properties ...
我想通过 driver 代码读取some.properties
文件的内容,即在创建 Spark 上下文和启动 RDD 任务之前。该文件会复制到远程驱动程序,但不会复制到驱动程序的工作目录。
我知道的解决这个问题的方法是:
-
将文件上传到 HDFS
将文件存储在应用 jar 中
两者都不方便,因为这个文件在提交的开发机器上经常更改。
有没有办法读取在驱动程序代码主方法期间使用--files
标志上传的文件?
【问题讨论】:
您可以将文件放在集群上所有节点都可以访问的网络挂载上。这样,您就可以在驱动程序中从该挂载中读取。您可以公开一个返回文件的简单端点。这样你的驱动程序就可以进行 http 调用了。 确实如此,@AlexNaspo,但多余。该文件与驱动程序 jar 一起传输,并存储在服务器上的同一文件系统中,只是不在同一文件夹中。通过 HTTP再次发送它似乎很浪费。 如果你接受@AlexNaspo 的建议,那么你就不需要通过--files
传递属性文件,这样就不会多余了?
【参考方案1】:
是的,您可以访问通过 --files
参数上传的文件。
这就是我能够访问通过--files
传入的文件的方式:
./bin/spark-submit \
--class com.MyClass \
--master yarn-cluster \
--files /path/to/some/file.ext \
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar \
/path/to/app.jar file.ext
在我的 Spark 代码中:
val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size
我确实相信这些文件会下载到与放置 jar 的同一目录中的工作程序中,这就是为什么只需传递文件名而不是 Source.fromFile
的绝对路径即可。
【讨论】:
这是我试过的。该文件被复制到工人的工作目录,而不是驱动程序的目录。也许在 YARN 模式下会有所不同。 哦,抱歉,我忽略了您没有在 YARN 模式下运行。有一个名为SparkFiles
的助手类。 SparkFiles.get(filename)
将返回 filename
下载到的路径,但在初始化 Spark 上下文之前您将无法使用它。在初始化 Spark 上下文之前,是否有任何特殊原因需要读取文件?
我使用其中一些属性来设置 Spark 参数
docs 说"every executor pulls the file from the driver HTTP server"
。这是否意味着一个简单的/path/to/some/file
本地存储在master
(或任何你调用的系统中执行spark-submit
)就足够了?还是我需要预先提供hdfs://
路径或手动将文件放在每个executor
中的指定路径?
要传递多个文件,是,
还是文件名之间有空格?【参考方案2】:
经过调查,我找到了解决上述问题的一种方法。在 spark-submit 期间发送 any.properties 配置,并在 SparkSession 初始化前后由 spark 驱动程序使用。希望对你有帮助。
any.properties
spark.key=value
spark.app.name=MyApp
SparkTest.java
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class SparkTest
public Static void main(String[] args)
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
Config conf = loadConf();
System.out.println(conf.getString("spark.key"));
// Initialize SparkContext and use configuration from properties
SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
public static Config loadConf()
String configFileName = "any.properties";
System.out.println(configFileName);
Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
System.out.println(configs.getString("spark.key")); // get value from properties file
return configs;
Spark 提交:
spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........
【讨论】:
【参考方案3】:--files
和 --archives
选项支持使用 #
指定文件名,就像 Hadoop。
例如,您可以指定:--files localtest.txt#appSees.txt
,这会将您在本地命名为 localtest.txt
的文件上传到 Spark 工作目录,但这将通过名称链接到 appSees.txt
,并且您的应用程序应使用名称作为appSees.txt
在YARN 上运行时引用它。
这适用于我在 yarn/client 和 yarn/cluster 模式下的 Spark Streaming 应用程序。
【讨论】:
嘿 - 这对我不起作用。你能看看here,看看你能不能告诉我我做错了什么?【参考方案4】:使用spark-submit --help
,会发现这个选项只适用于executor的工作目录而不是driver。
--files FILES: Comma-separated list of files to be placed in the working directory of each executor.
【讨论】:
【参考方案5】:这是我在 Python Spark 中开发的一个很好的解决方案,用于将任何数据作为文件从外部集成到您的大数据平台。
玩得开心。
# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly)
# (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html)
def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar):
localTextFilePath = localTextFilePath.strip(' ')
if (localTextFilePath.startswith("file://")):
localTextFilePath = localTextFilePath[7:]
import subprocess
dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True)
textRDD = sparkContext.parallelize(dataBytes.split(splitChar))
return textRDD
# Usage example
myRDD = parallelizeTextFileToRDD(sc, '~/myTextFile.txt', '\n') # Load my local file as a RDD
myRDD.saveAsTextFile('/user/foo/myTextFile') # Store my data to HDFS
【讨论】:
当你可以从 Python 中open()
时,为什么要 cat
一个文件?此外,这似乎与实际的--files
/--archives
支持相悖【参考方案6】:
解决此问题的一种方法是,您可以通过调用SparkContext.getOrCreate()
创建一个临时的SparkContext
,然后在SparkFiles.get('FILE')
的帮助下读取您在--files
中传递的文件。
读取文件后,在SparkConf()
变量中检索您需要的所有必要配置。
之后调用这个函数:
SparkContext.stop(SparkContext.getOrCreate())
这将破坏现有的SparkContext
,然后在下一行中简单地使用这样的必要配置初始化一个新的SparkContext
。
sc = SparkContext(conf=conf).getOrCreate()
你得到了一个SparkContext
并具有所需的设置
【讨论】:
【参考方案7】:在 pyspark 中,我发现轻松实现这一点非常有趣,首先像这样安排你的工作目录:
/path/to/your/workdir/
|--code.py
|--file.txt
然后在你的 code.py 主函数中,像往常一样读取文件:
if __name__ == "__main__":
content = open("./file.txt").read()
然后不加任何具体配置就提交如下:
spark-submit code.py
它运行正确,这让我很惊讶。我想提交过程将所有文件和子目录文件完全存档,并将它们发送到 pyspark 中的驱动程序,而您应该自己将它们存档在 scala 版本中。顺便说一句,--files 和 --archives 选项都在 worker 而不是驱动程序中工作,这意味着您只能在 RDD 转换或操作中访问这些文件。
【讨论】:
open("./file.txt")
不是 Spark 代码。您在这里所做的只是直接在驱动程序节点上运行 Python。上次查了一下,“当前工作目录”没有绑定spark-submit
以上是关于读取驱动程序使用 spark-submit 发送的文件的主要内容,如果未能解决你的问题,请参考以下文章