10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis相关的知识,希望对你有一定的参考价值。

10.Connectors
10.1.JDBC
10.2.Kafka Consumer/Source
10.3.Kafka Producer/Sink
10.4.Redis

10.Connectors

10.1.JDBC

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Desc 演示Flink官方提供的JdbcSink
 *
 * @author tuzuoquan
 * @date 2022/4/22 9:44
 */
public class JDBCDemo 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<Student> studentDS = env.fromElements(new Student(null, "tony2", 18));
        //TODO 2.transformation
        //TODO 3.sink
        studentDS.addSink(JdbcSink.sink(
                "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                (ps, value) -> 
                    ps.setString(1,value.getName());
                    ps.setInt(2, value.getAge());
                , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/bigdata")
                        .withUsername("root")
                        .withPassword("root")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .build()));

        //TODO 4.execute
        env.execute();
    

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student 
        private Integer id;
        private String name;
        private Integer age;
    


10.2.Kafka Consumer/Source

参数

env.addSource(new Kafka Consumer/Source(参数))

参数设置:
1.订阅的主题
2.反序列化规则
3.消费者属性-集群地址
4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
5.消费者属性-offset重置规则,如earliest/latest…
6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * Desc 演示Flink-Connectors-KafkaComsumer/Source
 *
 * @author tuzuoquan
 * @date 2022/4/23 17:56
 */
public class KafkaComsumerDemo 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        //准备kafka连接参数
        Properties props  = new Properties();
        //集群地址
        props.setProperty("bootstrap.servers", "node1:9092");
        //消费者组id
        props.setProperty("group.id", "flink");
        //latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
        //earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
        props.setProperty("auto.offset.reset","latest");
        //会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
        props.setProperty("flink.partition-discovery.interval-millis","5000");
        //自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
        props.setProperty("enable.auto.commit", "true");
        //自动提交的时间间隔
        props.setProperty("auto.commit.interval.ms", "2000");
        //使用连接参数创建FlinkKafkaConsumer/kafkaSource
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
        //使用KafkaSource
        DataStream<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 2.transformation

        //TODO 3.sink
        kafkaDS.print();

        //TODO 4.execute
        env.execute();
    

    //准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
    //启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
    //启动程序FlinkKafkaConsumer
    //观察控制台输出结果


10.3.Kafka Producer/Sink

控制台生成者 —> flink_kafka主题 --> Flink -->etl —> flink_kafka2主题—>控制台消费者。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * TODO
 *
 * @author tuzuoquan
 * @date 2022/4/24 9:34
 */
public class KafkaSinkDemo 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        //准备kafka连接参数
        Properties props  = new Properties();
        //集群地址
        props.setProperty("bootstrap.servers", "node1:9092");
        //消费者组id
        props.setProperty("group.id", "flink");
        //latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
        //earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
        props.setProperty("auto.offset.reset","latest");
        //会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
        props.setProperty("flink.partition-discovery.interval-millis","5000");
        props.setProperty("enable.auto.commit", "true");
        //自动提交的时间间隔
        props.setProperty("auto.commit.interval.ms", "2000");
        //使用连接参数创建FlinkKafkaConsumer/kafkaSource
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
        //使用kafkaSource
        DataStream<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 2.transformation
        SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() 
            @Override
            public boolean filter(String value) throws Exception 
                return value.contains("success");
            
        );

        //TODO 3.sink
        etlDS.print();

        Properties props2 = new Properties();
        props.setProperty("bootstrap.servers","node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<String>("flink_kafka2",
                new SimpleStringSchema(), props2);
        etlDS.addSink(kafkaSink);

        //TODO 4.execute
        env.execute();
    
    //控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
    //准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
    //准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
    //启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
    //log:2020-10-10 success xxx
    //log:2020-10-10 success xxx
    //log:2020-10-10 success xxx
    //log:2020-10-10 fail xxx
    //启动控制台消费者消费数据 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
    //启动程序FlinkKafkaConsumer
    //观察控制台输出结果


10.4.Redis

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

需求:
从Socket接收实时流数据,做WordCount,并将结果写入到Redis

数据结构使用:
单词:数量 (key-String, value-String)
wcresult: 单词:数量 (key-String, value-Hash)
注意: Redis的Key始终是String, value可以是:String/Hash/List/Set/有序Set

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

/**
 * @author tuzuoquan
 * @date 2022/4/26 9:36
 */
public class RedisDemo 

    public static void main(String[] args) throws Exception 
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> lines = env.socketTextStream("node1", 9999);

        //TODO 2.transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() 
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception 
                String[] arr = value.split(" ");
                for (String word : arr) 
                    out.collect(Tuple2.of(word, 1));
                
            
        ).keyBy(t -> t.f0).sum(1);


        //TODO 3.sink
        result.print();

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<Tuple2<String, Integer>>(conf,new MyRedisMapper());
        result.addSink(redisSink);

        //TODO 4.execute
        env.execute();
    

    public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>
        @Override
        public RedisCommandDescription getCommandDescription() 
            //我们选择的数据结构对应的是 key:String("wcresult"),value:Hash(单词,数量),命令为HSET
            return new RedisCommandDescription(RedisCommand.HSET,"wcresult");
        

        @Override
        public String getKeyFromData(Tuple2<String, Integer> t) 
            return  t.f0;
        

        @Override
        public String getValueFromData(Tuple2<String, Integer> t) 
            return t.f1.toString();
        
    

以上是关于10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 FME 处理 Kafka JDBC Sink 连接器

Kafk为什么这么快

锁屏面试题百日百刷-kafk篇

Kafka---窗口函数

kakfa 入门

Kafka 安装