FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FLINK 基于1.15.2的Java开发-自定义Redis Sink用于连接 Redis Sentinel模式相关的知识,希望对你有一定的参考价值。

 前言

我们生产用的Redis一般都为sentinel或者为cluster模式。因此如果只是简单的在代码里用flink自带的redis sink,它根本不能用在我们的生产环境。

同时,flink自带的jedis连接源码来看:

public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) 
        Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
 
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
        genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
        genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
 
        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
            jedisPoolConfig.getDatabase());
        return new RedisContainer(jedisPool);
    

它根本没有实现很重要的生产级别使用的:

  • 空闲连接检测;
  • 空闲连接回收;
  • tcp keep alive;

这些高级特性。

同时我们通过上一篇:FLINK 基于1.15.2的Java开发-读文件并把内容 sink到redis_TGITCIC的博客-CSDN博客中得知,flink使用自带redis sink是如下用法:

public class SinkRedisMapper implements RedisMapper<Tuple2<String, String>> 
    @Override
    public RedisCommandDescription getCommandDescription() 
        // hset
        return new RedisCommandDescription(RedisCommand.HSET, "flink");
    
 
    @Override
    public String getKeyFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f0;
    
 
    @Override
    public String getValueFromData(Tuple2<String, String> stringIntegerTuple2) 
        return stringIntegerTuple2.f1.toString();
    

如果此时我们的需求为:

  1. 每次先删除原有redis内的key
  2. 再插入新的timewindows中的内容

亦或者进一步:我们要在redis内生成一个score的数据结构呢?

只有满足了这些需求,我们的代码才具备上生产的条件。这就是为什么因此我们需要自定义自己的Sink组件,然后我们才可以用于实现:

  • 连接生产用的redis sentinel模式;
  • 可以自定义自己的redis业务逻辑;

需求

业务需求

 每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:

"productId":"a1004","status":"101"
"productId":"a1002","status":"101"
"productId":"a1001","status":"101"
"productId":"a1001","status":"101"

假设每过60s有上述内容被发送过来,那么flink应该会形成以下这样的一个排行榜在Redis内并且随着kafka传送过来的数据变化面实时变化着这个排行榜:

技术需求

  1. 需要使用log4j2;
  2. 各种中间件连接信息全部位于外置的config.properties文件内;
  3. 使用jedis connection pool连接redis sentinel,同时考虑到团队开发便利性而每个开发们本机的开发环境不可能人人有搭建redis sentinel。那么配置里指定sentinel连接模式,如果该连接为sentinel模式就自动使用sentinel模式;如果配置里指定的是standalone模式那么这个连接可以自动使用standalone模式去做连接,用以“自适应”不同环境;
  4. kafka数据源推送过来为json格式;
  5. 每次在redis内生成排行榜时必须清空之前己有排行榜以保持每次在redis内的排行榜为最新内容;

技术分析

此处,我以架构常用的技术分析手段来分解这个需求。

化大为小、化小为零的技术分析手段

这个需求看似很复杂,并且实际你只有把上述的业务需求、技术需求都实现了,我们才可以说flink你算是入门了、可以开发生产需求了而不只是会一个WordCount就说自己入门了,世界上哪有这么简单的事!

但实际我们也看到了,在之前我们经历过多达10多天的层层学习才能到达这一步。因此需要到达这一步我们需要把整个技术点折成若干可以推进的小步骤,而不要急着眉毛胡子一把抓。

从之前若干天到今日我们已经具备了以下技能:

  1. flink是什么、可以干什么;
  2. flink的架构中的定位;
  3. kafka单机、集群的搭建;
  4. flink连接kafka;
  5. flink sink到redis;
  6. flink读取外部配置文件;
  7. flink内使用log4j;
  8. flink的滑动窗口的使用;
  9. flink中的反序列化使用技巧;

AS-IS 和 TO-BE分析方法论

我们把我们手上到今天为止已经具备的东西全部列出,再来对比这个例子所需要的技术,我们为此制作了两个线轴。

一个线轴叫AS-IS、一个线轴叫TO-BE。这是TOGAF企业架构中常用的一种设计手法。

  1. AS-IS:现在的情况;
  2. TO-BE:要到达到的目标的情况;

根据图文的大小,我们一般会把AS-IS和TO-BE放置于一张PPT里要么左右开、要么上下开。然后分别用两种颜色加以区分AS-IS和TO-BE。

这样在视觉上给人以强烈的“冲击”,让人一看给人予一种。。。“哦。。。原来区别在于这些、这些、那些点”的感觉。

这是因为我们的大脑不可能一步到达理性、逻辑思维的地步的,往往理性思维一定是基于人们的感性认识上。那么在架构、技术上的感性认识就是:眼睛。

这就是AS-IS和TO-BE分析手法的作用。它经常会运用在我们用以说服业务型领导或者自我梳理思路时用的一种方法论。

 自顶向下的分析、自下向上的知识积累

有了上述的TO-BE和AS-IS分析后,我们知道我们现在要到达我们的目标还缺少哪个环节了。然后我们使用“需求分析自顶向下”而要达到这个需求所要经历的知识积累是一个“自下而上”的过程。

我们看TO-BE这个图,我们按照:最左边的对应于知识积累过程的“最下”面、最右边的对应于知识积累过程的“最上面”。

然后我们可以发觉,越靠近左边(下面)的知识点越接近于我们现在手上所具备的能力。因此我们学习知识必须要:横向上拆分知识点而在纵向上找最接近于你当前能力水平的那个知识点,只有这样才不会“眉毛胡子一把抓”或者是“东西做出来了但是大量复制网上别人的代码但是到了自己的实际生产环境运行一段时间后不是这边有问题就是那边有问题”。

