5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ
Posted 程序猿Knight
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ相关的知识,希望对你有一定的参考价值。
Spring实现异步的方式
引入MQ后的架构演进
MQ的使用场景
- 异步处理
- 流量削峰填谷
- 解耦微服务
MQ的选择
mq对比详情
搭建MQ
搭建RocketMq控制台
RocketMq的术语与概念
RocketMQ进阶
==看官方RocketMQ指导==
消息编程模型01-编写生产者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml
代码实现
private final RocketMQTemplate rocketMQTemplate;
消息编程模型02-编写消费者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml
代码实现
分布式事务01-流程剖析,概念术语,事务消息状态
分布式事务02-编码实现
表创建
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`transaction_Id` varchar(45) NOT NULL COMMENT '事务' ,
`log` varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息。。
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build(),
auditDTO
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享...")
.build()
);
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
ShareAuditDTO auditDTO = (ShareAuditDTO)arg;
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
SpringCloudStream
SpringCloudStream-编写生产者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml
代码编写
==@EnableBinding({Source.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
==修改com.alibaba.nacos日志级别==
logging:
level:
com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
com.alibaba.nacos: error
SpringCloudStream-编写消费者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml
代码编写
==@EnableBinding({Sink.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
SpringCloudStream自定义接口-发送消息
SpringCloudStream自定义接口-消费消息
==修改com.alibaba.nacos日志级别==
logging:
level:
com.alibaba.nacos: error
消息过滤
- condition
- Tags
- Sql 92
SpringCloudStream的监控
如下==三个链接查看SpringCloudStream==的监控
application.yml
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
SpringCloudStream的异常处理
全局处理【通用】
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " + errorMessage);
}
SpringCloudStream+RocketMQ实现分布式事务01-重构生产者
application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
transactional: true
group: tx-add-bonus-group
bindings:
output:
# 用来指定topic
destination: add-bonus
==@EnableBinding({Source.class})==
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息。。
String transactionId = UUID.randomUUID().toString();
this.source.output()
.send(
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build()
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享...")
.build()
);
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
String dtoString = (String) headers.get("dto");
ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
SpringCloudStream+RocketMQ实现分布式事务02-重构消费者
application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: add-bonus
group: binder-group
==@EnableBinding({Sink.class})==
// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
AddBonusStreamConsumer
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
private final UserService userService;
@StreamListener(Sink.INPUT)
public void receive(UserAddBonusMsgDTO message) {
message.setEvent("CONTRIBUTE");
message.setDescription("投稿加积分..");
this.userService.addBonus(message);
}
}
UserService
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
// 1. 为用户加积分
Integer userId = msgDTO.getUserId();
Integer bonus = msgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event(msgDTO.getEvent())
.createTime(new Date())
.description(msgDTO.getDescription())
.build()
);
log.info("积分添加完毕...");
}
SpringCloudStream知识盘点
以上是关于5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架;Spring Cloud Alibaba集成RocketMQ(二十四)
微服务 Spring Cloud Alibaba 项目搭建(七RocketMQ 集成)
Spring Cloud Alibaba 2.2.6发布:新增Nacos注册快速失败的配置
阿里巴巴开源 Spring Cloud Alibaba,加码微服务生态建设