FlinkX快速开始(一)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkX快速开始(一)相关的知识,希望对你有一定的参考价值。

参考技术A 链接地址: FlinkX

FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如mysql,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

1、前置
需要安装maven、java8、配置好github相关参数

2、Fork FlinX项目到自己的仓库中

2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git

3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)

4、打包
1)、回到flinkx目录:cd ..
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true

1、配置flink conf文件(暂时不需要安装flink)
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888

2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json

3、运行任务

4、查看监控网页和log.txt文件: http://localhost:8888/

1.34.FlinkX工作原理快速起步|1.35.Flink资料

1.34.FlinkX
1.34.1.什么是FlinkX
1.34.2.工作原理
1.34.3.快速起步
1.34.3.1.运行模式
1.34.3.2.执行环境
1.34.3.3.打包
1.34.3.4.启动
1.34.3.4.1.命令行参数选项
1.34.3.4.2.启动数据同步
1.34.4.数据同步任务模板
1.34.4.1.setting
1.34.4.1.1.speed
1.34.4.1.2.errorLimit
1.34.4.1.3.dirty
1.34.4.1.4.restore
1.34.4.2.content
1.34.4.3.数据同步任务例子
1.35.Flink资料

1.34.FlinkX

1.34.1.什么是FlinkX

FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。

不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

1.34.2.工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:

1.34.3.快速起步

1.34.3.1.运行模式

单击模式:对应Flink集群的单机模式
standalone模式:对应Flink集群的分布式模式
yarn模式:对应Flink集群的yarn模式

1.34.3.2.执行环境

Java: JDK8及以上
Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。

1.34.3.3.打包

进入项目根目录,使用maven打包:

mvn clean package -Dmaven.test.skip

打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包。

1.34.3.4.启动

1.34.3.4.1.命令行参数选项


1.34.3.4.2.启动数据同步

以本地模式启动数据同步任务

bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp ""flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以standalone模式启动数据同步任务

bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp ""flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以yarn模式启动数据同步任务

bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp ""flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

1.34.4.数据同步任务模板

从最高空俯视,一个数据同步的构成很简单,如下:


    "job": 
        "setting": ...,
        "content": [...]
    

数据同步任务包括一个job元素,而这个元素包括setting和content两部分。
setting: 用于配置限速、错误控制和脏数据管理。
content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)

1.34.4.1.setting

"setting": 
    "speed": ...,
    "errorLimit": ...,
    "dirty": ...

settings包括speed、errorLimit和dirty部分,分别描述限速、错误控制和脏数据管理的配置信息。

1.34.4.1.1.speed
"speed": 
    "channel": 3,
    "bytes": 0

channel: 任务并发数
bytes: 每秒字节数,默认为 Long.MAX_VALUE

1.34.4.1.2.errorLimit
"errorLimit": 
     "record": 10000,
     "percentage": 100

record: 出错记录数超过record设置的条数时,任务标记为失败。
percentage: 当出错记录超过percentage百分数时,任务标记为失败

1.34.4.1.3.dirty
"dirty": 
	"path": "/tmp",
	"hadoopConfig": 
		"fs.default.name": "hdfs://ns1",
		"dfs.nameservices": "ns1",
		"dfs.ha.namenodes.ns1": "nn1,nn2",
		"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
		"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
		"dfs.ha.automatic-failover.enabled": "true",
		"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
		"fs.hdfs.impl.disable.cache": "true"
	

path: 脏数据存放数据
hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

1.34.4.1.4.restore
"restore": 
	"isRestore": false,
	"restoreColumnName": "",
	"restoreColumnIndex": 0

restore配置请参考”断点续传”(https://gitee.com/tuzuoquan/flinkx/blob/master/docs/restore.md)

1.34.4.2.content

"content": [
	
	   "reader": 
			"name": "...",
			"parameter": 
				...
			
		,
	   "writer": 
			"name": "...",
			"parameter": 
				 ...
			 
		
	
]

reader: 用于读取数据的插件信息.
writer: 用于写入数据的插件的信息。

reader和writer包括name和parameter,分别表示插件名称和插件参数。

1.34.4.3.数据同步任务例子

具体可以查看:
https://gitee.com/tuzuoquan/flinkx

1.35.Flink资料

https://www.ucloud.cn/yun/74764.html

https://github.com/apache/rocketmq-externals

https://github.com/zhisheng17/flink-learning

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

https://www.dazhuanlan.com/2019/11/15/5dcd9f1999f57/?cf_chl_jschl_tk=2b92cebd5bcf222f98c03e2941998e9123b63576-1600045745-0-AfjF13iSAepCCX5phJ7zu3dpwwpsc3ynAiWlvDjZgtZMJcQGQt5pXsS4p8FWKwQRA7gG0q9Zitp1CHHZCcSrUsNhQ9a20rvXKPj5VsQKfWwTdnqGNMpRBzB19PwwnPWMInWITKEn6QA_dZaBtUcrXnlEMIDjZ7fecSFQnCPcvHw87KWHrQ3qDDNjZhOWGnHZww9zwJ20lZcbFi1HucoDBtJlAKPPMrXVF8zpHxKOJwLR5DMGz3UL3PSwHSCoX1HzmvbBymDkrlY2_cwHBU2rlYIdZQsg_Cy87XH8tzudmZVwD5-11hVBs7PQlS8CHt4NGA

以上是关于FlinkX快速开始(一)的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot:快速开始

Hbase快速开始——shell操作

Dubbo快速开始与依赖

react中文API解读一(快速开始)

Flask快速开始

关于制作Docker镜像?| Dockerfile快速开始