我们需要的是一步一个脚印踏实的奔向我们的目标。这就是对技术人员的要求即:知其然更要知其所以然。

Flink中自定义Redis Sink

回到我们的原文,现在我们就把Flink里如何自定义Redis Sink给搞定

public class MyRedisSink extends RichSinkFunction<Tuple2<String, Integer>> 

这个类因为是extends的RichSinkFunction。在此我们需要额外“覆盖”三个方法,它们分别是:

  1. public void open(Configuration config) },这个方法 会在flink一开始启动时被执行一次;
  2. public void invoke(Tuple2<String, Integer> value, Context context) throws Exception },这个方法会在每一次flink内调用sink输出时被调用一次;
  3. public void close() throws Exception },这个方法会在flink以提交方式提交到flink集群内运行时的“stop”,或者是“容器”、“进程”被销毁时执行一次;

解决了这个问题后后面一步就更好办了,即:连接redis sentinel。

在自定义Redis Sink中连接Redis Sentinel

此处我们需要

  • 在open()方法中使用jedis connection pool来建立redis sentinel方式,然后设定一个全局的Jedis对象;
  • 在每次invoke()方法中执行我们自定义的先清redis再写redis的业务逻辑,这个方法中不要忘记放置一个finally块以用于每次使用完了当前连接把它close掉以避免造成工程项目资源泄漏。
  • 在close()方法中对jedis connection pool进行“销毁”;

来看jedis connection pool的使用

自定义的jedis connection pool

以下这个connection pool会通过config.properties中的以下这2行的注释或者是放开来决定当前走的是redis单实例(standalone)模式还是哨兵(sentinel)模式。

#redis config

#redis sentinel connection
redis.host=192.168.0.106:27001,192.168.0.106:27002,192.168.0.106:27003
redis.sentinel.master=master1

#redis standalone connection 
#redis.host=192.168.0.106:7002

#redis.host=localhost:7003
redis.password=111111
jedis.pool.min-idle=5
jedis.pool.max-active=25
jedis.pool.max-idle=5
jedis.pool.max-wait=-1
jedis.pool.minEvictableIdleTimeMillis=50000
jedis.pool.timeBetweenEvictionRunsMillis=48000
jedis.pool.numTestsPerEvictionRun=-1;
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=5000
redis.selected.database=0
#kafka config
kafka.host=192.168.0.102
kafka.port=9092
kafka.bootstrapservers=192.168.0.102:9092
kafka.topic=test

它是这么工作的。

​ 千万不要忘了写上用于redis sentinel连接用的sentinel.master。

自定义的JedisConfig.java

package com.aldi.cn.flink.demo.util;

import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.*;

import java.io.Closeable;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

/**
 * Jedis配置实例封装类(兼容单节点连接池和sentinel连接池)
 *
 * @author mk
 * @createDate 2021-02-08
 * @since 2.x
 */
