redis教程-----redis事务记录日志到redis分布式锁

Posted alimayun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis教程-----redis事务记录日志到redis分布式锁相关的知识,希望对你有一定的参考价值。

redis事务

Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。
  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务。
  • 命令入队。
  • 执行事务。

示例:

//根据multi开启事务
redis 127.0.0.1:6379> MULTI OK redis 127.0.0.1:6379> SET book-name "Mastering C++ in 21 days" QUEUED redis 127.0.0.1:6379> GET book-name QUEUED redis 127.0.0.1:6379> SADD tag "C++" "Programming" "Mastering Series" QUEUED redis 127.0.0.1:6379> SMEMBERS tag QUEUED
//触发事务 redis
127.0.0.1:6379> EXEC 1) OK 2) "Mastering C++ in 21 days" 3) (integer) 3 4) 1) "Mastering Series" 2) "C++" 3) "Programming"

注:

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

redis 127.0.0.1:7000> multi
OK
redis 127.0.0.1:7000> set a aaa
QUEUED
redis 127.0.0.1:7000> set b bbb
QUEUED
redis 127.0.0.1:7000> set c ccc
QUEUED
redis 127.0.0.1:7000> exec
1) OK
2) OK
3) OK

如果在 set b bbb 处失败,set a 已成功不会回滚,set c 还会继续执行。

 

redis事务命令

multi:标记一个事务块的开始。
exec:执行所有事务块内的命令。
discard:取消事务,放弃执行事务块内的所有命令。
watch:监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch:取消 WATCH 命令对所有 key 的监视。

 

将日志记录到redis

1、需求

  • 获取日志的产生的线程名称,记录器名称,上下文产生时间,日志发生时间,自定义日志的信息
  • 将获取的信息以json的形式保存到redis中

 

2、思路

  • 配置logback使用自定义Appender实现,来获取对应的日志信息
  • 配置一个单列的redis工具类(不影响其他业务),将获取的日志信息保存起来

 

3、配置依赖

<!-- 日志:slf4j是接口,log4j是具体实现 -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.12</version>
</dependency>
<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-core</artifactId>
  <version>1.1.1</version>
</dependency>
<!-- 实现slf4j接口并整合 -->
<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-classic</artifactId>
  <version>1.1.1</version>
</dependency>
<!--redis-->
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.7.3</version>
</dependency>

 

4、配置redis以及logback

a、logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--自定义日志-->
    <appender name="MyAppender" class="log.RedisAppender" />
    <root level="info">
        <appender-ref ref="MyAppender" />
    </root>
</configuration>

b、redis.properties

maxIdle=100
maxWait=3000
testOnBorrow=true
host=127.0.0.1
port=6379
timeout=3000
pass=你的密码

 

5、MyAppender

package log;

import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import dto.log.LogData;
import redis.clients.jedis.Jedis;
import tool.JsonBuilder;
import tool.RedisBuilder;

/**
 * 自定义日志处理
 */
public class RedisAppender extends AppenderBase<LoggingEvent> {

    @Override
    protected void append(LoggingEvent loggingEvent) {
        //获取日志数据
        LogData logData=new LogData(loggingEvent);
        //设置日志保存数据库
        Jedis jedis=RedisBuilder.getSingleJedis(2);
        //设置日志的key
        String key="logData";
        //获取日志条数
        Integer index=RedisBuilder.getRedisIndexByName(key);
        //保存日志
        jedis.set(key+index, JsonBuilder.getString(logData));
        //关闭链接
        jedis.close();

    }
}

 

6、LogData

package dto.log;

import ch.qos.logback.classic.spi.LoggingEvent;
import tool.DataFormatBuilder;

/**
 * Created by yuyu on 2018/3/15.
 * 用于保存日志数据
 */
public class LogData {

    private String message;//日志的信息
    private String loggerTime;//上下文产生时间
    private String loggerName;//记录器名称
    private String threadName;//线程名称
    private String happenStamp;//日志发生时间

    public LogData() {
    }

    public LogData(LoggingEvent loggingEvent) {

        this.message=loggingEvent.toString();
        this.loggerTime=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getContextBirthTime());
        this.loggerName=loggingEvent.getLoggerName();
        this.threadName=loggingEvent.getThreadName();
        this.happenStamp=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getTimeStamp());
    }
    //getter,setter略
}

 

7、RedisBuilder

package tool;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Properties;

/**
 * Created by yuyu on 2018/3/15.
 * redis相关的操作,获取一个单例的连接池
 */
public class RedisBuilder {

    private static JedisPool jedisPool;

    private static RedisBuilder singleRedisBuilder=new RedisBuilder();

    //单利模式
    private RedisBuilder(){
        //获取配置信息
        Properties properties=PropertiesGetter.get("redis");
        //设置连接池参数
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(Integer.parseInt(properties.getProperty("maxIdle")));//最大的jedis实例
        config.setMaxWaitMillis(Integer.parseInt(properties.getProperty("maxWait")));//最大等待时间(毫秒)
        config.setTestOnBorrow(Boolean.parseBoolean(properties.getProperty("testOnBorrow")));//借用时测试
        //redis链接
        jedisPool=new JedisPool(config, properties.getProperty("host")
                , Integer.parseInt(properties.getProperty("port"))
                , Integer.parseInt(properties.getProperty("timeout"))
                ,  properties.getProperty("pass"));
    }

