RabbitMQ整合SpringBoot项目

RabbitMQ整合Spring Boot项目

添加依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application-dev.yml

spring: 
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /

交换机\路由Key\队列的配置类

package cn.tedu.csmall.stock.webapi.quartz;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//这个类是配置RabbitMQ中交换机、队列、路由键
//交换机和队列是对象,需要保存到Spring容器中,路由键是绑定关系和匹配使用的关键字
@Configuration
public class RabbitMQConfig {
//使用常量定义交换机路由键和队列
public static final String STOCK_EX="stock_ex";
public static final String STOCK_ROUT="stock_rout";
public static final String STOCK_QUEUE="stock_queue";

//声明交换机队列,保存到spring容器中
//根据实际需求生成交换机的数量,目前只需要一个
@Bean
public DirectExchange stockDirectExchange(){
return new DirectExchange(STOCK_EX);
}
//声明队列对象,保存到spring容器
@Bean
public Queue stockQueue(){
return new Queue(STOCK_QUEUE);
}
//通过路由键绑定交换机和队列
@Bean
public Binding stockBinding(){
return BindingBuilder.bind(stockQueue()).to(stockDirectExchange()).with(STOCK_ROUT);
}
}

计划任务—先前的计划任务代码先注释

    @Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//先简单发送一个字符串
//convertAndSend([交换机名称],[路由key的名称],[要发送的消息])
rabbitTemplate.convertAndSend(
RabbitMQConfig.STOCK_EX,RabbitMQConfig.STOCK_ROUT,"消息:执行减少库存的操作");
}
}

修改cron表达式

CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");

RabbitMQ的消费者

package cn.tedu.csmall.stock.webapi.quartz;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//当前类也是将对象交给Spring容器管理
@Component
@RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE})
public class RabbitMQConsumer {
//在类上监听,实际上运行的也是一个方法
//@RabbitHandler注解标记的方法,是当监听的队列有消息时,就会被触发
//一个类只能有一个方法标记这个注解
//这个方法直接声明参数类型,从队列中接收到的消息会自动转成指定的参数类型
@RabbitHandler
public void process(String str){
System.out.println("消息的接受者收到消息:"+str);
}
}

启动Nacos\RabbitMQ\Seata

启动stock-webapi

根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行

测试成功表示一切正常

开发酷鲨秒杀执行流程

准备流控和降级的处理类

秒杀业务肯定是一个高并发的处理,并发数超过程序设计的限制时,就需要对请求的数量进行限流

Sentinel是阿里提供的SpringCloud组件,主要用于外界访问当前服务器的控制器方法的限流操作

先编写限流异常

package cn.tedu.mall.seckill.exception;

import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import lombok.extern.slf4j.Slf4j;

//秒杀业务限流异常处理类
@Slf4j
public class SeckillBlockHandler {
//声明限流的方法,返回值必须和控制器一致
//参数要包含控制器的参数,最后再添加一个BlockException异常类型的参数
public static JsonResult seckillBlock(String randCode, SeckillOrderAddDTO seckillOrderAddDTO,
BlockException e){
log.error("一个请求被限流了");
return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,"服务器繁忙!");
}
}

再创建降级类

package cn.tedu.mall.seckill.exception;

import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import lombok.extern.slf4j.Slf4j;

//降级类
@Slf4j
public class SeckillFallback {
//返回值必须和控制器方法一致
//参数也是包含控制层方法参数,可以不写其他参数,也可以添加Throwable类型的参数
//Throwable类型的参数就是触发这次降级的原因
public static JsonResult seckillFall(String randCode,
SeckillOrderAddDTO seckillOrderAddDTO,
Throwable throwable){
log.error("一个请求被降级了!");
return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,throwable.getMessage());
}
}

提交秒杀订单

开发业务层

我们之前完成了秒杀的预热,预热中完成了秒杀商品sku库存数,spu随机码保存在redis中的操作

也完成了查询秒杀商品列表,和显示秒杀商品详情的方法

下面要开始进行秒杀商品生成订单的操作

