flink连接redis工具类-简单好用
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink连接redis工具类-简单好用相关的知识,希望对你有一定的参考价值。
本文包含redis非集群 和 集群两种方式
有些大佬写的实在太牛,恕我能力不足看不太懂,所以自己尝试了一下
非集群方式
非集群方式
工具类如下:
object RedisUtil
private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(200) //最大连接数
jedisPoolConfig.setMaxIdle(20) //连接池中最大空闲的连接数
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(2000) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "xxx.xxx.xxx.xxx", xxx)
// 直接得到一个 Redis 的连接
def getJedisClient: Jedis =
jedisPool.getResource
//测试通过
def main(args: Array[String]): Unit =
// println(getJedisClient.hget("flink_redis", "1"))
getJedisClient.hset("flink_redis","20","苏州")
println(getJedisClient.exists("flink_redis"))
println(getJedisClient.exists("flink_redis_false"))
getJedisClient.hkeys("flink_redis")
测试如下
def main(args: Array[String]): Unit =
getJedisClient.hset("key","field","value")
println(getJedisClient.exists("key"))
getJedisClient.hkeys("key")
直接使用getJedisClient 就可以获取所有redis的方法
由于我是集群,使用的时候,直接改 会报错
redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 1539 xxx.xxx.xxx.xxx:xxx
所以要修改一些
集群方式
工具类
object RedisUtil
private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(200) //最大连接数
jedisPoolConfig.setMaxIdle(20) //连接池中最大空闲的连接数
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(2000) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
val nodes: util.Set[HostAndPort] = new util.LinkedHashSet[HostAndPort]
//这样写方便后面读取配置文件使用
val nodesList:String = "172.8.10.105:7000,172.8.10.105:7001,172.8.10.106:7002,172.8.10.106:7003,172.8.10.107:7004,172.8.10.107:7005"
nodesList.split(",").foreach(s=>
val sp: Array[String] = s.split(":")
nodes.add(new HostAndPort(sp.apply(0),sp.apply(1).toInt))
)
private val cluster: JedisCluster = new JedisCluster(nodes, jedisPoolConfig)
//hashGet方法,以下 以此重写其他方法
def hashGet(key:String, field:String):String =
cluster.hget(key,field)
其他的方法可以手动重写
测试如下
println(RedisUtil.hashGet("key","field"))
最后补充一下需要的依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
以上是关于flink连接redis工具类-简单好用的主要内容,如果未能解决你的问题,请参考以下文章
每期一个小窍门(001): 一个简单优雅的redis 工具类 (最简单版本)
每期一个小窍门(001): 一个简单优雅的redis 工具类 (最简单版本)