Pyspark Dataframe:无法保存为 Hive 表
Posted
技术标签:
【中文标题】Pyspark Dataframe:无法保存为 Hive 表【英文标题】:Pyspark Dataframe: Unable to Save As Hive Table 【发布时间】:2017-01-30 21:17:09 【问题描述】:我正在尝试在数据帧上使用“SaveAsTable” - 我们的配置单元元存储在外部 RDS 中,我正在尝试将数据存储在 S3 中 - 但失败并出现以下错误:
py4j.protocol.Py4JJavaError:调用 o87.saveAsTable 时出错。 :java.net.NoRouteToHostException:没有路由到主机从 ip-10-174-20-142/10.174.20.142 到 ip-10-174-26-239.ec2.internal:8020 套接字超时异常失败:java.net .NoRouteToHostException:没有到主机的路由;更多详情见:http://wiki.apache.org/hadoop/NoRouteToHost
这是完整的代码和错误:
[hbohra@ip-10-174-20-142 ~]$ pyspark --files /etc/hive/conf/hive-site.xml
Python 2.7.12 (default, Sep 1 2016, 22:14:00)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/01/30 21:05:39 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.2
/_/
Using Python version 2.7.12 (default, Sep 1 2016 22:14:00)
SparkSession available as 'spark'.
>>> from pyspark.sql.functions import lit
>>> df = sqlContext.read.json('s3://dl-rawdata-dev/reporting/impressions/platform/module/context/2017-01-27T00-00')
>>> asset_df = df.withColumn('cust_id', df['key']['cust_id']).withColumn('platform', lit('platform')).withColumn('context', lit('context')).withColumn('module', lit('context')).withColumn('impressions',df['metric']['impressions']).withColumn('orders', df['metric']['orders']).withColumn('subtotal', df['metric']['subtotal']).withColumn('adfee', df['metric']['adfee']).withColumn('clicks', df['metric']['clicks']).withColumn('views', df['metric']['views']).withColumn('report_date', lit('2017-01-27')).drop('key').drop('metric')
>>> asset_df.write.format('orc').partitionBy('report_date', 'platform').saveAsTable('hbohra.reporting', path='s3://dl-data-assets-dev/hbohra.db/reporting/', mode='overwrite')
17/01/30 21:06:07 WARN command.CreateDataSourceTableUtils: Persisting partitioned data source relation `hbohra`.`reporting` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. Input path(s):
s3://dl-data-assets-dev/hbohra.db/reporting
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 585, in saveAsTable
self._jwrite.saveAsTable(name)
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o87.saveAsTable.
: java.net.NoRouteToHostException: No Route to Host from ip-10-174-20-142/10.174.20.142 to ip-10-174-26-239.ec2.internal:8020 failed on socket timeout exception: java.net.NoRouteToHostException: No route to host; For more details see: http://wiki.apache.org/hadoop/NoRouteToHost
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:758)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy12.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy13.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:703)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply$mcV$sp(HiveExternalCatalog.scala:185)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply(HiveExternalCatalog.scala:152)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createTable$1.apply(HiveExternalCatalog.scala:152)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:152)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:226)
at org.apache.spark.sql.execution.command.CreateDataSourceTableUtils$.createDataSourceTable(createDataSourceTables.scala:504)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:259)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:378)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.NoRouteToHostException: No route to host
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:614)
at org.apache.hadoop.ipc.Client$Connection.setupiostreams(Client.java:712)
at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
at org.apache.hadoop.ipc.Client.call(Client.java:1451)
... 48 more
【问题讨论】:
【参考方案1】:我认为这里的问题是 Hive 元存储中的表是使用不同的集群创建的。
我们遇到了同样的问题,在我们的 EC2 仪表板中找不到堆栈跟踪中提到的主机。
这个管理(非外部)表的link explains 保留了对文件系统 uri(其中包含不再存在的主机名)的引用。
【讨论】:
【参考方案2】:您是否关注了 wiki 条目?有帮助吗?
更新给任何投票否决的人
Hadoop 网络堆栈错误消息在几年前进行了重新设计,因此每个低级网络故障都是caught and wrapped to add the bit Sun left out: src & dest 主机名和端口,同时,[我们放入了一个 wiki 链接。这样不熟悉NoRouteToHost 之类的东西的人就有了一个很好的常见原因列表。当我们找到获取堆栈的新方法时,像我这样的人会添加新的。
作为该补丁的共同作者,当人们提交错误报告等关于某些不工作的问题时,我真的很绝望,实际上没有遵循 wiki 链接。他们正在跳过我们添加到 OSS 项目中的自助诊断和故障排除的核心部分。
正如 wiki 条目所述,“您的网络,您的问题”。您将不得不弄清楚发生了什么,(主机名,端口)对是找出服务不工作的好线索,所以是时候使用低级工具,如 telnet 和 ping。
然后关闭:异常包括故障诊断信息和指向 hadoop wiki 条目的链接。点击链接
【讨论】:
鉴于堆栈跟踪通过我相信我编写的代码行,我必须恭敬地不同意。添加了更多细节以准确解释我相信如此。以上是关于Pyspark Dataframe:无法保存为 Hive 表的主要内容,如果未能解决你的问题,请参考以下文章
如何以 xml 格式保存 pyspark sql DataFrame
Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame
如何在pyspark中更改DataFrame的hdfs块大小
当我在 AWS EMR Studio 中使用 saveAsTable 保存 PySpark DataFrame 时,它会保存在哪里?