sqoop常用模板案例整理

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sqoop常用模板案例整理相关的知识,希望对你有一定的参考价值。

使用sqoop导入/导出数据

首先,导入和导出是相对与hdfs来说的,将数据添加到hdfs中,称之为导入import;反之为导出export。

导出(导出到mysql)

需要准备要数据和MySQL表

#数据
1	sunwukong	male
2	qiqi	female
3	beijita	male
4	buerma	female
5	kelin	male
6	bike	male
--建表
create table sqp_people(
id int,
name varchar(20),
gender varchar(5)
);

执行命令如下:

sqoop export \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_people \\
-m 1 \\
--export-dir /study/sqoop/hdfstomysql \\
--fields-terminated-by '\\t' 

注意项:每一句需要加空格和\\,且\\后面不能再有空格

​ connect 需要连接到MySQL的数据库(这里是mypra)

​ 分别写MySQL的username和passwor

​ 接着是表名(这里是sqp_people)

​ 导出使用的是export-dir 后面跟的是hdfs的路径

​ 最后是\\t 是分隔符

​ 执行结束之后,查询表,验证结果

select * from sqp_people;

---------------结果如下--------
mysql> select *from sqp_people;
+------+-----------+--------+
| id   | name      | gender |
+------+-----------+--------+
|    1 | sunwukong | male   |
|    2 | qiqi      | female |
|    3 | beijita   | male   |
|    4 | buerma    | female |
|    5 | kelin     | male   |
|    6 | bike      | male   |
+------+-----------+--------+

导入(从MySQL到hdfs)

实现准备了一张teacherinfo的表,以它为例,导入到hdfs中,语句如下:

全量导入

sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table teacherinfo \\
--m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_all \\
--fields-terminated-by '\\t' \\
--lines-terminated-by '\\n'

delete上面和之前相同,不做赘述。

delete-target-dir代表,如果下面的目录已经有同名的会删除

target-dir代表会创建这个给文件

fields-terminated-by 指定列之间的分隔符

lines-terminated-by 指定行之间的分隔符

列裁剪导出

有时候并不需要MySQL里所有的数据,这时候就需要对数据进行裁剪,裁剪分为:列裁剪和行裁剪

以下是列裁剪

sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table studentinfo \\
--columns StuId,StuName,StuAge \\
-m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_colcut \\
--fields-terminated-by '\\t' \\
--lines-terminated-by '\\n'

columns 就是选择列名

行列裁剪+多个reducer

sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table studentinfo \\
--columns StuId,StuName,StuAge,StuSex \\
--where "StuId <=7" \\
--m 2 \\
--split-by StuSex \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_rowcut \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'


sqoop import \\
-connect jdbc:mysql://singlenick:3306/schoolPra \\
-username root \\
-password @ZWHzwh123 \\
-table studentinfo \\
-columns StuId,StuName,StuAge,StuSex \\
-where "StuId <=7" \\
-m 2 \\
-split-by StuSex \\
-delete-target-dir \\
-target-dir /study/sqoop/m2h_rowcut \\
-fields-terminated-by ',' \\
-lines-terminated-by '\\n'

代码写了两遍是因为我发现,每句话前面写一个中划线 和 写两个中划线 是一样的,那就不用特别去记忆了。但是习惯上都是写两个中划线的。

where 是对行裁剪

m 是启动多个reducer,这需要写split-by,就像分区一样,这里就根据StuSex分为两个区,也就是两个文件。

sql语句方式导入

通过sqL语句的方式,可以同时完成行列的裁剪,语句稍微简单一些

sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--query "select StuId,StuName,StuAge from studentinfo where \\$CONDITIONS " \\
--m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_sqlcut \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

需要注意的是:sql 语句的最后一定要写where(可以加条件,也可以不加,但是where必须写),后面的\\$CONDITIONS是必须要写的,不然会报错

增量导入append

追加,增量导入,常用于数据更新的时候

sqoop import \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--m 1 \\
--check-column id \\
--incremental append \\
--last-value 0 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

这时候把mysql里的数据更新一下,

sqoop import \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--m 1 \\
--check-column id \\
--incremental append \\
--last-value 6 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

check-column是用于检查的字段

last-value是上一次的最后一个值,如果之前没使用过,那么会创建一个文件,如果第二次使用append 会根据last-value增加,并放在新的文件里,(类似于分区,由于合并文件的代价比较大,因此直接放在不同的文件)。

