布隆过滤器
Posted jwen1994
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了布隆过滤器相关的知识,希望对你有一定的参考价值。
试想一下这样的场景,当黑客故意访问不存在的数据,导致程序不断访问DB数据库的数据,数据库会不会挂掉?答案是会的。所以为了避免这种情况发生,当黑客访问不存在的缓存时能够迅速返回避免缓存及DB挂掉,引出了今天讲的布隆过滤器。
布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
优点:相比于其它的数据结构,布隆过滤器在空间和时间方面都有巨大的优势。布隆过滤器存储空间和插入/查询时间都是常数。另外,散列函数相互之间没有关系,方便由硬件并行实现。布隆过滤器不需要存储元素本身,在某些对保密要求非常严格的场合有优势
缺点:布隆过滤器的缺点和优点一样明显。误算率是其中之一。随着存入的元素数量增加,误算率随之增加。但是如果元素数量太少,则使用散列表足矣
Spring Boot 实现谷歌布隆过滤器——以会员抽奖为例
步骤一:引入依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>
步骤二:将需要判断数据是否存在的key值
@Service public class BloomFilterService { @Resource private SysUserMapper sysUserMapper; private BloomFilter<Integer> bf; /*** * PostConstruct 程序启动时候加载此方法 */ @PostConstruct public void initBloomFilter() { SysUserExample sysUserExample = new SysUserExample(); List<SysUser> sysUserList = sysUserMapper.selectByExample(sysUserExample); if(CollectionUtils.isEmpty(sysUserList)){ return; } //创建布隆过滤器(默认3%误差) bf = BloomFilter.create(Funnels.integerFunnel(),sysUserList.size()); for (SysUser sysUser:sysUserList) { bf.put(sysUser.getId()); } } /*** * 判断id可能存在于布隆过滤器里面 * @param id * @return */ public boolean userIdExists(int id){ return bf.mightContain(id); } }
步骤三:进行测试
@RestController public class BloomFilterController { @Resource private BloomFilterService bloomFilterService; @RequestMapping("/bloom/idExists") public boolean ifExists(int id){ return bloomFilterService.userIdExists(id); } }
基于内存的 google 布隆过滤器的缺陷与思考
- 重启即失效
- 本地内存无法用在分布式场景
- 不支持大数据量存储
为了解决这些问题,我们可以使用 Redis 布隆过滤器,它的好处有:
- 可扩展性Bloom过滤器
- 一旦Bloom过滤器达到容量,就会在其上创建一个新的过滤器
- 不存在重启即失效或者定时任务维护的成本
- 基于goole实现的布隆过滤器需要启动之后初始化布隆过滤器
它的缺点:需要网络 IO,性能比基于内存的过滤器低
优先基于数据量进行考虑选择哪个布隆过滤器
基于 Lua 脚本实现 Spring Boot 和布隆过滤器的整合
步骤一:编写两个 Lua 脚本
bloomFilterAdd.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call(‘BF.ADD‘, bloomName, value) return result_1
bloomFilterExist.lua
local bloomName = KEYS[1] local value = KEYS[2] -- bloomFilter local result_1 = redis.call(‘BF.EXISTS‘, bloomName, value) return result_1
步骤二:新建两个方法
1)添加数据到指定名称的布隆过滤器(bloomFilterAdd)
2)从指定名称的布隆过滤器获取 key 是否存在的脚本(bloomFilterExists)
@Service public class RedisService { @Autowired private RedisTemplate redisTemplate; private static final String bloomFilterName = "isVipBloom"; public Boolean bloomFilterAdd(int value){ DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd,keyList); return result; } public Boolean bloomFilterExists(int value){ DefaultRedisScript<Boolean> bloomExists= new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList= new ArrayList<>(); keyList.add(bloomFilterName); keyList.add(value+""); Boolean result = (Boolean) redisTemplate.execute(bloomExists,keyList); return result; } }
步骤三:进行测试
@RestController public class BloomFilterController { @Resource private RedisService redisService; @RequestMapping("/bloom/redisIdExists") public boolean redisidExists(int id){ return redisService.bloomFilterExists(id); } @RequestMapping("/bloom/redisIdAdd") public boolean redisidAdd(int id){ return redisService.bloomFilterAdd(id); } }
实现一个秒杀业务
1)利用 Redis 缓存 incr 拦截流量
首先通过数据控制模块,提前将秒杀商品缓存到读写分离 Redis,并设置秒杀开始标记如下:
- skuId_start: 0 开始标记,0表示秒杀还没开始
- skuId_count: 10000 表示总数
- skuId_access: 12000 表示接受抢购数
秒杀开始前,服务集群读取 skuId_start 为 0,直接返回未开始。之所以设置这个值而不是根据时间判断是否开始,是因为服务时间可能不一致(相差几百毫秒)这样可能导致流量倾斜(其他服务没开始,会将大量的流量堆积到开始的服务上)
数据控制模块将 skuId_start 改为1,标志秒杀开始。
当接受下单数达到 skuId_count*1.2 后,继续拦截所有请求。
2)利用 Redis 缓存加速库存扣量
- skuId_booked: 0 表示没有抢购
3)将用户订单数据写入mq
4)监听mq入库
代码实现
@Service public class SeckillService { private static final String secStartPrefix = "skuId_start_"; private static final String secAccess = "skuId_access_"; private static final String secCount = "skuId_count_"; private static final String filterName = "skuId_bloomfilter_"; private static final String bookedName = "skuId_booked_"; @Resource private RedisService redisService; public String seckill(int uid, int skuId) { //流量拦截层 //1、判断秒杀是否开始 0_1554045087 开始标识_开始时间 String isStart = (String) redisService.get(secStartPrefix + skuId); if (StringUtils.isBlank(isStart)) { return "还未开始"; } if (isStart.contains("_")) { Integer isStartInt = Integer.parseInt(isStart.split("_")[0]); Integer startTime = Integer.parseInt(isStart.split("_")[1]); if (isStartInt == 0) { if (startTime > getNow()) { return "还未开始"; } else { //代表秒杀已经开始 redisService.set(secStartPrefix + skuId, 1 + ""); } } else { return "系统异常"; } } else { if (Integer.parseInt(isStart) != 1) { return "系统异常"; } } //2、流量拦截 String skuIdAccessName = secAccess + skuId; Integer accessNumInt = 0; String accessNum = (String) redisService.get(skuIdAccessName); if (StringUtils.isNotBlank(accessNum)) { accessNumInt = Integer.parseInt(accessNum); } String skuIdCountName = secCount + skuId; Integer countNumInt = Integer.parseInt((String) redisService.get(skuIdCountName)); if (countNumInt * 1.2 < accessNumInt) { return "抢购已经完成,欢迎下次参与"; } else { redisService.incr(skuIdAccessName); } //信息校验层 if (redisService.bloomFilterExists(filterName, uid)) { return "您已经抢购过该商品,请勿重复下发!"; } else { redisService.bloomFilterAdd(filterName, uid); } Boolean isSuccess = redisService.getAndIncrLua(bookedName + skuId); if (isSuccess) { return "恭喜您抢购成功!!!"; } else { return "抢购结束,欢迎下次参与"; } } private long getNow() { return System.currentTimeMillis() / 1000; } }
RedisService
@Service public class RedisService { @Autowired private RedisTemplate redisTemplate; private static double size = Math.pow(2, 32); /** * 写入缓存 * * @param key * @param offset 位 8Bit=1Byte * @return */ public boolean setBit(String key, long offset, boolean isShow) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.setBit(key, offset, isShow); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存 * * @param key * @param offset * @return */ public boolean getBit(String key, long offset) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); result = operations.getBit(key, offset); } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存 * * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); redisTemplate.opsForList(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存 * * @param key * @return */ public Object get(final String key) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); return operations.get(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 写入缓存 * * @param key * @param value * @return */ public boolean decr(final String key, int value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.increment(key, -value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存 * * @param key * @return */ public boolean incr(final String key) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.increment(key, 1); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存设置时效时间 * * @param key * @param value * @return */ public boolean set(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 批量删除对应的value * * @param keys */ public void remove(final String... keys) { for (String key : keys) { remove(key); } } /** * 删除对应的value * * @param key */ public void remove(final String key) { if (exists(key)) { redisTemplate.delete(key); } } /** * 判断缓存中是否有对应的value * * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * 读取缓存 * * @param key * @return */ public Object genValue(final String key) { Object result = null; ValueOperations<String, String> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * 哈希 添加 * * @param key * @param hashKey * @param value */ public void hmSet(String key, Object hashKey, Object value) { HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.put(key, hashKey, value); } /** * 哈希获取数据 * * @param key * @param hashKey * @return */ public Object hmGet(String key, Object hashKey) { HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.get(key, hashKey); } /** * 列表添加 * * @param k * @param v */ public void lPush(String k, Object v) { ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(k, v); } /** * 列表获取 * * @param k * @param l * @param l1 * @return */ public List<Object> lRange(String k, long l, long l1) { ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(k, l, l1); } /** * 集合添加 * * @param key * @param value */ public void add(String key, Object value) { SetOperations<String, Object> set = redisTemplate.opsForSet(); set.add(key, value); } /** * 集合获取 * * @param key * @return */ public Set<Object> setMembers(String key) { SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.members(key); } /** * 有序集合添加 * * @param key * @param value * @param scoure */ public void zAdd(String key, Object value, double scoure) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.add(key, value, scoure); } /** * 有序集合获取 * * @param key * @param scoure * @param scoure1 * @return */ public Set<Object> rangeByScore(String key, double scoure, double scoure1) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); redisTemplate.opsForValue(); return zset.rangeByScore(key, scoure, scoure1); } //第一次加载的时候将数据加载到redis中 public void saveDataToRedis(String name) { double index = Math.abs(name.hashCode() % size); long indexLong = new Double(index).longValue(); boolean availableUsers = setBit("availableUsers", indexLong, true); } //第一次加载的时候将数据加载到redis中 public boolean getDataToRedis(String name) { double index = Math.abs(name.hashCode() % size); long indexLong = new Double(index).longValue(); return getBit("availableUsers", indexLong); } /** * 有序集合获取排名 * * @param key 集合名称 * @param value 值 */ public Long zRank(String key, Object value) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.rank(key, value); } /** * 有序集合获取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> zRankWithScore(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.rangeWithScores(key, start, end); return ret; } /** * 有序集合添加 * * @param key * @param value */ public Double zSetScore(String key, Object value) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.score(key, value); } /** * 有序集合添加分数 * * @param key * @param value * @param scoure */ public void incrementScore(String key, Object value, double scoure) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.incrementScore(key, value, scoure); } /** * 有序集合获取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> reverseZRankWithScore(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.reverseRangeByScoreWithScores(key, start, end); return ret; } /** * 有序集合获取排名 * * @param key */ public Set<ZSetOperations.TypedTuple<Object>> reverseZRankWithRank(String key, long start, long end) { ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); Set<ZSetOperations.TypedTuple<Object>> ret = zset.reverseRangeWithScores(key, start, end); return ret; } public Boolean bloomFilterAdd(String filterName, int value) { DefaultRedisScript<Boolean> bloomAdd = new DefaultRedisScript<>(); bloomAdd.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterAdd.lua"))); bloomAdd.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(filterName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomAdd, keyList); return result; } public Boolean bloomFilterExists(String filterName, int value) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("bloomFilterExist.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(filterName); keyList.add(value + ""); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; } public Boolean getAndIncrLua(String key) { DefaultRedisScript<Boolean> bloomExists = new DefaultRedisScript<>(); bloomExists.setScriptSource(new ResourceScriptSource(new ClassPathResource("secKillIncr.lua"))); bloomExists.setResultType(Boolean.class); List<Object> keyList = new ArrayList<>(); keyList.add(key); Boolean result = (Boolean) redisTemplate.execute(bloomExists, keyList); return result; } }
secKillIncr.lua
local lockKey = KEYS[1] -- get info local result_1 = redis.call(‘GET‘, lockKey) if tonumber(result_1) <10000 then local result_2= redis.call(‘INCR‘, lockKey) return result_1 else return result_1 end
测试:
@RestController public class SeckillController { @Resource private SeckillService seckillService; @RequestMapping("/redis/seckill") public String secKill(int uid,int skuId){ return seckillService.seckill(uid,skuId); } }
以上是关于布隆过滤器的主要内容,如果未能解决你的问题,请参考以下文章