FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis相关的知识,希望对你有一定的参考价值。

需求

在kafka发送plainText消息,以逗号分割。逗号前的作为key,逗号后的作为value。

然后把kafka发过来的东西以Redis的HashMap结构存入flink这个主Key中去。

进入开发

为了解决这个问题,我们需要在前两个的范围内解决掉以下三个问题:

  1.  flink如何接入kafka
  2. flink如何不作统计(前两课我们用的是烂网上的wordcount例子)只接入流和折分
  3. flink如何sink到Redis

flink如何接入kafka

pom.xml

<!-- redis特性-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
 
<!-- kafka特性-->
<!--kafka-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.2</version>
</dependency>

kafka在flink内核心API的用法

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
          .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
          .setValueOnlyDeserializer(new SimpleStringSchema()).build();
      DataStream<String> testDataStreamSource =
          env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

以上代码相当的简单。

有一处需要注意,如果我把以上代码改成了如下那么它的效果就是每次这个flink应用重启,都会把kafka从test这个topic发过来的第一条消息全部重新读一遍,区别就在于这个“OffsetsInitializer.earliest()”。我们取的是最近一条kafka消息,因此我们才用了:OffsetsInitializer.latest()。

KafkaSource<String> source =
  KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
  .setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
  .setValueOnlyDeserializer(new SimpleStringSchema()).build();

flink如何不作统计(前两几篇我们用的是烂网上的wordcount例子)只接入流和折分数据

DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());

我们接着来看LineSplitter这个类

LineSplitter.java

* 系统项目名称 com.aldi.flink.demo LineSplitter.java
 *
 * 2022年9月23日-下午4:31:36 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
/**
 *
 * LineSplitter
 *
 *
 * 2022年9月23日 下午4:31:36
 *
 * @version 1.0.0
 *
 */
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, String>> 
    public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception 
        String[] tokens = s.toLowerCase().split(",");
        if (tokens != null && tokens.length > 0) 
            collector.collect(new Tuple2<String, String>(tokens[0], tokens[1]));
            //System.out.println(">>>>>>key->" + tokens[0] + " value->" + tokens[1]+"  into redis...");
        
    

非常简单,只读入流核心起作用的就是这个collector.collect,看,它按照逗号对读入的流进行折分。

flink如何sink到Redis

DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
FlinkJedisPoolConfig conf =
            new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
        data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
        env.execute();

我们来看这边的SinkRedisMapper这个类

SinkRedisMapper.java

 * 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
 * 
 * 2022年9月23日-下午3:52:49 2022XX公司-版权所有
 * 
 */
package com.aldi.com.cnflink.demo;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * 
 * SinkRedisMapper
 * 
 * 
 * 2022年9月23日 下午3:52:49
 * 
 * @version 1.0.0
 * 
 */
public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>> 
    @Override
    public RedisCommandDescription getCommandDescription() 
        // hset
        return new RedisCommandDescription(RedisCommand.HSET, "flink");
    

    @Override
    public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f0;
    

    @Override
    public String getValueFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f1.toString();
    

它的作用就是使用Redis HashMap结构,把读入的流Sink到Redis里以flink这个key开头的内容中去。

所以整个SimpleKafka内容如下

完整SimpleKafka.java

* 系统项目名称 com.aldi.com.cnflink.demo SimpleKafka.java
 *
 * 2022年9月26日-下午12:28:31 2022XX公司-版权所有
 *
 */
package com.aldi.com.cnflink.demo;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
 
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
 
/**
 *
 * SimpleKafka
 *
 *
 * 2022年9月26日 下午12:28:31
 *
 * @version 1.0.0
 *
 */
public class SimpleKafka 
 
    /**
     * main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
     *
     * @param args
     *            void
     * @exception
     * @since 1.0.0
     */
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        /**
         * OffsetsInitializer.earliest()会导至每次启动这个应用进行全量刷新kafka最早一条消息生成起至今
         */
        // KafkaSource<String> source =
        // KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("test")
        // .setGroupId("test01").setStartingOffsets(OffsetsInitializer.earliest())
        // .setValueOnlyDeserializer(new SimpleStringSchema()).build();
 
        /**
         * 因此以下使用OffsetsInitializer.latest(),这样只消息最近一条消息不会每次启动进行全量刷新
         */
        KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092")
            .setTopics("test").setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()).build();
        DataStream<String> testDataStreamSource =
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 
        DataStream<Tuple2<String, String>> data = testDataStreamSource.flatMap(new LineSplitter());
        FlinkJedisPoolConfig conf =
            new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
        data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
        env.execute();
    
 

项目运行

第一步:把zk运行起来

第二步:把kafka运行起来

第三步:在kafka上创建一条command窗口的producer

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

第四步:把SimpleKafka运行起来

 第五步:在kafka的producer内输入点东西如下:

然后在eclipse工程中我们看到了这样的内容

我们来看我们的Redis里 

 

看,sink成功。 

 

以上是关于FLINK 基于1.15.2的Java开发-连接kafka并把内容sink到redis的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-如何使用外部配置文件

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-自定义Source端

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