FTP文件上传到HDFS上

Posted yy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FTP文件上传到HDFS上相关的知识,希望对你有一定的参考价值。

在做测试数据时,往往会有ftp数据上传到hdfs的需求,一般需要手动操作,这样做太费事,于是有了下边代码实现的方式:

ftp数据上传到hdfs函数:

import java.io.InputStream;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * Created by Administrator on 11/10/2017.
 */
public class FtpUtil {
    /**
     * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
     *
     * @param ip
     * @param username
     * @param password
     * @param filePath
     * @param outputPath
     * @param conf
     * @return
     * @author qiyongkang
     * @since JDK 1.8
     */
    public static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath, String outputPath, Configuration conf) {
        FTPClient ftp = new FTPClient();
        InputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        boolean flag = true;
        try {
            ftp.connect(ip);
            ftp.login(username, password);
            ftp.setFileType(FTP.BINARY_FILE_TYPE);
            ftp.setControlEncoding("UTF-8");
            int reply = ftp.getReplyCode();
            if (!FTPReply.isPositiveCompletion(reply)) {
                ftp.disconnect();
            }
            FTPFile[] files = ftp.listFiles(filePath);
            FileSystem hdfs = FileSystem.get(conf);
            for (FTPFile file : files) {
                if (!(file.getName().equals(".") || file.getName().equals(".."))) {
                    inputStream = ftp.retrieveFileStream(filePath + file.getName());
                    outputStream = hdfs.create(new Path(outputPath + file.getName()));
                    IOUtils.copyBytes(inputStream, outputStream, conf, false);
                    if (inputStream != null) {
                        inputStream.close();
                        ftp.completePendingCommand();
                    }
                }
            }
            ftp.disconnect();
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }
}

main调用函数:

import org.apache.hadoop.conf.Configuration

/**
  * Created by Administrator on 11/10/2017.
  */
object FtpDownToHdfsMain {
  def main(args: Array[String]): Unit = {
    val conf = new Configuration()
    FtpUtil.loadFromFtpToHdfs("192.168.1.23", "test", "abc123", "/www/input/", "/user/jr/dt/fblib/", conf)
  }
}

使用yarn jar提交:

yarn jar myapp.jar

 

以上是关于FTP文件上传到HDFS上的主要内容,如果未能解决你的问题,请参考以下文章

Python-Socketserver实现FTP,文件上传下载

java Ftp上传创建多层文件的代码片段

从ftp上传文件到hadoop的一个坑

更新github上代码

hdfs-over-ftp使用说明

在批处理文件中将文本文件上传到FTP,该文件具有随机生成的文件名