Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错

Posted

技术标签:

【中文标题】Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错【英文标题】:Pyspark - Error while copying files from local(Edge node) to HDFS location 【发布时间】:2020-03-09 21:51:43 【问题描述】:

我正在尝试使用以下代码将文件从本地复制到 pyspark 中的 HDFS 位置。

sc = SparkSession.builder.appName("HDFSPySparkRead").getOrCreate()
sparkCont = sc.sparkContext
URI           = sparkCont._jvm.java.net.URI
Path          = sparkCont._jvm.org.apache.hadoop.fs.Path
FileSystem    = sparkCont._jvm.org.apache.hadoop.fs.FileSystem
Configuration = sparkCont._jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://gcgmwdcuat:8020"), Configuration())
fs.copyFromLocalFile('/home/ak18191/ar_new.txt', 'hdfs://gcgmwdcuat:8020/user/ak18191/')

得到错误:

py4j.Py4JException:方法 copyFromLocalFile([class java.lang.String, class java.lang.String]) 不存在

我也试过下面的代码

import subprocess
cmd = 'hdfs dfs -put -f /home/ak18191/ar_new.txt hdfs://gcgmwdcuat:8020/user/ak18191/'
subprocess.call(cmd)

但获取文件未找到错误。该命令在 spark-shell 中成功执行。请帮忙!

【问题讨论】:

【参考方案1】:

solution1 -子进程


def copy_from_local(local_file, hdfs_file, logger):
    import subprocess
    proc = subprocess.Popen(["hdfs", "dfs", "-copyFromLocal", "-f", local_file, hdfs_file])
    proc.communicate()

    if proc.returncode != 0:
        logger.info("copyFromLocal  to  error".format(local_file, hdfs_file))
        return False
    else:
        logger.info("copyFromLocal  to  success".format(local_file, hdfs_file))
        return True

解决方案2 -py4j


def copy_from_local_file(sc, logger, local_file, hdfs_file, delSrc=True, overwrite=True):
    # copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
    Path = sc._jvm.org.apache.hadoop.fs.Path
    try:
        getFileSystem(sc).copyFromLocalFile(delSrc, overwrite, Path(local_file), Path(hdfs_file))
        logger.info("copyFromLocal  to  success".format(local_file, hdfs_file))
    except Exception as e:
        logger.error(e)
        logger.info("copyFromLocal  to  error".format(local_file, hdfs_file))


def getFileSystem(sc):
    # Prepare a FileSystem manager
    FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
    fs = FileSystem.get(sc._jsc.hadoopConfiguration())
    return fs

你可以得到py4j jvm FileSystem 对象并在上面进行文件操作

