数据集成工具的使用---FlinkX 从理论学习到熟练使用
Posted 北慕辰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据集成工具的使用---FlinkX 从理论学习到熟练使用相关的知识,希望对你有一定的参考价值。
本期与大家分享的是,小北精心整理的大数据学习笔记,数据采集工具FlinkX 的详细介绍,希望对大家能有帮助,喜欢就给点鼓励吧,记得三连哦!欢迎各位大佬评论区指教讨论!
💜🧡💛制作不易,各位大佬们给点鼓励!
🧡💛💚点赞👍 ➕ 收藏⭐ ➕ 关注✅
💛💚💙欢迎各位大佬指教,一键三连走起!往期好文推荐:
🔶🔷数据集成工具的使用(一)—Sqoop 从理论学习到熟练使用
🔶🔷数据集成工具的使用(二)—DataX 从理论学习到熟练使用
🔶🔷数据集成工具的使用(四)—Flume 从理论学习到熟练使用
🔶🔷数据集成工具的使用(五)—Kettle 从理论学习到熟练使用
一、FlinkX 简介
FlinkX 是一个基于 Flink 的数据同步工具。 FlinkX 可以采集静态数据
,如 mysql、HDFS 等,也可以采集实时变化的数据
,如 MySQL binlog、Kafka 等。 同时 FlinkX 也是一个支持所有语法和特性的计算框架原生的 FlinkSql , 并提供了大量的 案例 。 FlinkX 目前包括以下功能
- 大多数插件支持并发读写数据,可以大大提高读写速
- 部分插件支持故障恢复功能,可以从故障位置恢复任务,节省运行时间;
- 关系数据库的源插件支持间隔轮询。 可不断采集变化的数据
- 部分数据库支持开启 Kerberos 安全认证;
- 限制源码插件的读取速度,减少对业务数据库的影响
- 写数据时保存脏数据
- 限制脏数据的最大数量
- 多种运行模式:Local、Standalone、Yarn Session、Yarn Per
- 同步任务支持执行flinksql语法的transformer操作
- sql 任务支持 共享 与 flinkSql 自己的连接器 ;
FlinkX 开源地址: https://github.com/DTStack/flinkx
二、FlinkX 安装
首先,需要安装unzip
yum install unzip
1、上传并解压
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置环境变量
vim /etc/profile
export FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$PATH:$FLINKX_HOME/bin
source /etc/profile
3、给bin/flinkx这个文件加上执行权限
chmod a+x flinkx
4、修改配置文件,设置运行端口
vim flinkconf/flink-conf.yaml
web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
三、FlinkX的简单使用
FlinkX的使用参考文档:
https://github.com/DTStack/flinkx#introduction
1、MySQLToHDFS
配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '理科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/data/flinkx/student",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},
{
"name": "col2",
"index": 1,
"type": "string"
},
{
"name": "col3",
"index": 2,
"type": "string"
},
{
"name": "col4",
"index": 3,
"type": "string"
},
{
"name": "col5",
"index": 4,
"type": "string"
},
{
"name": "col6",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "overwrite"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
启动任务
flinkx -mode local -job ./scripts/mysqlToHDFS.json -pluginRoot syncplugins/ -flinkconf flinkconf/
监听日志
flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件
tail -f nohup.out
通过web界面查看任务运行情况
http://master:8888
2、MySQLToHive
配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '文科二班'",
"splitPk": "id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"tablesColumn": "{\\"student\\":[{\\"key\\":\\"id\\",\\"type\\":\\"string\\"},{\\"key\\":\\"name\\",\\"type\\":\\"string\\"},{\\"key\\":\\"age\\",\\"type\\":\\"string\\"}]}",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
在hive中创建flinkx_test数据库,并创建student分区表
create database flinkx_test;
use flinkx_test;
CREATE TABLE `student`(
`id` string,
`name` string,
`age` string)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
启动hiveserver2
# 第一种方式:
hiveserver2
# 第二种方式:
hive --service hiveserver2
启动任务
flinkx -mode local -job ./scripts/mysqlToHive.json -pluginRoot syncplugins/ -flinkconf flinkconf/
查看日志及运行情况同上
3、 MySQLToHBase
配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"score"
]
}
],
"column": [
"*"
],
"customSql": "",
"splitPk": "student_id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "testFlinkx",
"rowkeyColumn": "$(cf1:student_id)_$(cf1:course_id)",
"column": [
{
"name": "cf1:student_id",
"type": "string"
},
{
"name": "cf1:course_id",
"type": "string"
},
{
"name": "cf1:score",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
启动hbase 并创建testflinkx表
create 'testFlinkx','cf1'
启动任务
flinkx -mode local -job ./scripts/mysqlToHBase.json -pluginRootsyncplugins/ -flinkconf flinkconf/
查看日志及运行情况同上
4、 MySQLToMySQL
配置文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
],
"table": [
"student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
"table": [
"student2"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
以上是关于数据集成工具的使用---FlinkX 从理论学习到熟练使用的主要内容,如果未能解决你的问题,请参考以下文章