redis应用场景--实现布隆过滤器
Posted 99kol
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis应用场景--实现布隆过滤器相关的知识,希望对你有一定的参考价值。
简述布隆过滤器的实现思路:
假设有一个长度为n的比特数组,bit_array,数组里的每一位都是0,对于一个url或者是其他数据,使用hash算法计算出url的散列值,这个散列值当然是一个整数,暂且命名为index,index=index%n,确保index的值小于n,查看bit_array[index]是否等于1,如果等于1,表示该url已被抓取过了,如果等于0,则表示还没有被抓取,让爬虫程序取抓取这个url,同时设置bit_array[index]=1.
当url非常多了以后,必然会发生碰撞,即两个不同的url经过hash处理后得到相同的散列值,这就麻烦了,两个url,有一个被误判,明明没有被抓取过,但比特数组里已经记录了它。
如何解决碰撞,布隆过滤器的方法很暴力,用多个hash,这样就会产生多个散列值,这些散列值所对应的索引位置均设置为1,虽然依然会发生碰撞,但是这些位置都发生碰撞的概率就降低了。
影响效果的三个因素:
比特数组的长度
错误率
hash的次数
布隆过滤器可以视为一个特殊的集合,它不能存储具体的值,它只能表示某个值是否在集合中,而且,它有一定的错误率,布隆过滤器说某个值不存在,那么就一定不存在,说某个值存在,则有一定的概率是错的。
查阅资料,有一个库可以来实现布隆过滤器,mmh3
安装方法:pip install mmh3
Redis应用场景中布隆过滤及高并发限流及分布式锁等应用
前言
本篇文章主要介绍Redis在一般通用的大场景下的应用,包括布隆过滤器的原理,利用布隆过滤器去重从而达到对空间进行节省;以及令牌桶、漏斗算法原理如何做到限流的;分布式锁,如何利用redis实现一个分布式锁,通过这几种应用场景去了解redis的应用
Redis实现去幂等性
例如在租房时,有一张房屋信息表,地址、房间号、房间其他字段... 在添加时候判断是重复? 这表里面数据达到100W+
布隆过滤
原理
bloom算法类似一个hash set,用来判断某个元素(key)是否在某个集合中。
和一般的hash set不同的是,这个算法无需存储key的值,对于每个key,只需要k个比特位,每个存储一个标志,用来判断key是否在集合中。
通过数组上占坑更改位数,客户端 key1 判断 三个算法 对应的索引是否为1 如果 其中有一个算法 表示为占用,则表示不存在,要不然一个就容易碰撞的;其次三个算法得到数据 多个key 有可能相同,降低hash碰撞的情况,也是多个算法来计算的原因
提高精度的方法,只需要更大的空间和hash函数计算就行
安装步骤 对于布隆过滤器:
准备Redis
默认已经安装了Redis4.0以上的版本,4.0以上才支持插件模块
gitHub下载最新版本
Releases · RedisBloom/RedisBloom · GitHub
代码中 使用方式
- 首先添加用户
- 判断 用户是否存在
- 一次性添加多个用户 ,和 校验 多个用户是否存在也是添加m就可以
- 在代码中调用,首先引入依赖 依赖于jedis
<!-- RedisBloomFilter client -->
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>jrebloom</artifactId>
<version>1.2.0</version>
</dependency>
- 而引用只需要 bloomfilter 的客户端
public class RedisBloomFilter {
// redis连接信息
@Value("${redis_host}")
private String redisHost;
@Value("${redis_port}")
private int redisPort;
// bloomfilter 客户端
private Client client;
@PostConstruct
public void init() {
// bloomfilter 客户端
client = new Client(redisHost, redisPort);
}
/**
* 创建一个自定义的过滤器
* @param filterName
*/
public void createFilter(String filterName) {
// 创建一个容量10万,误判率0.01%的布隆过滤器
client.createFilter(filterName, 1000, 0.1);
}
/**
* 添加元素
* @param filterName
* @param value
* @return
*/
public boolean addElement(String filterName, String value) {
return client.add(filterName, value);
}
/**
* 判断元素是否存在
* @param filterName
* @param value
* @return
*/
public boolean exists(String filterName, String value) {
return client.exists(filterName, value);
}
Redis实现高并发限流令牌桶
对于jdk中有Semaphore 这个信号量去实现对线程的限流,在juc包下利用aqs中共享锁,和互斥锁的特性对多线程进行限流;这是jdk中提供的。和这个是很像的,但不能做到均匀的获得资源
限流令牌桶 资源有限 ,但是不限速,用完就没有了
漏桶算法 资源有限 但是速度均匀。
令牌桶
基于redis的令牌桶代码实现 使用list 就行,这里不需要考虑并发情况。redis本身就是单线程的,是不用考虑的。
public class TokenBukect {
@Autowired
StringRedisTemplate stringRedisTemplate;
/**
* 创建令牌桶
* @param tokenName
* @param tokenSize
*/
public void createToken(String tokenName, int tokenSize){
String name = "token_list:"+tokenName;
for(int i =0; i < tokenSize; i++){
stringRedisTemplate.opsForList().leftPush(name, "token_"+i);
}
System.out.println(tokenSize+"个令牌,生成完毕!");
}
/**
* 使用令牌
* @return
*/
public String useToken(String tokenName) {
String name = "token_list:"+tokenName;
String token = stringRedisTemplate.opsForList().leftPop(name);
return token;
}
}
限流场景
可以使用redis 中 zset score key value, 通过score进行排序
只取10分钟以内的,一次操作就是一条zset 记录,十分钟外的数据删除,并且整个Key进行10分钟过期。
代码实现
- 等待多种操作一起执行
- 往zset中添加记录,当前时间作为score,value只要唯一就好
- 根据score排序,删除过期的记录 注意在这里 因为排好了序,所以可以利用pipe.zremrangeByScore(key, 0, nowTs - period * 1000); 拍好序
- 获得当前用户时间窗口中的操作次数
- 设置Key的过期时间,过了时间窗口自动删除 这里会将过期的给清理掉
- 执行上面的动作
public class SimpleLimiter {
// redis连接信息
@Value("${redis_host}")
private String redisHost;
@Value("${redis_port}")
private int redisPort;
Jedis jedis;
@PostConstruct
public void init() {
jedis = new Jedis(redisHost, redisPort);
}
// 判断当前动作是否超过了限制
/**
* 判断是否超出限制
* @param userId 操作用户
* @param actionKey 操作内容
* @param period 时间窗口,单位秒
* @param maxCount 允许的最大次数
* @return 是否超出限制
*/
public boolean isActionAllowed(String userId, String actionKey,
int period, int maxCount) {
String key = String.format("hist::%s::%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
Pipeline pipe = jedis.pipelined();
pipe.multi(); // 等待多种操作一起执行
// 往zset中添加记录,当前时间作为score,value只要唯一就好
pipe.zadd(key, nowTs, "" + nowTs);
// 根据score排序,删除过期的记录
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
// 获得当前用户时间窗口中的操作次数
Response<Long> count = pipe.zcard(key);
// 设置Key的过期时间,过了时间窗口自动删除
pipe.expire(key, period + 1);
// 执行上面的动作
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
}
public class JavaFunnelLimiter {
// 这是一个漏斗
static class Funnel {
int capacity; // 漏斗容量,初始化大小
float leakingRate; // 流水速率
int leftQuota; // 剩余空间
long lastLeakingTime; // 上一次漏水时间
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.lastLeakingTime = System.currentTimeMillis();
}
// 计算容量
void countSpace() {
long nowMill = System.currentTimeMillis();
long deltaMill = nowMill - lastLeakingTime; // 距离上一次漏水过去了多久
// 计算这段时间腾出了多少配额
long deltaQuota = (long) (deltaMill * leakingRate);
// 间隔时间太长,数字过大溢出了,恢复初始容量
if (deltaQuota < 0) {
this.leftQuota = capacity;
this.lastLeakingTime = nowMill;
return;
}
// 腾出空间太小,最小单位是 1,等下次,操作频率太快,
if (deltaQuota < 1) {
return;
}
// 容量足够
this.leftQuota += deltaQuota; // 增加剩余空间
this.lastLeakingTime = nowMill; // 记录漏水时间
// 剩余空间不得高于容量
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
/**
* 获取配额
* @param quota
* @return
*/
boolean watering(int quota) {
// 计算配额
countSpace();
// 减去此次配额
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
// 获得配额失败
return false;
}
}
// 管理所有的漏斗
private Map<String, Funnel> funnels = new HashMap<>();
// 是否允许访问
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
// 获取已经创建过的漏斗
Funnel funnel = funnels.get(key);
// 创建新的漏斗
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
// 需要 1 个流量
return funnel.watering(1);
}
}
基于redis实现分布式锁
多个服务请求同一个资源的时候,数据库压力,如何转移压力。,需要一把分布式锁的应用场景。
如何转移压力,受保护的公共资源,需要协调。缓存Key,失效以后,多个请求更新,只需要一个请求更新就可以了,需要一把分布式锁。
set lock true ex 5 nx OK... do something critical ... # 10秒 del lock
public class RedisDistributedLock implements Lock {
StringRedisTemplate stringRedisTemplate; // redis客户端
String resourceName = null;
int timeout = 10;
ThreadLocal<String> lockRandomValue = new ThreadLocal<>();
/**
* 构建一把锁
*
* @param resourceName 资源唯一标识
* @param timeout 资源锁定超时时间~防止资源死锁,单位秒
*/
public RedisDistributedLock(String resourceName, int timeout, StringRedisTemplate stringRedisTemplate) {
this.resourceName = "lock_" + resourceName;
this.timeout = timeout;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock() {
Boolean lockResult = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
// 随机设置一个值
lockRandomValue.set(UUID.randomUUID().toString());
Boolean status = connection.set(resourceName.getBytes(), lockRandomValue.get().getBytes(),
Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT);
return status;
}
});
return lockResult;
}
Lock lock = new ReentrantLock();
// 多台机器的情况下,会出现大量的等待,加重redis的压力。
// 在lock方法上,加入同步关键字。单机同步,多机用redis
@Override
public void lock() {
lock.lock();
try {
while (!tryLock()) {
// 监听,锁删除的通知
// try {
// Thread.sleep(1000L); // 定时等待后续尝试
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
try {
CountDownLatch waiter = new CountDownLatch(1);
// 等待通知结果
connection.subscribe((message, pattern) -> {
// 收到通知,不管结果,立刻再次抢锁
waiter.countDown();
}, (resourceName + "_unlock_channel").getBytes());
// 等待一段时间,超过这个时间都没收到消息,肯定有问题
waiter.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true; //随便返回一个值都没问题
}
});
}
} finally {
lock.unlock();
}
}
@Override
public void unlock() {
// 释放锁。(底层框架开发者要防止被API的调用者误调用)
// 错误示范 stringRedisTemplate.delete(resourceName);
// 1、 要比对内部的值,同一个线程,才能够去释放锁。
// 2、 同时发出释放锁的通知
if (lockRandomValue.get().equals(stringRedisTemplate.opsForValue().get(resourceName))) {
// 普通实现
//normalUnlock();
// 通过lua脚本实现
unlockUseLua();
}
System.out.println("释放锁:"+resourceName);
}
// 通过redis接口的通常操作
private void normalUnlock() {
// 有两个过程,不能成为原子操作
stringRedisTemplate.delete(resourceName); // 删除资源
stringRedisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
// 发送通知
Long received = connection.publish((resourceName + "_unlock_channel").getBytes(), "".getBytes());
return received;
}
});
}
@SuppressWarnings({ "unchecked", "unused" })
private void unlockUseLua() {
/* 脚本文件的方式
ScriptSource scriptSource =
new ResourceScriptSource(new ClassPathResource("unlock.lua"));
DefaultRedisScript<Boolean> defaultScript = new DefaultRedisScript<Boolean>();
defaultScript.setScriptSource(scriptSource);
*/
String scriptText =
"if redis.call('get',KEYS[1]) == ARGV[1] then\\r\\n" +
"local result = redis.call('del',KEYS[1])\\r\\n" +
"redis.call('publish',KEYS[2], ARGV[2])\\r\\n" +
"return result\\r\\n" +
"else\\r\\n" +
"return 0\\r\\n" +
"end";
RedisScript<Boolean> script = RedisScript.of(scriptText, Boolean.class);
Boolean result = stringRedisTemplate.execute(script,
Arrays.asList(resourceName, resourceName + "_unlock_channel"),
new Object[] {lockRandomValue.get(), lockRandomValue.get()});
System.out.println("lua脚本执行结果:"+result);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
在实际业务场景,至少我在项目中很少用到redis来做分布式锁。
集群下主从切换,Key转移的问题
产生原因 :异步进行主从切换,而不采用同步的,但是主服务器,出现问题,存在故障转移,进行故障转移,锁丢失了
以上是关于redis应用场景--实现布隆过滤器的主要内容,如果未能解决你的问题,请参考以下文章