Redis应用场景中布隆过滤及高并发限流及分布式锁等应用

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis应用场景中布隆过滤及高并发限流及分布式锁等应用相关的知识,希望对你有一定的参考价值。

前言

本篇文章主要介绍Redis在一般通用的大场景下的应用,包括布隆过滤器的原理,利用布隆过滤器去重从而达到对空间进行节省;以及令牌桶、漏斗算法原理如何做到限流的;分布式锁,如何利用redis实现一个分布式锁,通过这几种应用场景去了解redis的应用

Redis实现去幂等性

例如在租房时,有一张房屋信息表,地址、房间号、房间其他字段... 在添加时候判断是重复? 这表里面数据达到100W+

但是数据库里面的数据是往上增的,如何进行判断是否已经重复添加了,利用exits 效果是非常不好的。如果利用redis中set也有误判率的
这里就要说说布隆过滤器

布隆过滤

不怎么精确 set 数据,但可以提供精度控制 ,在一万和两万会有一个两个的误判,误判hash算法冲突,对撞。

原理

bloom算法类似一个hash set,用来判断某个元素(key)是否在某个集合中。
和一般的hash set不同的是,这个算法无需存储key的值,对于每个key,只需要k个比特位,每个存储一个标志,用来判断key是否在集合中。

 通过数组上占坑更改位数,客户端 key1 判断 三个算法 对应的索引是否为1  如果 其中有一个算法 表示为占用,则表示不存在,要不然一个就容易碰撞的;其次三个算法得到数据 多个key 有可能相同,降低hash碰撞的情况,也是多个算法来计算的原因

提高精度的方法,只需要更大的空间和hash函数计算就行

安装步骤 对于布隆过滤器:

准备Redis

默认已经安装了Redis4.0以上的版本,4.0以上才支持插件模块

gitHub下载最新版本

Releases · RedisBloom/RedisBloom · GitHub

 

代码中 使用方式

Redis布隆过滤器插件安装手册 提取码:fpu6 

  • 首先添加用户

  •  判断  用户是否存在

  •  一次性添加多个用户 ,和 校验  多个用户是否存在也是添加m就可以 

  •  在代码中调用,首先引入依赖  依赖于jedis
     <!-- RedisBloomFilter client -->
     <dependency>
	    <groupId>com.redislabs</groupId>
	    <artifactId>jrebloom</artifactId>
	    <version>1.2.0</version>
	</dependency>
  • 而引用只需要  bloomfilter 的客户端
public class RedisBloomFilter {
	// redis连接信息
	@Value("${redis_host}")
	private String redisHost;
	@Value("${redis_port}")
	private int redisPort;
	// bloomfilter 客户端
	private Client client;
	
	@PostConstruct
	public void init() {
		// bloomfilter 客户端
		client = new Client(redisHost, redisPort);
	}
	
	/**
	 * 创建一个自定义的过滤器
	 * @param filterName
	 */
	public void createFilter(String filterName) {
		// 创建一个容量10万,误判率0.01%的布隆过滤器
		client.createFilter(filterName, 1000, 0.1);
	}
	
	/**
	 * 添加元素
	 * @param filterName
	 * @param value
	 * @return
	 */
	public boolean addElement(String filterName, String value) {
		return client.add(filterName, value);
	}
	
	/**
	 * 判断元素是否存在
	 * @param filterName
	 * @param value
	 * @return
	 */
	public boolean exists(String filterName, String value) {
		return client.exists(filterName, value);
	}

Redis实现高并发限流令牌桶

对于jdk中有Semaphore 这个信号量去实现对线程的限流,在juc包下利用aqs中共享锁,和互斥锁的特性对多线程进行限流;这是jdk中提供的。和这个是很像的,但不能做到均匀的获得资源

限流令牌桶 资源有限 ,但是不限速,用完就没有了

漏桶算法 资源有限 但是速度均匀。

令牌桶

有限的数量资源,进行桶装,消费完就没有了。 

