Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)
Posted 蓝盒子itbluebox
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)相关的知识,希望对你有一定的参考价值。
关于安装RabbitMQ(Centos6)(入门使用教程)(消息丢失的解决方案)以及Spring AMQP的使用
在我的上也颇为稳重当中有介绍,请点击链接学习:
https://code100.blog.csdn.net/article/details/119982354
1、项目改造
接下来,我们就改造项目,实现搜索服务、商品静态页的数据同步。
1.1.思路分析
发送方:商品微服务
-
什么时候发?
当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。
-
发送什么内容?
对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。
接收方:搜索微服务、静态页微服务
- 接收消息后如何处理?
- 搜索微服务:
- 增/改:添加新的数据到索引库
- 删:删除索引库数据
- 静态页微服务:
- 增:创建新的静态页
- 删:删除原来的静态页
- 改:创建新的静态页并删除原来的
- 搜索微服务:
1.1.1.修改ly-page当中PageService设置其生成的html文件到本地的nginx安装目录当中的html当中
搭建服务到Linux的时候换成linux的安装路径
1.1.1.1创建对应枚举
package com.leyou.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Getter //get方法
@NoArgsConstructor //无参构造
@AllArgsConstructor //有参构造
public enum NginxUrlEnum {
NGINX_URL_ENUM_HTML("D:/ngnix/nginx-1.18.0/nginx-1.18.0/html/item/"),
;
private String url;
}
1.1.1.2 修改ly-page当中的PageService
public void createHtml(Long spuId) {
//上下文
Context context = new Context();
context.setVariables(loadModel(spuId));
//输出流(渲染输出)D:/ngnix/nginx-1.18.0/nginx-1.18.0/html/item
File dest = new File(NginxUrlEnum.NGINX_URL_ENUM_HTML.getUrl(), spuId + ".html");
try(PrintWriter writer = new PrintWriter(dest, "UTF-8")){
//三个参数(- 模板名称 - 上下文:里面包含模型数据 - writer:输出目的地的流)
templateEngine.process("item", context, writer);
}catch (Exception e){
log.error("[静态页服务] 生成静态页异常0",e);
}
}
1.2.发送消息
我们先在商品微服务ly-item-service
中实现发送消息。
1.2.1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2.2.配置文件
我们在application.yml中添加一些有关RabbitMQ的配置:
spring:
rabbitmq:
host: 192.168.206.66
username: leyou
password: 123321
virtual-host: /leyou
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 300000ms
multiplier: 2
exchange: ly.item.exchange
publisher-confirms: true
- template:有关
AmqpTemplate
的配置- retry:失败重试
- enabled:开启失败重试
- initial-interval:第一次重试的间隔时长
- max-interval:最长重试间隔,超过这个间隔将不再重试
- multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- retry:失败重试
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
1.2.3.改造GoodsService
1.2.3.1注入AmqpTemplate
@Autowired
private AmqpTemplate amqpTemplate;
1.2.3.2封装一个发送消息到mq的方法:
private void sendMessage(Long id, String type){
// 发送消息
try {
this.amqpTemplate.convertAndSend( type, id);
} catch (Exception e) {
}
}
- 这里没有指定交换机,因此默认发送到了配置中的:
ly.item.exchange
注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
- 然后在新增的时候调用:
@Transient
public void saveGoods(Spu spu) {
//新增SPU
spu.setId(null);
spu.setCreateTime(new Date());
spu.setLastUpdateTime(spu.getCreateTime());
spu.setSaleable(true);
spu.setValid(false);//设置默认不删除
int count = spuMapper.insert(spu);
if(count != 1){
throw new LyException(ExceptionEnum.GOODS_SAVE_ERROR);
}
//新增spu_detail
SpuDetail spuDetail = spu.getSpuDetail();
spuDetail.setSpuId(spu.getId());//spu新增完善会回显,然后就有spu.getId())
spuDetailMapper.insert(spuDetail);
saveSkuAndStock(spu);
sendMessage(spu.getId(),"item.insert");
}
- 修改的时候调用:
@Transactional
public void updateGoods(Spu spu) {
if(spu.getId() == null){
throw new LyException(ExceptionEnum.GOODS_UPDATE_ERROR);
}
//删除SKU
Sku sku = new Sku();
sku.setSpuId(spu.getId());
//查询sku
List<Sku> skuList = skuMapper.select(sku);
if(!CollectionUtils.isEmpty(skuList)){
//删除sku
skuMapper.delete(sku);
//skuList.stream().map(Sku::getId).collect(Collectors.toList());
//代码解析.stream()将集合转换为流,map将集合当中的的所有id取出转换为集合
// 删除stock
List<Long> ids = skuList.stream().map(Sku::getId).collect(Collectors.toList());
stockMapper.deleteByIdList(ids);
}
//修改spu
spu.setValid(null);//是否有效设置为空
spu.setSaleable(null);
spu.setLastUpdateTime(new Date());
spu.setLastUpdateTime(null);
int count = spuMapper.updateByPrimaryKeySelective(spu);
if(count != 1){
throw new LyException(ExceptionEnum.GOODS_UPDATE_ERROR);
}
//修改detail
count = spuDetailMapper.updateByPrimaryKeySelective(spu.getSpuDetail());
if(count != 1){
throw new LyException(ExceptionEnum.GOODS_UPDATE_ERROR);
}
//新增sku和stock
saveSkuAndStock(spu);
//发送mq消息
sendMessage(spu.getId(),"item.update");
}
1.3.搜索服务接收消息
搜索服务接收到消息后要做的事情:
- 增:添加新的数据到索引库
- 删:删除索引库数据
- 改:修改索引库数据
因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。
1.3.1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.3.2.添加配置
spring:
rabbitmq:
host: 192.168.206.66
username: leyou
password: leyou
virtual-host: /leyou
这里只是接收消息而不发送,所以不用配置template相关内容。
1.3.3.编写监听器(数据新增和更新)
1.3.3.1 创建ItemListener
package com.leyou.search.mq;
import com.leyou.search.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component //注入Spring当中
public class ItemListener {
@Autowired
private SearchService searchService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "search.item.insert.queue", durable = "true"),
exchange = @Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC),
key = {"item.insert","item.update"}
))
public void listenInsertOrUpdate(Long spuId){
if(spuId == null){
return;
}
//处理消息,对索引库进行新增或者修改
searchService.createOrUpdateIndex(spuId);
}
}
1.3.3.2 完善SearchService
public void createOrUpdateIndex(Long spuId) {
//查询spu
Spu spu = goodsClient.querySpuById(spuId);
//构建goods对象
Goods goods = buildGoods(spu);
//存入索引库
repository.save(goods);
}
1.3.4.编写监听器(数据删除)
1.3.4.1 完善ItemListener当中的listenDelete方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "search.item.delete.queue", durable = "true"),
exchange = @Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC),
key = {"item.delete"}
))
public void listenDelete(Long spuId){
if(spuId == null){
return;
}
//处理消息,对索引库进行删除
searchService.deleteIndex(spuId);
}
1.3.4.2 完善searchService当中的deleteIndex方法
public void deleteIndex(Long spuId) {
//删除
repository.deleteById(spuId);
}
1.3.5.运行测试
在这里多了两个队列
交换机
点击
ly.item.exchange
1.4.静态页服务接收消息
商品静态页服务接收到消息后的处理:
- 增:创建新的静态页
- 删:删除原来的静态页
- 改:创建新的静态页并删除原来的
不过,我们编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。
1.4.1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.4.2.添加配置
spring:
rabbitmq:
host: 192.168.206.66
username: leyou
password: leyou
virtual-host: /leyou
这里只是接收消息而不发送,所以不用配置template相关内容。
1.4.3.编写监听器
1.4.3.1 完善PageService当中的createHtml,设置判断是否有文件如果有删除重新创建,没有则直接创建
if(dest.exists()){
dest.delete();
}
1.4.3.2 创建ItemListener
//处理消息,对HTML静态页进行删除
pageService.deleteHtml(spuId);
1.4.3.3 完善pageService
public void deleteHtml(Long spuId) {
File dest = new File(NginxUrlEnum.NGINX_URL_ENUM_HTML.getUrl(), spuId + ".html");
if(dest.exists()){
dest.delete();
}
}
1.4.4.重新启动项目并测试
在leyou-manage-web项目当中修改商品
重新启动项目,并且登录RabbitMQ管理界面
可以看到,交换机已经创建出来了:
队列也已经创建完毕:
并且队列都已经绑定到交换机:
查看数据
我们搜索下手机:
商品详情页:
修改商品
然后在管理后台修改商品:
我们修改以下内容:
标题改成6.1
商品详情加点文字:
再次查看数据
以上是关于Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)的主要内容,如果未能解决你的问题,请参考以下文章
(超详解)SpringBoot高级部分-自动配置+监听机制+监控+项目部署