RabbitMQ整合Spring Boot项目
添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application-dev.yml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
|
交换机\路由Key\队列的配置类
package cn.tedu.csmall.stock.webapi.quartz;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { public static final String STOCK_EX="stock_ex"; public static final String STOCK_ROUT="stock_rout"; public static final String STOCK_QUEUE="stock_queue";
@Bean public DirectExchange stockDirectExchange(){ return new DirectExchange(STOCK_EX); } @Bean public Queue stockQueue(){ return new Queue(STOCK_QUEUE); } @Bean public Binding stockBinding(){ return BindingBuilder.bind(stockQueue()).to(stockDirectExchange()).with(STOCK_ROUT); } }
|
计划任务—先前的计划任务代码先注释
@Autowired private RabbitTemplate rabbitTemplate; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { rabbitTemplate.convertAndSend( RabbitMQConfig.STOCK_EX,RabbitMQConfig.STOCK_ROUT,"消息:执行减少库存的操作"); } }
|
修改cron表达式
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
|
RabbitMQ的消费者
package cn.tedu.csmall.stock.webapi.quartz;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE}) public class RabbitMQConsumer { @RabbitHandler public void process(String str){ System.out.println("消息的接受者收到消息:"+str); } }
|
启动Nacos\RabbitMQ\Seata
启动stock-webapi
根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行
测试成功表示一切正常
开发酷鲨秒杀执行流程
准备流控和降级的处理类
秒杀业务肯定是一个高并发的处理,并发数超过程序设计的限制时,就需要对请求的数量进行限流
Sentinel是阿里提供的SpringCloud组件,主要用于外界访问当前服务器的控制器方法的限流操作
先编写限流异常
package cn.tedu.mall.seckill.exception;
import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import com.alibaba.csp.sentinel.slots.block.BlockException; import lombok.extern.slf4j.Slf4j;
@Slf4j public class SeckillBlockHandler { public static JsonResult seckillBlock(String randCode, SeckillOrderAddDTO seckillOrderAddDTO, BlockException e){ log.error("一个请求被限流了"); return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,"服务器繁忙!"); } }
|
再创建降级类
package cn.tedu.mall.seckill.exception;
import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import lombok.extern.slf4j.Slf4j;
@Slf4j public class SeckillFallback { public static JsonResult seckillFall(String randCode, SeckillOrderAddDTO seckillOrderAddDTO, Throwable throwable){ log.error("一个请求被降级了!"); return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,throwable.getMessage()); } }
|
提交秒杀订单
开发业务层
我们之前完成了秒杀的预热,预热中完成了秒杀商品sku库存数,spu随机码保存在redis中的操作
也完成了查询秒杀商品列表,和显示秒杀商品详情的方法
下面要开始进行秒杀商品生成订单的操作
如果用户选择商品规格(sku)提交订单,那么就要按照提交秒杀订单的业务流程处理
秒杀提交订单和普通订单的区别
1.要判断当前用户是否为重复购买
2.从Redis中判断是否有库存
3.秒杀订单转换成普通订单,需要使用dubbo在order模块完成
4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中
创建一个SeckillServiceImpl业务逻辑层实现类,完成上面的业务
package cn.tedu.mall.seckill.service.impl;
import cn.tedu.mall.common.exception.CoolSharkServiceException; import cn.tedu.mall.common.pojo.domain.CsmallAuthenticationInfo; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.order.service.IOmsOrderService; import cn.tedu.mall.pojo.order.dto.OrderAddDTO; import cn.tedu.mall.pojo.order.dto.OrderItemAddDTO; import cn.tedu.mall.pojo.order.vo.OrderAddVO; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import cn.tedu.mall.pojo.seckill.model.Success; import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO; import cn.tedu.mall.seckill.config.RabbitMqComponentConfiguration; import cn.tedu.mall.seckill.service.ISeckillService; import cn.tedu.mall.seckill.utils.SeckillCacheUtils; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboReference; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.List;
@Service @Slf4j public class SeckillServiceImpl implements ISeckillService { @Autowired private StringRedisTemplate stringRedisTemplate; @DubboReference private IOmsOrderService dubboOrderService; @Autowired private RabbitTemplate rabbitTemplate;
@Override public SeckillCommitVO commitSeckill(SeckillOrderAddDTO seckillOrderAddDTO) { Long skuId = seckillOrderAddDTO.getSeckillOrderItemAddDTO().getSkuId(); Long userId = getUserId(); String reseckillCheckKey = SeckillCacheUtils.getReseckillCheckKey(skuId, userId); Long seckillCounts = stringRedisTemplate.boundValueOps(reseckillCheckKey).increment(); if(seckillCounts>1) throw new CoolSharkServiceException(ResponseCode.FORBIDDEN,"您已经购买过该商品了"); String stockKey = SeckillCacheUtils.getStockKey(skuId); Long seckillStocks = stringRedisTemplate.boundValueOps(stockKey).decrement(); if(seckillStocks<0){ stringRedisTemplate.boundValueOps(reseckillCheckKey).decrement(); throw new CoolSharkServiceException(ResponseCode.BAD_REQUEST,"对不起,您购买的商品已经无货了"); } OrderAddDTO orderAddDTO = converSeckillOrderTOOrder(seckillOrderAddDTO); orderAddDTO.setUserId(userId); OrderAddVO orderAddVO = dubboOrderService.addOrder(orderAddDTO); Success success = new Success(); BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),success); success.setUserId(userId); success.setOrderSn(orderAddVO.getSn()); rabbitTemplate.convertAndSend(RabbitMqComponentConfiguration.SECKILL_EX ,RabbitMqComponentConfiguration.SECKILL_RK,success); SeckillCommitVO seckillCommitVO = new SeckillCommitVO(); BeanUtils.copyProperties(orderAddVO,seckillCommitVO); return seckillCommitVO; } private OrderAddDTO converSeckillOrderTOOrder(SeckillOrderAddDTO seckillOrderAddDTO) { OrderAddDTO orderAddDTO = new OrderAddDTO(); BeanUtils.copyProperties(seckillOrderAddDTO,orderAddDTO); OrderItemAddDTO orderItemAddDTO = new OrderItemAddDTO(); BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),orderItemAddDTO); List<OrderItemAddDTO> orderItemAddDTOS = new ArrayList<>(); orderItemAddDTOS.add(orderItemAddDTO); orderAddDTO.setOrderItems(orderItemAddDTOS); return orderAddDTO; }
public CsmallAuthenticationInfo getUserInfo(){ UsernamePasswordAuthenticationToken authenticationToken = (UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication(); if(authenticationToken==null) throw new CoolSharkServiceException(ResponseCode.UNAUTHORIZED,"没有登录信息"); CsmallAuthenticationInfo csmallAuthenticationInfo = (CsmallAuthenticationInfo) authenticationToken.getCredentials(); return csmallAuthenticationInfo; } public Long getUserId(){ return getUserInfo().getId(); } }
|
开发控制层

