Sqoop基础

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sqoop基础相关的知识,希望对你有一定的参考价值。

Flume基础

概述

和Flume不同,Sqoop的底层是MapReduce(快要被淘汰),高度依赖MapReduce和YARN。是一个纯离线的数据采集工具,只能用于离线业务。离线数据处理时经常会丢失业务数据中发生修改的数据,导致信息收集不全,离线数据处理的弊端很大,随着性能提升,必将被实时数据处理取代。

功能

用于实现mysql等RDBMS数据库于HDFS之间的数据导入与导出(相对HDFS而言),导入就是MySQL→HDFS,导出就是HDFS→MySQL。

本质

底层是MapReduce,由于基本是做ETL数据清洗之类的操作,大多数情况是三大阶段的MapReduce。

导入阶段:

  • Input:DBInputFormat:读MySQL
  • Output:TextOutputFormat:写HDFS

导出阶段:

  • Input:TextInputFormat:读HDFS
  • Output:DBOutputFormat:写MySQL

应用

  • 数据同步:定期将离线的数据进行采集同步到数据仓库中
    • 全量:每次都采集所有数据
    • 增量:每次只采集最新的数据,大部分都是增量处理
  • 数据迁移:将历史数据(MySQL、Oracle等RDBMS)存储到HDFS中
    • 全量:第一次一定是全量的

展示数据库

sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456

执行后:

[root@node3 ~]# sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
21/05/07 21:15:21 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.0
21/05/07 21:15:21 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
21/05/07 21:15:21 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
hivemetadata
hue
mysql
nev
oozie
performance_schema
scrm
teach

效果还不错。。。

Sqoop导入

Sqoop导入HDFS

MySQL数据准备

create database sqoopTest;
use sqoopTest;

