SpringBoot集成Sqoop1.4.7实现从Mysql导入数据到HDFS

Posted 程序员超时空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot集成Sqoop1.4.7实现从Mysql导入数据到HDFS相关的知识,希望对你有一定的参考价值。

这里写自定义目录标题

背景介绍

网上关于SpringBoot集成Sqoop1.4.7的教程很多,譬如:
文章1
文章2
文章3

看过后基本都有个大概的了解,但是实操过程中,我可能遇到了所有可能出现的问题。虽然每个问题网上基本都能找到答案,但是实在过于分散,所以我这里整个汇总一下,也记录一下我的心酸历程。

问题1:Sqoop1和Sqoop2有什么区别

Sqoop2的版本是1.99.x,引入了Sqoop server,以及多种访问方式:CLI,Web UI,REST API,方便编程进行远程调用处理。但是需要搭建Server,比较复杂,而且Cloudera明确不再支持Sqoop2了。
Sqoop1的版本是1.4.x,使用CM安装CDH后貌似是默认自带的。他就是一个本地的工具,可以直接使用命令行进行数据迁移(mysql,HDFS,HBase,Hive等多种数据源)操作,并且定制任务。
介绍Sqoop1的文章也是大把,随便搜,譬如:
文章4

问题2:可以在Windows下运行吗?需要在Windows下安装Haddop吗

前面的贴的教程里有几个就是直接在Windows上跑的。不需要在Windows下安装Haddop,但是需要下载一些必要的文件(只需要hadoop.dll和winutils.exe)。参考下面的教程:
文章5
文章6

我是从这里下载的https://github.com/cdarlint/winutils

问题3:无法下载org.apache.sqoop:sqoop:1.4.7

        <dependency>
            <groupId>org.apache.sqoop</groupId>
            <artifactId>sqoop</artifactId>
            <version>1.4.7</version>
        </dependency>

浏览Maven仓库,https://repo1.maven.org/maven2/org/apache/sqoop/sqoop/1.4.7/,发现jar包名字中包含hadoop260字样,所以我直接手动下载,然后重命名了一下,去掉了hadoop260

或者使用<classifier>hadoop260</classifier>来描述这个依赖,不知道为什么网上的教程都没有提到这个。难道他们都可以直接下载成功

问题4:log4j的依赖冲突

按照教程写的pom.xml会产生依赖冲突,如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/administrator/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/administrator/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

java.lang.IllegalStateException: Failed to load ApplicationContext

使用Maven Helper可以搜索查看发生冲突的依赖,然后右键选中一个Jar包将其排除掉,比较方便。网上也有各种教程实现全局排除依赖。

问题5:执行本地MapReduce时遇到NoClassDefFoundError

警告: Can't initialize javac processor due to (most likely) a class loader problem: java.lang.NoClassDefFoundError: com/sun/tools/javac/processing/JavacProcessingEnvironment

使用Idea + JDK1.8的时候,默认是没有加载tools.jar的,需要手动配置一下
参考教程:
文章7

问题6: 程序包org.apache.hadoop.io不存在

2020-10-09 17:18:16.204  INFO 2072 --- [           main] org.apache.sqoop.orm.CompilationManager  : $HADOOP_MAPRED_HOME is not set
	mpsqoop-administratorcompile455f3e126b635a71d8e1990478638bccid_type.java:37: 错误: 无法访问Writable
public class id_type extends SqoopRecord  implements DBWritable, Writable 
       ^
  找不到org.apache.hadoop.io.Writable的类文件
	mpsqoop-administratorcompile455f3e126b635a71d8e1990478638bccid_type.java:7: 错误: 程序包org.apache.hadoop.io不存在
import org.apache.hadoop.io.BytesWritable;
                           ^
	mpsqoop-administratorcompile455f3e126b635a71d8e1990478638bccid_type.java:8: 错误: 程序包org.apache.hadoop.io不存在
import org.apache.hadoop.io.Text;
                           ^
	mpsqoop-administratorcompile455f3e126b635a71d8e1990478638bccid_type.java:9: 错误: 程序包org.apache.hadoop.io不存在
import org.apache.hadoop.io.Writable;
                           ^
	mpsqoop-administratorcompile455f3e126b635a71d8e1990478638bccid_type.java:205: 错误: 找不到符号
  public void parse(Text __record) throws RecordParser.ParseError 
                    ^
  符号:   类 Text
  位置: 类 id_type
4 个错误
2020-10-09 17:18:20.382 ERROR 2072 --- [           main] org.apache.sqoop.tool.ImportTool         : Import failed: java.io.IOException: Error returned by javac
	at org.apache.sqoop.orm.CompilationManager.compile(CompilationManager.java:226)
	at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
	at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:501)