 基于redis的令牌桶代码实现 使用list  就行,这里不需要考虑并发情况。redis本身就是单线程的,是不用考虑的。

public class TokenBukect {
	@Autowired
	StringRedisTemplate stringRedisTemplate;
	
	/**
	 * 创建令牌桶
	 * @param tokenName
	 * @param tokenSize
	 */
	public void createToken(String tokenName, int tokenSize){
		String name = "token_list:"+tokenName;
		for(int i =0; i < tokenSize; i++){
			stringRedisTemplate.opsForList().leftPush(name, "token_"+i);
		}
		System.out.println(tokenSize+"个令牌,生成完毕!");
	}
	
	/**
	 * 使用令牌
	 * @return
	 */
	public String useToken(String tokenName) {
		String name = "token_list:"+tokenName;
		String token = stringRedisTemplate.opsForList().leftPop(name);
		return token;
	}

}

限流场景

针对同一个用户,同一个功能要求 10 分钟,只能访 60 次。
这里不用redis,可以在list中存储 10 分钟过期 key ,每次操作,新 key,并每次操作,就判断是否过期,并记录上次数。
这里时间窗口是在不断移动的,对时间进行排序

 可以使用redis 中 zset score key value, 通过score进行排序   

只取10分钟以内的,一次操作就是一条zset 记录,十分钟外的数据删除,并且整个Key进行10分钟过期。

代码实现

  • 等待多种操作一起执行  
  •  往zset中添加记录,当前时间作为score,value只要唯一就好
  • 根据score排序,删除过期的记录  注意在这里 因为排好了序,所以可以利用pipe.zremrangeByScore(key, 0, nowTs - period * 1000); 拍好序
  • 获得当前用户时间窗口中的操作次数
  • 设置Key的过期时间,过了时间窗口自动删除  这里会将过期的给清理掉
  • 执行上面的动作  
public class SimpleLimiter {
	// redis连接信息
	@Value("${redis_host}")
	private String redisHost;
	@Value("${redis_port}")
	private int redisPort;

	Jedis jedis;
	
	@PostConstruct
	public void init() {
		jedis = new Jedis(redisHost, redisPort);
	}
	
	// 判断当前动作是否超过了限制
	/**
	 * 判断是否超出限制
	 * @param userId 操作用户
	 * @param actionKey 操作内容
	 * @param period 时间窗口,单位秒
	 * @param maxCount 允许的最大次数
	 * @return 是否超出限制
	 */
	public boolean isActionAllowed(String userId, String actionKey, 
			int period, int maxCount) {
		String key = String.format("hist::%s::%s", userId, actionKey);
		long nowTs = System.currentTimeMillis();

		Pipeline pipe = jedis.pipelined();
		pipe.multi();	// 等待多种操作一起执行
		// 往zset中添加记录,当前时间作为score,value只要唯一就好
		pipe.zadd(key, nowTs, "" + nowTs);	
		// 根据score排序,删除过期的记录
		pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
		// 获得当前用户时间窗口中的操作次数
		Response<Long> count = pipe.zcard(key);
		// 设置Key的过期时间,过了时间窗口自动删除
		pipe.expire(key, period + 1);
		// 执行上面的动作
		pipe.exec();
		pipe.close();
		
		return count.get() <= maxCount;
	}
}
漏斗算法限流
这里就不用redis直接使用空间匹配即可
public class JavaFunnelLimiter {
	// 这是一个漏斗
	static class Funnel {
		int capacity;		// 漏斗容量,初始化大小
		float leakingRate;		// 流水速率
		int leftQuota;		// 剩余空间
		long lastLeakingTime;		// 上一次漏水时间
		
		public Funnel(int capacity, float leakingRate) {
			this.capacity = capacity;
			this.leakingRate = leakingRate;
			this.leftQuota = capacity;
			this.lastLeakingTime = System.currentTimeMillis();
		}
		
