17-flink-1.10.1-flink 状态一致性
Posted 逃跑的沙丁鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了17-flink-1.10.1-flink 状态一致性相关的知识,希望对你有一定的参考价值。
1 状态一致性
1.1 什么是状态一致性
1.2 状态一致性的级别
2 一致性检查点(checkpoint)
回顾
3 端到端(end-to-end)状态一致性
4 端到端的精确一次(exactly-once)保证
三个策略兜底
4.1 幂等写入
比如写入elasticsearch 是可以做到幂等写入的,同一个_id 的值多次写入还是同一个id的值,
mysql redis 都可做到幂等写入
4.2 事务写入
5 flink+kafka端到端状态一致性的保证
5.1 如何保证
package com.study.liucf.unbounded.sink
import java.util.Properties
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
/**
* @Author liucf
* @Date 2021/9/18
* 本示例演示,数据从kafka的一个topic:sensor_input_csv读入然
* 后写出到kafka的另一个topic:sensor_out
*/
object KafkaSink
def main(args: Array[String]): Unit =
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置kafak配置项
val props = new Properties()
props.setProperty("bootstrap.servers","192.168.109.151:9092")
props.setProperty("topic","sensor_input_csv")
//添加kafka数据源
val inputDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
val transDs: DataStream[String] = inputDs.map(d=>
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble).toString
)
//输出结果到标准控制台
transDs.print()
//输出到kafka的另一个topic里
val outProps = new Properties()
outProps.setProperty("bootstrap.servers","192.168.109.151:9092")
transDs.addSink(new FlinkKafkaProducer[String](
"sensor_out",new SimpleStringSchema(),outProps))
//启动执行flink
env.execute("liucf kafka sink api test")
可以看到使用了两阶段提交
transDs.addSink(new FlinkKafkaProducer[String]( "sensor_out",new SimpleStringSchema(),outProps))
5.2 两阶段事务提交具体过程
以上是关于17-flink-1.10.1-flink 状态一致性的主要内容,如果未能解决你的问题,请参考以下文章