参考教程:
文章8

我采用的方式是增加--hadoop-mapred-home,如下

	String[] argument = new String[]
			"--connect", "jdbc:mysql://x.x.x.x:8306/db?useSSL=false",
			"--driver", "com.mysql.jdbc.Driver",
			"--username", "root",
			"--password", "root",
			"--table", "id_type",
			"--columns", "id,name",
			"-m", "1",
			"--target-dir", "hdfs://x.x.x.x:8021/user/test_import"
			"--hadoop-mapred-home", "c:\\Users\\administrator\\.m2\\repository\\org\\apache\\hadoop"
	;

问题7:无权访问HDFS目录

2020-10-09 18:41:39.068 ERROR 8508 --- [           main] org.apache.sqoop.tool.ImportTool         : Import failed: org.apache.hadoop.security.AccessControlException: Permission denied: user=administrator, access=EXECUTE, inode="/user":mapred:supergroup:drwxrwx---
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:279)
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:260)

问题显而易见,因为程序是在windows上跑的,取得的是windows用户,对于HDFS指定的目录并没有访问权限。继续上网上教程,这个作者的思路和我很一致,我们不喜欢简单粗暴的直接修改目录权限,或者禁用权限检查,还是寻求指定用户的方式来解决问题。
文章9

所以直接在代码里加入下面的代码来切换任务执行的用户,mapred是有权限的用户。顺便把本地生成的临时的java文件的目录名字也一起改了。

    System.setProperty("user.name", "mapred"); //java临时文件的目录名字中的用户名
    System.setProperty("HADOOP_USER_NAME", "mapred"); //Import任务使用的用户

感兴趣的童鞋可以去d: mpsqoop-mapredcompile看看里面都是啥。
注:这个目录在d盘是因为我的工程是在d盘。

问题8:createBlockOutputStream错误

2020-10-10 01:11:07.508  INFO 15916 --- [      Thread-15] org.apache.hadoop.hdfs.DataStreamer      : Exception in createBlockOutputStream

java.net.ConnectException: Connection timed out: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
	at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:259)
	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1699)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1655)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)

2020-10-10 01:11:07.509  WARN 15916 --- [      Thread-15] org.apache.hadoop.hdfs.DataStreamer      : Abandoning BP-1349808031-x.x.x.214-1601445395722:blk_1073756551_15727
2020-10-10 01:11:07.664  WARN 15916 --- [      Thread-15] org.apache.hadoop.hdfs.DataStreamer      : Excluding datanode DatanodeInfoWithStorage[x.x.x.216:50010,DS-01f93abb-0cf2-4d7d-8b81-e316e8b640eb,DISK]

Sqoop在上传一个文件的多个副本的时候超时,我的环境总共有3个DataNode,都没有成功,上面的异常信息也打印了三遍。读了一下代码后知道,Sqoop是把MySQL数据拉取到本地,并本地执行MapReduce任务对数据进行处理,然后从参数中指定的HDFS的NameNode中读取各种配置信息,包括都有哪些DataNode,需要多少个副本等,然后在一一从本地往DataNode中传输。从日志里看到他用的是内网IP,x.x.x.214x.x.x.216:50010,那自然是不会成功的了,因为运行这个程序的是我的windows本机,和hadoop集群完全不在一起。
继续上网上教程:
文章10
不过我只是在client端的java代码中做了如下配置。

conf.set("dfs.client.use.datanode.hostname", "true");

服务器端的hdfs-site.xmldfs.client.use.datanode.hostname仍然是false,但是所有xml中的配置已然都是使用的hostname了,譬如

  <property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>false</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address</name>
    <value>hadoop1:8021</value>
  </property>
  <property>
    <name>dfs.https.address</name>
    <value>hadoop1:50470</value>
  </property>

这个地方我没有去深究,可能是因为我用CM安装的吧。

问题9:sqoop无法连接MySQL

这个问题和java调用无关,是我在hadoop集群上使用命令行时遇到的,这个一般童鞋应该是遇不到的。

sqoop list-databases --connect 'jdbc:mysql://x.x.x.x:8306/wsbsdb?useSSL=false' --username sqoop --password "xxxx$xxxx"

无法成功连接,提示Access denied for user 'sqoop'@'x.x.x.x' (using password: YES)。经过几个小时对比测试,最后发现是密码里包含了$符号,sqoop不认识了。要是我早点把这个命令贴在Markdown里就好了,一眼就看到这个着色问题了。

