Flink当中使用kafka Consumer

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink当中使用kafka Consumer相关的知识,希望对你有一定的参考价值。

Flink与kafka结合使用的三个优势:

第一:kafka可以作为Flink的Source和Sink来使用;
第二:Kafka的Partition机制和Flink的并行度机制可以深度结合,从而提高数据的读取率和写入效率
第三:当Flink任务失败的时候,可以通过设置kafka的Offset来恢复应用从而重新消费数据

添加依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

含义:kafka的版本是0.10,scala的版本是2.11,如果你用的java语言编写,scala版本无所谓

1、kafka常用的API

2、Kafka Consumer的消费模式设置

1、setStartFromGroupOffsets
这是默认的消费策略,会从上次消费者保存的offset处继续开始消费,需要说明的是:如果任务是第一次启动,读取不到上次的offset信息,则会根据参数outo.offset
.reset的数值来消费数据
2、setStartFromEarliest
从最初的数据开始消费,忽略存储的Offset信息
3、setStartFromLatest
从最新的数据进行消费,忽略存储的Offset信息
4、setStartFromSpecificOffsets(Map<KafkaTopicPartition,Long> specificStartupOffsets)
从指定分区的具体位置开始消费 

3、Kafka consumer的容错机制
当Flink checkPoint机制开启的时候,Kafka Consumer会定期把kafka的Offset信息以及其他的Operator的状态信息保存起来,进行快照存储。当Job失败重启的时候,Flink会从最近一次的CheckPoint中回复数据,重新消费Kafka当中的数据

为了使用支持容错的kafka Consumer,Kafka Consumer需要开启Checkpoint机制,可以通过下面的代码进行设置:
默认情况下,Flink的checkPiont功能是disable的,想要使用的时候需要先开启:

env.enableCheckpointing(1000);

完整参考代码:

//开启flink的checkpoint功能:每隔1000ms启动一个检查点(设置checkpoint的声明周期)
env.enableCheckpointing(1000);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckPointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoint(1);
//程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().enableExternazedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方)
env.setStateBackend(new FsStateBackend("hdfs://s101:9000/flink/checkpoints"));

4、Kafka Consumer Offset自动提交行为的控制(到底谁决定了offset的自动提交)
kafka Consumer Offset自动提交的配置需要根据Job是否开启Checkpoint来区分
a、Checkpoint关闭时,可以通过下面2个参数进行配置:

enable.auto.commit
auto.commit.interval.ms

b、CheckPoint开启时,只有当执行Checkpoint的时候才会自动提交Offset,这样就保证了kafka的offset和checkpoint的状态偏移量保持一致,此时Kafka中的自动提交Offset机制就会被忽略。
这里说明一下:我的理解是Offset的存储在2个地方,Kafka存储一份,CheckPoint的时候在HDFS存储一份,一旦开启checkpoint机制,这俩个提交offset的时刻就会同步。

小列子:

package Flink_Kafka;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

//最简单的一个Kafka的代码
public class FlinkKafkaConsumer 
   public static void main(String[] args) throws Exception 
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       Properties consumerProperties = new Properties();
       //设置服务
       consumerProperties.setProperty("bootstrap.servers","s101:9092");
       //设置消费者组
       consumerProperties.setProperty("group.id","con56");
       //自动提交偏移量
       consumerProperties.setProperty("enable.auto.commit","true");
       consumerProperties.setProperty("auto.commit.interval.ms","2000");

       DataStream<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<>("browse_topic",new SimpleStringSchema(),consumerProperties));
       dataStreamSource.print();
       env.execute("FlinkKafkaConsumer");
   

以上是关于Flink当中使用kafka Consumer的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-Consumer高CPU问题分析

flink源码分析之kafka consumer的执行流程

flink kafka consumer with avro schema. handling null

如何修复:java.lang.OutOfMemoryError: Direct buffer memory in flink kafka consumer

Kafka Consumer 获取键值对

kafka consumer重新连接后如何获取当前最新数据