Flink检查点配置一致性
Posted 后季暖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink检查点配置一致性相关的知识,希望对你有一定的参考价值。
目录
上一篇我详细介绍了检查点的一些概念:Flink检查点详解_后季暖的博客-CSDN博客
这一篇主要来讲如何配置检查点,和保障数据一致性的几种模式
检查点配置基本概念
一致性详解
说白了就是输入端数据可重用 输出端幂等写入或事务写入(最好是能两阶段提交)
Flink和Kafka连接的精确一次
你的写入事务什么时候关闭?
不是在下一个检查点的数据流来的时候 而是当你事务都写完毕以后 因为是可以开启多个事务写入的 互不干扰
flink整合kafka 开启检查点 代码:
public class RestartStrategyDemo
public static void main(String[] args) throws Exception
/**1.创建流运行环境**/
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**请注意此处:**/
//1.只有开启了CheckPointing,才会有重启策略
env.enableCheckpointing(5000);
//2.默认的重启策略是:固定延迟无限重启
//此处设置重启策略为:出现异常重启3次,隔5秒一次
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)));
//系统异常退出或人为 Cancel 掉,不删除checkpoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置Checkpoint模式(与Kafka整合,一定要设置Checkpoint模式为Exactly_Once)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
/**2.Source:读取 Kafka 中的消息**/
//Kafka props
Properties properties = new Properties();
//指定Kafka的Broker地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092");
//指定组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
//如果没有记录偏移量,第一次从最开始消费
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Kafka的消费者,不自动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer("testTopic", new SimpleStringSchema(), properties);
//Checkpoint成功后,还要向Kafka特殊的topic中写偏移量(此处不建议改为false )
//设置为false后,则不会向特殊topic中写偏移量。
//kafkaSource.setCommitOffsetsOnCheckpoints(false);
//通过addSource()方式,创建 Kafka DataStream
DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
/**3.Transformation过程**/
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = kafkaDataStream.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
/**此部分读取Socket数据,只是用来人为出现异常,触发重启策略。验证重启后是否会再次去读之前已读过的数据(Exactly-Once)*/
/*************** start **************/
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<String> streamOperator1 = socketTextStream.map(new MapFunction<String, String>()
@Override
public String map(String word) throws Exception
if ("error".equals(word))
throw new RuntimeException("Throw Exception");
return word;
);
/************* end **************/
//对元组 Tuple2 分组求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamOperator.keyBy(0).sum(1);
/**4.Sink过程**/
sum.print();
/**5.任务执行**/
env.execute("RestartStrategyDemo");
总结:
以上是关于Flink检查点配置一致性的主要内容,如果未能解决你的问题,请参考以下文章