sqoop常用模板案例整理
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sqoop常用模板案例整理相关的知识,希望对你有一定的参考价值。
使用sqoop导入/导出数据
首先,导入和导出是相对与hdfs来说的,将数据添加到hdfs中,称之为导入import;反之为导出export。
导出(导出到mysql)
需要准备要数据和MySQL表
#数据
1 sunwukong male
2 qiqi female
3 beijita male
4 buerma female
5 kelin male
6 bike male
--建表
create table sqp_people(
id int,
name varchar(20),
gender varchar(5)
);
执行命令如下:
sqoop export \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_people \\
-m 1 \\
--export-dir /study/sqoop/hdfstomysql \\
--fields-terminated-by '\\t'
注意项:每一句需要加空格和\\,且\\后面不能再有空格
connect 需要连接到MySQL的数据库(这里是mypra)
分别写MySQL的username和passwor
接着是表名(这里是sqp_people)
导出使用的是export-dir 后面跟的是hdfs的路径
最后是\\t 是分隔符
执行结束之后,查询表,验证结果
select * from sqp_people;
---------------结果如下--------
mysql> select *from sqp_people;
+------+-----------+--------+
| id | name | gender |
+------+-----------+--------+
| 1 | sunwukong | male |
| 2 | qiqi | female |
| 3 | beijita | male |
| 4 | buerma | female |
| 5 | kelin | male |
| 6 | bike | male |
+------+-----------+--------+
导入(从MySQL到hdfs)
实现准备了一张teacherinfo的表,以它为例,导入到hdfs中,语句如下:
全量导入
sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table teacherinfo \\
--m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_all \\
--fields-terminated-by '\\t' \\
--lines-terminated-by '\\n'
delete上面和之前相同,不做赘述。
delete-target-dir代表,如果下面的目录已经有同名的会删除
target-dir代表会创建这个给文件
fields-terminated-by 指定列之间的分隔符
lines-terminated-by 指定行之间的分隔符
列裁剪导出
有时候并不需要MySQL里所有的数据,这时候就需要对数据进行裁剪,裁剪分为:列裁剪和行裁剪
以下是列裁剪
sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table studentinfo \\
--columns StuId,StuName,StuAge \\
-m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_colcut \\
--fields-terminated-by '\\t' \\
--lines-terminated-by '\\n'
columns 就是选择列名
行列裁剪+多个reducer
sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--table studentinfo \\
--columns StuId,StuName,StuAge,StuSex \\
--where "StuId <=7" \\
--m 2 \\
--split-by StuSex \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_rowcut \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
sqoop import \\
-connect jdbc:mysql://singlenick:3306/schoolPra \\
-username root \\
-password @ZWHzwh123 \\
-table studentinfo \\
-columns StuId,StuName,StuAge,StuSex \\
-where "StuId <=7" \\
-m 2 \\
-split-by StuSex \\
-delete-target-dir \\
-target-dir /study/sqoop/m2h_rowcut \\
-fields-terminated-by ',' \\
-lines-terminated-by '\\n'
代码写了两遍是因为我发现,每句话前面写一个中划线 和 写两个中划线 是一样的,那就不用特别去记忆了。但是习惯上都是写两个中划线的。
where 是对行裁剪
m 是启动多个reducer,这需要写split-by,就像分区一样,这里就根据StuSex分为两个区,也就是两个文件。
sql语句方式导入
通过sqL语句的方式,可以同时完成行列的裁剪,语句稍微简单一些
sqoop import \\
--connect jdbc:mysql://singlenick:3306/schoolPra \\
--username root \\
--password @ZWHzwh123 \\
--query "select StuId,StuName,StuAge from studentinfo where \\$CONDITIONS " \\
--m 1 \\
--delete-target-dir \\
--target-dir /study/sqoop/m2h_sqlcut \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
需要注意的是:sql 语句的最后一定要写where(可以加条件,也可以不加,但是where必须写),后面的\\$CONDITIONS是必须要写的,不然会报错
增量导入append
追加,增量导入,常用于数据更新的时候
sqoop import \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--m 1 \\
--check-column id \\
--incremental append \\
--last-value 0 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
这时候把mysql里的数据更新一下,
sqoop import \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--m 1 \\
--check-column id \\
--incremental append \\
--last-value 6 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
check-column是用于检查的字段
last-value是上一次的最后一个值,如果之前没使用过,那么会创建一个文件,如果第二次使用append 会根据last-value增加,并放在新的文件里,(类似于分区,由于合并文件的代价比较大,因此直接放在不同的文件)。
增量导入merge
融入,当信息更新改动之后使用
new_staff
1 AAA male
2 BBB male
3 CCC male
4 DDD male
old_staff
1 AAA female
2 CCC female
3 BBB female
6 DDD female
经过merge之后
结果:
1 AAA MALE
2 BBB MALE
3 CCC MALE
4 DDD MALE
6 DDD FEMALE
具体操作如下:
sqoop merge \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--check-column id \\
--incremental append \\
--last-value 11 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
sqoop merge \\
--connect jdbc:mysql://singlenick:3306/mypra \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_people where \\$CONDITIONS " \\
--check-column id \\
--incremental append \\
--last-value 11 \\
--target-dir /study/sqoop/sqp_append \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
增量导入lastmodified
为了模拟场景,现在mysql里建一个表,然后插入数据
create table sqp_incr_time(
incrName varchar(20),
incrTime timestamp
);
insert into sqp_incr_time(incrName) values('wangwang'),('huihui'),('tongtong'),('jingjing');
然后使用sqoop导入到hdfs中
sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS" \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastm \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--last-value '0000-00-00 00:00:00'
可以在hdfs中查到已经导入的数据,这时候更新mysql的数据
需要注意的是,时间是不能超过现在的时间的,这也符合工作场景,到了一定的时间才会去更新导入数据
insert into sqp_incr_time(incrName,incrTime) values('zwh','2021-06-29 13:25:22'),('zwj','2021-06-29 13:34:22'),('mei','2021-06-29 13:27:22'),('ju','2021-06-29 13:16:22');
插入数据后,再次导入,这时候的last_value 就可以根据需要来写了
sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS" \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastmodified \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--append \\
--last-value '2021-06-29 13:34:22'
分区单表分区导入
原来Mysql中的表,导到Hive中,并建立分区表
先新建Mysql中的表,并导入数据
create table sqp_partition(
id int,
name varchar(20),
dotime datetime
);
insert into sqp_partition(id,name,dotime) values
(1,'sunwukong','2021-06-30 12:13:12'),
(2,'beijita','2021-06-30 12:15:12'),
(3,'bike','2021-06-30 12:10:12'),
(4,'kelin','2021-06-30 12:11:12'),
(5,'sunwufan','2021-06-30 12:09:12'),
(6,'guixianren','2021-06-30 12:01:12');
insert into sqp_partition(id,name,dotime) values
(7,'sunwukong','2021-06-29 12:13:12'),
(8,'beijita','2021-06-29 12:15:12'),
(9,'bike','2021-06-29 12:10:12'),
(10,'kelin','2021-06-29 12:11:12'),
(11,'sunwufan','2021-06-29 12:09:12'),
(12,'guixianren','2021-06-29 12:01:12');
在hive中建分区表
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
create table sqp_partition(
id int,
name string,
dotime timestamp
)
partitioned by (dodate date)
row format delimited
fields terminated by ','
lines terminated by '\\n'
stored as textfile;
然后执行sqoop命令
sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='2021-06-30'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=2021-06-30 \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
#然后这hive中添加分区
alter table sqp_partition add partition(dodate='2021-06-30');
# 把日期更换就可以根据日期导表
sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='2021-06-29'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=2021-06-29 \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
# 然后在hive中添加分区
alter table sqp_partition add partition(dodate='2021-06-29');
就可以在/user/hive/warehouse/test.db/sqp_partition目录里查到所有数据
hive中可以查看到所有的数据
基于以上可以模拟成一个案例
写一个自动脚本,每天建一个分区,自动导数据
#!/bin/bash
source /etc/profile
DATE=`date -d '-1 day' +%F`
sqoop import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--table sqp_partition \\
--where "cast(dotime as date)='$DATE'" \\
-m 1 \\
--delete-target-dir \\
--target-dir /user/hive/warehouse/test.db/sqp_partition/dodate=$DATE \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n'
hive -e "alter table test.sqp_partition add partition(dodate='$DATE')"
只要执行以上的脚本,就可以完成需求
sqoop job
创建sqoop job
模板如下
sqoop job \\
--create job_m2hv_par \\
-- import \\
--connect jdbc:mysql://singlenick:3306/test \\
--username root \\
--password @ZWHzwh123 \\
--query "select * from sqp_incr_time where \\$CONDITIONS" \\
-m 1 \\
--target-dir /study/sqoop/m2h_incr_lastmodified \\
--fields-terminated-by ',' \\
--lines-terminated-by '\\n' \\
--check-column incrTime \\
--incremental lastmodified \\
--append \\
--last-value '2021-06-29 13:34:22'
查看job
sqoop job --list
删除Job
sqoop job --delete job_name
查看job定义
sqoop job --show job_name
执行job
sqoop job --exec job_name
以上是关于sqoop常用模板案例整理的主要内容,如果未能解决你的问题,请参考以下文章