如何使用StreamSets实现MySQL中变化数据实时写入Kudu

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用StreamSets实现MySQL中变化数据实时写入Kudu相关的知识,希望对你有一定的参考价值。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。


Fayson的github:https://github.com/fayson/cdhproject


提示:代码块部分可以左右滑动查看噢


1.文档编写目的




在前面Fayson介绍了《​​如何在CDH中安装和使用StreamSets​​​》和《​​如何使用StreamSets从MySQL增量更新数据到Hive​​​》,通过StreamSets实现数据采集,在实际生产中需要实时捕获mysql、Oracle等其他数据源的变化数据(简称CDC)将变化数据实时的写入大数据平台的Hive、HDFS、HBase、Solr、Elasticserach等。在《​​如何使用StreamSets从MySQL增量更新数据到Hive​​》中,使用受限于表需要主键或者更新字段,我们在本篇文章主要介绍如何将MySQL Binary Log作为StreamSets的源,来实时捕获MySQL变化数据并将变化数据存入Kudu。


StreamSets实现的流程如下:


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据


  • 内容概述

1.环境准备

2.创建StreamSets的Pipeline流程

3.Pipeline流程测试

4.总结


  • 测试环境

1.StreamSets版本为3.1.2.0

2.CM和CDH版本为5.13.1

3.MariaDB版本为5.5.56


2.环境准备




1.开启MariaDB的Binlog日志

修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置



server-id=1
log-bin=mysql-bin
binlog_format=ROW

(可左右滑动)


注意:MySQL Binlog支持多种数据更新格式包括RowStatementmixRowStatement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_02


修改完MariaDB的配置后重启服务。


[root@ip-172-31-16-68 ~]# systemctl restart mariadb
[root@ip-172-31-16-68 ~]# systemctl status mariadb

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_03


登录MariaDB创建同步账号


GRANT ALL on maxwell.* to maxwell@% identified by 123456;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to maxwell@%;
FLUSH PRIVILEGES;

(可左右滑动)


2.StreamSets安装MySQL驱动

将MySQL的JDBC驱动拷贝至

/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_04


3.在MariaDB数据库中创建测试表


create database test;
create table cdc_test (
id int,
name varchar(32)
);

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_05


4.使用Hue创建Kudu表


create table cdc_test (
id int,
name String,
primary key(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_06


3.创建StreamSets的Pipline




1.登录StreamSets,创建一个新的Pipline


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_07


2.选择Origins类别,搜索MySQL Binary Log


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_08


配置MySQL Binary Log


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_09


配置MySQL信息


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_10


配置同步账号信息


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_11


高级配置,根据自己的需要进行配置


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_12


到此MySQL Binary Log的配置完成。


3.添加表过滤的Stream Selector


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_13


Stream Selector基本配置


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_14


配置分流条件


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_15


4.添加插入类型分流的Stream Selector


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_16


Stream Selector基本配置


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_17


配置分流条件


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_18


5.添加处理Delete类型日志的javascript Evaluator

该JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_19


配置基本属性


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_20


配置JavaScript脚本,脚本如下:


for(var i = 0; i < records.length; i++) 
try
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value[OldData];
newRecord.value.Type = records[i].value[Type];
newRecord.value.Database = records[i].value[Database];
newRecord.value.Table = records[i].value[Table];
log.info(records[i].value[Type])
output.write(newRecord);
catch (e)
// Send record to error
error.write(records[i], e);

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_21


6.添加处理INSRET和UPDATE类型日志的JavaScript Evaluator

该JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_22


配置基本属性


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_23


配置JavaScript脚本,脚本如下:


for(var i = 0; i < records.length; i++) 
try
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value[Data];
newRecord.value.Type = records[i].value[Type];
newRecord.value.Database = records[i].value[Database];
newRecord.value.Table = records[i].value[Table];
log.info(records[i].value[Type])
output.write(newRecord);
catch (e)
// Send record to error
error.write(records[i], e);

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_24


7.为JavaScript Evaluator-DELETE添加Kudu


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_25


配置Kudu基本属性


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_26


配置Kudu环境


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_27


Kudu的高级配置,Fayson这里使用的是默认配置


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_28


8.为JavaScript Evaluator-UPSERT添加Kudu


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_29


配置基础属性


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_30


配置Kudu环境


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_31


Kudu高级配置


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_32


9.流程创建完成后,启动该Pipelines


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_33

如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_34


4.Pipeline流程测试




1.登录MariaDB数据库,向cdc_test表中插入数据


insert into cdc_test values(1, fayson);

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_35


查看StreamSets的Pipeline实时状态


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_36


可以看到Kudu-Upsert成功的处理了一条数据


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_37


使用Hue查看Kudu表数据


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_38


数据成功的插入到Kudu的cdc_test表中。

2.登录MariaDB数据库修改cdc_test表中数据


update cdc_test set name=fayson-update where id=1;

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_39


查看StreamSets的Pipeline实时状态


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_40


可以看到Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_41


使用Hue查看Kudu的cdc_test表


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_42


3.登录MariaDB数据,删除cdc_test表中数据


delete from cdc_test where id=1;

(可左右滑动)


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_43


查看StreamSets的Pipeline实时状态


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_44


可以看到Kudu-Delete成功处理一条日志


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_45


使用Hue查看Kudu的cdc_test表,id为1的数据已不存在


如何使用StreamSets实现MySQL中变化数据实时写入Kudu_数据_46


5.总结




  • 实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave。
  • 向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据。
  • JavaScript脚本需要注意在解析每一条Record是需要使用其内置的Function,在示例中Fayson将MySQL Binary Log复杂的JSON数据解析重组为简单的Map对象,这里就省去了Kudu入库时“Field to Column Mapping”的映射,需要去确保组装的Map数据中Key与Kudu表中的column字段一致。
  • 在Kudu插入数据时指定Kudu表名需要注意,如果使用Impala创建的表,则需要加上impala的前缀格式impala:<database>:<table>。



提示:代码块部分可以左右滑动查看噢


为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。



推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

如何使用StreamSets实现MySQL中变化数据实时写入Kudu_mysql_47

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操


以上是关于如何使用StreamSets实现MySQL中变化数据实时写入Kudu的主要内容,如果未能解决你的问题,请参考以下文章

使用StreamSets提供接口 实现零代码微服务

使用StreamSets提供接口 实现零代码微服务

使用StreamSets提供接口 实现零代码微服务

使用StreamSets提供接口 实现零代码微服务

使用StreamSets提供接口 实现零代码微服务

如何在 Jython Evaluator 中获取 StreamSets 记录字段类型