如果用户选择商品规格(sku)提交订单,那么就要按照提交秒杀订单的业务流程处理

秒杀提交订单和普通订单的区别

1.要判断当前用户是否为重复购买

2.从Redis中判断是否有库存

3.秒杀订单转换成普通订单,需要使用dubbo在order模块完成

4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中

创建一个SeckillServiceImpl业务逻辑层实现类,完成上面的业务

package cn.tedu.mall.seckill.service.impl;

import cn.tedu.mall.common.exception.CoolSharkServiceException;
import cn.tedu.mall.common.pojo.domain.CsmallAuthenticationInfo;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.order.service.IOmsOrderService;
import cn.tedu.mall.pojo.order.dto.OrderAddDTO;
import cn.tedu.mall.pojo.order.dto.OrderItemAddDTO;
import cn.tedu.mall.pojo.order.vo.OrderAddVO;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import cn.tedu.mall.pojo.seckill.model.Success;
import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO;
import cn.tedu.mall.seckill.config.RabbitMqComponentConfiguration;
import cn.tedu.mall.seckill.service.ISeckillService;
import cn.tedu.mall.seckill.utils.SeckillCacheUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class SeckillServiceImpl implements ISeckillService {
//减少sku库存数的Redis对象,是操作字符串
@Autowired
private StringRedisTemplate stringRedisTemplate;
//需要普通订单生成的方法,Dubbo调用
@DubboReference
private IOmsOrderService dubboOrderService;
//秒杀成功后的成功信息,先放到RabbitMQ中
@Autowired
private RabbitTemplate rabbitTemplate;

/*
1.要判断当前用户是否为重复购买
2.从Redis中判断是否有库存
3.秒杀订单转换成普通订单,需要使用dubbo在order中完成
4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中
*/
@Override
public SeckillCommitVO commitSeckill(SeckillOrderAddDTO seckillOrderAddDTO) {
//第一步:利用redids检查库存数和检查是否为重复购买
//先获取用户Id和要购买的商品skuId
Long skuId = seckillOrderAddDTO.getSeckillOrderItemAddDTO().getSkuId();
Long userId = getUserId();
//我们根据userId和skuId的组合来确定谁买了什么商品
//秒杀业务规定,一个用户id只能购买一个skuId一次
//我们利用userId和skuId生成一个key,将key保存到redis中,表示当前用户已经购买过
//key的组成可能是:mall:seckill:reseckill:1:2
String reseckillCheckKey = SeckillCacheUtils.getReseckillCheckKey(skuId, userId);
//向Redis中保存这个key,利用increment()方式
//increment()效果如下
//1.如果当前key不存在,redis就会创建这个key,并保存他的值为1
//2.如果当前key存在,redis就会给当前值+1
//3.将当前key的值返回给调用者
Long seckillCounts = stringRedisTemplate.boundValueOps(reseckillCheckKey).increment();
//如果seckillCounts大于1,表示之前已经购买过
if(seckillCounts>1)
//购买次数超过1,证明不是第一次购买,终止业务,抛出异常
throw new CoolSharkServiceException(ResponseCode.FORBIDDEN,"您已经购买过该商品了");
//程序运行到这里,表示用户是第一次购买
//先检查该商品是否有库存
String stockKey = SeckillCacheUtils.getStockKey(skuId);
//从redis中获取库存数,使用decrement()方法,将当前库存减去1后返回值
Long seckillStocks = stringRedisTemplate.boundValueOps(stockKey).decrement();
//如果seckillStocks是0表示最后一件,可以正常购买
//如果小于0则表示,库存不足
if(seckillStocks<0){
//删除用户购买记录
stringRedisTemplate.boundValueOps(reseckillCheckKey).decrement();
//抛出异常,提示库存不足
throw new CoolSharkServiceException(ResponseCode.BAD_REQUEST,"对不起,您购买的商品已经无货了");
}
//代码到这里可以证明用户是第一次购买且库存充足
//第二阶段:将秒杀订单SeckillOrderAddDTO转成普通订单OrderAddDTO
OrderAddDTO orderAddDTO = converSeckillOrderTOOrder(seckillOrderAddDTO);
//为userId进行赋值
orderAddDTO.setUserId(userId);
//订单完整了,直接利用Dubbo生成普通订单
OrderAddVO orderAddVO = dubboOrderService.addOrder(orderAddDTO);
//第三阶段:使用消息队列记录秒杀成功的信息
//需要使用success对象
Success success = new Success();
BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),success);
//将userId赋值给success对象
success.setUserId(userId);
//将订单编号
success.setOrderSn(orderAddVO.getSn());
//将success对象发送给RabbitMQ
rabbitTemplate.convertAndSend(RabbitMqComponentConfiguration.SECKILL_EX
,RabbitMqComponentConfiguration.SECKILL_RK,success);
//我们需要返回SeckillCommitVO类型对象
SeckillCommitVO seckillCommitVO = new SeckillCommitVO();
BeanUtils.copyProperties(orderAddVO,seckillCommitVO);
return seckillCommitVO;
}
//在这个方法中将秒杀订单转成普通订单
private OrderAddDTO converSeckillOrderTOOrder(SeckillOrderAddDTO seckillOrderAddDTO) {
OrderAddDTO orderAddDTO = new OrderAddDTO();
//进行同名属性赋值
BeanUtils.copyProperties(seckillOrderAddDTO,orderAddDTO);
//seckillOrderAddDTO对象中的订单项SeckillOrderItemAddDATO赋值给orderAddDTO订单项集合OrderItemAddDTO
OrderItemAddDTO orderItemAddDTO = new OrderItemAddDTO();
BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),orderItemAddDTO);
//实例化一个List集合,将赋值好的orderItemAddDTO新增到集合中
List<OrderItemAddDTO> orderItemAddDTOS = new ArrayList<>();
orderItemAddDTOS.add(orderItemAddDTO);
//最后将集合赋值到orderAddDTO对象的orderItems属性中
orderAddDTO.setOrderItems(orderItemAddDTOS);
//转换完成,返回结果
return orderAddDTO;
}

