5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ

Posted 程序猿Knight

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ相关的知识,希望对你有一定的参考价值。

Spring实现异步的方式

技术图片

引入MQ后的架构演进

技术图片

MQ的使用场景

  • 异步处理
  • 流量削峰填谷
  • 解耦微服务

MQ的选择

技术图片

mq对比详情

mq对比详情

技术图片

搭建MQ

搭建教程

搭建RocketMq控制台

RocketMQ控制台安装教程

RocketMq的术语与概念

技术图片

技术图片

RocketMQ进阶

==看官方RocketMQ指导==

消息编程模型01-编写生产者

技术图片

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
application.yml

技术图片

代码实现
private final RocketMQTemplate rocketMQTemplate;

技术图片

消息编程模型02-编写消费者

技术图片

pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml

技术图片

代码实现

技术图片

分布式事务01-流程剖析,概念术语,事务消息状态

技术图片

技术图片

技术图片

分布式事务02-编码实现

表创建
CREATE TABLE `rocketmq_transaction_log` (
`id`  int(11) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`transaction_Id`  varchar(45) NOT NULL COMMENT '事务' ,
`log`  varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("参数非法!该分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
    }

    // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 发送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.rocketMQTemplate.sendMessageInTransaction(
                "tx-add-bonus-group",
                "add-bonus",
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build(),
                auditDTO
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("审核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        ShareAuditDTO auditDTO = (ShareAuditDTO)arg;

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream

技术图片

技术图片

技术图片

SpringCloudStream-编写生产者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

技术图片

代码编写

==@EnableBinding({Source.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {

技术图片

==修改com.alibaba.nacos日志级别==

logging:
  level:
    com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
    com.alibaba.nacos: error

SpringCloudStream-编写消费者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

技术图片

代码编写

==@EnableBinding({Sink.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}

技术图片

SpringCloudStream自定义接口-发送消息

技术图片

技术图片

技术图片

技术图片

SpringCloudStream自定义接口-消费消息

技术图片

技术图片

技术图片

技术图片

==修改com.alibaba.nacos日志级别==

logging:
  level:
    com.alibaba.nacos: error

消息过滤

Spring Cloud Stream实现消息过滤消费

  • condition
  • Tags
  • Sql 92

SpringCloudStream的监控

如下==三个链接查看SpringCloudStream==的监控
技术图片

application.yml
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

SpringCloudStream的异常处理

SpringCloudStream的异常处理

全局处理【通用】
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("x");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    ErrorMessage errorMessage = (ErrorMessage) message;
    System.out.println("Handling ERROR: " + errorMessage);
}

SpringCloudStream+RocketMQ实现分布式事务01-重构生产者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              transactional: true
              group: tx-add-bonus-group
      bindings:
        output:
          # 用来指定topic
          destination: add-bonus

==@EnableBinding({Source.class})==

@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("参数非法!该分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
    }

    // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 发送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.source.output()
            .send(
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build()
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("审核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        String dtoString = (String) headers.get("dto");
        ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream+RocketMQ实现分布式事务02-重构消费者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: add-bonus
          group: binder-group

==@EnableBinding({Sink.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}
AddBonusStreamConsumer
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
    private final UserService userService;

    @StreamListener(Sink.INPUT)
    public void receive(UserAddBonusMsgDTO message) {
        message.setEvent("CONTRIBUTE");
        message.setDescription("投稿加积分..");
        this.userService.addBonus(message);
    }
}
UserService
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
    // 1. 为用户加积分
    Integer userId = msgDTO.getUserId();
    Integer bonus = msgDTO.getBonus();
    User user = this.userMapper.selectByPrimaryKey(userId);

    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);

    // 2. 记录日志到bonus_event_log表里面
    this.bonusEventLogMapper.insert(
        BonusEventLog.builder()
            .userId(userId)
            .value(bonus)
            .event(msgDTO.getEvent())
            .createTime(new Date())
            .description(msgDTO.getDescription())
            .build()
    );
    log.info("积分添加完毕...");
}

SpringCloudStream知识盘点

SpringCloudStream知识盘点

以上是关于5.Spring Cloud Alibaba消息驱动的微服务-SpringCloudAlibabaRocketMQ的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架;Spring Cloud Alibaba集成RocketMQ(二十四)

微服务 Spring Cloud Alibaba 项目搭建(七RocketMQ 集成)

Spring Cloud Alibaba 2.2.6发布:新增Nacos注册快速失败的配置

阿里巴巴开源 Spring Cloud Alibaba,加码微服务生态建设

阿里巴巴开源 Spring Cloud Alibaba,加码微服务生态建设

Spring Cloud Alibaba发布第二个版本,Spring 发来贺电