    /**
     * 从连接池中获取一个Jedis对象
     * @param db 数据库[0,15]
     * @return
     */
    public static Jedis getSingleJedis(Integer db) {
        if (db==null||(db<0&&db>15)){
            return null;
        }
        Jedis back=jedisPool.getResource();
        back.select(db);
        return back;
    }

    /**
     *传入名称获取保存在redis中的Index号码
     * @param name
     * @return
     */
    public static Integer getRedisIndexByName(String name){
        if (name==null){
            return null;
        }
        Jedis jedis=RedisBuilder.getSingleJedis(15);
        Integer index;
        String value=jedis.get(name);
        //获取保存的index数据,没有的时候取0
        if (null==value){
            index=0;
        }else{
            index =Integer.parseInt(value)+1;
        }
        //将index保存
        jedis.set(name,index.toString());
        jedis.close();
        return index;
    }
}

 

8、DateFormatBuilder

package tool;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by yuyu on 2018/3/15.
 * 用于日期处理相关函数
 */
public class DataFormatBuilder {

    /**
     * 根据传进来的时间戳 获取对应的时间格式
     * @param format 时间格式
     * @param stamp 时间戳
     * @return
     */
    public static  String getTimeStampFormat(String format,Long stamp){
        if (stamp==null){
            return null;
        }
        if (format==null){
            format="yyyy-MM-dd HH:mm:ss/SSS";
        }
        SimpleDateFormat df = new SimpleDateFormat(format);//设置日期格式
        return df.format(stamp);//传进来的时间戳为获取当前系统时间
    }
}

 

9、PropertiesGetter

package tool;

import exception.DobeoneException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by yuyu on 2018/3/12.
 * 用于参数配置文件中的数据获取
 */
public class PropertiesGetter {
    /**
     * 获取配置文件的配置信息
     * @param name
     * @return
     */
    public synchronized static Properties get(String name){
        String file="/properties/"+name+".properties";
        Properties prop = new Properties();
        InputStream in = PropertiesGetter.class.getResourceAsStream(file);
        try {
            prop.load(in);
        } catch (IOException e) {
            throw new DobeoneException("获取配置文件异常!-file-"+file,e);
        }
        return prop;
    }
}

 

10、JsonBuilder 

package tool;

import com.fasterxml.jackson.databind.ObjectMapper;
import exception.DobeoneException;

/**
 * Created by yuyu on 2018/3/15.
 * json相关的函数
 */
public class JsonBuilder {
    /**
     * 将一个实体类转换成json字符串
     * @param object
     * @return
     */
    public static String getString(Object object){
        //安全判断
        if (object==null){
            return null;
        }
        ObjectMapper mapper = new ObjectMapper();
        String back;
        try{
            back=mapper.writeValueAsString(object);
        }catch (Exception e){
            //抛出一个自定义异常
            throw new DobeoneException("json字符转换失败!-object-"+object,e);
        }
        return back;
    }
}

测试:

package tool;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by yuyu on 2018/3/15.
 * 测试日志到redis
 */
public class TestLogger {

    private Logger logger= LoggerFactory.getLogger(this.getClass());

    @Test
    public void testLogger(){
        logger.info("hahha");
    }
}

 

redis分布式锁

一般生产环境都是服务器集群,那么如果希望某个操作只能由一台服务器去运行,那么如何实现呢?例如本人之前做过的一个需求,通过kettle将数据同步到我们的数据库中,这个同步是每天凌晨3点,每天一次,数据到达我们数据库之后我们要进行分发,并做后续的处理。因此这个分发就只能由一台服务器去操作,如果同时有多台服务器分发,就会出现任务重复的问题。

1、分布式锁要求

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

 

2、分布式锁

a、添加依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

b、代码实现

public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        //第一个为key,我们使用key来当锁,因为key是唯一的;        
    
//第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,
      通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
//第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作; //第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。 //第五个为time,与第四个参数相呼应,代表key的过期时间。 //总结:利用redis单个操作的原子性,将加锁操作放在一条命令中 //1、String set(String key, String value) //2、String set(String key, String value, String nxxx) //3、String set(String key, String value, String nxxx, String expx, int time) //4、String set(String key, String value, String nxxx, String expx, long time) String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } }

关于分布式锁的各种问题可以参考这篇文章:Redis分布式锁的正确实现方式(Java版)

 

以上是关于redis教程-----redis事务记录日志到redis分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Redis学习笔记 [事务和锁机制持久化机制模拟主从复制集群搭建]

Redis教程:事务详解

Redis学习与总结

第218天学习打卡(知识点复习 Mysql隔离级别 主从复制 Redis 知识点复习 事务三特性)

redis学习教程三《发送订阅事务连接》

图解Redis,谈谈Redis的持久化,RDB快照与AOF日志