//获取当前登录用户的用户信息
public CsmallAuthenticationInfo getUserInfo(){
//获得Spring Security上下文对象
UsernamePasswordAuthenticationToken authenticationToken =
(UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication();
//判断authenticationToken是否为空
if(authenticationToken==null)
throw new CoolSharkServiceException(ResponseCode.UNAUTHORIZED,"没有登录信息");
//不为空,获取其中用户信息
CsmallAuthenticationInfo csmallAuthenticationInfo =
(CsmallAuthenticationInfo) authenticationToken.getCredentials();
return csmallAuthenticationInfo;
}
//获取当前登录用户的用户id
public Long getUserId(){
return getUserInfo().getId();
}
}

开发控制层

image-20230304104628257

package cn.tedu.mall.seckill.controller;

import cn.tedu.mall.common.exception.CoolSharkServiceException;
import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO;
import cn.tedu.mall.seckill.exception.SeckillBlockHandler;
import cn.tedu.mall.seckill.exception.SeckillFallback;
import cn.tedu.mall.seckill.service.ISeckillService;
import cn.tedu.mall.seckill.utils.SeckillCacheUtils;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/seckill")
@Api(tags = "提交秒杀订单")
public class SeckillController {
@Autowired
private ISeckillService seckillService;

@Autowired
private RedisTemplate redisTemplate;

@PostMapping("/{randCode}")
@ApiOperation("随机码验证并提交订单")
@ApiImplicitParam(value = "随机码",name = "randCode",required = true,dataType = "string")
@PreAuthorize("hasRole('user')")
//Sentinel限流和降级配置
@SentinelResource(value = "seckill",
blockHandlerClass = SeckillBlockHandler.class,blockHandler = "seckillBlock",
fallbackClass = SeckillFallback.class,fallback = "seckillFall")
public JsonResult<SeckillCommitVO> commitSeckillOrder(
@PathVariable String randCode, SeckillOrderAddDTO seckillOrderAddDTO
){
//获取spuId
Long spuId = seckillOrderAddDTO.getSpuId();
//获取当前spuId对应的随机码
//先获取key,再获取value
String randCodeKey = SeckillCacheUtils.getRandCodeKey(spuId);
//判断redis中是否包含这个key
if(redisTemplate.hasKey(randCodeKey)){
//根据key获取value
String redisRandCode = redisTemplate.boundValueOps(randCodeKey).get()+"";
//为了防止Redis信息丢失,我们可以判断一下redisRandCode的存在
if(redisRandCode==null)
//redis信息丢失
throw new CoolSharkServiceException(ResponseCode.INTERNAL_SERVER_ERROR,"服务器内部错误,请联系客服");
//判断Redis中的随机码和控制器的随机码是否一致,防止投机购买
if(!redisRandCode.equals(randCode))
//如果不一致,判断为投机购买,抛出异常
throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品");
//执行购买操作
SeckillCommitVO seckillCommitVO = seckillService.commitSeckill(seckillOrderAddDTO);
return JsonResult.ok(seckillCommitVO);
} else {
//如果redis中没有这个随机码的key值,直接发送异常,提示没有该商品
throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品");
}
}
}

