FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis相关的知识,希望对你有一定的参考价值。
需求
需求为:我们需要把如下文本,按照“,“分隔。然后把每行第二列作为主键,第三列作为value塞到Redis里。
而flink要用到Redis必须要使用以下pom依赖
<!-- redis特性-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
在使用flink redis connector时,一定要记得把slf4j-log4j12给排除掉,否则这个包日后会和我们在项目工程内的log4j2产生冲突而导致输不出日志。
完整的项目pom配置
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aldi.flink.demo</groupId>
<artifactId>FlinkKafka2Redis</artifactId>
<version>0.0.1</version>
<name>FlinkKafka2Redis</name>
<properties>
<jcuda.version>10.0.0</jcuda.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<curator-framework.version>4.0.1</curator-framework.version>
<curator-recipes.version>2.8.0</curator-recipes.version>
<!-- druid.version>1.1.20</druid.version -->
<druid.version>1.2.6</druid.version>
<guava.version>27.0.1-jre</guava.version>
<fastjson.version>1.2.59</fastjson.version>
<jackson.version>2.11.1</jackson.version>
<gson.version>2.8.6</gson.version>
<groovy.version>2.5.8</groovy.version>
<hutool-crypto.version>5.0.0</hutool-crypto.version>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<compiler.plugin.version>3.8.1</compiler.plugin.version>
<war.plugin.version>3.2.3</war.plugin.version>
<jar.plugin.version>3.1.1</jar.plugin.version>
<log4j2.version>2.17.1</log4j2.version>
<redis-common.version>0.0.1</redis-common.version>
<commons-lang3.version>3.4</commons-lang3.version>
<flink.version>1.15.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<!--
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>$jackson.version</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>$jackson.version</version>
</dependency>
-->
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>$flink.version</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>$flink.version</version>
<scope>test</scope>
</dependency>
<!-- redis特性-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- kafka特性-->
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aldi.flink.demo.SinkToRedis</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
主程序
SinkToRedis.java
/**
* 系统项目名称 com.aldi.flink.demo SinkToRedis.java
*
* 2022年9月23日-上午11:43:33 2022XX公司-版权所有
*
*/
package com.aldi.flink.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
/**
*
* SinkToRedis
*
*
* 2022年9月23日 上午11:43:33
*
* @version 1.0.0
*
*/
public class SinkToRedis
public static void main(String[] args) throws Exception
// 1、创建执行环境
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、读取数据
String path = "/Users/chrishu123126.com/opt/datawarehouse/sample/sinktoredis/data.txt";
DataStreamSource<String> txtSink = env.readTextFile(path);
// String outputPath = "/Users/chrishu123126.com/opt/datawarehouse/sample/wordcount/output";
DataStream<Tuple2<String, String>> data = txtSink.flatMap(new LineSplitter());
FlinkJedisPoolConfig conf =
new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
data.addSink(new RedisSink<>(conf, new SinkRedisMapper()));
env.execute();
用到的LineSplitter和SinkRedisMapper辅助类
LineSplitter.java
package com.aldi.flink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
*
* LineSplitter
*
*
* 2022年9月23日 下午4:31:36
*
* @version 1.0.0
*
*/
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, String>>
public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception
String[] tokens = s.toLowerCase().split(",");
if (tokens != null && tokens.length > 0)
collector.collect(new Tuple2<String, String>(tokens[1], tokens[2]));
System.out.println(">>>>>>key->" + tokens[1] + " value->" + tokens[2]+" into redis...");
SinkRedisMapper.java
/**
* 系统项目名称 com.aldi.flink.demo SinkRedisMapper.java
*
* 2022年9月23日-下午3:52:49 2022XX公司-版权所有
*
*/
package com.aldi.flink.demo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
*
* SinkRedisMapper
*
*
* 2022年9月23日 下午3:52:49
*
* @version 1.0.0
*
*/
public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>>
@Override
public RedisCommandDescription getCommandDescription()
// hset
return new RedisCommandDescription(RedisCommand.HSET, "flink");
@Override
public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2)
return stringIntegerTuple2.f0;
@Override
public String getValueFromData(Tuple2<String, String> stringIntegerTuple2)
return stringIntegerTuple2.f1.toString();
同样,打包上传到我们的flink的webui(也可以在eclipse里直接运行)
我们在上一篇:FLINK 基于1.15.2的Java开发-入门中详细给出了flink的maven打包。flink官方主张使用maven-shade-plugin打包, 主要是申明了如下build版段
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aldi.flink.demo.hello.WordCount</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
其中mainClass很重要,必须要申请成你可以运行flink的那个main类。
项目运行
最后得到Redis里如下值
下面给出data.txt文件内容:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
以上是关于FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis的主要内容,如果未能解决你的问题,请参考以下文章
FLINK 基于1.15.2的Java开发-自定义Source端
FLINK 基于1.15.2的Java开发-在flink内如何使用log4j
FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境