java Redis的订阅发布模式使用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java Redis的订阅发布模式使用相关的知识,希望对你有一定的参考价值。
import java.lang.annotation.*;
/**
* @Description: 缓存变更注解
*/
@Documented
@Inherited
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheChangeAnn {
CacheNameEnum value();//自定义的
CacheSignEnum sign();//自定义的
}
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.io.Serializable;
/**
* @Description: 缓存传输对象
*/
public class CacheSend implements Serializable {
private CacheSignEnum sign;
private CacheNameEnum name;
private Long id;
public CacheSend() {
}
public CacheSend(CacheSignEnum sign, CacheNameEnum name, Long id) {
this.sign = sign;
this.name = name;
this.id = id;
}
public CacheSignEnum getSign() {
return sign;
}
public void setSign(CacheSignEnum sign) {
this.sign = sign;
}
public CacheNameEnum getName() {
return name;
}
public void setName(CacheNameEnum name) {
this.name = name;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("sign", sign)
.append("name", name)
.append("id", id)
.toString();
}
}
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @Description: 缓存变更拦截器
*/
@Aspect
@Component
public class CacheChangeInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(CacheChangeInterceptor.class);
@Autowired
private RedisTemplate redisTemplate;
/**
* 添加业务逻辑方法切入点
*/
@Pointcut("@annotation(com.skyon.lily.util.cache.CacheChangeAnn)")
public void cacheChangeAspect() {
}
/**
* 缓存变更(后置通知)
*
* @param joinPoint
* @throws Throwable
*/
@AfterReturning(value = "cacheChangeAspect()&&@annotation(cacheChangeAnn)", returning = "returnValue")
public void addAdminLog(JoinPoint joinPoint, CacheChangeAnn cacheChangeAnn, Object returnValue) {
//判断参数
if (joinPoint.getArgs() == null || joinPoint.getArgs()[0] == null) {
return;
}
Long id;
if (CacheSignEnum.SAVE_OR_UPDATE == cacheChangeAnn.sign()) {
id = getIdWithReflect(returnValue);
switch (cacheChangeAnn.value()) {
//todo
default:
return;
}
} else {
id = (Long) joinPoint.getArgs()[0];
redisTemplate.delete(cacheChangeAnn.value().getCacheKey() + "::" + id);
}
CacheSend cacheSend = new CacheSend(cacheChangeAnn.sign(), cacheChangeAnn.value(), id);
redisTemplate.convertAndSend("cacheChange", cacheSend);
}
private Long getIdWithReflect(Object o) {
try {
String getter = "getId";
Method method = o.getClass().getMethod(getter, new Class[]{});
Object id = method.invoke(o);
return (Long) id;
} catch (Exception e) {
LOGGER.error("{}通过反射获取ID失败", o, e);
}
return null;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* @Description: 缓存变更消息接收
*/
@Component
public class CacheChangeReceiver implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CacheChangeReceiver.class);
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private CacheService cacheService;
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
CacheSend cacheSend = (CacheSend) redisTemplate.getValueSerializer().deserialize(message.getBody());
LOGGER.info("收到缓存变更channel:{},消息:{}", cacheSend, channel);
if (cacheSend.getId() == null) {
return;
}
cacheService.mapChange(cacheSend);
}
}
public interface FieldRepository extends BaseRepository<Field, Serializable> {
/**
* 保存
*/
@Override
@CacheChangeAnn(value = CacheNameEnum.FIELD, sign = CacheSignEnum.SAVE_OR_UPDATE)
Field save(Field field);
@Override
@CacheChangeAnn(value = CacheNameEnum.FIELD, sign = CacheSignEnum.DELETE)
void deleteById(Serializable id);
}
以上是关于java Redis的订阅发布模式使用的主要内容,如果未能解决你的问题,请参考以下文章