java Redis的订阅发布模式使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java Redis的订阅发布模式使用相关的知识,希望对你有一定的参考价值。

import java.lang.annotation.*;

/**
 * @Description: 缓存变更注解
 */
@Documented
@Inherited
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheChangeAnn {
    CacheNameEnum value();//自定义的

    CacheSignEnum sign();//自定义的
}
import org.apache.commons.lang3.builder.ToStringBuilder;

import java.io.Serializable;

/**
 * @Description: 缓存传输对象
 */
public class CacheSend implements Serializable {

    private CacheSignEnum sign;
    private CacheNameEnum name;
    private Long id;

    public CacheSend() {
    }

    public CacheSend(CacheSignEnum sign, CacheNameEnum name, Long id) {
        this.sign = sign;
        this.name = name;
        this.id = id;
    }

    public CacheSignEnum getSign() {
        return sign;
    }

    public void setSign(CacheSignEnum sign) {
        this.sign = sign;
    }

    public CacheNameEnum getName() {
        return name;
    }

    public void setName(CacheNameEnum name) {
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return new ToStringBuilder(this)
                .append("sign", sign)
                .append("name", name)
                .append("id", id)
                .toString();
    }
}
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * @Description: 缓存变更拦截器
 */
@Aspect
@Component
public class CacheChangeInterceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheChangeInterceptor.class);
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 添加业务逻辑方法切入点
     */
    @Pointcut("@annotation(com.skyon.lily.util.cache.CacheChangeAnn)")
    public void cacheChangeAspect() {
    }

    /**
     * 缓存变更(后置通知)
     *
     * @param joinPoint
     * @throws Throwable
     */
    @AfterReturning(value = "cacheChangeAspect()&&@annotation(cacheChangeAnn)", returning = "returnValue")
    public void addAdminLog(JoinPoint joinPoint, CacheChangeAnn cacheChangeAnn, Object returnValue) {
        //判断参数
        if (joinPoint.getArgs() == null || joinPoint.getArgs()[0] == null) {
            return;
        }
        Long id;
        if (CacheSignEnum.SAVE_OR_UPDATE == cacheChangeAnn.sign()) {
            id = getIdWithReflect(returnValue);
            switch (cacheChangeAnn.value()) {
               //todo 
                default:
                    return;
            }

        } else {
            id = (Long) joinPoint.getArgs()[0];
            redisTemplate.delete(cacheChangeAnn.value().getCacheKey() + "::" + id);
        }
        CacheSend cacheSend = new CacheSend(cacheChangeAnn.sign(), cacheChangeAnn.value(), id);
        redisTemplate.convertAndSend("cacheChange", cacheSend);
    }

    private Long getIdWithReflect(Object o) {
        try {
            String getter = "getId";
            Method method = o.getClass().getMethod(getter, new Class[]{});
            Object id = method.invoke(o);
            return (Long) id;
        } catch (Exception e) {
            LOGGER.error("{}通过反射获取ID失败", o, e);
        }
        return null;
    }

}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

/**
 * @Description: 缓存变更消息接收
 */
@Component
public class CacheChangeReceiver implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheChangeReceiver.class);

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private CacheService cacheService;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        CacheSend cacheSend = (CacheSend) redisTemplate.getValueSerializer().deserialize(message.getBody());
        LOGGER.info("收到缓存变更channel:{},消息:{}", cacheSend, channel);
        if (cacheSend.getId() == null) {
            return;
        }
        cacheService.mapChange(cacheSend);
    }

}
public interface FieldRepository extends BaseRepository<Field, Serializable> {

    /**
     * 保存
     */
    @Override
    @CacheChangeAnn(value = CacheNameEnum.FIELD, sign = CacheSignEnum.SAVE_OR_UPDATE)
    Field save(Field field);
    
    @Override
    @CacheChangeAnn(value = CacheNameEnum.FIELD, sign = CacheSignEnum.DELETE)
    void deleteById(Serializable id);
}

以上是关于java Redis的订阅发布模式使用的主要内容,如果未能解决你的问题,请参考以下文章

Redis发布订阅功能介绍,生产场景使用及性能测试

SSH如何去发布订阅监听

Redis实现不可靠发布/订阅功能

Redis订阅和发布模式和Redis事务

redis 发布/订阅 模式

ServiceStack.Redis订阅发布服务的调用(Z)