解惑

  1. SpringBoot中集成的Sqoop是直接和Hadoop集群交互,和Hadoop集群中是否安装Sqoop没有关系。如前面提到的,Sqoop是把MySQL数据拉取到本地(这个本地就是跑这个SpringBoot程序的,和hadoop集群可以是完全独立的),然后本地执行MapReduce任务对数据进行处理,然后连接参数中指定的HDFS的Namenode服务,从中读取各种配置信息,包括都有哪些Datanode,需要多少个副本等,然后在一一从本地往Datanode中传输。

  2. conf中设置fs.default.name的作用

    Configuration conf = new Configuration();
    conf.set("fs.default.name", "hdfs://x.x.x.x:8021"); 
    Sqoop sqoop = new Sqoop(sqoopTool, SqoopTool.loadPlugins(conf));
    

对于conf.set("fs.default.name", "hdfs://x.x.x.x:8021");这句,有人说是要指定本机地址,有人说是远程hdfs地址,而且必须设置,否则无法成功。经过测试验证,这个应该是远程hdfs地址,如果导入参数里的--target-dir设置的是/user/test_import,并没有携带hdfs地址信息,就需要使用fs.default.name来指定hdfs地址。如果--target-dir设置的是hdfs://y.y.y.y:8021/user/test_import,就不需要fs.default.name了。因为导入hive和hbase时,导入参数里不能像hdfs那样携带地址信息,所以应该是一定要设置fs.default.name了的,有待验证。

最后总结,还是要自己看代码,看了代码后才能使用更加准确的关键字去搜索相关问题,否则搜都搜不到。我就是有几个问题是在自己看了代码后,才搜到相关教程。

附上完整的代码:
pom.xml

    <!-- SqoopTest need start -->
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.42</version>
	</dependency>

	<!--hadoop-->
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>2.10.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>2.10.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-mapreduce-client-core</artifactId>
		<version>2.10.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-mapreduce-client-common</artifactId>
		<version>2.10.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
		<version>2.10.0</version>
		<scope>test</scope>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.avro</groupId>
		<artifactId>avro-mapred</artifactId>
		<version>1.8.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.avro</groupId>
		<artifactId>avro</artifactId>
		<version>1.8.1</version>
	</dependency>

	<!-- scoop -->
	<dependency>
		<groupId>org.apache.sqoop</groupId>
		<artifactId>sqoop</artifactId>
		<version>1.4.7</version>
		<classifier>hadoop260</classifier>
	</dependency>
	<!-- SqoopTest need end -->

SqoopTest.java

/*
 * <p>
 *
 * @author       zdw
 * @since        2020/10/8 17:33
 */

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;

import java.io.IOException;

public class SqoopTest 

    public void testImport() throws IOException 
        System.out.println("begin test sqoop");

        // 设置用于拼接存放MapReduce临时java文件的目录的名字的用户名称
        System.setProperty("user.name", "mapred");
        // 设置Job执行的用户身份
        System.setProperty("HADOOP_USER_NAME", "mapred");

        String[] argument = new String[]
                "--connect", "jdbc:mysql://x.x.x.x:8306/resident_info?useSSL=false",
                "--driver", "com.mysql.jdbc.Driver",
                "--username", "root",
                "--password", "xxx",
                "--table", "id_type",
                "--columns", "id,name",
                "--null-string", "na",
                "--null-non-string", "na",
                "-m", "1",
                "--target-dir", "hdfs://y.y.y.y:8021/user/test_import",
                // 指向依赖的各种jar包的位置
                "--hadoop-mapred-home", "c:\\Users\\administrator\\.m2\\repository\\org\\apache\\hadoop"
        ;
        com.cloudera.sqoop.tool.SqoopTool sqoopTool = (com.cloudera.sqoop.tool.SqoopTool) SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        // HDFS地址,用于删除已经存在的文件夹
        conf.set("fs.default.name", "hdfs://y.y.y.y:8021");
        // 使用hostname来连接DataNode
        conf.set("dfs.client.use.datanode.hostname", "true");
        // 读取插件
        Sqoop sqoop = new Sqoop(sqoopTool, SqoopTool.loadPlugins(conf));
        // 访问HDFS文件系统,并删除指定文件夹
        FileSystem fileSystem = FileSystem.get(conf);
        Path path = new Path("/user/test_import");
        if (fileSystem.exists(path)) 
            fileSystem.delete(path, true); // true的意思是,就算output有东西,也一带删除
        
        int res = Sqoop.runSqoop(sqoop, argument);
        System.out.println("--------" + res);
        System.out.println("执行sqoop结束");
    

以上是关于SpringBoot集成Sqoop1.4.7实现从Mysql导入数据到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

Sqoop配置

四十centos安装sqoop(使用Sqoop完成MySQL和HDFS之间的数据互导)

SpringBoot从入门到精通(三十四)如何集成JWT实现Token验证

springboot集成shiro实现用户登录认证

Springboot zuul2集成

springboot集成schedule(深度理解)