		// 计算容量
		void countSpace() {
			long nowMill = System.currentTimeMillis();
			long deltaMill = nowMill - lastLeakingTime;		// 距离上一次漏水过去了多久
			
			// 计算这段时间腾出了多少配额
			long deltaQuota = (long) (deltaMill * leakingRate);
			
			// 间隔时间太长,数字过大溢出了,恢复初始容量
			if (deltaQuota < 0) { 
				this.leftQuota = capacity;
				this.lastLeakingTime = nowMill;	
				return;
			}
			// 腾出空间太小,最小单位是 1,等下次,操作频率太快, 
			if (deltaQuota < 1) {
				return;
			}
			
			// 容量足够
			this.leftQuota += deltaQuota;	// 增加剩余空间
			this.lastLeakingTime = nowMill;		// 记录漏水时间
			
			// 剩余空间不得高于容量
			if (this.leftQuota > this.capacity) {		
				this.leftQuota = this.capacity;
			}
		}
		
		/**
		 * 获取配额
		 * @param quota
		 * @return
		 */
		boolean watering(int quota) {
			// 计算配额
			countSpace();
			
			// 减去此次配额
			if (this.leftQuota >= quota) {
				this.leftQuota -= quota;
				return true;
			}
			// 获得配额失败
			return false;
		}
	}
	
	// 管理所有的漏斗
	private Map<String, Funnel> funnels = new HashMap<>();
	
	// 是否允许访问
	public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
		String key = String.format("%s:%s", userId, actionKey);
		// 获取已经创建过的漏斗
		Funnel funnel = funnels.get(key);
		
		// 创建新的漏斗
		if (funnel == null) {
			funnel = new Funnel(capacity, leakingRate);
			funnels.put(key, funnel);
		}
		
		// 需要 1 个流量
		return funnel.watering(1); 
	}
}

基于redis实现分布式锁

多个服务请求同一个资源的时候,数据库压力,如何转移压力。,需要一把分布式锁的应用场景。

如何转移压力,受保护的公共资源,需要协调。缓存Key,失效以后,多个请求更新,只需要一个请求更新就可以了,需要一把分布式锁。 

原子问题
Redis 实现锁
Redis2.8 前, setNx + expire , set ex nx 这是两个命令,两个命令间存在原子操作的问题
Redis2.8 以后, set ex nx
set lock true ex 5 nx OK... do something critical ... # 10秒 del lock
超时问题
获得锁过后,这是网络原因导致,创建锁,可能会出现 客户端1耗时太长,本来第5秒redis数据就失效了,然后客户端1得到锁,然后导致拿锁时,时间太长,导致反而是客户端3拿到锁
redis分布式锁只适用于
1. 程序控制,业务逻辑时间确定的情况下,使用,程序能够控制执行时间的。
2. Value ,一个随机数,解锁去比较,类似版本号,只能删除自己的 Key-value
一个简单的代码实现:
public class RedisDistributedLock implements Lock {
    StringRedisTemplate stringRedisTemplate; // redis客户端
    
    String resourceName = null;
    
    int timeout = 10;

    ThreadLocal<String> lockRandomValue = new ThreadLocal<>();

    /**
     * 构建一把锁
     *
     * @param resourceName 资源唯一标识
     * @param timeout      资源锁定超时时间~防止资源死锁,单位秒
     */
    public RedisDistributedLock(String resourceName, int timeout, StringRedisTemplate stringRedisTemplate) {
        this.resourceName = "lock_" + resourceName;
        this.timeout = timeout;
        this.stringRedisTemplate = stringRedisTemplate;
    }