启动Nacos\Seata\RabbitMQ\Redis\Sentinel

项目Leaf\product\passport\order\seckill

注意yml配置文件中的RabbitMQ的用户名和密码

为了方便测试:如果说已经购买过,就修改允许购买的数量 >1为 >100

为了方便测试:如果说没有库存,可以把判断库存的if注释掉

测试成功即可

还可以测试sentinel的限流

success成功信息的处理

开发持久层

我们要连接数据库,对这个表进行新增

还有对秒杀数据库sku库存的修改

//根据skuId修改库存数
void updateReduceStockBySkuId(@Param("skuId") Long skuId,
@Param("quantity") Integer quantity);
<!--根据skuId修改库存数-->
<update id="updateReduceStockBySkuId">
update
seckill_sku
set
seckill_stock=seckill_stock - #{quantity}
where
sku_id=#{skuId}
</update>

下面再编写新增Success的方法

@Repository
public interface SuccessMapper {
//新增Success对象到数据库的方法
void saveSuccess(Success success);
}

SuccessMapper.xml

<!--  新增Success对象到数据库的方法  -->
<insert id="saveSuccess">
insert into success(
user_id,
user_phone,
sku_id,
title,
main_picture,
seckill_price,
quantity,
bar_code,
data,
order_sn
)values(
#{userId},
#{userPhone},
#{skuId},
#{title},
#{mainPicture},
#{seckillPrice},
#{quantity},
#{barCode},
#{data},
#{orderSn}
)
</insert>

开发消息的接收功能

我们当前触发新增Success的方法并不是常规的业务逻辑层

而是由RabbitMQ消息收发机制中接收消息的对象来调用

所有我们编写一个接收消息的监听器类来完成这个操作

创建consumer包,包中创建类SekillQueueConsumer代码如下

// 必须保存到Spring容纳中
@Component
// RabbitMQ监听器声明
@RabbitListener(queues = {RabbitMqComponentConfiguration.SECKILL_QUEUE})
public class SeckillQueueConsumer {
//业务需要的Mapper对象装配
@Autowired
private SuccessMapper successMapper;
@Autowired
private SeckillSkuMapper skuMapper;
// 当前类上标记的队列收到消息时
// 下面方法会接收这个消息,自动运行
@RabbitHandler
public void process(Success success){
// 先减库存
// 减少seckill_sku表中的库存数并不迫切,运行可能延迟,真正运行时,秒杀可能已经结束了
// 这个库存的减少操作也不会影响秒杀过程中redis的库存数
skuMapper.updateReduceStockBySkuId(success.getSkuId(),success.getQuantity());
// 新增success对象到数据库
successMapper.saveSuccess(success);
}
}

环境方面

Nacos\Sentinel\Seata\redis\RabbitMQ

服务方面

Leaf\product\order\seckill