如何使用StreamSets从MySQL增量更新数据到Hive

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用StreamSets从MySQL增量更新数据到Hive相关的知识,希望对你有一定的参考价值。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


Fayson的github:https://github.com/fayson/cdhproject


提示:代码块部分可以左右滑动查看噢


1.文档编写目的




在前面Fayson介绍了《​​如何在CDH中安装和使用StreamSets​​》,通过StreamSets实现数据采集,在实际生产中需要实时捕获mysql、Oracle等其他数据源的变化数据(简称CDC)将变化数据实时的写入大数据平台的Hive、HDFS、HBase、Solr、Elasticserach等。本篇文章主要介绍如何使用使用StreamSets通过JDBC的方式实时抽取增量数据到Hive。


StreamSets实现的流程如下:


如何使用StreamSets从MySQL增量更新数据到Hive_数据


  • 内容概述

1.环境准备

2.创建StreamSets的Pipeline流程

3.Pipeline流程测试


  • 测试环境

1.StreamSets版本为3.1.2.0

2.CM和CDH版本为5.13.1

3.MariaDB版本为5.5.44


2.环境准备




1.准备测试表和数据


[root@cdh4 ~]# mysql -uroot -p
Enter password:
MariaDB [(none)]> use test;
MariaDB [test]> create table test_to_hive (s1 varchar(20),s2 varchar(20));
MariaDB [test]> insert into test_to_hive values ("1","aaa");
MariaDB [test]> insert into test_to_hive values ("2","bbb");
MariaDB [test]> select * from test_to_hive;

(可左右滑动)


如何使用StreamSets从MySQL增量更新数据到Hive_数据_02


3.创建StreamSets的Pipline




1.创建新的管道流


如何使用StreamSets从MySQL增量更新数据到Hive_数据_03


配置错误日志输入路径,这里配置到本地的/tmp/sdctest(需要自己创建)目录下


如何使用StreamSets从MySQL增量更新数据到Hive_hive_04

如何使用StreamSets从MySQL增量更新数据到Hive_hive_05

2.添加JDBC查询者


如何使用StreamSets从MySQL增量更新数据到Hive_数据_06


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_07

如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_08

如何使用StreamSets从MySQL增量更新数据到Hive_hive_09


3.执行预览检查


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_10


查看结果如下


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_11


4.添加Hive Metadata 

将JDBC 链接到 Hive Metadata 配置hive 的JDBC URL


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_12


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_13


配置数据库和要生成的表名,这里我们没有分区,删掉分区


如何使用StreamSets从MySQL增量更新数据到Hive_数据_14


选择Avro 格式


如何使用StreamSets从MySQL增量更新数据到Hive_hive_15


5.将Hive Metadata 输出到 HiveMetastore

将Hive Metadata的 Metadata 链接到Hive Metastore


如何使用StreamSets从MySQL增量更新数据到Hive_hive_16


修改配置


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_17


6.将Hive Metadata的data 输出到HDFS 上

将Hive Metadata的 data链接到Hadoop FS 1


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_18


如何使用StreamSets从MySQL增量更新数据到Hive_数据_19

如何使用StreamSets从MySQL增量更新数据到Hive_hive_20

如何使用StreamSets从MySQL增量更新数据到Hive_数据_21


7.统一选择包版本

这里测试环境是CDH 5.13

HiveMetadata


如何使用StreamSets从MySQL增量更新数据到Hive_hive_22


Hadoop FS


如何使用StreamSets从MySQL增量更新数据到Hive_数据_23


Hive Metastore


如何使用StreamSets从MySQL增量更新数据到Hive_hive_24


8.校验并执行

点击校验,返回成功后点击执行


如何使用StreamSets从MySQL增量更新数据到Hive_hive_25


执行后可以看到有2条数据输入和输出,这与我们测试数据库的数据相符合


如何使用StreamSets从MySQL增量更新数据到Hive_数据_26


去HUE 页面查看hive 表中的数据,发现已经更新进来


如何使用StreamSets从MySQL增量更新数据到Hive_数据_27


4.Pipeline流程测试




1.去mysql 中增加数据并查看


如何使用StreamSets从MySQL增量更新数据到Hive_hive_28


查看管道流信息发现输入输出数量变成了4


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_29


去HUE 中查看hive 表的数据,跟mysql 中同步,说明增量更新成功


如何使用StreamSets从MySQL增量更新数据到Hive_hive_30


5.常见问题




1.校验的时候找不到 hive.conf.HiveConf

出现如下异常,说明包不对,参考 2.8步骤重置配置


如何使用StreamSets从MySQL增量更新数据到Hive_数据_31


2.JDBC 链接不通

新建文件夹/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-3.0.0.0/sdc-extras,

并赋予给用户sdc


mkdir /opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-3.0.0.0/sdc-extras
chown sdc:sdc /opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-3.0.0.0/sdc-extras

(可左右滑动)


如何使用StreamSets从MySQL增量更新数据到Hive_数据_32


上传JDBC驱动


如何使用StreamSets从MySQL增量更新数据到Hive_hive_33


上传成功后如下:


如何使用StreamSets从MySQL增量更新数据到Hive_数据_34


在CM中配置StreamSets包的路径


export STREAMSETS_LIBRARIES_EXTRA_DIR="/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/sdc-extras/"

(可左右滑动)


如何使用StreamSets从MySQL增量更新数据到Hive_数据_35


如果存在权限问题还需要配置安全策略给该目录授权:


grant codebase "file:///opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/sdc-extras/-" 
permission java.security.AllPermission;
;

(可左右滑动)


如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_36



提示:代码块部分可以左右滑动查看噢


为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。



推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

如何使用StreamSets从MySQL增量更新数据到Hive_cloudera_37

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操


以上是关于如何使用StreamSets从MySQL增量更新数据到Hive的主要内容,如果未能解决你的问题,请参考以下文章

StreamSet的环境的初始化

Hive 中的增量更新

MySQL 从分配自动增量 ID 的表插入并更新第三个表中的 FK

如何使用 Sqoop 从 MySQL 增量导入到 Hive?

关于streamsets的相关问题总结

如何在 Jython Evaluator 中获取 StreamSets 记录字段类型