package cn.tedu.mall.seckill.controller;
import cn.tedu.mall.common.exception.CoolSharkServiceException; import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO; import cn.tedu.mall.seckill.exception.SeckillBlockHandler; import cn.tedu.mall.seckill.exception.SeckillFallback; import cn.tedu.mall.seckill.service.ISeckillService; import cn.tedu.mall.seckill.utils.SeckillCacheUtils; import com.alibaba.csp.sentinel.annotation.SentinelResource; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/seckill") @Api(tags = "提交秒杀订单") public class SeckillController { @Autowired private ISeckillService seckillService;
@Autowired private RedisTemplate redisTemplate;
@PostMapping("/{randCode}") @ApiOperation("随机码验证并提交订单") @ApiImplicitParam(value = "随机码",name = "randCode",required = true,dataType = "string") @PreAuthorize("hasRole('user')") @SentinelResource(value = "seckill", blockHandlerClass = SeckillBlockHandler.class,blockHandler = "seckillBlock", fallbackClass = SeckillFallback.class,fallback = "seckillFall") public JsonResult<SeckillCommitVO> commitSeckillOrder( @PathVariable String randCode, SeckillOrderAddDTO seckillOrderAddDTO ){ Long spuId = seckillOrderAddDTO.getSpuId(); String randCodeKey = SeckillCacheUtils.getRandCodeKey(spuId); if(redisTemplate.hasKey(randCodeKey)){ String redisRandCode = redisTemplate.boundValueOps(randCodeKey).get()+""; if(redisRandCode==null) throw new CoolSharkServiceException(ResponseCode.INTERNAL_SERVER_ERROR,"服务器内部错误,请联系客服"); if(!redisRandCode.equals(randCode)) throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品"); SeckillCommitVO seckillCommitVO = seckillService.commitSeckill(seckillOrderAddDTO); return JsonResult.ok(seckillCommitVO); } else { throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品"); } } }
|
启动Nacos\Seata\RabbitMQ\Redis\Sentinel
项目Leaf\product\passport\order\seckill
注意yml配置文件中的RabbitMQ的用户名和密码
为了方便测试:如果说已经购买过,就修改允许购买的数量 >1为 >100
为了方便测试:如果说没有库存,可以把判断库存的if注释掉
测试成功即可
还可以测试sentinel的限流
success成功信息的处理
开发持久层
我们要连接数据库,对这个表进行新增
还有对秒杀数据库sku库存的修改
void updateReduceStockBySkuId(@Param("skuId") Long skuId, @Param("quantity") Integer quantity);
|
<update id="updateReduceStockBySkuId"> update seckill_sku set seckill_stock=seckill_stock - #{quantity} where sku_id=#{skuId} </update>
|
下面再编写新增Success的方法
@Repository public interface SuccessMapper { void saveSuccess(Success success); }
|
SuccessMapper.xml
<insert id="saveSuccess"> insert into success( user_id, user_phone, sku_id, title, main_picture, seckill_price, quantity, bar_code, data, order_sn )values( #{userId}, #{userPhone}, #{skuId}, #{title}, #{mainPicture}, #{seckillPrice}, #{quantity}, #{barCode}, #{data}, #{orderSn} ) </insert>
|
开发消息的接收功能
我们当前触发新增Success的方法并不是常规的业务逻辑层
而是由RabbitMQ消息收发机制中接收消息的对象来调用
所有我们编写一个接收消息的监听器类来完成这个操作
创建consumer包,包中创建类SekillQueueConsumer代码如下
@Component
@RabbitListener(queues = {RabbitMqComponentConfiguration.SECKILL_QUEUE}) public class SeckillQueueConsumer { @Autowired private SuccessMapper successMapper; @Autowired private SeckillSkuMapper skuMapper; @RabbitHandler public void process(Success success){ skuMapper.updateReduceStockBySkuId(success.getSkuId(),success.getQuantity()); successMapper.saveSuccess(success); } }
|
环境方面
Nacos\Sentinel\Seata\redis\RabbitMQ
服务方面
Leaf\product\order\seckill