Flink 与rabbitmq集成 并开启checkpoint

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 与rabbitmq集成 并开启checkpoint相关的知识,希望对你有一定的参考价值。

参考技术A 如果不开启checkpoint机制,flink job 在运行时如果遇到异常整个job就会停止。

如果开启了checkpoint机制,就会根据恢复点进行数据重试,这个是一个非常复杂的机制,需要单独的文章进行解析。

所以开启checkpoint是必然要做的配置。

在与rabbitmq集成时有个点必须要注意,就是mq 发送消息时候,必须要带上CorrelationId。我们看一下flink的官方文档。

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointInterval(10000);

DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig,

" kgraph",true,new SimpleStringSchema()) );

dataStreamSource.print();

上面是构造RMQSource(…)的参数,如下

queueName: The RabbitMQ queue name.

usesCorrelationId : true when correlation ids should be used, false otherwise (default is false).

deserializationScehma : Deserialization schema to turn messages into Java objects.

根据参数不同,有如下三种模式

Exactly-once (when checkpointed) with RabbitMQ transactions and messages with unique correlation IDs.

At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism (correlation id is not set).

No strong delivery guarantees ( without checkpointing) with RabbitMQ auto-commit mode.

那么 开启 exactly-once 确保消费一次的特性,就必须在传递 mq消息的时候带上 correlationId。

correlation Id 是 mq 消息的一个基本属性,可以用来标识消息的唯一id,通常是mq实现rpc调用时使用,flink 利用其唯一id的特性来做 exactly once的消费。所以在发送mq消息时 加上 correlation_id 的properties 就不会有问题了。

如果使用 spring 结合 rabbitmq 作为客户端,需要对 correlationId 做一个特别的处理

就是需要自己手动设置correaltionId, rabbittemplate 没有自动的封装这个属性, convertAndSend这个方法非常让人confuse,

里面支持传入correlationData字段,但是这个是写入到消息头的,而不是correlation_id,flink那边永远无法读取到。

public void sendMsg(KgraphUpdateMessage kgraphUpdateMessage)



CorrelationData correlationId =new CorrelationData(UUID.randomUUID().toString());

ObjectMapper jsonReader =new ObjectMapper();

try

   

MessageProperties properties =new MessageProperties();

properties.setCorrelationId(correlationId.getId().getBytes());

Message message =new Message(jsonReader.writeValueAsBytes(kgraphUpdateMessage), properties);

rabbitTemplate. convertAndSend (KgraphMqConfig.KGRAPH_EXCHANGE, KgraphMqConfig.KGRAPH_TOPIC_EVENT, message,correlationId);

catch (JsonProcessingException e)



e.printStackTrace();



以上是关于Flink 与rabbitmq集成 并开启checkpoint的主要内容,如果未能解决你的问题,请参考以下文章

ldap集成rabbitmq

Spring boot集成RabbitMQ中Exchange与Queue参数详解

Kafka与Flink集成

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink JDBC Connector:Flink 与数据库集成最佳实践

Flink从入门到精通100篇(二十一)-Apache Flink 与 Apache Hive 的集成