Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错
Posted
技术标签:
【中文标题】Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错【英文标题】:Pyspark - Error while copying files from local(Edge node) to HDFS location 【发布时间】:2020-03-09 21:51:43 【问题描述】:我正在尝试使用以下代码将文件从本地复制到 pyspark 中的 HDFS 位置。
sc = SparkSession.builder.appName("HDFSPySparkRead").getOrCreate()
sparkCont = sc.sparkContext
URI = sparkCont._jvm.java.net.URI
Path = sparkCont._jvm.org.apache.hadoop.fs.Path
FileSystem = sparkCont._jvm.org.apache.hadoop.fs.FileSystem
Configuration = sparkCont._jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://gcgmwdcuat:8020"), Configuration())
fs.copyFromLocalFile('/home/ak18191/ar_new.txt', 'hdfs://gcgmwdcuat:8020/user/ak18191/')
得到错误:
py4j.Py4JException:方法 copyFromLocalFile([class java.lang.String, class java.lang.String]) 不存在
我也试过下面的代码
import subprocess
cmd = 'hdfs dfs -put -f /home/ak18191/ar_new.txt hdfs://gcgmwdcuat:8020/user/ak18191/'
subprocess.call(cmd)
但获取文件未找到错误。该命令在 spark-shell 中成功执行。请帮忙!
【问题讨论】:
【参考方案1】:solution1 -子进程
def copy_from_local(local_file, hdfs_file, logger):
import subprocess
proc = subprocess.Popen(["hdfs", "dfs", "-copyFromLocal", "-f", local_file, hdfs_file])
proc.communicate()
if proc.returncode != 0:
logger.info("copyFromLocal to error".format(local_file, hdfs_file))
return False
else:
logger.info("copyFromLocal to success".format(local_file, hdfs_file))
return True
解决方案2 -py4j
def copy_from_local_file(sc, logger, local_file, hdfs_file, delSrc=True, overwrite=True):
# copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
Path = sc._jvm.org.apache.hadoop.fs.Path
try:
getFileSystem(sc).copyFromLocalFile(delSrc, overwrite, Path(local_file), Path(hdfs_file))
logger.info("copyFromLocal to success".format(local_file, hdfs_file))
except Exception as e:
logger.error(e)
logger.info("copyFromLocal to error".format(local_file, hdfs_file))
def getFileSystem(sc):
# Prepare a FileSystem manager
FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
return fs
你可以得到py4j jvm FileSystem 对象并在上面进行文件操作
getFileSystem(sc) = JavaObject DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_15601292_17, ugi=jrsyb@HADOOP.COM (auth:KERBEROS)]]
access = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e7d0>
addCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432610>
addCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435610>
addDelegationTokens = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44323d0>
allowSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432990>
append = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c90>
areSymlinksEnabled = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cf10>
cancelDeleteOnExit = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432850>
clearStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e410>
close = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb90>
closeAll = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432c10>
closeAllForUGI = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432150>
completeLocalOutput = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435190>
concat = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e450>
copyFromLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e650>
copyToLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e990>
create = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432790>
createEncryptionZone = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432310>
createNewFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44321d0>
createNonRecursive = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432f90>
createSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e290>
createSymlink = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a50>
delete = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435390>
deleteOnExit = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec10>
deleteSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e110>
disallowSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee50>
enableSymlinks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ed90>
equals = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ef90>
exists = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432050>
finalizeUpgrade = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432890>
get = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ea10>
getAclStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a90>
getAllStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e8d0>
getBlockSize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e1d0>
getCanonicalServiceName = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cdd0>
getChildFileSystems = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435410>
getClass = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432290>
getClient = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44358d0>
getConf = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435750>
getContentSummary = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44357d0>
getCorruptBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432550>
getDataNodeStats = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432950>
getDefaultBlockSize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d50>
getDefaultReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e210>
getDefaultUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435490>
getDelegationToken = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c50>
getDiskStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee10>
getEZForPath = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435710>
getFileBlockLocations = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e2d0>
getFileBlockStorageLocations = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e890>
getFileChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d10>
getFileLinkStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435a90>
getFileStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e250>
getFileSystemClass = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e850>
getHomeDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44326d0>
getInotifyEventStream = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432690>
getLength = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b90>
getLinkTarget = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cfd0>
getLocal = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ef10>
getMissingBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435450>
getName = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432350>
getNamed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ea90>
getRawCapacity = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435950>
getRawUsed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e690>
getReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e150>
getScheme = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec90>
getServerDefaults = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432650>
getSnapshotDiffReport = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e0d0>
getSnapshottableDirListing = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435a50>
getStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432490>
getStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cf50>
getStoragePolicies = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432e90>
getUnderReplicatedBlocksCount = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ced0>
getUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44350d0>
getUsed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432f10>
getWorkingDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44329d0>
getXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432410>
getXAttrs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432b10>
globStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435810>
hashCode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435510>
initialize = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44320d0>
isDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435150>
isFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ed10>
isFileClosed = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435650>
isInSafeMode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e4d0>
listCacheDirectives = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442edd0>
listCachePools = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435210>
listCorruptFileBlocks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432ad0>
listEncryptionZones = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432750>
listFiles = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435690>
listLocatedStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb50>
listStatus = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432590>
listXAttrs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e550>
makeQualified = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b50>
metaSave = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442cd50>
mkdir = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e310>
mkdirs = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e5d0>
modifyAclEntries = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435b10>
modifyCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435590>
modifyCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435350>
moveFromLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432510>
moveToLocalFile = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e510>
newInstance = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432bd0>
newInstanceLocal = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432710>
notify = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e910>
notifyAll = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435250>
open = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432210>
printStatistics = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e190>
recoverLease = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432fd0>
refreshNodes = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44355d0>
removeAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ce50>
removeAclEntries = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e390>
removeCacheDirective = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432a10>
removeCachePool = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e710>
removeDefaultAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ce10>
removeXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ec50>
rename = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442ee90>
renameSnapshot = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432d90>
resolvePath = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e9d0>
restoreFailedStorage = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432810>
rollEdits = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432dd0>
rollingUpgrade = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44324d0>
saveNamespace = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435c10>
setAcl = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44352d0>
setBalancerBandwidth = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e3d0>
setConf = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44359d0>
setDefaultUri = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432ed0>
setOwner = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e050>
setPermission = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442eb10>
setQuota = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432c90>
setReplication = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435990>
setSafeMode = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435890>
setStoragePolicy = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44354d0>
setTimes = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435050>
setVerifyChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432b90>
setWorkingDirectory = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432e50>
setWriteChecksum = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44351d0>
setXAttr = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b44328d0>
startLocalOutput = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4432190>
supportsSymlinks = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b4435ad0>
toString = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4dbcbdc10>
wait = JavaMember <py4j.java_gateway.JavaMember object at 0x7ff4b442e750>
【讨论】:
【参考方案2】:copyFromLocalFile
接受两个 Path 对象,而不是字符串
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html
【讨论】:
【参考方案3】:去掉-f参数:
import subprocess
cmd = 'hdfs dfs -put /home/ak18191/ar_new.txt hdfs://gcgmwdcuat:8020/user/ak18191/'
subprocess.call(cmd)
如果仍然找不到文件,请确保通过在计算机上运行以下命令来正确键入文件名:hdfs dfs -ls /home/ak18191/ar_new.txt
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#put
【讨论】:
以上是关于Pyspark - 将文件从本地(边缘节点)复制到 HDFS 位置时出错的主要内容,如果未能解决你的问题,请参考以下文章
如何将文件从 S3 复制到 Amazon EMR HDFS?