public class JedisConfig 

	// private static final Logger logger =
	// LoggerFactory.getLogger(JedisConfig.class);
	private final static Logger logger = LoggerFactory.getLogger(JedisConfig.class);
	private static volatile JedisConfig redisConfig;
	private Object jedisPool;
	// 当前模式:单例,集群
	private boolean singleton;

	// jedis连接池 - 废弃by mk
	// private JedisPool jedisPool;

	// jeids集群 废弃by mk
	// private JedisCluster jedisCluster;

	// 命名空间,隔离不同环境的redis key
	// 默认获取当前环境的命名空间
	// 值为forbidden时,表示禁用隔离数据功能
	private String namespace;

	private JedisConfig(Configuration config) 
		Properties redisProp = new Properties();
		redisProp.setProperty("redis.host",
				config.getString(ConfigOptions.key("redis.host").stringType().noDefaultValue()));
		redisProp.setProperty("redis.sentinel.master",
				config.getString(ConfigOptions.key("redis.sentinel.master").stringType().noDefaultValue()));
		redisProp.setProperty("redis.password",
				config.getString(ConfigOptions.key("redis.password").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.min-idle",
				config.getString(ConfigOptions.key("jedis.pool.min-idle").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-active",
				config.getString(ConfigOptions.key("jedis.pool.max-active").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-idle",
				config.getString(ConfigOptions.key("jedis.pool.max-idle").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-wait",
				config.getString(ConfigOptions.key("jedis.pool.max-wait").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.timeBetweenEvictionRunsMillis", config.getString(
				ConfigOptions.key("jedis.pool.timeBetweenEvictionRunsMillis").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.testOnBorrow",
				config.getString(ConfigOptions.key("jedis.pool.testOnBorrow").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.testWhileIdle",
				config.getString(ConfigOptions.key("jedis.pool.testWhileIdle").stringType().noDefaultValue()));
		redisProp.setProperty("connection.timeout",
				config.getString(ConfigOptions.key("connection.timeout").stringType().noDefaultValue()));
		redisProp.setProperty("redis.selected.database",
				config.getString(ConfigOptions.key("redis.selected.database").stringType().noDefaultValue()));

		String hostConf = config.getString(ConfigOptions.key("redis.host").stringType().noDefaultValue());
		String sentinelMaster = config
				.getString(ConfigOptions.key("redis.sentinel.master").stringType().defaultValue(""));
		this.singleton = !hostConf.contains(",");
		logger.info(">>>>>>singleton->", singleton);
		if (singleton) 
			logger.info(">>>>>>init single connection");
			initJedisSinglePool(redisProp);
		 else 
			logger.info(">>>>>>init sentinel connection");
			initJedisSentinelPool(redisProp);
		
	

	// enhanced JedisPoolConfig by mk
	private JedisPoolConfig getJedisConnPoolConfig(Properties redisProp) 
		// int timeout = Integer.valueOf(redisProp.getProperty("connection.timeout",
		// "1000"));
		int maxActive = Integer.valueOf(redisProp.getProperty("jedis.pool.max-active", "5"));
		int maxIdle = Integer.valueOf(redisProp.getProperty("jedis.pool.max-idle", "5"));
		long maxWait = Integer.valueOf(redisProp.getProperty("jedis.pool.max-wait", "-1"));
		int minIdle = Integer.valueOf(redisProp.getProperty("jedis.pool.min-idle", "5"));
		long timeBetweenEvictionRunsMillis = Long
				.valueOf(redisProp.getProperty("jedis.pool.timeBetweenEvictionRunsMillis", "5000"));
		long minEvictableIdleTimeMillis = Long
				.valueOf(redisProp.getProperty("jedis.pool.minEvictableIdleTimeMillis", "5000"));
		int numTestsPerEvictionRun = Integer
				.valueOf(redisProp.getProperty("jedis.pool.numTestsPerEvictionRun", "5000"));

		boolean testOnBorrow = Boolean.valueOf(redisProp.getProperty("jedis.pool.testOnBorrow", "false"));
		boolean testWhileIdle = Boolean.valueOf(redisProp.getProperty("jedis.pool.testWhileIdle", "false"));
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxTotal(maxActive);
		logger.info(">>>>>>[enhanced jedis conn]setMaxTotal->", maxActive);
		config.setMaxWaitMillis(maxWait);
		logger.info(">>>>>>[enhanced jedis conn]setMaxWaitMillis->", maxWait);
		config.setMaxIdle(maxIdle);
		logger.info(">>>>>>[enhanced jedis conn]setMaxIdle->", maxIdle);
		config.setMinIdle(minIdle);
		logger.info(">>>>>>[enhanced jedis conn]setMinIdle->", minIdle);
		config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
		logger.info(">>>>>>[enhanced jedis conn]setTimeBetweenEvictionRunsMillis->", timeBetweenEvictionRunsMillis);
		config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
		logger.info(">>>>>>[enhanced jedis conn]setMinEvictableIdleTimeMillis->", minEvictableIdleTimeMillis);
		config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
		logger.info(">>>>>>[enhanced jedis conn]setNumTestsPerEvictionRun->", numTestsPerEvictionRun);

		config.setTestOnBorrow(testOnBorrow);
		logger.info(">>>>>>[enhanced jedis conn]setTestOnBorrow->", testOnBorrow);
		config.setTestWhileIdle(testWhileIdle);
		logger.info(">>>>>>[enhanced jedis conn]setTestWhileIdle->", testWhileIdle);
		return config;
	

	private void initJedisSinglePool(Properties redisProp) 
		JedisPool pool = null;
		String[] hostConf = redisProp.getProperty("redis.host").split(":");
		logger.info(">>>>>>set single redis pool connection");
		pool = new JedisPool(getJedisConnPoolConfig(redisProp), hostConf[0], Integer.valueOf(hostConf[1]).intValue(),
				Integer.valueOf(redisProp.getProperty("connection.timeout", "5000")),
				redisProp.getProperty("redis.password"),
				Integer.valueOf(redisProp.getProperty("redis.selected.database", "0")));
		this.jedisPool = pool;
	

	private void initJedisSentinelPool(Properties redisProp) 
		JedisSentinelPool pool = null;
		String[] hostConfList = redisProp.getProperty("redis.host").split(",");
		String[] hostConf;
		Set<String> sentinels = new HashSet<String>();
		for (String hc : hostConfList) 
			hostConf = hc.split(":");
			sentinels.add(new HostAndPort(hostConf[0], Integer.parseInt(hostConf[1])).toString());
			logger.info(">>>>>>node->" + hostConf[0] + ":" + hostConf[1]);
		
		String sentinelMasterName = (String) redisProp.getProperty("redis.sentinel.master");
		logger.info(">>>>>>set sentinel master->", sentinelMasterName);
		pool = new JedisSentinelPool(sentinelMasterName, sentinels, getJedisConnPoolConfig(redisProp),
				Integer.valueOf(redisProp.getProperty("connection.timeout", "5000")),
				redisProp.getProperty("redis.password"),
				Integer.valueOf(redisProp.getProperty("redis.selected.database", "0")));
		this.jedisPool = pool;
	

	public static JedisConfig getInstance(Configuration config) 
		if (redisConfig == null) 
			synchronized (JedisConfig.class) 
				if (redisConfig == null) 
					logger.info(">>>>>>init jedis connection pool connected");
					redisConfig = new JedisConfig(config);
				
			
		
		return redisConfig;
	

	public JedisConn getConn() 
		if (jedisPool instanceof JedisSentinelPool) 
			return new JedisConn(((JedisSentinelPool) jedisPool).getResource(), this.namespace);
		 else 
			return new JedisConn(((JedisPool) jedisPool).getResource(), this.namespace);
		
	

	/**
	 * redis连接封装类,支持单机和集群,支持常规操作,支持分布式锁
	 */
	public class JedisConn implements Closeable 

		private Jedis jedis;

		public JedisConn(Jedis jedis, String namespace) 
			this.jedis = jedis;

		

		public Jedis getJedis() 
			return this.jedis;
		

		@Override
		public void close() 
			try 
				logger.info(">>>>>>redis operation stop, close redis");
				if (jedis != null) 
					jedis.close();
				
			 catch (Exception e) 
			
		

	

 如何在自定义的redis sink中使用JedisConfig

/* 系统项目名称 com.aldi.cn.flink.demo MyRedisSink.java
 *
 * 2022年9月27日-下午3:53:04 2022XX公司-版权所有
 *
 */

package com.aldi.cn.flink.demo;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aldi.cn.flink.demo.util.JedisConfig;

import redis.clients.jedis.Jedis;

/**
 *
 * MyRedisSink
 *
 *
 * 2022年9月27日 下午3:53:04
 *
 * @version 1.0.0
 *
 */
public class MyRedisSink extends RichSinkFunction<Tuple2<String, Integer>> 
	private final static Logger logger = LoggerFactory.getLogger(MyRedisSink.class);

	private transient Jedis jedis;
	private transient JedisConfig.JedisConn jedisConn = null;
	private String keyName = "";

	public MyRedisSink(String keyName) 
		this.keyName = keyName;
	

	@Override
	public void open(Configuration config) 
		logger.info(">>>>>>MyRedisSink -> open");
		ExecutionConfig.GlobalJobParameters parameters = getRuntimeContext().getExecutionConfig()
				.getGlobalJobParameters();
		Configuration globConf = (Configuration) parameters;
		JedisConfig jedisConfig = JedisConfig.getInstance(globConf);
		jedisConn = jedisConfig.getConn();
		jedis = jedisConn.getJedis();
	

	@Override
	public void invoke(Tuple2<String, Integer> value, Context context) throws Exception 
		logger.info(">>>>>>MyRedisSink -> invoke");
		// 保存
		try 
			jedis = jedisConn.getJedis();
			logger.info(">>>>>>del redis key->" + keyName);
			if (jedis.exists(keyName)) 
				jedis.del(this.keyName);
			
			jedis.hset(keyName, value.f0, String.valueOf(value.f1));

		 catch (Exception e) 
			logger.error(">>>>>>invoke redis operation to sink data into redis->" + this.keyName + " error: "
					+ e.getMessage(), e);
		 finally 
			try 
				jedis.close();
			 catch (Exception e) 

			
		
	

	@Override
	public void close() throws Exception 
		logger.info(">>>>>>MyRedisSink -> close");
		try 
			jedis.close();
		 catch (Exception e) 

		
		try 
			jedisConn.close();
		 catch (Exception e) 

		
	

全代码展示

至此,我们已经补全了所有的需要到达我们的最终目标所需要的“链路节点了”。因此我一次性放出全代码

工程结构

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.aldi.flink.demo</groupId>
	<artifactId>ProductJsonTypeStatistics</artifactId>
	<version>0.0.1</version>
	<properties>
		<jcuda.version>10.0.0</jcuda.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>11</java.version>
		<guava.version>27.0.1-jre</guava.version>
		<fastjson.version>1.2.59</fastjson.version>
		<jackson-databind.version>2.11.1</jackson-databind.version>
		<hutool-crypto.version>5.0.0</hutool-crypto.version>
		<maven.compiler.source>$java.version</maven.compiler.source>
		<maven.compiler.target>$java.version</maven.compiler.target>
		<compiler.plugin.version>3.8.1</compiler.plugin.version>
		<war.plugin.version>3.2.3</war.plugin.version>
		<jar.plugin.version>3.1.1</jar.plugin.version>
		<dom4j.version>1.6.1</dom4j.version>
		<log4j2.version>2.17.1</log4j2.version>
		<fastjson.version>1.2.59</fastjson.version>
		<commons-lang3.version>3.4</commons-lang3.version>
		<flink.version>1.15.2</flink.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>$fastjson.version</version>
		</dependency>
		<!-- core dependencies -->

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>$flink.version</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.12</artifactId>
			<version>$flink.version</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.12</artifactId>
			<version>$flink.version</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>$flink.version</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>$flink.version</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>$flink.version</version>
		</dependency>

		<!-- test dependencies -->

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-test-utils</artifactId>
			<version>$flink.version</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-redis_2.11</artifactId>
			<version>1.1.5</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<scope>compile</scope>
			<version>2.17.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<scope>compile</scope>
			<version>2.17.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<scope>compile</scope>
			<version>2.17.1</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>1.2.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.aldi.flink.demo.SinkToRedis</mainClass>
								</transformer>
								<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
									<resource>reference.conf</resource>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

flink运行用主类ProductJsonTypeStatistics.java

/* 系统项目名称 com.aldi.cn.flink.demo ProductJsonTypeStatistics.java
 *
 * 2022年9月27日-上午10:42:14 2022XX公司-版权所有
 *
 */
package com.aldi.cn.flink.demo;
 
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
 
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

import org.apache.flink.api.common.functions.FlatMapFunction;
 
/**
 *
 * ProductJsonTypeStatistics
 *
 *
 * 2022年9月27日 上午10:42:14
 *
 * @version 1.0.0
 *
 */
public class ProductJsonTypeStatistics 
    private final static Logger logger = LoggerFactory.getLogger(ProductBeanJSONDeSerializer.class);
 
    /**
     * main(这里用一句话描述这个方法的作用) (这里描述这个方法适用条件 – 可选)
     *
     * @param args void
     * @exception
     * @since 1.0.0
     */
    // 序列化
    public static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<ProductBean> 
 
        private final String encoding = "UTF8";
        private boolean includeTopic;
        private boolean includeTimestamp;
 
        public ProductBeanJSONDeSerializer(boolean includeTopic, boolean includeTimestamp) 
            this.includeTopic = includeTopic;
            this.includeTimestamp = includeTimestamp;
        
 
        @Override
        public TypeInformation<ProductBean> getProducedType() 
            return TypeInformation.of(ProductBean.class);
        
 
        @Override
        public boolean isEndOfStream(ProductBean nextElement) 
            return false;
        
 
        @Override
        public ProductBean deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception 
            if (consumerRecord != null) 
                try 
                    String value = new String(consumerRecord.value(), encoding);
                    ProductBean product = JSON.parseObject(value, ProductBean.class);
                    return product;
                 catch (Exception e) 
                    logger.error(">>>>>>deserialize failed : " + e.getMessage(), e);
                
            
            return null;
        
    
 
    public static void main(String[] args) throws Exception 
 
        // FlinkJedisPoolConfig conf =
        // new
        // FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(7002).setPassword("111111").build();
        // RedisSink redisSink = new RedisSink<>(conf, new SumReport2RedisMapper());
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool argParas = ParameterTool.fromArgs(args);
        String propertiesFilePath = argParas.get("config_path");
        logger.info(">>>>>>start to load properties from ", propertiesFilePath);
        ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
        logger.info(">>>>>>load from external config_path redis.server=" + paras.get("redis.host"));
        logger.info(">>>>>>load from external config_path redis.password=" + paras.get("redis.password"));
        logger.info(
                ">>>>>>load from external config_path redis.selected.database=" + paras.get("redis.selected.database"));
        logger.info(">>>>>>load from external config_path redis.sentinel.master=" + paras.get("redis.sentinel.master"));
        logger.info(">>>>>>load from external config_path jedis.pool.min=" + paras.get("jedis.pool.min"));
        logger.info(">>>>>>load from external config_path jedis.pool.max-active=" + paras.get("jedis.pool.max-active"));
        logger.info(">>>>>>load from external config_path jedis.pool.max-idle=" + paras.get("jedis.pool.max-idle"));
        logger.info(">>>>>>load from external config_path jedis.pool.max-wait=" + paras.get("jedis.pool.max-wait"));
        logger.info(">>>>>>load from external config_path jedis.pool.timeBetweenEvictionRunsMillis="
                + paras.get("jedis.pool.timeBetweenEvictionRunsMillis"));
        logger.info(">>>>>>load from external config_path jedis.pool.minEvictableIdleTimeMillis="
                + paras.get("jedis.pool.minEvictableIdleTimeMillis"));
        logger.info(">>>>>>load from external config_path jedis.pool.numTestsPerEvictionRun="
                + paras.get("jedis.pool.numTestsPerEvictionRun"));
        logger.info(
                ">>>>>>load from external config_path jedis.pool.testOnBorrow=" + paras.get("jedis.pool.testOnBorrow"));
        logger.info(">>>>>>load from external config_path jedis.pool.testWhileIdle="
                + paras.get("jedis.pool.testWhileIdle"));
        logger.info(">>>>>>load from external config_path connection.timeout=" + paras.get("connection.timeout"));
 
        // ParameterTool configname = ParameterTool.fromPropertiesFile(path);
        // env.getConfig().setGlobalJobParameters(paras);
        Configuration conf = new Configuration();
        conf.setString("redis.host", paras.get("redis.host"));
        conf.setString("redis.password", paras.get("redis.password"));
        conf.setInteger("redis.selected.database", paras.getInt("redis.selected.database"));
        conf.setString("redis.sentinel.master", paras.get("redis.sentinel.master"));
        conf.setString("jedis.pool.min-idle", paras.get("jedis.pool.min-idle"));
        conf.setString("jedis.pool.max-active", paras.get("jedis.pool.max-active"));
        conf.setString("jedis.pool.max-idle", paras.get("jedis.pool.max-idle"));
        conf.setString("jedis.pool.max-wait", paras.get("jedis.pool.max-wait"));
        conf.setString("jedis.pool.timeBetweenEvictionRunsMillis",
                paras.get("jedis.pool.timeBetweenEvictionRunsMillis"));
        
        conf.setString("jedis.pool.timeBetweenEvictionRunsMillis",
                paras.get("jedis.pool.timeBetweenEvictionRunsMillis"));        
        conf.setString("jedis.pool.numTestsPerEvictionRun",
                paras.get("jedis.pool.numTestsPerEvictionRun"));
        
        conf.setString("jedis.pool.testOnBorrow", paras.get("jedis.pool.testOnBorrow"));
        conf.setString("jedis.pool.testWhileIdle", paras.get("jedis.pool.testWhileIdle"));
        conf.setString("connection.timeout", paras.get("connection.timeout"));
        env.getConfig().setGlobalJobParameters(conf);
        KafkaSource<ProductBean> source = KafkaSource.<ProductBean>builder()
                .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
                .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))
                .build();
        DataStream<ProductBean> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        DataStream<Tuple2<String, Integer>> ds = kafkaDS
                .flatMap(new FlatMapFunction<ProductBean, Tuple2<String, Integer>>() 
                    public void flatMap(ProductBean product, Collector<Tuple2<String, Integer>> collector)
                            throws Exception 
                        if (product.getStatus() == 101) 
                            // System.out.println(">>>>>>productId->" + product.getProductId());
                            logger.info(">>>>>>productId->" + product.getProductId());
                            collector.collect(new Tuple2<String, Integer>(product.getProductId(), 1));
                        
                    
                );
        DataStream<Tuple2<String, Integer>> prodCount = ds.keyBy(value -> value.f0)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(60), Time.seconds(5)))
                // key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
                .sum(1);// 将相同的key的元素第二个count值相加
        prodCount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))// (shu1, xx) (shu2,xx)....
                // 所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
                .process(new TopNAllFunction(5)).addSink(new MyRedisSink("flinkdemo:kafka:simplekafka"));// 5代表前5名
 
        // prodCount.addSink(new RedisSink<>(conf, new SumReport2RedisMapper()));
        env.execute();
    
 
    private static class TopNAllFunction
            extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> 
 
        private int topSize = 5;
 
        public TopNAllFunction(int topSize) 
 
            this.topSize = topSize;
        
 
        @Override
        public void process(
 
                ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception 
 
            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() 
 
                        @Override
                        public int compare(Integer y, Integer x) 
                            return (x < y) ? -1 : 1;
                        
 
                    ); // treemap按照key降序排列,相同count值不覆盖
 
            for (Tuple2<String, Integer> element : input) 
                treemap.put(element.f1, element);
                if (treemap.size() > topSize)  // 只保留前面TopN个元素
                    treemap.pollLastEntry();
                
            
            for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) 
                Tuple2<String, Integer> prodInfo = entry.getValue();
                logger.info(">>>>>>prodid->" + prodInfo.f0 + "  购买次数->" + prodInfo.f1);
                out.collect(prodInfo);
            
        
    

