如何从 pyspark 访问 org.apache.hadoop.fs.FileUtil?

Posted

技术标签:

【中文标题】如何从 pyspark 访问 org.apache.hadoop.fs.FileUtil?【英文标题】:How to access org.apache.hadoop.fs.FileUtil from pyspark? 【发布时间】:2016-08-18 19:37:00 【问题描述】:

我正在尝试直接从 pyspark shell 访问 org.apache.hadoop.fs.FileUtil.unTar

我知道我可以访问底层虚拟机(通过 py4j)sc._jvm 来执行此操作,但我很难真正连接到 hdfs(尽管我的 pyspark 会话完全可以正常工作,并且能够跨集群运行作业针对集群内的作业)。

例如:

hdpUntar = sc._jvm.org.apache.hadoop.fs.FileUtil.unTar
hdpFile = sc._jvm.java.io.File

root    = hdpFile("hdfs://<url>/user/<file>")
target  = hdpFile("hdfs://<url>/user/myuser/untar")

hdpUntar(root, target)

很遗憾,这不起作用:

Py4JJavaError: An error occurred while calling z:org.apache.hadoop.fs.FileUtil.unTar.

: ExitCodeException exitCode=128: tar: Cannot connect to hdfs: resolve failed
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
    at org.apache.hadoop.fs.FileUtil.unTarUsingTar(FileUtil.java:675)
    at org.apache.hadoop.fs.FileUtil.unTar(FileUtil.java:651)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

后来,从 scala 中尝试过——看起来代码只是在本地通过管道输出。 【参考方案1】:

这应该适用于数据块:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("dbfs:///"), Configuration())

status = fs.listStatus(Path('dbfs:/mnt/gobidatagen/tpch/sf100_parquet/'))

for fileStatus in status:
    print(fileStatus.getPath())

【讨论】:

以上是关于如何从 pyspark 访问 org.apache.hadoop.fs.FileUtil?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 错误实例化 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"

如何将 pyspark-dataframe 写入红移?

NoSuchMethodException:Pyspark 模型加载中的 org.apache.spark.ml.classification.GBTClassificationModel

PySpark 中的 org.apache.spark.ml.feature.Tokenizer NPE

pyspark.sql 无法实例化 HiveMetaStoreClient - noclassfound from org.apache.commons.dbcp.connectionfactory

org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'