flink sink

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink sink相关的知识,希望对你有一定的参考价值。

【README】

本文记录了flink sink操作,输出目的存储器(中间件)包括

  • kafka;
  • es;
  • db;
  • 等等有很多;
  • 本文只给出了 sink2kafka的代码

本文使用的flink为 1.14.4 版本;

本文部分内容参考了 flink 官方文档,如下:

Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/


【1】 flink sink2kafka

1)场景:

  • 消费上游topic hello0415的数据,并把数据流输出到下游kafka topic hell0416;
  • 如,我们在java框架中把数据库日志发送到 topic1 ,然后我想统计执行时间大于3秒的sql,则需要把筛选后的sql 发送到 下游 topic2, 就可以使用sink 来完成;

2)代码

/**
 * @Description flink流输出到kafka(下沉)
 * @author xiao tang
 * @version 1.0.0
 * @createTime 2022年04月16日
 */
public class SinkTest1_Kafka 
    public static void main(String[] args) throws Exception 
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4); // 设置全局并行度
        // 创建flink连接kafka
        KafkaSource kafkaSource = KafkaSource.<String>builder()
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperties(KafkaConsumerProps._INS.getProps())
                .setTopics("hello0415")
                .setGroupId("flink")
                .build();
        DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

        // kafka生产者属性
        Properties kafkaProducerProps = new Properties();
        kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);
        kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);
        // 把kafka数据流输出到(sink) topic hello0416
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setKafkaProducerConfig(kafkaProducerProps)
                .setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("hello0416")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        // 添加到sink
        kafkaStream.sinkTo(sink);
        // 打印stream
        kafkaStream.print("kafkaStream");
        // 执行
        env.execute("kafkaSinkJob");
    

效果:


 【补充】

kafka 消费者属性

public enum KafkaConsumerProps 
    _INS;

    /* 1.创建kafka生产者的配置信息 */
    Properties props = new Properties();

    private KafkaConsumerProps() 
        /*2.指定连接的kafka集群, broker-list */
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    

    public Properties getProps() 
        return props;
    

以上是关于flink sink的主要内容,如果未能解决你的问题,请参考以下文章

Flink的sink实战之一:初探

FLINK实例(131):FLINK-SQL应用场景(22) CONNECTORS(22) sourcesink 原理

记一次 Flink 反压问题排查过程

flink sql 知其所以然| sourcesink 原理

2021年大数据Flink(四十二):​​​​​​​BroadcastState

flink - sink - hive