kafka传输后反序化的类ProductBean.java

package com.aldi.cn.flink.demo;

import java.io.Serializable;

public class ProductBean implements Serializable 
	private String productId = "";
	private int status = 0;

	public String getProductId() 
		return productId;
	

	public void setProductId(String productId) 
		this.productId = productId;
	

	public int getStatus() 
		return status;
	

	public void setStatus(int status) 
		this.status = status;
	

JedisConfig.java

package com.aldi.cn.flink.demo.util;

import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.*;

import java.io.Closeable;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

/**
 * Jedis配置实例封装类(兼容单节点连接池和sentinel连接池)
 *
 * @author mk
 * @createDate 2021-02-08
 * @since 2.x
 */
public class JedisConfig 

	// private static final Logger logger =
	// LoggerFactory.getLogger(JedisConfig.class);
	private final static Logger logger = LoggerFactory.getLogger(JedisConfig.class);
	private static volatile JedisConfig redisConfig;
	private Object jedisPool;
	// 当前模式:单例,集群
	private boolean singleton;

	// jedis连接池 - 废弃by mk
	// private JedisPool jedisPool;

	// jeids集群 废弃by mk
	// private JedisCluster jedisCluster;

	// 命名空间,隔离不同环境的redis key
	// 默认获取当前环境的命名空间
	// 值为forbidden时,表示禁用隔离数据功能
	private String namespace;

	private JedisConfig(Configuration config) 
		Properties redisProp = new Properties();
		redisProp.setProperty("redis.host",
				config.getString(ConfigOptions.key("redis.host").stringType().noDefaultValue()));
		redisProp.setProperty("redis.sentinel.master",
				config.getString(ConfigOptions.key("redis.sentinel.master").stringType().noDefaultValue()));
		redisProp.setProperty("redis.password",
				config.getString(ConfigOptions.key("redis.password").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.min-idle",
				config.getString(ConfigOptions.key("jedis.pool.min-idle").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-active",
				config.getString(ConfigOptions.key("jedis.pool.max-active").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-idle",
				config.getString(ConfigOptions.key("jedis.pool.max-idle").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.max-wait",
				config.getString(ConfigOptions.key("jedis.pool.max-wait").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.timeBetweenEvictionRunsMillis", config.getString(
				ConfigOptions.key("jedis.pool.timeBetweenEvictionRunsMillis").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.testOnBorrow",
				config.getString(ConfigOptions.key("jedis.pool.testOnBorrow").stringType().noDefaultValue()));
		redisProp.setProperty("jedis.pool.testWhileIdle",
				config.getString(ConfigOptions.key("jedis.pool.testWhileIdle").stringType().noDefaultValue()));
		redisProp.setProperty("connection.timeout",
				config.getString(ConfigOptions.key("connection.timeout").stringType().noDefaultValue()));
		redisProp.setProperty("redis.selected.database",
				config.getString(ConfigOptions.key("redis.selected.database").stringType().noDefaultValue()));

		String hostConf = config.getString(ConfigOptions.key("redis.host").stringType().noDefaultValue());
		String sentinelMaster = config
				.getString(ConfigOptions.key("redis.sentinel.master").stringType().defaultValue(""));
		this.singleton = !hostConf.contains(",");
		logger.info(">>>>>>singleton->", singleton);
		if (singleton) 
			logger.info(">>>>>>init single connection");
			initJedisSinglePool(redisProp);
		 else 
			logger.info(">>>>>>init sentinel connection");
			initJedisSentinelPool(redisProp);
		
	

	// enhanced JedisPoolConfig by mk
	private JedisPoolConfig getJedisConnPoolConfig(Properties redisProp) 
		// int timeout = Integer.valueOf(redisProp.getProperty("connection.timeout",
		// "1000"));
		int maxActive = Integer.valueOf(redisProp.getProperty("jedis.pool.max-active", "5"));
		int maxIdle = Integer.valueOf(redisProp.getProperty("jedis.pool.max-idle", "5"));
		long maxWait = Integer.valueOf(redisProp.getProperty("jedis.pool.max-wait", "-1"));
		int minIdle = Integer.valueOf(redisProp.getProperty("jedis.pool.min-idle", "5"));
		long timeBetweenEvictionRunsMillis = Long
				.valueOf(redisProp.getProperty("jedis.pool.timeBetweenEvictionRunsMillis", "5000"));
		long minEvictableIdleTimeMillis = Long
				.valueOf(redisProp.getProperty("jedis.pool.minEvictableIdleTimeMillis", "5000"));
		int numTestsPerEvictionRun = Integer
				.valueOf(redisProp.getProperty("jedis.pool.numTestsPerEvictionRun", "5000"));

		boolean testOnBorrow = Boolean.valueOf(redisProp.getProperty("jedis.pool.testOnBorrow", "false"));
		boolean testWhileIdle = Boolean.valueOf(redisProp.getProperty("jedis.pool.testWhileIdle", "false"));
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxTotal(maxActive);
		logger.info(">>>>>>[enhanced jedis conn]setMaxTotal->", maxActive);
		config.setMaxWaitMillis(maxWait);
		logger.info(">>>>>>[enhanced jedis conn]setMaxWaitMillis->", maxWait);
		config.setMaxIdle(maxIdle);
		logger.info(">>>>>>[enhanced jedis conn]setMaxIdle->", maxIdle);
		config.setMinIdle(minIdle);
		logger.info(">>>>>>[enhanced jedis conn]setMinIdle->", minIdle);
		config.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
		logger.info(">>>>>>[enhanced jedis conn]setTimeBetweenEvictionRunsMillis->", timeBetweenEvictionRunsMillis);
		config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
		logger.info(">>>>>>[enhanced jedis conn]setMinEvictableIdleTimeMillis->", minEvictableIdleTimeMillis);
		config.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
		logger.info(">>>>>>[enhanced jedis conn]setNumTestsPerEvictionRun->", numTestsPerEvictionRun);

		config.setTestOnBorrow(testOnBorrow);
		logger.info(">>>>>>[enhanced jedis conn]setTestOnBorrow->", testOnBorrow);
		config.setTestWhileIdle(testWhileIdle);
		logger.info(">>>>>>[enhanced jedis conn]setTestWhileIdle->", testWhileIdle);
		return config;
	

	private void initJedisSinglePool(Properties redisProp) 
		JedisPool pool = null;
		String[] hostConf = redisProp.getProperty("redis.host").split(":");
		logger.info(">>>>>>set single redis pool connection");
		pool = new JedisPool(getJedisConnPoolConfig(redisProp), hostConf[0], Integer.valueOf(hostConf[1]).intValue(),
				Integer.valueOf(redisProp.getProperty("connection.timeout", "5000")),
				redisProp.getProperty("redis.password"),
				Integer.valueOf(redisProp.getProperty("redis.selected.database", "0")));
		this.jedisPool = pool;
	

	private void initJedisSentinelPool(Properties redisProp) 
		JedisSentinelPool pool = null;
		String[] hostConfList = redisProp.getProperty("redis.host").split(",");
		String[] hostConf;
		Set<String> sentinels = new HashSet<String>();
		for (String hc : hostConfList) 
			hostConf = hc.split(":");
			sentinels.add(new HostAndPort(hostConf[0], Integer.parseInt(hostConf[1])).toString());
			logger.info(">>>>>>node->" + hostConf[0] + ":" + hostConf[1]);
		
		String sentinelMasterName = (String) redisProp.getProperty("redis.sentinel.master");
		logger.info(">>>>>>set sentinel master->", sentinelMasterName);
		pool = new JedisSentinelPool(sentinelMasterName, sentinels, getJedisConnPoolConfig(redisProp),
				Integer.valueOf(redisProp.getProperty("connection.timeout", "5000")),
				redisProp.getProperty("redis.password"),
				Integer.valueOf(redisProp.getProperty("redis.selected.database", "0")));
		this.jedisPool = pool;
	

	public static JedisConfig getInstance(Configuration config) 
		if (redisConfig == null) 
			synchronized (JedisConfig.class) 
				if (redisConfig == null) 
					logger.info(">>>>>>init jedis connection pool connected");
					redisConfig = new JedisConfig(config);
				
			
		
		return redisConfig;
	

	public JedisConn getConn() 
		if (jedisPool instanceof JedisSentinelPool) 
			return new JedisConn(((JedisSentinelPool) jedisPool).getResource(), this.namespace);
		 else 
			return new JedisConn(((JedisPool) jedisPool).getResource(), this.namespace);
		
	

	/**
	 * redis连接封装类,支持单机和集群,支持常规操作,支持分布式锁
	 */
	public class JedisConn implements Closeable 

		private Jedis jedis;

		public JedisConn(Jedis jedis, String namespace) 
			this.jedis = jedis;

		

		public Jedis getJedis() 
			return this.jedis;
		

		@Override
		public void close() 
			try 
				logger.info(">>>>>>redis operation stop, close redis");
				if (jedis != null) 
					jedis.close();
				
			 catch (Exception e) 
			
		

	

MyRedisSink.java

/* 系统项目名称 com.aldi.cn.flink.demo MyRedisSink.java
 *
 * 2022年9月27日-下午3:53:04 2022XX公司-版权所有
 *
 */

package com.aldi.cn.flink.demo;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aldi.cn.flink.demo.util.JedisConfig;

import redis.clients.jedis.Jedis;

/**
 *
 * MyRedisSink
 *
 *
 * 2022年9月27日 下午3:53:04
 *
 * @version 1.0.0
 *
 */
public class MyRedisSink extends RichSinkFunction<Tuple2<String, Integer>> 
	private final static Logger logger = LoggerFactory.getLogger(MyRedisSink.class);

	private transient Jedis jedis;
	private transient JedisConfig.JedisConn jedisConn = null;
	private String keyName = "";

	public MyRedisSink(String keyName) 
		this.keyName = keyName;
	

	@Override
	public void open(Configuration config) 
		logger.info(">>>>>>MyRedisSink -> open");
		ExecutionConfig.GlobalJobParameters parameters = getRuntimeContext().getExecutionConfig()
				.getGlobalJobParameters();
		Configuration globConf = (Configuration) parameters;
		JedisConfig jedisConfig = JedisConfig.getInstance(globConf);
		jedisConn = jedisConfig.getConn();
		jedis = jedisConn.getJedis();
	

	@Override
	public void invoke(Tuple2<String, Integer> value, Context context) throws Exception 
		logger.info(">>>>>>MyRedisSink -> invoke");
		// 保存
		try 
			jedis = jedisConn.getJedis();
			logger.info(">>>>>>del redis key->" + keyName);
			if (jedis.exists(keyName)) 
				jedis.del(this.keyName);
			
			jedis.hset(keyName, value.f0, String.valueOf(value.f1));

		 catch (Exception e) 
			logger.error(">>>>>>invoke redis operation to sink data into redis->" + this.keyName + " error: "
					+ e.getMessage(), e);
		 finally 
			try 
				jedis.close();
			 catch (Exception e) 

			
		
	

	@Override
	public void close() throws Exception 
		logger.info(">>>>>>MyRedisSink -> close");
		try 
			jedis.close();
		 catch (Exception e) 

		
		try 
			jedisConn.close();
		 catch (Exception e) 

		
	

config.properties

#redis config

#redis sentinel connection
redis.host=192.168.0.106:27001,192.168.0.106:27002,192.168.0.106:27003
redis.sentinel.master=master1

#redis standalone connection 
#redis.host=192.168.0.106:7002

#redis.host=localhost:7003
redis.password=111111
jedis.pool.min-idle=5
jedis.pool.max-active=25
jedis.pool.max-idle=5
jedis.pool.max-wait=-1
jedis.pool.minEvictableIdleTimeMillis=50000
jedis.pool.timeBetweenEvictionRunsMillis=48000
jedis.pool.numTestsPerEvictionRun=-1;
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=5000
redis.selected.database=0
#kafka config
kafka.host=192.168.0.102
kafka.port=9092
kafka.bootstrapservers=192.168.0.102:9092
kafka.topic=test

log4j2.xml

<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出 -->
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数 -->
<configuration status="ALL">
    <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
 
    <!--变量配置 -->
    <Properties>
        <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符 -->
        <!-- %logger36 表示 Logger 名字最长36个字符 -->
        <property name="LOG_PATTERN"
            value="%dateHH:mm:ss.SSS [%thread] %-5level %logger36 - %msg%n" />
        <!-- 定义日志存储的路径 -->
        <property name="FILE_PATH" value="../logs" />
        <property name="FILE_NAME" value="FlinkKafka2Redis" />
    </Properties>
 
    <appenders>
 
        <console name="Console" target="SYSTEM_OUT">
            <!--输出日志的格式 -->
            <PatternLayout pattern="$LOG_PATTERN" />
            <!--控制台只输出level及其以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="DEBUG" onMatch="ACCEPT"
                onMismatch="DENY" />
        </console>
 
        <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,适合临时测试用 -->
        <File name="Filelog" fileName="$FILE_PATH/FlinkKafka2Redis.log"
            append="false">
            <PatternLayout pattern="$LOG_PATTERN" />
        </File>
 
        <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileInfo"
            fileName="$FILE_PATH/info.log"
            filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="info" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
        <RollingFile name="RollingFileDebug"
            fileName="$FILE_PATH/FlinkKafka2Redis-debug.log"
            filePattern="$FILE_PATH/$FILE_NAME-INFO-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="debug" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
        <!-- 这个会打印出所有的warn及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileWarn"
            fileName="$FILE_PATH/FlinkKafka2Redis-warn.log"
            filePattern="$FILE_PATH/$FILE_NAME-WARN-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="warn" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
        <!-- 这个会打印出所有的error及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 -->
        <RollingFile name="RollingFileError"
            fileName="$FILE_PATH/FlinkKafka2Redis-error.log"
            filePattern="$FILE_PATH/$FILE_NAME-ERROR-%dyyyy-MM-dd_%i.log.gz">
            <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) -->
            <ThresholdFilter level="error" onMatch="ACCEPT"
                onMismatch="DENY" />
            <PatternLayout pattern="$LOG_PATTERN" />
            <Policies>
                <!--interval属性用来指定多久滚动一次,默认是1 hour -->
                <TimeBasedTriggeringPolicy interval="1" />
                <SizeBasedTriggeringPolicy size="10MB" />
            </Policies>
            <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件开始覆盖 -->
            <DefaultRolloverStrategy max="15" />
        </RollingFile>
 
    </appenders>
 
    <!--Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。 -->
    <!--然后定义loggers,只有定义了logger并引入的appender,appender才会生效 -->
    <loggers>
 
        <!--过滤掉spring和mybatis的一些无用的DEBUG信息 -->
        <logger name="org.mybatis" level="info" additivity="false">
            <AppenderRef ref="Console" />
        </logger>
        <!--监控系统信息 -->
        <!--若是additivity设为false,则 子Logger 只会在自己的appender里输出,而不会在 父Logger 的appender里输出。 -->
        <Logger name="org.springframework" level="info"
            additivity="false">
            <AppenderRef ref="Console" />
        </Logger>
 
        <root level="INFO">
            <appender-ref ref="Console" />
            <appender-ref ref="Filelog" />
            <appender-ref ref="RollingFileInfo" />
            <appender-ref ref="RollingFileWarn" />
            <appender-ref ref="RollingFileDebug" />
            <appender-ref ref="RollingFileError" />
        </root>
    </loggers>
 
</configuration>

项目运行

 然后我们使用kafka自带的producer来send几条message

FLINK 基于1.15.2的Java开发-如何使用外部配置文件

FLINK 基于1.15.2的Java开发-入门

FLINK 基于1.15.2的Java开发-在flink内如何使用log4j

FLINK 基于1.15.2的Java开发-搭建2主3从的生产集群环境

FLINK 基于1.15.2的Java开发-实时流计算商品销售热榜

FLINK 基于1.15.2的Java开发-Sink到MYSQL的两种姿势