FTP文件上传到HDFS上
Posted yy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FTP文件上传到HDFS上相关的知识,希望对你有一定的参考价值。
在做测试数据时,往往会有ftp数据上传到hdfs的需求,一般需要手动操作,这样做太费事,于是有了下边代码实现的方式:
ftp数据上传到hdfs函数:
import java.io.InputStream; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPReply; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** * Created by Administrator on 11/10/2017. */ public class FtpUtil { /** * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/> * * @param ip * @param username * @param password * @param filePath * @param outputPath * @param conf * @return * @author qiyongkang * @since JDK 1.8 */ public static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath, String outputPath, Configuration conf) { FTPClient ftp = new FTPClient(); InputStream inputStream = null; FSDataOutputStream outputStream = null; boolean flag = true; try { ftp.connect(ip); ftp.login(username, password); ftp.setFileType(FTP.BINARY_FILE_TYPE); ftp.setControlEncoding("UTF-8"); int reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); } FTPFile[] files = ftp.listFiles(filePath); FileSystem hdfs = FileSystem.get(conf); for (FTPFile file : files) { if (!(file.getName().equals(".") || file.getName().equals(".."))) { inputStream = ftp.retrieveFileStream(filePath + file.getName()); outputStream = hdfs.create(new Path(outputPath + file.getName())); IOUtils.copyBytes(inputStream, outputStream, conf, false); if (inputStream != null) { inputStream.close(); ftp.completePendingCommand(); } } } ftp.disconnect(); } catch (Exception e) { flag = false; e.printStackTrace(); } return flag; } }
main调用函数:
import org.apache.hadoop.conf.Configuration /** * Created by Administrator on 11/10/2017. */ object FtpDownToHdfsMain { def main(args: Array[String]): Unit = { val conf = new Configuration() FtpUtil.loadFromFtpToHdfs("192.168.1.23", "test", "abc123", "/www/input/", "/user/jr/dt/fblib/", conf) } }
使用yarn jar提交:
yarn jar myapp.jar
以上是关于FTP文件上传到HDFS上的主要内容,如果未能解决你的问题,请参考以下文章