从0到1Flink的成长之路(二十一)-Flink+Kafka实现End-to-End Exactly-Once代码示例

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十一)-Flink+Kafka实现End-to-End Exactly-Once代码示例相关的知识,希望对你有一定的参考价值。

Flink+Kafka实现End-to-End Exactly-Once

https://ververica.cn/developers/flink-kafka-end-to-end-exactly-once-analysis/

package xx.xxxxxx.flink.exactly;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* 1. Kafka Producer的容错-Kafka 0.9 and 0.10
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,配置如下参数
* setLogFailuresOnly(false)
* setFlushOnCheckpoint(true)
* retries【这个参数的值默认是0】 //注意:建议修改kafka 生产者的重试次数
*
* 2. Kafka Producer的容错-Kafka 0.11+
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer011+ 就可以提供 exactly-once的语义
* 但是需要选择具体的语义
* Semantic.NONE
* Semantic.AT_LEAST_ONCE【默认】
* Semantic.EXACTLY_ONCE
*/
public class StreamExactlyOnceKafkaDemo {
 /*
/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092
Source Kafka:
/export/server/kafka/bin/kafka-topics.sh --create \\
--bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3 --topic flink-topic-source
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 \\
--topic flink-topic-source
Sink Kafka:
/export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 \\
--replication-factor 1 --partitions 3 --topic flink-topic-sink
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 \\
--topic flink-topic-sink --from-beginning
*/
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(2);
 // 设置Checkpoint
 env.enableCheckpointing(5000);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
 );
 // 2. 数据源-source
 // 2.1. Kafka Consumer 消费数据属性设置
 Properties sourceProps = new Properties();
 sourceProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
 sourceProps.setProperty("group.id","flink-1001");
 //如果有记录偏移量从记录的位置开始消费,如果没有从最新的数据开始消费
 sourceProps.setProperty("auto.offset.reset","latest");
 //开一个后台线程每隔5s检查Kafka的分区状态
 sourceProps.setProperty("flink.partition-discovery.interval-millis","5000");
 // 2.2. 实例化FlinkKafkaConsumer对象
 FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
 "flink-topic-source", new SimpleStringSchema(), sourceProps
 );
 // 从group offset记录的位置位置开始消费,如果没有该group信息,会根据"auto.offset.reset"的设置来决定从哪开始消费
 kafkaSource.setStartFromGroupOffsets();
 // Flink执行Checkpoint的时候提交偏移量(一份在Checkpoint中, 一份在Kafka主题中__comsumer_offsets(方便外部监控工具去看))
 kafkaSource.setCommitOffsetsOnCheckpoints(true) ;
 // 2.3. 添加数据源
 DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
 // 3. 数据转换-transformation
 // 4. 数据终端-sink
 // 4.1. Kafka Producer 生成者属性配置
 Properties sinkProps = new Properties();
 sinkProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
 //设置事务超时时间,也可在kafka配置中设置
 sinkProps.setProperty("transaction.timeout.ms", 60000 * 15 + "");
 // 4.2. 创建序列化实例对象
 KafkaSerializationSchema<String> kafkaSchema = new KafkaSerializationSchema<String>(){
 @Override
 public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
 ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
 "flink-topic-sink", element.getBytes(StandardCharsets.UTF_8)
 );
 return record;
 }
 } ;
 // 4.3. 实例化FlinkKafkaProducer对象
 FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
 "flink-topic-sink",
 kafkaSchema,
 sinkProps,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE
 );
 // 4.4. 添加sink
 kafkaDataStream.addSink(kafkaSink) ;
 // 5. 触发执行-execute
 env.execute(StreamExactlyOnceKafkaDemo.class.getSimpleName());
  }
}

Flink+mysql实现End-to-End Exactly-Once

https://www.jianshu.com/p/5bdd9a0d7d02

在这里插入图片描述
需求
1、checkpoint每10s进行一次,此时用FlinkKafkaConsumer实时消费kafka中的消息
2、消费并处理完消息后,进行一次预提交数据库的操作
3、如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次
checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中
4、如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重
启的策略可以在配置中设置,checkpoint记录的还是上一次成功消费的offset,因为本次
消费的数据在checkpoint期间,消费成功,但是预提交过程中失败了
5、注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生。等将异常数据处理完成之后,再重新启动这个Flink程序,
它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的
Exactly-Once。

1)、代码:Kafka Producer

package xx.xxxxxx.exactly_once;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class DataProducer {
 public static void main(String[] args) throws InterruptedException {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node1:9092");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 Producer<String, String> producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
 try {
 for (int i = 1; i <= 20; i++) {
 DataBean data = new DataBean(String.valueOf(i));
 ProducerRecord record = new ProducerRecord<String, String>("flink_kafka", null, null, 
JSON.toJSONString(data));
 producer.send(record);
 System.out.println("发送数据: " + JSON.toJSONString(data));
 Thread.sleep(1000);
 }
 }catch (Exception e){
 System.out.println(e);
 }
 producer.flush();
 }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class DataBean {
 private String value;
}

2)、代码:Flink Streaming