增量导入merge

融入,当信息更新改动之后使用

new_staff
1       AAA     male
2       BBB     male
3       CCC     male
4       DDD     male

old_staff
1       AAA     female
2       CCC     female
3       BBB     female
6       DDD     female

经过merge之后
结果:
1     AAA      MALE
2     BBB       MALE
3     CCC       MALE
4     DDD      MALE
6     DDD      FEMALE

具体操作如下:

sqoop merge \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--check-column id \\
--incremental append \\
--last-value 11 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

sqoop merge \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--check-column id \\
--incremental append \\
--last-value 11 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

增量导入lastmodified

为了模拟场景,现在mysql里建一个表,然后插入数据

create table sqp_incr_time(
incrName varchar(20),
incrTime timestamp
);

insert into sqp_incr_time(incrName) values('wangwang'),('huihui'),('tongtong'),('jingjing');

然后使用sqoop导入到hdfs中

sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS"  \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastm \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--last-value '0000-00-00 00:00:00'

可以在hdfs中查到已经导入的数据,这时候更新mysql的数据

​ 需要注意的是,时间是不能超过现在的时间的,这也符合工作场景,到了一定的时间才会去更新导入数据

insert into sqp_incr_time(incrName,incrTime) values('zwh','2021-06-29 13:25:22'),('zwj','2021-06-29 13:34:22'),('mei','2021-06-29 13:27:22'),('ju','2021-06-29 13:16:22');

​ 插入数据后,再次导入,这时候的last_value 就可以根据需要来写了

sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS"  \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastmodified \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--append \\
--last-value '2021-06-29 13:34:22'

分区单表分区导入

原来Mysql中的表,导到Hive中,并建立分区表

先新建Mysql中的表,并导入数据

create table sqp_partition(
id int,
name varchar(20),
dotime datetime
);

insert into sqp_partition(id,name,dotime) values
(1,'sunwukong','2021-06-30 12:13:12'),
(2,'beijita','2021-06-30 12:15:12'),
(3,'bike','2021-06-30 12:10:12'),
(4,'kelin','2021-06-30 12:11:12'),
(5,'sunwufan','2021-06-30 12:09:12'),
(6,'guixianren','2021-06-30 12:01:12');

insert into sqp_partition(id,name,dotime) values
(7,'sunwukong','2021-06-29 12:13:12'),
(8,'beijita','2021-06-29 12:15:12'),
(9,'bike','2021-06-29 12:10:12'),
(10,'kelin','2021-06-29 12:11:12'),
(11,'sunwufan','2021-06-29 12:09:12'),
(12,'guixianren','2021-06-29 12:01:12');

在hive中建分区表

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

create table sqp_partition(
id int,
name string,
dotime timestamp
)
partitioned by (dodate date)
row format delimited
fields terminated by ','
lines terminated by '\\n'
stored as textfile;

然后执行sqoop命令

sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='2021-06-30'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=2021-06-30 \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

#然后这hive中添加分区
alter table sqp_partition add partition(dodate='2021-06-30');
# 把日期更换就可以根据日期导表

sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='2021-06-29'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=2021-06-29 \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

# 然后在hive中添加分区
alter table sqp_partition add partition(dodate='2021-06-29');

就可以在/user/hive/warehouse/test.db/sqp_partition目录里查到所有数据

hive中可以查看到所有的数据

基于以上可以模拟成一个案例

写一个自动脚本,每天建一个分区,自动导数据

#!/bin/bash
source /etc/profile
DATE=`date -d '-1 day' +%F`

sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='$DATE'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=$DATE \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'

hive -e "alter table test.sqp_partition add partition(dodate='$DATE')"

只要执行以上的脚本,就可以完成需求

sqoop job

创建sqoop job

模板如下

sqoop job \\
--create job_m2hv_par \\
-- import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS"  \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastmodified \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--append \\
--last-value '2021-06-29 13:34:22'

查看job

sqoop job --list

删除Job

sqoop job --delete job_name

查看job定义

sqoop job --show job_name

执行job

sqoop job --exec job_name

以上是关于sqoop常用模板案例整理的主要内容,如果未能解决你的问题,请参考以下文章

吴恩达《机器学习》课程总结单变量线性回归

ETL工具sqoop

Sqoop的简单使用案例

sqoop example

ATP应用测试平台——使用EasyExcel实现excel导入导出多sheet填充模板下载等功能案例实战

sqoop连接MySQL失败解决案例