flink sink
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink sink相关的知识,希望对你有一定的参考价值。
【README】
本文记录了flink sink操作,输出目的存储器(中间件)包括
- kafka;
- es;
- db;
- 等等有很多;
- 本文只给出了 sink2kafka的代码;
本文使用的flink为 1.14.4 版本;
本文部分内容参考了 flink 官方文档,如下:
【1】 flink sink2kafka
1)场景:
- 消费上游topic hello0415的数据,并把数据流输出到下游kafka topic hell0416;
- 如,我们在java框架中把数据库日志发送到 topic1 ,然后我想统计执行时间大于3秒的sql,则需要把筛选后的sql 发送到 下游 topic2, 就可以使用sink 来完成;
2)代码
/**
* @Description flink流输出到kafka(下沉)
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月16日
*/
public class SinkTest1_Kafka
public static void main(String[] args) throws Exception
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 设置全局并行度
// 创建flink连接kafka
KafkaSource kafkaSource = KafkaSource.<String>builder()
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(KafkaConsumerProps._INS.getProps())
.setTopics("hello0415")
.setGroupId("flink")
.build();
DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");
// kafka生产者属性
Properties kafkaProducerProps = new Properties();
kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);
kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);
// 把kafka数据流输出到(sink) topic hello0416
KafkaSink<String> sink = KafkaSink.<String>builder()
.setKafkaProducerConfig(kafkaProducerProps)
.setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("hello0416")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 添加到sink
kafkaStream.sinkTo(sink);
// 打印stream
kafkaStream.print("kafkaStream");
// 执行
env.execute("kafkaSinkJob");
效果:
【补充】
kafka 消费者属性
public enum KafkaConsumerProps
_INS;
/* 1.创建kafka生产者的配置信息 */
Properties props = new Properties();
private KafkaConsumerProps()
/*2.指定连接的kafka集群, broker-list */
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
public Properties getProps()
return props;
以上是关于flink sink的主要内容,如果未能解决你的问题,请参考以下文章
FLINK实例(131):FLINK-SQL应用场景(22) CONNECTORS(22) sourcesink 原理
flink sql 知其所以然| sourcesink 原理