FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis相关的知识,希望对你有一定的参考价值。

需求

需求为:我们需要把如下文本,按照“,“分隔。然后把每行第二列作为主键,第三列作为value塞到Redis里。

而flink要用到Redis必须要使用以下pom依赖

 <!-- redis特性-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
    <exclusions>
       <exclusion>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-log4j12</artifactId>
       </exclusion>
    </exclusions>
</dependency>

在使用flink redis connector时,一定要记得把slf4j-log4j12给排除掉,否则这个包日后会和我们在项目工程内的log4j2产生冲突而导致输不出日志。

完整的项目pom配置

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aldi.flink.demo</groupId>
    <artifactId>FlinkKafka2Redis</artifactId>
    <version>0.0.1</version>
    <name>FlinkKafka2Redis</name>
    <properties>
        <jcuda.version>10.0.0</jcuda.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version> 
        <curator-framework.version>4.0.1</curator-framework.version>
        <curator-recipes.version>2.8.0</curator-recipes.version>
        <!-- druid.version>1.1.20</druid.version -->
        <druid.version>1.2.6</druid.version>
        <guava.version>27.0.1-jre</guava.version>
        <fastjson.version>1.2.59</fastjson.version>  
        <jackson.version>2.11.1</jackson.version>
        <gson.version>2.8.6</gson.version>
        <groovy.version>2.5.8</groovy.version>
        <hutool-crypto.version>5.0.0</hutool-crypto.version>
        <maven.compiler.source>$java.version</maven.compiler.source>
        <maven.compiler.target>$java.version</maven.compiler.target>
        <compiler.plugin.version>3.8.1</compiler.plugin.version>
        <war.plugin.version>3.2.3</war.plugin.version>
        <jar.plugin.version>3.1.1</jar.plugin.version>      
        <log4j2.version>2.17.1</log4j2.version>
        <redis-common.version>0.0.1</redis-common.version>
        <commons-lang3.version>3.4</commons-lang3.version>
        <flink.version>1.15.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>2.17.1</version>
        </dependency>
 
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>
        <!--
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>$jackson.version</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>$jackson.version</version>
        </dependency>
        -->
        <!-- core dependencies -->
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>$flink.version</version>
        </dependency>
 
        <!-- test dependencies -->
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>$flink.version</version>
            <scope>test</scope>
        </dependency>
        <!-- redis特性-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <!-- kafka特性-->
        <!--kafka-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.15.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aldi.flink.demo.SinkToRedis</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

主程序

SinkToRedis.java

/**
 * 系统项目名称 com.aldi.flink.demo SinkToRedis.java
 *
 * 2022年9月23日-上午11:43:33 2022XX公司-版权所有
 *
 */
package com.aldi.flink.demo;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
 
 
/**
 *
 * SinkToRedis
 *
 *
 * 2022年9月23日 上午11:43:33
 *
 * @version 1.0.0
 *
 */
public class SinkToRedis 
    public static void main(String[] args) throws Exception 
        // 1、创建执行环境
        // ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据
        String path = "/Users/chrishu123126.com/opt/datawarehouse/sample/sinktoredis/data.txt";
        DataStreamSource<String> txtSink = env.readTextFile(path);
 
        // String outputPath = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/output";
        DataStream<Tuple2<String, String>> data = txtSink.flatMap(new LineSplitter());
 
        FlinkJedisPoolConfig conf =
            new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
        data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
        env.execute();
    

用到的LineSplitter和SinkRedisMapper辅助类

LineSplitter.java

package com.aldi.flink.demo;
 
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
 
/**
 *
 * LineSplitter
 *
 *
 * 2022年9月23日 下午4:31:36
 *
 * @version 1.0.0
 *
 */
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, String>> 
    public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception 
        String[] tokens = s.toLowerCase().split(",");
        if (tokens != null && tokens.length > 0) 
            collector.collect(new Tuple2<String, String>(tokens[1], tokens[2]));
            System.out.println(">>>>>>key->" + tokens[1] + " value->" + tokens[2]+"  into redis...");
        
    

SinkRedisMapper.java

/**
 * 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
 *
 * 2022年9月23日-下午3:52:49 2022XX公司-版权所有
 *
 */
package com.aldi.flink.demo;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 
/**
 *
 * SinkRedisMapper
 *
 *
 * 2022年9月23日 下午3:52:49
 *
 * @version 1.0.0
 *
 */
public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>> 
    @Override
    public RedisCommandDescription getCommandDescription() 
        // hset
        return new RedisCommandDescription(RedisCommand.HSET, "flink");
    
 
    @Override
    public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f0;
    
 
    @Override
    public String getValueFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f1.toString();
    

同样,打包上传到我们的flink的webui(也可以在eclipse里直接运行)

我们在上一篇:FLINK 基于1.15.2的Java开发-入门中详细给出了flink的maven打包。flink官方主张使用maven-shade-plugin打包, 主要是申明了如下build版段

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aldi.flink.demo.hello.WordCount</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

其中mainClass很重要,必须要申请成你可以运行flink的那个main类。

项目运行

最后得到Redis里如下值

 下面给出data.txt文件内容:

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

以上是关于FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis的主要内容,如果未能解决你的问题,请参考以下文章

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-自定义Source端

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-Sink到MYSQL的两种姿势