package xx.xxxxxx.exactly_once;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class ExactlyOnceDemo_FlinkMysql {
 public static void main(String[] args) throws Exception {
 //1.env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);//方便测试
 env.enableCheckpointing(10000);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
 
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointClea
nup.RETAIN_ON_CANCELLATION);
 env.setStateBackend(new FsStateBackend("file:///D:/data/ckp"));
 //2.Source
 String topic = "flink_kafka";
 Properties props = new Properties();
 props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
 props.setProperty("group.id","flink");
 props.setProperty("auto.offset.reset","latest");//如果有记录偏移量从记录的位置开始消费,如果没有从最
新的数据开始消费
 props.setProperty("flink.partition-discovery.interval-millis","5000");//开一个后台线程每隔5s检查
Kafka的分区状态
 FlinkKafkaConsumer<ObjectNode> kafkaSource = new FlinkKafkaConsumer<>("topic_in", new 
JSONKeyValueDeserializationSchema(true), props);
 kafkaSource.setStartFromGroupOffsets();//从group offset记录的位置位置开始消费,如果kafka broker 端
没有该group信息,会根据"auto.offset.reset"的设置来决定从哪开始消费
 kafkaSource.setCommitOffsetsOnCheckpoints(true);//Flink 执 行 Checkpoint 的时候提交偏移量 ( 一份在
Checkpoint中,一份在Kafka的默认主题中__comsumer_offsets(方便外部监控工具去看))
 DataStreamSource<ObjectNode> kafkaDS = env.addSource(kafkaSource);
 //3.transformation
 //4.Sink
 kafkaDS.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
 //5.execute
 env.execute(ExactlyOnceDemo_FlinkMysql.class.getName());
 }
}
/**
自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。
功能:保证kafak to mysql 的Exactly-Once
CREATE TABLE `t_test` (
 `id` bigint(20) NOT NULL AUTO_INCREMENT,
 `value` varchar(255) DEFAULT NULL,
 `insert_time` datetime DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
*/
class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {
 public MySqlTwoPhaseCommitSink() {
 super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
 }
 /**
 * 执行数据入库操作
 */
 @Override
 protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws 
Exception {
 System.err.println("start invoke.......");
 String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
 System.err.println("===>date:" + date + " " + objectNode);
 String value = objectNode.get("value").toString();
 String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";
 PreparedStatement ps = connection.prepareStatement(sql);
 ps.setString(1, value);
 ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
 //执行insert语句
 ps.execute();
 //手动制造异常
 if(Integer.parseInt(value) == 15) System.out.println(1/0);
 }
 /**
 * 获取连接,开启手动提交事物(getConnection方法中)
 */
 @Override
 protected Connection beginTransaction() throws Exception {
 String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
 Connection connection = DBConnectUtil.getConnection(url, "root", "root");
 System.err.println("start beginTransaction......."+connection);
 return connection;
 }
 /**
 * 预提交,这里预提交的逻辑在invoke方法中
 */
 @Override
 protected void preCommit(Connection connection) throws Exception {
 System.err.println("start preCommit......."+connection);
 }
 /**
 * 如果invoke执行正常则提交事物
 */
 @Override
 protected void commit(Connection connection) {
 System.err.println("start commit......."+connection);
 DBConnectUtil.commit(connection);
 }
 @Override
 protected void recoverAndCommit(Connection connection) {
 System.err.println("start recoverAndCommit......."+connection);
 }
 @Override
 protected void recoverAndAbort(Connection connection) {
 System.err.println("start abort recoverAndAbort......."+connection);
 }
 /**
 * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
 */
 @Override
 protected void abort(Connection connection) {
 System.err.println("start abort rollback......."+connection);
 DBConnectUtil.rollback(connection);
 }
}
class DBConnectUtil {
 /**
 * 获取连接
 * @throws SQLException
 */
 public static Connection getConnection(String url, String user, String password) throws SQLException 
{
 Connection conn = null;
 conn = DriverManager.getConnection(url, user, password);
 //设置手动提交
 conn.setAutoCommit(false);
 return conn;
 }
 /**
 * 提交事物
 */
 public static void commit(Connection conn) {
 if (conn != null) {
 try {
 conn.commit();
 } catch (SQLException e) {
 e.printStackTrace();
 } finally {
 close(conn);
 }
 }
 }
 /**
 * 事物回滚
 */
 public static void rollback(Connection conn) {
 if (conn != null) {
 try {
 conn.rollback();
 } catch (SQLException e) {
 e.printStackTrace();
 } finally {
 close(conn);
 }
 }
 }
 /**
 * 关闭连接
 */
 public static void close(Connection conn) {
 if (conn != null) {
 try {
 conn.close();
 } catch (SQLException e) {
 e.printStackTrace();
 }
 }
 }
}

以上是关于从0到1Flink的成长之路(二十一)-Flink+Kafka实现End-to-End Exactly-Once代码示例的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十一)-异步IO

从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once代码示例

从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once

从0到1Flink的成长之路(二十一)-Flink+Kafka实现End-to-End Exactly-Once代码示例

从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式

从0到1Flink的成长之路(二十)-案例:时间会话窗口