CREATE TABLE `tb_tohdfs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL,
  `age` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tb_tohdfs values(null,"laoda",18);
insert into tb_tohdfs values(null,"laoer",19);
insert into tb_tohdfs values(null,"laosan",20);
insert into tb_tohdfs values(null,"laosi",21);

格式

查看帮助:

sqoop import --help

内容很齐全,但是太多:

[root@node3 ~]# sqoop import --help
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../hbase does not exist! HBase imports will fail.
Please set $HBASE_HOME to the root of your HBase installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
Warning: /export/server/sqoop-1.4.6-cdh5.14.0/../zookeeper does not exist! Accumulo imports will fail.
Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
21/05/07 21:44:30 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.0
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]

Common arguments:
   --connect <jdbc-uri>                                       Specify JDBC
                                                              connect
                                                              string
   --connection-manager <class-name>                          Specify
                                                              connection
                                                              manager
                                                              class name
   --connection-param-file <properties-file>                  Specify
                                                              connection
                                                              parameters
                                                              file
   --driver <class-name>                                      Manually
                                                              specify JDBC
                                                              driver class
                                                              to use
   --hadoop-home <hdir>                                       Override
                                                              $HADOOP_MAPR
                                                              ED_HOME_ARG
   --hadoop-mapred-home <dir>                                 Override
                                                              $HADOOP_MAPR
                                                              ED_HOME_ARG
   --help                                                     Print usage
                                                              instructions
   --metadata-transaction-isolation-level <isolationlevel>    Defines the
                                                              transaction
                                                              isolation
                                                              level for
                                                              metadata
                                                              queries. For
                                                              more details
                                                              check
                                                              java.sql.Con
                                                              nection
                                                              javadoc or
                                                              the JDBC
                                                              specificaito
                                                              n
   --oracle-escaping-disabled <boolean>                       Disable the
                                                              escaping
                                                              mechanism of
                                                              the
                                                              Oracle/OraOo
                                                              p connection
                                                              managers
-P                                                            Read
                                                              password
                                                              from console
   --password <password>                                      Set
                                                              authenticati
                                                              on password
   --password-alias <password-alias>                          Credential
                                                              provider
                                                              password
                                                              alias
   --password-file <password-file>                            Set
                                                              authenticati
                                                              on password
                                                              file path
   --relaxed-isolation                                        Use
                                                              read-uncommi
                                                              tted
                                                              isolation
                                                              for imports
   --skip-dist-cache                                          Skip copying
                                                              jars to
                                                              distributed
                                                              cache
   --temporary-rootdir <rootdir>                              Defines the
                                                              temporary
                                                              root
                                                              directory
                                                              for the
                                                              import
   --throw-on-error                                           Rethrow a
                                                              RuntimeExcep
                                                              tion on
                                                              error
                                                              occurred
                                                              during the
                                                              job
   --username <username>                                      Set
                                                              authenticati
                                                              on username
   --verbose                                                  Print more
                                                              information
                                                              while
                                                              working

Import control arguments:
   --append                                                   Imports data
                                                              in append
                                                              mode
   --as-avrodatafile                                          Imports data
                                                              to Avro data
                                                              files
   --as-parquetfile                                           Imports data
                                                              to Parquet
                                                              files
   --as-sequencefile                                          Imports data
                                                              to
                                                              SequenceFile
                                                              s
   --as-textfile                                              Imports data
                                                              as plain
                                                              text
                                                              (default)
   --autoreset-to-one-mapper                                  Reset the
                                                              number of
                                                              mappers to
                                                              one mapper
                                                              if no split
                                                              key
                                                              available
   --boundary-query <statement>                               Set boundary
                                                              query for
                                                              retrieving
                                                              max and min
                                                              value of the
                                                              primary key
   --columns <col,col,col...>                                 Columns to
                                                              import from
                                                              table
   --compression-codec <codec>                                Compression
                                                              codec to use
                                                              for import
   --delete-target-dir                                        Imports data
                                                              in delete
                                                              mode
   --direct                                                   Use direct
                                                              import fast
                                                              path
   --direct-split-size <n>                                    Split the
                                                              input stream
                                                              every 'n'
                                                              bytes when
                                                              importing in
                                                              direct mode
-e,--query <statement>                                        Import
                                                              results of
                                                              SQL
                                                              'statement'
   --fetch-size <n>                                           Set number
                                                              'n' of rows
                                                              to fetch
                                                              from the
                                                              database
                                                              when more
                                                              rows are
                                                              needed
   --inline-lob-limit <n>                                     Set the
                                                              maximum size
                                                              for an
                                                              inline LOB
-m,--num-mappers <n>                                          Use 'n' map
                                                              tasks to
                                                              import in
                                                              parallel
   --mapreduce-job-name <name>                                Set name for
                                                              generated
                                                              mapreduce
                                                              job
   --merge-key <column>                                       Key column
                                                              to use to
                                                              join results
   --split-by <column-name>                                   Column of
                                                              the table
                                                              used to
                                                              split work
                                                              units
   --split-limit <size>                                       Upper Limit
                                                              of rows per
                                                              split for
                                                              split
                                                              columns of
                                                              Date/Time/Ti
                                                              mestamp and
                                                              integer
                                                              types. For
                                                              date or
                                                              timestamp
                                                              fields it is
                                                              calculated
                                                              in seconds.
                                                              split-limit
                                                              should be
                                                              greater than
                                                              0
   --table <table-name>                                       Table to
                                                              read
   --target-dir <dir>                                         HDFS plain
                                                              table
                                                              destination
   --validate                                                 Validate the
                                                              copy using
                                                              the
                                                              configured
                                                              validator
   --validation-failurehandler <validation-failurehandler>    Fully
                                                              qualified
                                                              class name
                                                              for
                                                              ValidationFa
                                                              ilureHandler
   --validation-threshold <validation-threshold>              Fully
                                                              qualified
                                                              class name
                                                              for
                                                              ValidationTh
                                                              reshold
   --validator <validator>                                    Fully
                                                              qualified
                                                              class name
                                                              for the
                                                              Validator
   --warehouse-dir <dir>                                      HDFS parent
                                                              for table
                                                              destination
   --where <where clause>                                     WHERE clause
                                                              to use
                                                              during
                                                              import
-z,--compress                                                 Enable
                                                              compression

Incremental import arguments:
   --check-column <column>        Source column to check for incremental
                                  change
   --incremental <import-type>    Define an incremental import of type
                                  'append' or 'lastmodified'
   --last-value <value>           Last imported value in the incremental
                                  check column

Output line formatting arguments:
   --enclosed-by <char>               Sets a required field enclosing
                                      character
   --escaped-by <char>                Sets the escape character
   --fields-terminated-by <char>      Sets the field separator character
   --lines-terminated-by <char>       Sets the end-of-line character
   --mysql-delimiters                 Uses MySQL's default delimiter set:
                                      fields: ,  lines: \\n  escaped-by: \\
                                      optionally-enclosed-by: '
   --optionally-enclosed-by <char>    Sets a field enclosing character

Input parsing arguments:
   --input-enclosed-by <char>               Sets a required field encloser
   --input-escaped-by <char>                Sets the input escape
                                            character
   --input-fields-terminated-by <char>      Sets the input field separator
   --input-lines-terminated-by <char>       Sets the input end-of-line
                                            char
   --input-optionally-enclosed-by <char>    Sets a field enclosing
                                            character

Hive arguments:
   --create-hive-table                         Fail if the target hive
                                               table exists
   --hive-database <database-name>             Sets the database name to
                                               use when importing to hive
   --hive-delims-replacement <arg>             Replace Hive record \\0x01
                                               and row delimiters (\\n\\r)
                                               from imported string fields
                                               with user-defined string
   --hive-drop-import-delims                   Drop Hive record \\0x01 and
                                               row delimiters (\\n\\r) from
                                               imported string fields
   --hive-home <dir>                           Override $HIVE_HOME
   --hive-import                               Import tables into Hive
                                               (Uses Hive's default
                                               delimiters if none are
                                               set.)
   --hive-overwrite                            Overwrite existing data in
                                               the Hive table
   --hive-partition-key <partition-key>        Sets the partition key to
                                               use when importing to hive
   --hive-partition-value <partition-value>    Sets the partition value to
                                               use when importing to hive
   --hive-table <table-name>                   Sets the table name to use
                                               when importing to hive
   --map-column-hive <arg>                     Override mapping for
                                               specific column to hive
                                               types.

HBase arguments:
   --column-family <family>    Sets the target column family for the
                               import
   --hbase-bulkload            Enables HBase bulk loading
   --hbase-create-table        If specified, create missing HBase tables
   --hbase-row-key <col>       Specifies which input column to use as the
                               row key
   --hbase-table <table>       Import to <table> in HBase

HCatalog arguments:
   --hcatalog-database <arg>                        HCatalog database name
   --hcatalog-home <hdir>                           Override $HCAT_HOME
   --hcatalog-partition-keys <partition-key>        Sets the partition
                                                    keys to use when
                                                    importing to hive
   --hcatalog-partition-values <partition-value>    Sets the partition
                                                    values to use when
                                                    importing to hive
   --hcatalog-table <arg>                           HCatalog table name
   --hive-home <dir>                                Override $HIVE_HOME
   --hive-partition-key <partition-key>             Sets the partition key
                                                    to use when importing
                                                    to hive
   --hive-partition-value <partition-value>         Sets the partition
                                                    value to use when
                                                    importing to hive
   --map-column-hive <arg>                          Override mapping for
                                                    specific column to
                                                    hive types.

HCatalog import specific options:
   --create-hcatalog-table             Create HCatalog before import
   --drop-and-create-hcatalog-table    Drop and Create HCatalog before
                                       import
   --hcatalog-storage-stanza <arg>     HCatalog storage stanza for table
                                       creation

Accumulo arguments:
   --accumulo-batch-size <size>          Batch size in bytes
   --accumulo-column-family <family>     Sets the target column family for
                                         the import
   --accumulo-create-table               If specified, create missing
                                         Accumulo tables
   --accumulo-instance <instance>        Accumulo instance name.
   --accumulo-max-latency <latency>      Max write latency in milliseconds
   --accumulo-password <password>        Accumulo password.
   --accumulo-row-key <col>              Specifies which input column to
                                         use as the row key
   --accumulo-table <table>              Import to <table> in Accumulo
   --accumulo-user <user>                Accumulo user name.
   --accumulo-visibility <vis>           Visibility token to be applied to
                                         all rows imported
   --accumulo-zookeepers <zookeepers>    Comma-separated list of
                                         zookeepers (host:port)

Code generation arguments:
   --bindir <dir>                             Output directory for
                                              compiled objects
   --class-name <name>                        Sets the generated class
                                              name. This overrides
                                              --package-name. When
                                              combined with --jar-file,
                                              sets the input class.
   --escape-mapping-column-names <boolean>    Disable special characters
                                              escaping in column names
   --input-null-non-string <null-str>         Input null non-string
                                              representation
   --input-null-string <null-str>             Input null string
                                              representation
   --jar-file <file>                          Disable code generation; use
                                              specified jar
   --map-column-java <arg>                    Override mapping for
                                              specific columns to java
                                              types
   --null-non-string <null-str>               Null non-string
                                              representation
   --null-string <null-str>                   Null string representation
   --outdir <dir>                             Output directory for
                                              generated code
   --package-name <name>                      Put auto-generated classes
                                              in this package

Generic Hadoop command-line arguments:
(must preceed any tool-specific arguments)
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]


At minimum, you must specify --connect and --table
Arguments to mysqldump and other subprograms may be supplied
after a '--' on the command line.

基本格式应该是这样:

usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]

显然应该指定MySQL和相关参数(url、username、password、table)及HDFS与写入位置。

导入测试

将MySQL中tb_tohdfs表的数据导入HDFS的/sqoop/import/test01目录中:

在node3:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test01

其中\\代表本条语句未结束,提前换行。查看效果:

[root@node3 ~]# hdfs dfs -cat /sqoop/import/test01/par*
1,laoda,18
2,laoer,19
3,laosan,20
4,laosi,21

看看之前的过程中:

21/05/07 21:36:02 INFO db.IntegerSplitter: Split size: 0; Num splits: 4 from: 1 to: 4

这么小的文件产生了4个Split。。。MapTask真多。。。

修改参数

将tb_tohdfs表的id和name导入HDFS的/sqoop/import/test01目录,并且用制表符分隔:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1

其中:

  • -m:指定MapTask的个数
  • –fields-terminated-by:用于指定输出的分隔符
  • –columns:指定导入哪些列
  • –delete-target-dir :提前删除输出目录

设置MapTask的个数后:

21/05/07 22:04:12 INFO mapreduce.JobSubmitter: number of splits:1

速度反倒也变快了。。。在浏览器打开192.168.88.221:50070
在这里插入图片描述
成功。。。

按条件导入

将tb_tohdfs表中的id >2的数据导入HDFS的/sqoop/import/test01目录中:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--where 'id > 2' \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1

其中:
–where :用于指定行的过滤条件

执行后:

[root@node3 ~]# hdfs dfs -cat /sqoop/import/test01/par*                                                                       
3       laosan  20
4       laosi   21

条件导入部分列

将tb_tohdfs表中的id>2的数据中id和name两列导入/sqoop/import/test01目录中:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--where 'id > 2' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1

或者使用SQL语句的方式:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 2 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1

-e,–query :使用SQL语句读取数据.只要使用SQL语句,必须在where子句中加上$CONDITIONS

Sqoop导入Hive

Hive表准备

在beeline中:

use default;
create table fromsqoop(
id int,
name string,
age int
);

直接导入

在node3中:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hive-import \\
--hive-database default \\
--hive-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1

其中:

  • –hive-import \\:表示导入Hive表
  • –hive-database default \\:表示指定导入哪个Hive的数据库
  • –hive-table fromsqoop \\:表示指定导入哪个Hive的表
  • –fields-terminated-by ‘\\001’ \\:指定Hive表的分隔符,一定要与Hive表的分隔符一致

其实底层运行了2步。。。先将MySQL的数据通过MapReduce先导入HDFS(DBInputFormat可以读取数据库(读MySQL),再用TextOutputFormat可以写出到HDFS),再将HDFS上导入的这个文件通过load命令加载到了Hive表中。

hcatalog导入

在node3中:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hcatalog-database default \\
--hcatalog-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1

这种做法底层也是2步。。。先获取Hive表的元数据,再将Hive表的目录直接作为MapReduce输出。

Sqoop增量导入

如果每天都:

sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1

会产生大量重复数据(完全没有意义的脏数据)且读取时间长浪费性能,多余的数据浪费硬盘。。。

正常方式

对某一列值进行判断,只要大于上一次的值就会被导入:


Incremental import arguments:
   --check-column <column>        Source column to check for incremental
                                  change
   --incremental <import-type>    Define an incremental import of type
                                  'append' or 'lastmodified'
   --last-value <value>           Last imported value in the incremental
                                  check column

其中:

  • –check-column :按照哪一列进行增量导入

  • –last-value:用于指定上一次的值

  • –incremental:增量的方式

append

必须有一列自增的值,按照自增的int值(∵MySQL等数据库的auto_increment列只能是int类型)进行判断。只能导入insert插入的新数据,无法导入update更新的数据(∵update更新数据时自增列的数据不会变化)。

在node3:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--check-column id \\
--incremental append \\
--last-value 1 \\
-m 1

产生新数据后:

insert into tb_tohdfs values(null,"laowu",22);
insert into tb_tohdfs values(null,"laoliu",23);
insert into tb_tohdfs values(null,"laoqi",24);
insert into tb_tohdfs values(null,"laoba",25);

可以增量更新:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1

lastmodified

必须包含动态时间变化这一列,按照数据变化的时间进行判断。既可以导入新增的数据也导入更新的数据。

MySQL准备数据:

CREATE TABLE `tb_lastmode` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `word` varchar(200) NOT NULL,
  `lastmode` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP  ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into tb_lastmode values(null,'hadoop',null);
insert into tb_lastmode values(null,'spark',null);
insert into tb_lastmode values(null,'hbase',null);

在node3采集:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-06 16:09:32' \\
-m 1

数据变化时:

  insert into tb_lastmode values(null,'hive',null);
  update tb_lastmode set word = 'sqoop' where id = 1;

可以增量保存;

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--merge-key id \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-07 16:10:38' \\
-m 1

其中:
–merge-key :按照id进行合并。

特殊方式

参照之前搞分区表的套路:

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 12 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1

这种方式必须每次将最新导入的数据放到一个目录单独存储,也不能增量更新update的数据。

Sqoop导出

Sqoop全量导出

MySQL准备数据

use sqoopTest;
CREATE TABLE `tb_url` (
  `id` int(11) NOT NULL,
  `url` varchar(200) NOT以上是关于Sqoop基础的主要内容,如果未能解决你的问题,请参考以下文章

甘道夫Sqoop1.99.3基础操作--导入Oracle的数据到HDFS

sqoop快速上手

Sqoop基础操作

[vscode]--HTML代码片段(基础版,reactvuejquery)

sqoop使用入门

ETL工具sqoop