技术干货|如何利用 ChunJun 实现数据实时同步?
Posted 数栈DTinsight
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了技术干货|如何利用 ChunJun 实现数据实时同步?相关的知识,希望对你有一定的参考价值。
实时同步是 ChunJun 的⼀个重要特性,指在数据同步过程中,数据源与⽬标系统之间的数据传输和更新⼏乎在同⼀时间进⾏。
在实时同步场景中我们更加关注源端,当源系统中的数据发⽣变化时,这些变化会⽴即传输并应⽤到⽬标系统,以保证两个系统中的数据保持⼀致。这个特性需要作业运⾏过程中 source 插件不间断地频繁访问源端。在⽣产场景下,对于这类⻓时间运⾏、资源可预估、需要稳定性的作业,我们推荐使⽤ perjob 模式部署。
插件⽀持 JSON 脚本和 SQL 脚本两种配置⽅式,具体的参数配置请参考「ChunJun连接器文档」:https://sourl.cn/vxq6Zp
本文将为大家介绍如何使用 ChunJun 实时同步,以及 ChunJun ⽀持的 RDB 实时采集插件的特性、采集逻辑及其原理,帮助大家更好地理解 ChunJun 与实时同步。
如何使用 ChunJun 实时同步
为了让⼤家能更深⼊了解如何使⽤ ChunJun 做实时同步,我们假设有这样⼀个场景:⼀个电商⽹站希望将其订单数据从 MySQL 数据库实时同步到 HBase 数据库,以便于后续的数据分析和处理。
在这个场景中,我们将使⽤ Kafka 作为中间消息队列,以实现 MySQL 和 HBase 之间的数据同步。这样做的好处是 MySQL 表中变更可以实时同步到 HBase 结果表中,⽽不⽤担⼼历史数据被修改后 HBase 表未被同步。
如果在⼤家的实际应用场景中,不关⼼历史数据是否变更(或者历史数据根本不会变更),且业务表有⼀个递增的主键,那么可以参考本⽂之后的 JDBC-Polling 模式⼀节的内容。
· 数据源组件的部署以及 ChunJun 的部署这⾥不做详细描述
· 案例中的脚本均以 SQL 脚本为例,JSON 脚本也能实现相同功能,但在参数名上可能存在出⼊,使⽤ JSON 的同学可以参考上文 「ChunJun 连接器」⽂档中的参数介绍
采集 MySQL 数据到 Kafka
● 数据准备
⾸先,我们在 Kafka 中创建⼀个名为 order_dml 的 topic,然后在 MySQL 中创建⼀个订单表,并插⼊⼀些测试数据。创建表的 SQL 语句如下:
-- 创建⼀个名为ecommerce_db的数据库,⽤于存储电商⽹站的数据
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 创建⼀个名为orders的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS orders (
id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主键
order_id VARCHAR(50) NOT NULL, -- 订单编号,不能为空
user_id INT NOT NULL, -- ⽤户ID,不能为空
product_id INT NOT NULL, -- 产品ID,不能为空
quantity INT NOT NULL, -- 订购数量,不能为空
order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 订单⽇期,默认值为当前时间
戳,不能为空
);
-- 插⼊⼀些测试数据到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES (\'ORD123\', 1, 101, 2),
(\'ORD124\', 2, 102, 1),
(\'ORD125\', 3, 103, 3),
(\'ORD126\', 1, 104, 1),
(\'ORD127\', 2, 105, 5);
● 使用 Binlog 插件采集数据到 Kafka
为了表示数据的变化类型和更好地处理数据变化,实时采集插件一般会用 RowData(Flink 内部数据结构)中的 RowKind 记录⽇志中的数据事件(insert、delete 等)类型,binlog 插件也⼀样。而当数据被打到 Kafka 中时,RowKind 信息应该怎么处理呢?
这⾥我们就需要⽤到 upsert-kafka-x,upsert-kafka-x 会识别 RowKind。对各类时间的处理逻辑如下:
• insert 数据:序列化后直接打⼊
• delete 数据:只写 key,value 置为 null
• update 数据:分为⼀条 delete 数据和 insert 数据处理,即先根据主键删除原本的数据,再写⼊ update 后的数据
在下⼀步中我们再解释如何将 Kafka 中的数据还原到 HBase 或者其他⽀持 upsert 语义的数据库中,接下来我们来编写 SQL 脚本,实现 MySQL 数据实时采集到 Kafka 中的功能,示例如下:
CREATE TABLE binlog_source (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3)
) WITH (
\'connector\' = \'binlog-x\',
\'username\' = \'root\',
\'password\' = \'root\',
\'cat\' = \'insert,delete,update\',
\'url\' = \'jdbc:mysql://localhost:3306/ecommerce_db?useSSL=false\',
\'host\' = \'localhost\',
\'port\' = \'3306\',
\'table\' = \'ecommerce_db.orders\',
\'timestamp-format.standard\' = \'SQL\',
\'scan.parallelism\' = \'1\'
);
CREATE TABLE kafka_sink (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH (
\'connector\' = \'upsert-kafka-x\',
\'topic\' = \'orders\',
\'properties.bootstrap.servers\' = \'localhost:9092\',
\'key.format\' = \'json\',
\'value.format\' = \'json\',
\'value.fields-include\' = \'ALL\',
\'sink.parallelism\' = \'1\'
);
insert into
kafka_sink
select
*
from
binlog_source u;
还原 Kafka 中的数据到 HBase
上述步骤中,我们通过 binlog-x 和 upsert-kafka-x,将 MySQL 中的数据实时采集到了 Kafka 中。解铃还须系铃⼈,我们可以通过 upsert-kafka-x 再去将 Kafka 中的数据解析成带有 upsert 语义的数据。
upsert-kafka-x 作为 source 插件时,会判断 Kafka 中数据的 value 是否为 null,如果 value 为 null 则标记这条数据的 RowKind 为 DELETE,否则将数据的 ROWKIND 标记为 INSERT。
ChunJun的 hbase-x 插件⽬前已经具备了 upsert 语句的能⼒,使⽤ hbase-x 即可将 Kafka 中的数据还原到 hbase中。接下来是 SQL 脚本示例,为了⽅便在 HBase 中查看数据结果,我们将 int 数据 cast 为 string 类型:
CREATE TABLE kafka_source (
id int,
order_id STRING,
user_id INT,
product_id INT,
quantity INT,
order_date TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
\'connector\' = \'upsert-kafka-x\',
\'topic\' = \'orders\',
\'properties.bootstrap.servers\' = \'localhost:9092\',
\'properties.group.id\' = \'test_group\',
\'key.format\' = \'json\',
\'value.format\' = \'json\',
\'scan.parallelism\' = \'1\'
);
CREATE TABLE hbase_sink(
rowkey STRING, order_info ROW < order_id STRING,
user_id STRING,
product_id STRING,
quantity STRING,
order_date STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH(
-- 这⾥以hbase14为例,如果hbase版本是2.x,我们可以使⽤hbase2-x插件代替
\'connector\' = \'hbase14-x\',
\'zookeeper.quorum\' = \'localhost:2181\',
\'zookeeper.znode.parent\' = \'/hbase\',
\'table-name\' = \'ecommerce_db:orders\',
\'sink.parallelism\' = \'1\'
);
INSERT INTO
hbase_sink
SELECT
cast(id as STRING),
ROW(
cast(order_id as STRING),
cast(user_id as STRING),
cast(product_id as STRING),
cast(quantity as STRING),
cast(order_date as STRING)
)
FROM
kafka_source
Tips:如果我们不需要 Kafka 中间件,也可以使⽤ binlog-x 插件直接对接 hbase-x 插件。
ChunJun 支持的 RDB 实时采集插件
本节主要介绍 ChunJun 的 RDB 实时采集插件的特性、采集逻辑及其原理。
ChunJun 的 RDB 实时采集可以实时监视数据库中的更改,并在发⽣更改时读取数据变化,例如插⼊、更新和删除操作。使⽤ ChunJun 实时采集,我们可以实时获取有关数据库中更改的信息,从⽽能够及时响应这些更改,如此便可以帮助我们更好地管理和利⽤ RDB 数据库中的数据。
并且 ChunJun 提供了故障恢复和断点续传功能来确保数据的完整性。ChunJun 实时采集类插件的⼤致实现步骤如下:
· 连接数据库,确认读取点位,读取点位可以理解为⼀个 offset,如 Binlog 中,指⽇志的⽂件名和⽂件的 position 信息
· 根据读取点位开始读取 redolog,获取其中关于数据变更相关的操作记录
· 根据 tableName、操作事件(如insert、delete、update)等过滤信息过滤出需要的 log ⽇志
· 解析 log ⽇志,解析后的事件信息包括表名、数据库名、操作类型(插⼊、更新或删除)和变更的数据⾏等
· 将解析出来的数据会加⼯为 ChunJun 内部统⼀的 DdlRowData 供下游使⽤
ChunJun ⽬前已⽀持的实时采集 Connector 有:binlog(mysql)、oceanbasecdc、oraclelogminer、sqlservercdc。
Binlog 简介
ChunJun binlog 插件的主要功能是读取 MySQL 的⼆进制⽇志(binlog)⽂件。这些⽂件记录了所有对数据的更改操作,如插⼊、更新和删除等。⽬前,该插件依赖 Canal 组件来读取 MySQL 的 binlog ⽂件。
核⼼操作步骤如下:
• 确认读取点位:在 binlog 插件中,我们可以在脚本的 start 字段中直接指定 journal-name(binlog ⽂件名)和 position(⽂件的特定位置)
• 读取binlog:binlog 插件将⾃身伪装成 MySQL 的 Slave 节点,向 MySQL Master 发送请求,要求将 binlog ⽂件的数据流发送给它
• 故障恢复和断点续传:故障时,插件会记录当前的 binlog 位置信息,从 checkpoint/savepoint 恢复后,我们可以从上次记录的位置继续读取 binlog ⽂件,确保数据变化的完整性
使⽤ binlog 所需的权限在「binlog插件使⽤⽂档」中有详细说明,链接如下:
OracleLogminer 简介
Logminer 插件借助 Oracle 提供的 Logminer ⼯具通过读取视图的⽅式获取 Oracle redolog 中的信息。
核⼼操作步骤如下:
01 定位需读取起始点位(start_scn)
⽬前 logminer ⽀持四种策略指定 StartScn:
· all:从 Oracle 数据库中最早的归档⽇志组开始采集(不建议使⽤)
· current:任务运⾏时的 SCN 号
· time:指定时间点对应的 SCN 号
· scn:直接指定 SCN 号
02 定位需要读取的结束点位(end_scn)
插件根据 start_scn 和 maxLogFileSize(默认5G)获取可加载的 redolog ⽂件列表,end_scn 取这个⽂件列表中最⼤的 scn 值。
03 加载 redo ⽇志到 Logminer
通过⼀个存储过程,将 scn 区间范围内的 redolog 加载到 Logminer ⾥。
04 从视图中读取数据
以 scn > ? 作为 where 条件直接查询 v$logmnr_contents 视图内的信息即可获取 redolog 中的数据。
05 重复1-4步骤,实现不断的读取
如标题。
06 故障恢复和断点续传
在发⽣故障时,插件会保存当前消费的 scn 号,重启时从上次的 scn 号开始读取,确保数据完整性。
• 关于该插件原理的详细介绍请参⻅「Oracle Logminer 实现原理说明⽂档」:
• 使⽤lominer插件的前提条件详⻅「Oracle配置LogMiner」:
SqlServerCDC 简介
SqlServerCDC 插件依赖 SQL Server 的 CDC Agent 服务提供的视图获取 redolog 中的信息。
核⼼操作步骤如下:
01 定位需读取起始点位(from_lsn)
⽬前 SqlserverCDC 仅⽀持直接配置 lsn 号,如果 lsn 号未配置,则取数据库中当前最⼤的 lsn 号为 from_lsn。
02 定位需要读取的结束点位(to_lsn)
SqlserverCDC 插件定期地(可通过 pollInterval 参数指定)获取数据库中的最⼤ lsn 为 end_lsn。
03 从视图中读取数据
查询 Agent 服务提供的视图中 lsn 区间范围内的数据,过滤出需要监听的表及事件类型。
04 重复1-3步骤,实现不断的读取
如标题。
05 故障恢复和断点续传
在发⽣故障时,插件会保存当前消费的 lsn 号。重启时从上次的 lsn 号开始读取,确保数据完整性。
• 关于该插件原理的详细介绍请参⻅「Sqlserver CDC 实现原理说明⽂档」:
• 配置 SqlServer CDC Agent 服务详⻅「Sqlserver 配置 CDC ⽂档」:
OceanBaseCDC 简介
OceanBase 是蚂蚁集团开源的⼀款分布式关系型数据库,它使⽤⼆进制⽇志(binlog)记录数据变更。OceanBaseCDC 的实现依赖于 OceanBase 提供的 LogProxy 服务,LogProxy 提供了基于发布-订阅模型的服务,允许使⽤ OceanBase 的 logclient 订阅特定的 binlog 数据流。
OceanBaseCDC 启动⼀个 Listener 线程。当 logclient 连接到 LogProxy 后,Listener 会订阅经过数据过滤的 binlog,然后将其添加到内部维护的列表中。当收到 COMMIT 信息后,Listener 会将⽇志变更信息传递给⼀个阻塞队列,由主线程消费并将其转换为 ChunJun 内部的 DdlRowData,最终发送到下游。
JDBC-Polling 模式读
JDBC 插件的 polling 读取模式是基于 SQL 语句做数据读取的,相对于基于重做⽇志的实时采集成本更低,但 jdbc 插件做实时同步对业务场景有更⾼的要求:
· 有⼀个数值类型或者时间类型的递增主键
· 不更新历史数据或者不关⼼历史数据是否更新,仅关⼼新数据的获取
实现原理简介
• 设置递增的业务主键作为 polling 模式依赖的增量键
• 在增量读取的过程中,实时记录 increColumn 对应的值(state),作为下⼀次数据读取的起始点位
• 当⼀批数据读取完后,间隔⼀段时间之后依据 state 读取下⼀批数据
polling 依赖部分增量同步的逻辑,关于增量同步的更多介绍可以点击:
如何配置⼀个 jdbc-polling 作业
先介绍⼀下开启 polling 模式需要关注的配置项:
以 MySQL 为例,假设我们有⼀个存储订单信息的历史表,且订单的 order_id 是递增的,我们希望定期地获取这张表的新增数据。
CREATE TABLE order.realtime_order_archive (
order_id INT PRIMARY KEY COMMENT \'订单唯⼀标识\',
customer_id INT COMMENT \'客户唯⼀标识\',
product_id INT COMMENT \'产品唯⼀标识\',
order_date TIMESTAMP COMMENT \'订单⽇期和时间\',
payment_method VARCHAR(255) COMMENT \'⽀付⽅式(信⽤卡、⽀付宝、微信⽀付等)\',
shipping_method VARCHAR(255) COMMENT \'配送⽅式(顺丰速运、圆通速递等)\',
shipping_address VARCHAR(255) COMMENT \'配送地址\',
order_total DECIMAL(10,2) COMMENT \'订单总⾦额\',
discount DECIMAL(10,2) COMMENT \'折扣⾦额\',
order_status VARCHAR(255) COMMENT \'订单状态(已完成、已取消等)\'
);
我们可以这样配置 json 脚本的 reader 信息。
"name": "mysqlreader",
"parameter":
"column" : [
"*" //这⾥假设我们读取所有字段,可以填写‘*’
],
"increColumn": "id",
"polling": true,
"pollingInterval": 3000,
"username": "username",
"password": "password",
"connection": [
"jdbcUrl": [
"jdbc:mysql://ip:3306/liuliu?useSSL=false"
],
"schema":"order",
"table": [
"realtime_order_archive" ]
]
《数栈产品白皮书》:https://fs80.cn/cw0iw1
《数据治理行业实践白皮书》下载地址:https://fs80.cn/380a4b
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack
干货 | 快速实现数据导入及简单DCS的实现
干货 | 快速实现数据导入及简单DCS的实现
原创:?赵琦?京东云开发者社区??4月18日
对于多数用户而言,在利用云计算的大数据服务时首先要面临的一个问题就是如何将已有存量数据快捷的导入到大数据仓库中。本文将演示如何基于京东云数据计算服务平台,简单、快速地将数据导入数据计算服务。
我们通常说的大数据平台主要包括三部分:数据相关的产品和技术、数据资产、数据管理。京东云数据计算服务(Data Computing Service,简称:DCS)是一个全托管、低使用成本的云上数据仓库服务。通过数据工厂,可轻松实现云上各数据源(包括对象存储、云数据库、数据仓库等)间、本地数据源与云数据源间的多种不同数据源的数据同步,实现多源数据分析与管理。
在数据工厂服务中可以创建同步任务来搬运数据,并按照指定的调度策略(每天、每周、每月)运行。该模块提供任务监控和告警功能,用户可以通过任务执行的明细日志和告警历史,轻松查明问题所在;同时,提供全面托管的工作流服务,支持图形化设计数据分析。以工作流任务的方式实现对数据的处理和相互依赖,帮助用户快速构建数据处理分析作业并周期性地执行。
下面会以MySQL数据库为例说明如何利用京东云数据工厂进行数据采集和DTS数据库之间的数据同步。数据工厂支持常见RDS数据库,如MySQL、SQLServer、Oracle、DB2和NoSQL数据库,也支持从OSS、FTP站点及Elastic Search等。
下图是数据工厂支持的数据源种类:
下面会演示以MySQL数据库为例,如何利用数据工厂进行数据采集,以及如何利用数据工厂作为DTS在两个数据源之间进行数据迁移。
为了方便测试,首先我们要创建数据库表单并灌入测试数据。为了测试方便,提前创建了一台CentOS 7.4云主机作为模拟客户端对数据库进行访问。
一、核心概念
数据集
在数据工厂服务中的数据集,是指由同步任务时需指定的数据源端或数据目标端的不同数据存储实例。因此,在创建同步任务之前,你必须先连接数据集。同一个数据集,可以是多个同步任务的数据源端或者数据目标端。
数据集的连通性
在用户创建数据集连接时,为检验数据集成服务能否连接成功,需要用户更具根据不同的数据集类型,填入相应的值用于连通性校验。数据集连接成功,是数据同步任务成功的前提。
同步任务
同步任务,是用户使用数据集成服务的最小单元。每一个同步任务需要用户配置数据源端、数据目标端以及相应的同步策略(如脏数据的处理等)。
工作流
工作流以图形化设计任务的方式实现对数据的处理和相互依赖。
开始实战
一、准备测试数据源
首先创建模拟数据环境。本例采用京东云的RDS MySQL 8.0数据库服务作为数据源,创建时可指定数据库名字为Testdb。数据库创建完成后要开启外网访问,数据工厂可以通过公网IP或域名对数据库进行访问,详细域名可以在数据库的详情页中找到。
创建MySQL数据库可以通过图形界面按提示填写必要信息开通,这里不赘述。需要提醒的是数据库开通后默认不允许外网访问,要点击开启外网访问,并记住默认端口3306。
对MySQL访问可以通过图形界面访问或通过客户端访问。当然也可以通过其他支持MySQL的图形化客户端进行访问。
本例使用CentOS 系统作为客户端访问,如未安装客户端可以使用Yum命令安装 MySQL。顺利安装可以看到如下提示。
1[[email protected]?~]#?yum?install?mysql 2Loaded?plugins:?fastestmirror,?langpacks 3Loading?mirror?speeds?from?cached?hostfile 4base????????????????????????????????|?3.6?kB?????00:00????? 5epel????????????????????????????????|?4.7?kB?????00:00????? 6extras??????????????????????????????|?3.4?kB?????00:00????? 7updates?????????????????????????????|?3.4?kB?????00:00????? 8(1/2):?epel/x86_64/updateinfo?????????|?986?kB???00:00????? 9(2/2):?epel/x86_64/primary_db?????????|?6.7?MB???00:00????? 10Resolving?Dependencies 11-->?Running?transaction?check 12--->?Package?mariadb.x86_64?1:5.5.60-1.el7_5?will?be?installed 13-->?Processing?Dependency:?mariadb-libs(x86-64)?=?1:5.5.60-1.el7_5?for?package:?1:mariadb-5.5.60-1.el7_5.x86_64 14-->?Running?transaction?check 15--->?Package?mariadb-libs.x86_64?1:5.5.56-2.el7?will?be?updated 16--->?Package?mariadb-libs.x86_64?1:5.5.60-1.el7_5?will?be?an?update 17-->?Finished?Dependency?Resolution 18 19Dependencies?Resolved 20 21=========================================================== 22?Package????????Arch?????Version??????????????Repository 23??????????????????????????????????????????????????????Size 24=========================================================== 25Installing: 26?mariadb????????x86_64???1:5.5.60-1.el7_5?????base???8.9?M 27Updating?for?dependencies: 28?mariadb-libs???x86_64???1:5.5.60-1.el7_5?????base???758?k 29 30Transaction?Summary 31=========================================================== 32Install??1?Package 33Upgrade?????????????(?1?Dependent?package) 34 35Total?download?size:?9.6?M 36Is?this?ok?[y/d/N]:?y 37Downloading?packages: 38Delta?RPMs?disabled?because?/usr/bin/applydeltarpm?not?installed. 39(1/2):?mariadb-libs-5.5.60-1.el7_5.x8?|?758?kB???00:00????? 40(2/2):?mariadb-5.5.60-1.el7_5.x86_64.?|?8.9?MB???00:00????? 41----------------------------------------------------------- 42Total??????????????????????????11?MB/s?|?9.6?MB??00:00????? 43Running?transaction?check 44Running?transaction?test 45Transaction?test?succeeded 46Running?transaction 47??Updating???:?1:mariadb-libs-5.5.60-1.el7_5.x86_64????1/3? 48??Installing?:?1:mariadb-5.5.60-1.el7_5.x86_64?????????2/3? 49??Cleanup????:?1:mariadb-libs-5.5.56-2.el7.x86_64??????3/3? 50??Verifying??:?1:mariadb-libs-5.5.60-1.el7_5.x86_64????1/3? 51??Verifying??:?1:mariadb-5.5.60-1.el7_5.x86_64?????????2/3? 52??Verifying??:?1:mariadb-libs-5.5.56-2.el7.x86_64??????3/3? 53 54Installed: 55??mariadb.x86_64?1:5.5.60-1.el7_5?????????????????????????? 56 57Dependency?Updated: 58??mariadb-libs.x86_64?1:5.5.60-1.el7_5????????????????????? 59 60Complete!
安装后执行MySQL命令,测试一下是否可以链接数据库,客户端访问命令格式是MySQL?-h主机地址 -u用户名 -p用户密码
主机地址使用MySQL数据库的外部域名。从连接数据库到创建表单的详细执行过程如下:
1、验证是否可以正常连接数据库
1[[email protected]?~]#?mysql?-h?mysql-cn-north-1-aed0e558da5e4877.public.jcloud.com?-P3306?-umysqlxxx?–pPasswordxxx
如可以正常连接,可以新开一个窗口创建SQL脚本用于数据库的创建和测试数据插入操作,也可以提前制作好并上传到客户端。
创建数据表格,创建一个测试数据库和测试表便于测试。选择合适目录创建SQL脚本文件,可以用vi ctable.sql
?创建,也可以用其他文本编辑工具制作,脚本内容如下:
1[[email protected]?~]#?cat?ctable.sql? 2USE?testdb; 3DROP?TABLE?IF?EXISTS?`sqltest`; 4CREATE?TABLE?`sqltest`?( 5????`id`?int(10)?unsigned?NOT?NULL?AUTO_INCREMENT, 6????`user_id`?varchar(20)?NOT?NULL?DEFAULT?‘‘, 7????`vote_num`?int(10)?unsigned?NOT?NULL?DEFAULT?‘0‘, 8????`group_id`?int(10)?unsigned?NOT?NULL?DEFAULT?‘0‘, 9????`status`?tinyint(2)?unsigned?NOT?NULL?DEFAULT?‘1‘, 10????`create_time`?datetime?NOT?NULL?DEFAULT?‘0000-00-00?00:00:00‘, 11????PRIMARY?KEY?(`id`), 12????KEY?`index_user_id`?(`user_id`)?USING?HASH 13)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8;
2、创建临时数据产生脚本文件,vi adddb.sql
1[[email protected]?~]#?cat?adddb.sql 2DELIMITER?// -- 修改MySQL delimiter:‘//‘ 3DROP?FUNCTION?IF?EXISTS?`rand_string`?// 4SET?NAMES?utf8?// 5CREATE?FUNCTION?`rand_string`?(n?INT)?RETURNS?VARCHAR(255)?CHARSET?‘utf8‘ 6BEGIN? 7????DECLARE?char_str?varchar(100)?DEFAULT?‘abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789‘; 8????DECLARE?return_str?varchar(255)?DEFAULT?‘‘; 9????DECLARE?i?INT?DEFAULT?0; 10????WHILE?i?-- 创建插入数据的存储过程
1DROP?PROCEDURE?IF?EXISTS?`adddb`?// 2CREATE?PROCEDURE?`adddb`(IN?n?INT) 3BEGIN 4????DECLARE?i?INT?DEFAULT?1; 5????DECLARE?vote_num?INT?DEFAULT?0; 6????DECLARE?group_id?INT?DEFAULT?0; 7????DECLARE?status?TINYINT?DEFAULT?1; 8????WHILE?i?-- 改回默认的?
MySQL delimiter:‘;‘
3、登陆数据库执行ctable.sql和adddb.sql脚本
执行命令:
1MySQL?[testdb]>?source?/root/ctable.sql; 2MySQL?[testdb]>?source?/root/adddb.sql;增加100条数据
1MySQL?[testdb]>?call?adddb(100);通过调整adddb(要增加数字)参数的数字,也可以增加1000条如adddb(1000)。
至此我们有了数据源的测试环境,接下来可以开始利用数据工厂进行数据同步。
二、利用数据工厂进行源数据采集
选择大数据与分析的数据工厂菜单。在连接管理中添加连接,如下:
建立连接时建议点击连接测试按钮先进行测试数据库连接,不能连接时请检查域名、端口、用户名密码是否正确,数据库是否允许外网访问。
建立好数据库连接后,就可以进行数据同步工作了。数据同步工作可以在数据同步中单独建立任务设置,也可以在工作流中通过数据集成选项进行设置。数据集成设置后会自动生成数据同步任务。
调度策略可以选择手工执行、周期调度和单次运行三种模式,也可以直接选择单次运行。
执行完毕后在数据计算服务中就同步产生了数据。
上述操作可以用工作流的形式实现,衔接更为复杂的Spark计算脚本。
成功执行后可以在运维中心查看执行情况,在实例列表中的画图试布中看到执行节点变为绿色。
通过以上建立数据同步任务和工作流两种形式都能实现数据源的数据获取,数据获取后就可以直接使用大数据服务进行数据处理了。在大数据与分析菜单下选择数据计算服务管理。默认用自己用户名/PIN(本例用户名是jdc-14)为实例名,建立了Default HIVE INSTANCE。
数据的库表管理下可以看到刚刚新建的库MySQLdb和表SQLTest,点击进入SQLTest表名可以看到更详细的表信息如图。
可以基于获取的大数据信息在数据计算服务中进行任务开发,任务开发可以使用SQL或开发脚本对数据进行计算。
三、DCS大数据导出
可以利用这个能力把数据工厂当作简单DTS工具,把数据传给目的数据库。本例在京东云建立一个MySQL目的数据库,把SQLTest同步给目的数据库,实现两个数据库的数据同步。
准备好或新建立目的MySQL数据库?
destmysqldb
,将数据计算服务的MySQLdb同步给目的数据库destmysqldb
。在数据工厂菜单下,选择连接管理,新建到
destmysqldb
的连接。新建同步任务,将数据计算服务的数据同步给
destmysqldb
数据库,任务名称synctodest。选择数据源端要选择大数据的数据计算服务,数据库名为mysqldb数据表名为sqltest,表的数据可以预览,避免出错。在传输数据前要在数据库中事先建立空表结构,执行文章开头的
ctable.sql
建立表SQLTest,数据插入时要选择目的表名。执行完毕后可以在destmysqldb中确认结果,通过select count(*) from sqltest 可以确认数据已经成功导入。
可以确认利用数据工厂作为简单DTS工具进行源数据库数据同步到目的数据库,实战成功!
以上是关于技术干货|如何利用 ChunJun 实现数据实时同步?的主要内容,如果未能解决你的问题,请参考以下文章
袋鼠云批流一体分布式同步引擎ChunJun(原FlinkX)的前世今生
DTMO直播预告丨ChunJun 2022年开源规划&支持异构数据源