使用 sc.textFile() 加载本地文件以触发

Posted

技术标签:

【中文标题】使用 sc.textFile() 加载本地文件以触发【英文标题】:load a local file to spark using sc.textFile() 【发布时间】:2015-11-10 21:50:36 【问题描述】:

问题

如何使用 sc.textFile 将文件从本地文件系统加载到 Spark?我需要更改任何 -env 变量吗?此外,当我在未安装 Hadoop 的 Windows 上尝试相同操作时,我得到了同样的错误。

代码

> val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(63280) called with curMem=0, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 61.8 KB, free 265.1 MB)
/17 22:28:18 INFO MemoryStore: ensureFreeSpace(19750) called with curMem=63280, maxMem=278019440
/17 22:28:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.3 KB, free 265.1 MB)
/17 22:28:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:53659 (size: 19.3 KB, free: 265.1 MB)
/17 22:28:18 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
File: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

> val words = input.flatMap(line => line.split(" "))
ole>:19: error: not found: value input
  val words = input.flatMap(line => line.split(" "))
              ^

> val words = inputFile.flatMap(line => line.split(" "))
: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

> val counts = words.map(word => (word, 1)).reduceByKeycase (x, y) => x + y

错误

apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/c:/spark-1.4.1-bin-hadoop2.6/bin/file/C:/Users/swaapnika/Desktop/to do list
   at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
   at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
   at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
   at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
   at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
   at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
   at $iwC$$iwC$$iwC.<init>(<console>:38)
   at $iwC$$iwC.<init>(<console>:40)
   at $iwC.<init>(<console>:42)
   at <init>(<console>:44)
   at .<init>(<console>:48)
   at .<clinit>(<console>)
   at .<init>(<console>:7)
   at .<clinit>(<console>)
   at $print(<console>)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
   at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
   at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
   at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
   at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
   at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
   at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
   at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
   at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
   at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
   at org.apache.spark.repl.Main$.main(Main.scala:31)
   at org.apache.spark.repl.Main.main(Main.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 

org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


>

【问题讨论】:

你可以阅读类似的问题:***.com/questions/27299923/… 【参考方案1】:

我再次检查了所有依赖项和环境变量。 实际路径“file:///home/..../.. .txt”将从本地文件系统获取数据,因为 hadoop env.sh 文件的默认文件系统设置为 fs.defaultFs。 如果我们将 Spark-env.sh 保留为默认值而不做任何更改,它会在遇到“file://...”时使用本地文件系统,而当路径为“hdfs://..”时会使用 hdfs 如果您特别需要任何文件系统导出 HADOOP_CONF_DIR 到 spark-env.sh 它将支持 Hadoop 支持的任何文件系统。 这是我的观察。接受任何更正或建议。谢谢

【讨论】:

【参考方案2】:

尝试改变

val inputFile = sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

到这里:

val inputFile = sc.textFile("file:///Users/swaapnika/Desktop/to do list")

我对 hadoop 和 spark 也很陌生,但据我所知,当在 Windows 上本地运行 spark 时,传递给 sc.textFile 的字符串 file:/// 已经引用了 C:\

【讨论】:

是的,我认为这可能是一个错字,但即使是后者也给了我同样的错误。我们可以做的就是在已经有一个正在运行的 hadoop 版本时设置 hadoop 和 spark 依赖项。默认FS。否则在 Hadoop_conf_DIR 中设置它【参考方案3】:

你定义的文件路径不正确。

尝试改变

sc.textFile("file///C:/Users/swaapnika/Desktop/to do list")

sc.textFile("file://C:/Users/swaapnika/Desktop/to do list")

sc.textFile("C:/Users/swaapnika/Desktop/to do list") 

【讨论】:

是的,我认为这可能是一个错字,但即使是后者也给了我同样的错误。我们可以做的就是在已经有一个正在运行的 hadoop 版本时设置 hadoop 和 spark 依赖项。默认FS。否则将其设置在 Hadoop_conf_DIR 中。谢谢 Jem【参考方案4】:

在集群中运行 spark 时会发生此错误。当您提交作业以触发集群时,集群管理器(YARN 或 Mesos 或任何)会将其提交给工作节点。当工作节点试图找到我们需要加载到 spark 中的文件的路径时,它会失败,因为工作节点没有这样的文件。所以尝试在本地模式下运行 spark-shell 再试一次,

\bin\spark-shell --master local

sc.textFile("file:///C:/Users/swaapnika/Desktop/to do list")

如果这有帮助,请告诉我。

【讨论】:

你能把你面临的错误发给我吗?以及你用来启动 spark shell 的命令?

以上是关于使用 sc.textFile() 加载本地文件以触发的主要内容,如果未能解决你的问题,请参考以下文章

pySpark加载数据

Spark RDD 操作实战之文件读取

Spark 使用 sc.textFile ("s3n://...) 从 S3 读取文件

spark配置

将文件名信息附加到由 sc.textFile 初始化的 RDD

spark textFile 困惑与解释