使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据
Posted
技术标签:
【中文标题】使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据【英文标题】:Can't load .csv data in a local Spark connection when using sparklyr 【发布时间】:2019-01-29 07:57:11 【问题描述】:背景:我对整个 Spark 平台和概念完全陌生,我正在尝试通过R
和sparklyr
学习如何操作它。我开始学习有关该主题的在线课程,并尝试将其用于我自己的数据分析作为学习它的一种方式。
问题:我正在尝试加载 6.3gb 的 csv 数据集(约 30 百万行,约 20 列),但出现以下错误(据我所知,相同的块不断重复他们自己,我在这里给出前 3 个,否则我会达到帖子的字符数限制)。代码运行但 17 分钟后退出并出现以下错误(未加载数据):
Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
The currently active SparkContext was created at:
org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:716)
at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:715)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
at org.apache.spark.SparkContext.parallelize(SparkContext.scala:715)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at sparklyr.Invoke.invoke(invoke.scala:139)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Unknown Source)
Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
这是我的 R 代码:
library(sparklyr)
spark_install(version = "2.1.0")
sc <- spark_connect(master = "local")
testdata = spark_read_csv(sc, name = "testdata", path = ...)
在 Spark 之外,我可以加载文件,例如使用read_csv
。我用谷歌搜索了这个问题,有人提到潜在的原因是 OutOfMemory 问题 - 我不太确定这是否是问题以及如何解决它。
如果有人能指出如何调试和修复它,我将不胜感激!
干杯
【问题讨论】:
【参考方案1】:一些事情:
除非您使用微小数据,否则您应该忘记local
模式。它主要是为测试和小规模实验而设计的,而不是用于处理中等大小的数据。
由于它仅对驱动程序和执行程序代码使用单个 JVM,因此存在严重故障恢复的可能性,并且如果处理中断,您可能会丢失整个会话(这里似乎就是这种情况)。
因此,如果您想在中等大小的数据上进行本地测试,请考虑使用standalone mode,否则只需缩减数据集。
附带说明local
模式仅使用一个处理线程。即使对于测试,使用local[n]
(用于n
线程)或local[*]
(用于所有可用内核)也更有意义。
准备好调整配置,因为默认值非常保守 - 例如 spark.driver.memory
默认为 1 GB - 在独立模式下您可能会侥幸成功,但当所有组件都嵌入单个 JVM。
不要相信 sparklyr 默认值。
sparklyr 开发人员做出了一个非常不幸的选择,即默认将数据急切地缓存在内存中。它不仅违反了 Spark 的默认设置(for a reason,使用MEMORY_AND_DISK
来表示Dataset
API)并且很少为实际大小的数据提供任何实际好处,而且还以一些丑陋的方式干扰了 Spark 优化器(最明显的是阻止了投影和选择下推)。
因此,请养成在适用时使用memory = FALSE
的习惯:
spark_read_csv(sc, name = "testdata", memory = FALSE, path = ...)
为读者提供模式,而不是使用模式推断。见SparklyR: Convert directly to parquet
【讨论】:
非常感谢,这很有帮助!我看到我必须投入更多时间来理解这一点 - 如果您对易于理解的指南或其他资源有建议,请告诉我。另一个问题:当涉及到您所描述的使用“本地”的内容时,实际上什么被认为是微小数据?在这种情况下,我的 3000 万行/12 列被认为是小/中还是大?干杯 我个人选择的路径是学习核心(即 Scala API,O'Reilly 有很多不错的书籍,由 Spark 贡献者提供,EPFL 在 Coursera 上有不错的 MOOC),然后转向 sparklyr,但我意识到这可能不是最吸引人的路径。除此之外,不要被熟悉的 API 所欺骗——它是你最大的敌人。就中等而言,它是主观和模糊的,但是您可以尝试将事物与可用内存进行比较——如果原始数据的大小与可用 RAM 的数量级相同,我将其称为“中等”。对于学习,我至少会少用几个数量级。以上是关于使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 sparklyr 为 Apache Spark 实现 Stanford CoreNLP 包装器?
Sparklyr:如何将列表列分解为Spark表中自己的列?
将 spark 数据帧转换为 sparklyR 表“tbl_spark”