    @Override
    public boolean tryLock() {
        Boolean lockResult = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                // 随机设置一个值
                lockRandomValue.set(UUID.randomUUID().toString());
                Boolean status = connection.set(resourceName.getBytes(), lockRandomValue.get().getBytes(),
                        Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT);
                return status;
            }
        });
        return lockResult;
    }

    Lock lock = new ReentrantLock();

    // 多台机器的情况下,会出现大量的等待,加重redis的压力。 
    // 在lock方法上,加入同步关键字。单机同步,多机用redis
    @Override
    public void lock() {
        lock.lock();
        try {

            while (!tryLock()) {
                // 监听,锁删除的通知
                // try {
                //    Thread.sleep(1000L); // 定时等待后续尝试
                // } catch (InterruptedException e) {
                //  e.printStackTrace();
                // }
                stringRedisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                        try {
                            CountDownLatch waiter = new CountDownLatch(1);
                            // 等待通知结果
                            connection.subscribe((message, pattern) -> {
                                // 收到通知,不管结果,立刻再次抢锁
                                waiter.countDown();
                            }, (resourceName + "_unlock_channel").getBytes());
                            // 等待一段时间,超过这个时间都没收到消息,肯定有问题
                            waiter.await(timeout, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return true; //随便返回一个值都没问题
                    }
                });
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock() {
        // 释放锁。(底层框架开发者要防止被API的调用者误调用)
        // 错误示范 stringRedisTemplate.delete(resourceName);
        // 1、 要比对内部的值,同一个线程,才能够去释放锁。 
    	// 2、 同时发出释放锁的通知
        if (lockRandomValue.get().equals(stringRedisTemplate.opsForValue().get(resourceName))) {
        	// 普通实现
        	//normalUnlock();
        	
        	// 通过lua脚本实现
        	unlockUseLua();
        }
        System.out.println("释放锁:"+resourceName);
    }
    
    // 通过redis接口的通常操作
    private void normalUnlock() {
    	// 有两个过程,不能成为原子操作
    	stringRedisTemplate.delete(resourceName); // 删除资源
        stringRedisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                // 发送通知
                Long received = connection.publish((resourceName + "_unlock_channel").getBytes(), "".getBytes());
                return received;
            }
        });
    }
    
    @SuppressWarnings({ "unchecked", "unused" })
	private void unlockUseLua() {
    	/* 脚本文件的方式
    	ScriptSource scriptSource = 
    			new ResourceScriptSource(new ClassPathResource("unlock.lua"));
    	DefaultRedisScript<Boolean> defaultScript = new DefaultRedisScript<Boolean>();
    	defaultScript.setScriptSource(scriptSource);
    	*/
    	
    	String scriptText = 
    			"if redis.call('get',KEYS[1]) == ARGV[1] then\\r\\n" + 
    			"local result = redis.call('del',KEYS[1])\\r\\n" + 
    			"redis.call('publish',KEYS[2], ARGV[2])\\r\\n" + 
    			"return result\\r\\n" + 
    			"else\\r\\n" + 
    			"return 0\\r\\n" + 
    			"end";
    	RedisScript<Boolean> script = RedisScript.of(scriptText, Boolean.class);
    	
    	Boolean result = stringRedisTemplate.execute(script, 
    			Arrays.asList(resourceName, resourceName + "_unlock_channel"), 
    			new Object[] {lockRandomValue.get(), lockRandomValue.get()});
    	
    	System.out.println("lua脚本执行结果:"+result);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

在实际业务场景,至少我在项目中很少用到redis来做分布式锁。

集群下主从切换,Key转移的问题

产生原因 :异步进行主从切换,而不采用同步的,但是主服务器,出现问题,存在故障转移,进行故障转移,锁丢失了

用到大多数来解决这个问题,第三方库实现,比如Redlock;操作三个 主服务器来解决。

以上是关于Redis应用场景中布隆过滤及高并发限流及分布式锁等应用的主要内容,如果未能解决你的问题,请参考以下文章

Redis应用

分布式爬虫-基于redis的布隆过滤器设计大规模网址去重

Redis缓存雪崩缓存穿透缓存击穿

Redis深度历险:核心原理和技术实现(基础及应用篇)

Redis深度历险:核心原理和技术实现(基础及应用篇)

Java并发:分布式应用限流 Redis + Lua 实践