Sqoop基础
Posted 杀智勇双全杀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sqoop基础相关的知识,希望对你有一定的参考价值。
Sqoop基础
概述
和Flume不同,Sqoop的底层是MapReduce(快要被淘汰),高度依赖MapReduce和YARN。是一个纯离线的数据采集工具,只能用于离线业务。离线数据处理时经常会丢失业务数据中发生修改的数据,导致信息收集不全,离线数据处理的弊端很大,随着性能提升,必将被实时数据处理取代。
功能
用于实现mysql等RDBMS数据库于HDFS之间的数据导入与导出(相对HDFS而言),导入就是MySQL→HDFS,导出就是HDFS→MySQL。
本质
底层是MapReduce,由于基本是做ETL数据清洗之类的操作,大多数情况是三大阶段的MapReduce。
导入阶段:
- Input:DBInputFormat:读MySQL
- Output:TextOutputFormat:写HDFS
导出阶段:
- Input:TextInputFormat:读HDFS
- Output:DBOutputFormat:写MySQL
应用
- 数据同步:定期将离线的数据进行采集同步到数据仓库中
- 全量:每次都采集所有数据
- 增量:每次只采集最新的数据,大部分都是增量处理
- 数据迁移:将历史数据(MySQL、Oracle等RDBMS)存储到HDFS中
- 全量:第一次一定是全量的
展示数据库
sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456
执行后:
[root@node3 ~]# sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
21/05/07 21:15:21 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.0
21/05/07 21:15:21 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
21/05/07 21:15:21 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
hivemetadata
hue
mysql
nev
oozie
performance_schema
scrm
teach
效果还不错。。。
Sqoop导入
Sqoop导入HDFS
MySQL数据准备
create database sqoopTest;
use sqoopTest;
CREATE TABLE `tb_tohdfs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into tb_tohdfs values(null,"laoda",18);
insert into tb_tohdfs values(null,"laoer",19);
insert into tb_tohdfs values(null,"laosan",20);
insert into tb_tohdfs values(null,"laosi",21);
格式
查看帮助:
sqoop import --help
内容很齐全,但是太多:
[root@node3 ~]# sqoop import --help
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
21/05/07 21:44:30 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.0
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]
Common arguments:
--connect <jdbc-uri> Specify JDBC
connect
string
--connection-manager <class-name> Specify
connection
manager
class name
--connection-param-file <properties-file> Specify
connection
parameters
file
--driver <class-name> Manually
specify JDBC
driver class
to use
--hadoop-home <hdir> Override
$HADOOP_MAPR
ED_HOME_ARG
--hadoop-mapred-home <dir> Override
$HADOOP_MAPR
ED_HOME_ARG
--help Print usage
instructions
--metadata-transaction-isolation-level <isolationlevel> Defines the
transaction
isolation
level for
metadata
queries. For
more details
check
java.sql.Con
nection
javadoc or
the JDBC
specificaito
n
--oracle-escaping-disabled <boolean> Disable the
escaping
mechanism of
the
Oracle/OraOo
p connection
managers
-P Read
password
from console
--password <password> Set
authenticati
on password
--password-alias <password-alias> Credential
provider
password
alias
--password-file <password-file> Set
authenticati
on password
file path
--relaxed-isolation Use
read-uncommi
tted
isolation
for imports
--skip-dist-cache Skip copying
jars to
distributed
cache
--temporary-rootdir <rootdir> Defines the
temporary
root
directory
for the
import
--throw-on-error Rethrow a
RuntimeExcep
tion on
error
occurred
during the
job
--username <username> Set
authenticati
on username
--verbose Print more
information
while
working
Import control arguments:
--append Imports data
in append
mode
--as-avrodatafile Imports data
to Avro data
files
--as-parquetfile Imports data
to Parquet
files
--as-sequencefile Imports data
to
SequenceFile
s
--as-textfile Imports data
as plain
text
(default)
--autoreset-to-one-mapper Reset the
number of
mappers to
one mapper
if no split
key
available
--boundary-query <statement> Set boundary
query for
retrieving
max and min
value of the
primary key
--columns <col,col,col...> Columns to
import from
table
--compression-codec <codec> Compression
codec to use
for import
--delete-target-dir Imports data
in delete
mode
--direct Use direct
import fast
path
--direct-split-size <n> Split the
input stream
every 'n'
bytes when
importing in
direct mode
-e,--query <statement> Import
results of
SQL
'statement'
--fetch-size <n> Set number
'n' of rows
to fetch
from the
database
when more
rows are
needed
--inline-lob-limit <n> Set the
maximum size
for an
inline LOB
-m,--num-mappers <n> Use 'n' map
tasks to
import in
parallel
--mapreduce-job-name <name> Set name for
generated
mapreduce
job
--merge-key <column> Key column
to use to
join results
--split-by <column-name> Column of
the table
used to
split work
units
--split-limit <size> Upper Limit
of rows per
split for
split
columns of
Date/Time/Ti
mestamp and
integer
types. For
date or
timestamp
fields it is
calculated
in seconds.
split-limit
should be
greater than
0
--table <table-name> Table to
read
--target-dir <dir> HDFS plain
table
destination
--validate Validate the
copy using
the
configured
validator
--validation-failurehandler <validation-failurehandler> Fully
qualified
class name
for
ValidationFa
ilureHandler
--validation-threshold <validation-threshold> Fully
qualified
class name
for
ValidationTh
reshold
--validator <validator> Fully
qualified
class name
for the
Validator
--warehouse-dir <dir> HDFS parent
for table
destination
--where <where clause> WHERE clause
to use
during
import
-z,--compress Enable
compression
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
Output line formatting arguments:
--enclosed-by <char> Sets a required field enclosing
character
--escaped-by <char> Sets the escape character
--fields-terminated-by <char> Sets the field separator character
--lines-terminated-by <char> Sets the end-of-line character
--mysql-delimiters Uses MySQL's default delimiter set:
fields: , lines: \\n escaped-by: \\
optionally-enclosed-by: '
--optionally-enclosed-by <char> Sets a field enclosing character
Input parsing arguments:
--input-enclosed-by <char> Sets a required field encloser
--input-escaped-by <char> Sets the input escape
character
--input-fields-terminated-by <char> Sets the input field separator
--input-lines-terminated-by <char> Sets the input end-of-line
char
--input-optionally-enclosed-by <char> Sets a field enclosing
character
Hive arguments:
--create-hive-table Fail if the target hive
table exists
--hive-database <database-name> Sets the database name to
use when importing to hive
--hive-delims-replacement <arg> Replace Hive record \\0x01
and row delimiters (\\n\\r)
from imported string fields
with user-defined string
--hive-drop-import-delims Drop Hive record \\0x01 and
row delimiters (\\n\\r) from
imported string fields
--hive-home <dir> Override $HIVE_HOME
--hive-import Import tables into Hive
(Uses Hive's default
delimiters if none are
set.)
--hive-overwrite Overwrite existing data in
the Hive table
--hive-partition-key <partition-key> Sets the partition key to
use when importing to hive
--hive-partition-value <partition-value> Sets the partition value to
use when importing to hive
--hive-table <table-name> Sets the table name to use
when importing to hive
--map-column-hive <arg> Override mapping for
specific column to hive
types.
HBase arguments:
--column-family <family> Sets the target column family for the
import
--hbase-bulkload Enables HBase bulk loading
--hbase-create-table If specified, create missing HBase tables
--hbase-row-key <col> Specifies which input column to use as the
row key
--hbase-table <table> Import to <table> in HBase
HCatalog arguments:
--hcatalog-database <arg> HCatalog database name
--hcatalog-home <hdir> Override $HCAT_HOME
--hcatalog-partition-keys <partition-key> Sets the partition
keys to use when
importing to hive
--hcatalog-partition-values <partition-value> Sets the partition
values to use when
importing to hive
--hcatalog-table <arg> HCatalog table name
--hive-home <dir> Override $HIVE_HOME
--hive-partition-key <partition-key> Sets the partition key
to use when importing
to hive
--hive-partition-value <partition-value> Sets the partition
value to use when
importing to hive
--map-column-hive <arg> Override mapping for
specific column to
hive types.
HCatalog import specific options:
--create-hcatalog-table Create HCatalog before import
--drop-and-create-hcatalog-table Drop and Create HCatalog before
import
--hcatalog-storage-stanza <arg> HCatalog storage stanza for table
creation
Accumulo arguments:
--accumulo-batch-size <size> Batch size in bytes
--accumulo-column-family <family> Sets the target column family for
the import
--accumulo-create-table If specified, create missing
Accumulo tables
--accumulo-instance <instance> Accumulo instance name.
--accumulo-max-latency <latency> Max write latency in milliseconds
--accumulo-password <password> Accumulo password.
--accumulo-row-key <col> Specifies which input column to
use as the row key
--accumulo-table <table> Import to <table> in Accumulo
--accumulo-user <user> Accumulo user name.
--accumulo-visibility <vis> Visibility token to be applied to
all rows imported
--accumulo-zookeepers <zookeepers> Comma-separated list of
zookeepers (host:port)
Code generation arguments:
--bindir <dir> Output directory for
compiled objects
--class-name <name> Sets the generated class
name. This overrides
--package-name. When
combined with --jar-file,
sets the input class.
--escape-mapping-column-names <boolean> Disable special characters
escaping in column names
--input-null-non-string <null-str> Input null non-string
representation
--input-null-string <null-str> Input null string
representation
--jar-file <file> Disable code generation; use
specified jar
--map-column-java <arg> Override mapping for
specific columns to java
types
--null-non-string <null-str> Null non-string
representation
--null-string <null-str> Null string representation
--outdir <dir> Output directory for
generated code
--package-name <name> Put auto-generated classes
in this package
Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|resourcemanager:port> specify a ResourceManager
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
At minimum, you must specify --connect and --table
Arguments to mysqldump and other subprograms may be supplied
after a '--' on the command line.
基本格式应该是这样:
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]
显然应该指定MySQL和相关参数(url、username、password、table)及HDFS与写入位置。
导入测试
将MySQL中tb_tohdfs表的数据导入HDFS的/sqoop/import/test01目录中:
在node3:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test01
其中\\
代表本条语句未结束,提前换行。查看效果:
[root@node3 ~]# hdfs dfs -cat /sqoop/import/test01/par*
1,laoda,18
2,laoer,19
3,laosan,20
4,laosi,21
看看之前的过程中:
21/05/07 21:36:02 INFO db.IntegerSplitter: Split size: 0; Num splits: 4 from: 1 to: 4
这么小的文件产生了4个Split。。。MapTask真多。。。
修改参数
将tb_tohdfs表的id和name导入HDFS的/sqoop/import/test01目录,并且用制表符分隔:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
其中:
- -m:指定MapTask的个数
- –fields-terminated-by:用于指定输出的分隔符
- –columns:指定导入哪些列
- –delete-target-dir :提前删除输出目录
设置MapTask的个数后:
21/05/07 22:04:12 INFO mapreduce.JobSubmitter: number of splits:1
速度反倒也变快了。。。在浏览器打开192.168.88.221:50070
:
成功。。。
按条件导入
将tb_tohdfs表中的id >2的数据导入HDFS的/sqoop/import/test01目录中:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--where 'id > 2' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
其中:
–where :用于指定行的过滤条件
执行后:
[root@node3 ~]# hdfs dfs -cat /sqoop/import/test01/par*
3 laosan 20
4 laosi 21
条件导入部分列
将tb_tohdfs表中的id>2的数据中id和name两列导入/sqoop/import/test01目录中:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--where 'id > 2' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
或者使用SQL语句的方式:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 2 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
-e,–query :使用SQL语句读取数据.只要使用SQL语句,必须在where子句中加上$CONDITIONS
。
Sqoop导入Hive
Hive表准备
在beeline中:
use default;
create table fromsqoop(
id int,
name string,
age int
);
直接导入
在node3中:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hive-import \\
--hive-database default \\
--hive-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1
其中:
- –hive-import \\:表示导入Hive表
- –hive-database default \\:表示指定导入哪个Hive的数据库
- –hive-table fromsqoop \\:表示指定导入哪个Hive的表
- –fields-terminated-by ‘\\001’ \\:指定Hive表的分隔符,一定要与Hive表的分隔符一致
其实底层运行了2步。。。先将MySQL的数据通过MapReduce先导入HDFS(DBInputFormat可以读取数据库(读MySQL),再用TextOutputFormat可以写出到HDFS),再将HDFS上导入的这个文件通过load命令加载到了Hive表中。
hcatalog导入
在node3中:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hcatalog-database default \\
--hcatalog-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1
这种做法底层也是2步。。。先获取Hive表的元数据,再将Hive表的目录直接作为MapReduce输出。
Sqoop增量导入
如果每天都:
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
会产生大量重复数据(完全没有意义的脏数据)且读取时间长浪费性能,多余的数据浪费硬盘。。。
正常方式
对某一列值进行判断,只要大于上一次的值就会被导入:
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
其中:
-
–check-column :按照哪一列进行增量导入
-
–last-value:用于指定上一次的值
-
–incremental:增量的方式
append
必须有一列自增的值,按照自增的int值(∵MySQL等数据库的auto_increment列只能是int类型)进行判断。只能导入insert插入的新数据,无法导入update更新的数据(∵update更新数据时自增列的数据不会变化)。
在node3:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--check-column id \\
--incremental append \\
--last-value 1 \\
-m 1
产生新数据后:
insert into tb_tohdfs values(null,"laowu",22);
insert into tb_tohdfs values(null,"laoliu",23);
insert into tb_tohdfs values(null,"laoqi",24);
insert into tb_tohdfs values(null,"laoba",25);
可以增量更新:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
lastmodified
必须包含动态时间变化这一列,按照数据变化的时间进行判断。既可以导入新增的数据也导入更新的数据。
MySQL准备数据:
CREATE TABLE `tb_lastmode` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(200) NOT NULL,
`lastmode` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into tb_lastmode values(null,'hadoop',null);
insert into tb_lastmode values(null,'spark',null);
insert into tb_lastmode values(null,'hbase',null);
在node3采集:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-06 16:09:32' \\
-m 1
数据变化时:
insert into tb_lastmode values(null,'hive',null);
update tb_lastmode set word = 'sqoop' where id = 1;
可以增量保存;
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--merge-key id \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-07 16:10:38' \\
-m 1
其中:
–merge-key :按照id进行合并。
特殊方式
参照之前搞分区表的套路:
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 12 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
这种方式必须每次将最新导入的数据放到一个目录单独存储,也不能增量更新update的数据。
Sqoop导出
Sqoop全量导出
MySQL准备数据
use sqoopTest;
CREATE TABLE `tb_url` (
`id` int(11) NOT NULL,
`url` varchar(200) NOT以上是关于Sqoop基础的主要内容,如果未能解决你的问题,请参考以下文章
甘道夫Sqoop1.99.3基础操作--导入Oracle的数据到HDFS