10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis相关的知识,希望对你有一定的参考价值。
10.Connectors
10.1.JDBC
10.2.Kafka Consumer/Source
10.3.Kafka Producer/Sink
10.4.Redis
10.Connectors
10.1.JDBC
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Desc 演示Flink官方提供的JdbcSink
*
* @author tuzuoquan
* @date 2022/4/22 9:44
*/
public class JDBCDemo
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Student> studentDS = env.fromElements(new Student(null, "tony2", 18));
//TODO 2.transformation
//TODO 3.sink
studentDS.addSink(JdbcSink.sink(
"INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
(ps, value) ->
ps.setString(1,value.getName());
ps.setInt(2, value.getAge());
, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/bigdata")
.withUsername("root")
.withPassword("root")
.withDriverName("com.mysql.jdbc.Driver")
.build()));
//TODO 4.execute
env.execute();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student
private Integer id;
private String name;
private Integer age;
10.2.Kafka Consumer/Source
参数
env.addSource(new Kafka Consumer/Source(参数))
参数设置:
1.订阅的主题
2.反序列化规则
3.消费者属性-集群地址
4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
5.消费者属性-offset重置规则,如earliest/latest…
6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* Desc 演示Flink-Connectors-KafkaComsumer/Source
*
* @author tuzuoquan
* @date 2022/4/23 17:56
*/
public class KafkaComsumerDemo
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//准备kafka连接参数
Properties props = new Properties();
//集群地址
props.setProperty("bootstrap.servers", "node1:9092");
//消费者组id
props.setProperty("group.id", "flink");
//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
//earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("auto.offset.reset","latest");
//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");
//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
props.setProperty("enable.auto.commit", "true");
//自动提交的时间间隔
props.setProperty("auto.commit.interval.ms", "2000");
//使用连接参数创建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
//使用KafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
//TODO 2.transformation
//TODO 3.sink
kafkaDS.print();
//TODO 4.execute
env.execute();
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//启动程序FlinkKafkaConsumer
//观察控制台输出结果
10.3.Kafka Producer/Sink
控制台生成者 —> flink_kafka主题 --> Flink -->etl —> flink_kafka2主题—>控制台消费者。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* TODO
*
* @author tuzuoquan
* @date 2022/4/24 9:34
*/
public class KafkaSinkDemo
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//准备kafka连接参数
Properties props = new Properties();
//集群地址
props.setProperty("bootstrap.servers", "node1:9092");
//消费者组id
props.setProperty("group.id", "flink");
//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
//earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("auto.offset.reset","latest");
//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");
props.setProperty("enable.auto.commit", "true");
//自动提交的时间间隔
props.setProperty("auto.commit.interval.ms", "2000");
//使用连接参数创建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
//使用kafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
//TODO 2.transformation
SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>()
@Override
public boolean filter(String value) throws Exception
return value.contains("success");
);
//TODO 3.sink
etlDS.print();
Properties props2 = new Properties();
props.setProperty("bootstrap.servers","node1:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<String>("flink_kafka2",
new SimpleStringSchema(), props2);
etlDS.addSink(kafkaSink);
//TODO 4.execute
env.execute();
//控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
//启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//启动控制台消费者消费数据 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
//启动程序FlinkKafkaConsumer
//观察控制台输出结果
10.4.Redis
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
需求:
从Socket接收实时流数据,做WordCount,并将结果写入到Redis
数据结构使用:
单词:数量 (key-String, value-String)
wcresult: 单词:数量 (key-String, value-Hash)
注意: Redis的Key始终是String, value可以是:String/Hash/List/Set/有序Set
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
* @author tuzuoquan
* @date 2022/4/26 9:36
*/
public class RedisDemo
public static void main(String[] args) throws Exception
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>()
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception
String[] arr = value.split(" ");
for (String word : arr)
out.collect(Tuple2.of(word, 1));
).keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<Tuple2<String, Integer>>(conf,new MyRedisMapper());
result.addSink(redisSink);
//TODO 4.execute
env.execute();
public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>
@Override
public RedisCommandDescription getCommandDescription()
//我们选择的数据结构对应的是 key:String("wcresult"),value:Hash(单词,数量),命令为HSET
return new RedisCommandDescription(RedisCommand.HSET,"wcresult");
@Override
public String getKeyFromData(Tuple2<String, Integer> t)
return t.f0;
@Override
public String getValueFromData(Tuple2<String, Integer> t)
return t.f1.toString();
以上是关于10.Connectors JDBC Kafka Consumer/Source Kafka Producer/Sink Redis的主要内容,如果未能解决你的问题,请参考以下文章