FlinkToMySql两阶段提交
Posted 今天好好洗头了嘛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkToMySql两阶段提交相关的知识,希望对你有一定的参考价值。
版本
Flink1.10
实现
- 继承TwoPhaseCommitSinkFunction
- 实现invoke()、beginTransaction()、preCommit()、commit()、abort()等方法
流程
beginTransaction---->preCommit,同时开启下一次beginTransaction---->commit---->preCommit,同时开启下一次beginTransaction---->commit。。。。。。
大概理解是下边这个流程
beginTransaction,invoke,preCommit,commit(第一次checkpoint)
---------------------------------------------------->beginTransaction,invoke,preCommit,commit(第二次checkpoint)
---------------------------------------------------------------------------------------------------------->beginTransaction,invoke,preCommit,commit(第三次checkpoint)
注意:
1、如果commit期间有异常,则重试commit方法。
2、如果invoke期间有异常,则回滚到上一个checkpoint,执行一下上一个checkpoint的commit方法(很奇怪,没理解这里),然后进行重试后续的数据。
代码测试
public abstract class MyTwoPhaseCommitSinkFunction<IN> extends TwoPhaseCommitSinkFunction<IN, List<IN>, Void>
public MyTwoPhaseCommitSinkFunction()
super((TypeSerializer<List<IN>>) new KryoSerializer<IN>((Class<IN>) List.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
@Override
protected void invoke(List<IN> list, IN value, Context context) throws Exception
System.err.println("invoke开始......"+list);
list.add(value);
System.err.println("invoke结束......"+list);
@Override
protected List<IN> beginTransaction() throws Exception
System.err.println("beginTransaction......");
return new ArrayList<>();
@Override
protected void preCommit(List<IN> list) throws Exception
System.err.println("preCommit......"+list);
@Override
protected void abort(List<IN> list)
System.err.println("abort......"+list);
public class App
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing( 5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("file:///C:/Jian_Coding/Flink/Study/checkpoint"));
SingleOutputStreamOperator<String> sourceDS = env.addSource(MyKafkaUtil.getKafkaSource()).name("kafkaSourceFun");
sourceDS.addSink(new MyTwoPhaseCommitSinkFunction<String>()
@Override
protected void commit(List<String> list)
/* long l = System.currentTimeMillis();
if(l%2==0)
int i = 1/0;
*/
System.err.println("commit......"+list);
);
env.execute();
以上是关于FlinkToMySql两阶段提交的主要内容,如果未能解决你的问题,请参考以下文章