getFileSystem(sc) = JavaObject DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_15601292_17, ugi=jrsyb@HADOOP.COM (auth:KERBEROS)]]
 access = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e7d0>
 addCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432610>
 addCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435610>
 addDelegationTokens = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44323d0>
 allowSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432990>
 append = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c90>
 areSymlinksEnabled = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cf10>
 cancelDeleteOnExit = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432850>
 clearStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e410>
 close = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb90>
 closeAll = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432c10>
 closeAllForUGI = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432150>
 completeLocalOutput = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435190>
 concat = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e450>
 copyFromLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e650>
 copyToLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e990>
 create = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432790>
 createEncryptionZone = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432310>
 createNewFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44321d0>
 createNonRecursive = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432f90>
 createSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e290>
 createSymlink = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a50>
 delete = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435390>
 deleteOnExit = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec10>
 deleteSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e110>
 disallowSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee50>
 enableSymlinks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ed90>
 equals = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ef90>
 exists = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432050>
 finalizeUpgrade = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432890>
 get = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ea10>
 getAclStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a90>
 getAllStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e8d0>
 getBlockSize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e1d0>
 getCanonicalServiceName = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cdd0>
 getChildFileSystems = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435410>
 getClass = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432290>
 getClient = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44358d0>
 getConf = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435750>
 getContentSummary = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44357d0>
 getCorruptBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432550>
 getDataNodeStats = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432950>
 getDefaultBlockSize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d50>
 getDefaultReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e210>
 getDefaultUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435490>
 getDelegationToken = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c50>
 getDiskStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee10>
 getEZForPath = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435710>
 getFileBlockLocations = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e2d0>
 getFileBlockStorageLocations = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e890>
 getFileChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d10>
 getFileLinkStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435a90>
 getFileStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e250>
 getFileSystemClass = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e850>
 getHomeDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44326d0>
 getInotifyEventStream = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432690>
 getLength = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b90>
 getLinkTarget = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cfd0>
 getLocal = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ef10>
 getMissingBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435450>
 getName = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432350>
 getNamed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ea90>
 getRawCapacity = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435950>
 getRawUsed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e690>
 getReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e150>
 getScheme = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec90>
 getServerDefaults = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432650>
 getSnapshotDiffReport = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e0d0>
 getSnapshottableDirListing = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435a50>
 getStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432490>
 getStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cf50>
 getStoragePolicies = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432e90>
 getUnderReplicatedBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ced0>
 getUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44350d0>
 getUsed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432f10>
 getWorkingDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44329d0>
 getXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432410>
 getXAttrs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432b10>
 globStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435810>
 hashCode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435510>
 initialize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44320d0>
 isDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435150>
 isFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ed10>
 isFileClosed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435650>
 isInSafeMode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e4d0>
 listCacheDirectives = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442edd0>
 listCachePools = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435210>
 listCorruptFileBlocks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432ad0>
 listEncryptionZones = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432750>
 listFiles = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435690>
 listLocatedStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb50>
 listStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432590>
 listXAttrs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e550>
 makeQualified = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b50>
 metaSave = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cd50>
 mkdir = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e310>
 mkdirs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e5d0>
 modifyAclEntries = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b10>
 modifyCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435590>
 modifyCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435350>
 moveFromLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432510>
 moveToLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e510>
 newInstance = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432bd0>
 newInstanceLocal = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432710>
 notify = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e910>
 notifyAll = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435250>
 open = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432210>
 printStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e190>
 recoverLease = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432fd0>
 refreshNodes = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44355d0>
 removeAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ce50>
 removeAclEntries = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e390>
 removeCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a10>
 removeCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e710>
 removeDefaultAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ce10>
 removeXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec50>
 rename = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee90>
 renameSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d90>
 resolvePath = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e9d0>
 restoreFailedStorage = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432810>
 rollEdits = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432dd0>
 rollingUpgrade = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44324d0>
 saveNamespace = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c10>
 setAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44352d0>
 setBalancerBandwidth = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e3d0>
 setConf = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44359d0>
 setDefaultUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432ed0>
 setOwner = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e050>
 setPermission = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb10>
 setQuota = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432c90>
 setReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435990>
 setSafeMode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435890>
 setStoragePolicy = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44354d0>
 setTimes = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435050>
 setVerifyChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432b90>
 setWorkingDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432e50>
 setWriteChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44351d0>
 setXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44328d0>
 startLocalOutput = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432190>
 supportsSymlinks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435ad0>
 toString = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4dbcbdc10>
 wait = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e750>

【讨论】:

【参考方案2】:

copyFromLocalFile 接受两个 Path 对象,而不是字符串

https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html

【讨论】:

【参考方案3】:

去掉-f参数:

import subprocess
cmd = 'hdfs dfs -put /home/ak18191/ar_new.txt hdfs://gcgmwdcuat:8020/user/ak18191/'
subprocess.call(cmd)

如果仍然找不到文件,请确保通过在计算机上运行以下命令来正确键入文件名:hdfs dfs -ls /home/ak18191/ar_new.txt

https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#put

【讨论】:

以上是关于Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错的主要内容,如果未能解决你的问题,请参考以下文章

将文件/块从 HDFS 复制到从节点的本地文件系统

将文件从 s3:// 复制到本地文件系统

将远程文件放入hadoop而不将其复制到本地磁盘

如何将文件从 S3 复制到 Amazon EMR HDFS?

如何从 python 复制 pyspark / hadoop 中的文件

如何将节点从一个 xml 文件复制到另一个?