发布时间:2024-02-25 19:30
无论开发者或用户都知道一个环节,那就是双11/12秒杀环节,开发者需要把后台代码优化到极致以应对大量的用户请求,而用户即需要快速的手速进行抢单环节~
由于是演示环境我就一个服务完成当前需求了,同一时间多个请求进入抢购即下单环节,而我们要做的就是限流当前请求,时服务端起到高吞吐量,以达到最高效率完成抢单环节。
用户下单访问请求,到业务层简单处理不耗时业务后即丢放一个标识或订单号到队列当中去(此处就用到了rabbitmq消息中间件),之后设立监听器即消费者,用于实时去消费当前请求,消费者处理时先从redis缓存中获取当前商品库存量,第一次请求如果没有即从数据库获取,然后放入缓存当中去,一个请求过来即库存量自减一位然后紧接创建订单,直至库存为0时不接收任何消息其余请求直接放入死信队列,由延时队列去处理通用的返回结果即(抢单失败,商品已售空等信息。
- SpringBoot:2.0.0 -
-SpringCloud:Finchley.M9
- JDK:1.8
- maven:3.8
- rabbitmq:2.0.2
- redis:2.0.5
订单表:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for order_info
-- ----------------------------
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL,
`address` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NULL DEFAULT NULL COMMENT '地址',
`order_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NULL DEFAULT NULL COMMENT '订单编号',
`com_modity` tinyint(255) NULL DEFAULT NULL COMMENT '商品类型',
`user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_unicode_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
库存表:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for stok_order
-- ----------------------------
DROP TABLE IF EXISTS `stok_order`;
CREATE TABLE `stok_order` (
`id` bigint(20) NOT NULL,
`com_modity` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL COMMENT '商品类型',
`stock_count` tinyint(255) NULL DEFAULT NULL COMMENT '库存',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_unicode_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of stok_order
-- ----------------------------
INSERT INTO `stok_order` VALUES (1, '1', 5);
SET FOREIGN_KEY_CHECKS = 1;
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.itxwl</groupId>
<artifactId>rabbitmq-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-study</name>
<packaging>jar</packaging>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.M9</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--SpringBoot整合Rabbitmq依赖引入-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
属性名 说明 默认值
spring.rabbitmq.address 客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合
spring.rabbitmq.cache.channel.checkout-timeout 当缓存已满时,获取Channel的等待时间,单位为毫秒
spring.rabbitmq.cache.channel.size 缓存中保持的Channel数量
spring.rabbitmq.cache.connection.mode 连接缓存的模式 CHANNEL
spring.rabbitmq.cache.connection.size 缓存的连接数
spring.rabbitmq.connnection-timeout 连接超时参数单位为毫秒:设置为“0”代表无穷大
spring.rabbitmq.dynamic 默认创建一个AmqpAdmin的Bean true
spring.rabbitmq.host RabbitMQ的主机地址 localhost
spring.rabbitmq.listener.acknowledge-mode 容器的acknowledge模式
spring.rabbitmq.listener.auto-startup 启动时自动启动容器 true
spring.rabbitmq.listener.concurrency 消费者的最小数量
spring.rabbitmq.listener.default-requeue-rejected 投递失败时是否重新排队 true
spring.rabbitmq.listener.max-concurrency 消费者的最大数量
spring.rabbitmq.listener.prefetch 在单个请求中处理的消息个数,他应该大于等于事务数量
spring.rabbitmq.listener.retry.enabled 不论是不是重试的发布 false
spring.rabbitmq.listener.retry.initial-interval 第一次与第二次投递尝试的时间间隔 1000
spring.rabbitmq.listener.retry.max-attempts 尝试投递消息的最大数量 3
spring.rabbitmq.listener.retry.max-interval 两次尝试的最大时间间隔 10000
spring.rabbitmq.listener.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.listener.retry.stateless 不论重试是有状态的还是无状态的 true
spring.rabbitmq.listener.transaction-size 在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值
spring.rabbitmq.password 登录到RabbitMQ的密码
spring.rabbitmq.port RabbitMQ的端口号 5672
spring.rabbitmq.publisher-confirms 开启Publisher Confirm机制 false
spring.rabbitmq.publisher-returns 开启publisher Return机制 false
spring.rabbitmq.requested-heartbeat 请求心跳超时时间,单位为秒
spring.rabbitmq.ssl.enabled 启用SSL支持 false
spring.rabbitmq.ssl.key-store 保存SSL证书的地址
spring.rabbitmq.ssl.key-store-password 访问SSL证书的地址使用的密码
spring.rabbitmq.ssl.trust-store SSL的可信地址
spring.rabbitmq.ssl.trust-store-password 访问SSL的可信地址的密码
spring.rabbitmq.ssl.algorithm SSL算法,默认使用Rabbit的客户端算法库
spring.rabbitmq.template.mandatory 启用强制信息 false
spring.rabbitmq.template.receive-timeout receive()方法的超时时间 0
spring.rabbitmq.template.reply-timeout sendAndReceive()方法的超时时间 5000
spring.rabbitmq.template.retry.enabled 设置为true的时候RabbitTemplate能够实现重试 false
spring.rabbitmq.template.retry.initial-interval 第一次与第二次发布消息的时间间隔 1000
spring.rabbitmq.template.retry.max-attempts 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-interval 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.username 登录到RabbitMQ的用户名
spring.rabbitmq.virtual-host 连接到RabbitMQ的虚拟主机方法参数说明:
手动确认ack
(拒绝消息)参数1:随便值 参数2:开启手动确认消息 参数3:被拒绝是否重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, false);
(确认消息)参数1:随便值 参数2:开启手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
spring:
rabbitmq:
host: localhost #地址
port: 5672 #端口
username: xwl #用户名
password: 258000 #密码
virtual-host: /xwl #虚拟机地址/权限
template:
#exchange: xwl_exchange #交换机
retry:
initial-interval: 10000ms #如果没有接收到消费回执,即每隔10秒访问一次
enabled: true #开启重试机制
max-interval: 30000ms #最大叠加制不超过30秒
max-attempts: 2 #每次访问一次间隔后都以2倍叠加再次访问
listener:
simple:
default-requeue-rejected: false #监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true
#none无应答确认发送
#manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。
#auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。
acknowledge-mode: manual
prefetch: 1
concurrency: 5 #消费者监听 分发5个队列执行
type: simple
publisher-confirms: true
datasource:
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
url: jdbc:mysql://localhost:3306/mythread?characterEncoding=utf-8
eureka:
client:
register-with-eureka: false #单体应用测试-不注册eureka
fetch-registry: false #不发送心跳到注册中心
#配置mybatis-plus打印sql语句于控制台
mybatis-plus:
configuration:
#log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#开启驼峰命名转换
map-underscore-to-camel-case: true
package com.itxwl.rabbitmqstudy.seckill.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther: 薛
* @Date: 2020/6/16 16:09
* @Description:
*/
@Component
@Configuration
@SuppressWarnings("ALL")
public class RabbitMqConfig {
//交换机名称
public static String ITEM_TOPIC_EXCHANGE="xwl_exchange";
//队列名称
public static final String ITEM_QUEUE = "xwl_queue";
//声明交换机
@Bean("xwlTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
//声明主队列
@Bean("xwlQueue")
public Queue itemQueue(){
Map<String, Object> args = new HashMap<>();
//声明死信交换器
args.put("x-dead-letter-exchange", "deal_exchange");
//声明死信路由键
args.put("x-dead-letter-routing-key", "DelayKey");
//声明主队列如果发生堵塞或其它-10秒自动消费消息
args.put("x-message-ttl",10000);
return QueueBuilder.durable(ITEM_QUEUE).withArguments(args).build();
}
//主队列绑定交换机以及-路由(此处采用TOPC通配符)
@Bean
public Binding itemQueueExchange(@Qualifier("xwlQueue") Queue queue,
@Qualifier("xwlTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
//声明死信队列
@Bean("dealQueue")
public Queue dealQueue(){
return QueueBuilder.durable("deal_queue").build();
}
//声明死信交换机
@Bean("dealExchange")
public Exchange dealExchange(){
return ExchangeBuilder.topicExchange("deal_exchange").durable(true).build();
}
//死信队列绑定交换机以及路由key
@Bean
public Binding dealQueueExchange(@Qualifier("dealQueue") Queue queue,
@Qualifier("dealExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("DelayKey").noargs();
}
}
package com.itxwl.rabbitmqstudy.seckill.controller;
import com.itxwl.rabbitmqstudy.seckill.service.ISeckKillService;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
/**
* @Auther: 薛
* @Date: 2020/6/23 15:18
* @Description:
*/
@RestController
@AllArgsConstructor
@RequestMapping("skill")
public class SeckKillController {
private RedisTemplate redisTemplate;
private ISeckKillService seckKillService;
/**
**模拟用户组
**/
public List<String> getUsers(){
return Arrays.asList("张三","李四","王五","赵六","李珏","郭思","吕布","王月英","嘻哈","田丰");
}
/**
* 模拟抢单-入口
* @param -用户名
* @param -商品类型 -此处默认1
* @return
*/
@GetMapping("getShopByType")
public String getShopByType(){
//为了演示结果需
redisTemplate.opsForValue().set("stockCount",null);
getUsers().stream().forEach(name ->{
seckKillService.getShopByType(name,1);
});
return "已经收到您的抢购申请,请稍后留意信息提示结果";
}
}
声明接口
package com.itxwl.rabbitmqstudy.seckill.service;
public interface ISeckKillService {
void getShopByType(String userName, Integer shopType);
}
接口实现类
package com.itxwl.rabbitmqstudy.seckill.service.impl;
import com.itxwl.rabbitmqstudy.seckill.service.ISeckKillService;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* @Auther: 薛
* @Date: 2020/6/23 15:25
* @Description:
*/
@Service
@AllArgsConstructor
public class SeckKillServiceImpl implements ISeckKillService {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "xwl_exchange";
//下单队列路由key
public static final String ITEM_ROUKEY = "item.sendKill";
//引入消息发送API
private RabbitTemplate rabbitTemplate;
@Override
@SneakyThrows
public void getShopByType(String userName, Integer shopType) {
//不做任何操作处理~直接进去队列-
rabbitTemplate.convertAndSend(ITEM_TOPIC_EXCHANGE,ITEM_ROUKEY,userName);
}
}
package com.itxwl.rabbitmqstudy.seckill.eneity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Auther: 薛
* @Date: 2020/6/23 15:40
* @Description:
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NameTypeDto {
private String userName;
private Integer shopType;
}
package com.itxwl.rabbitmqstudy.seckill.eneity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Auther: 薛
* @Date: 2020/6/23 15:12
* @Description:
*/
@TableName("order_info")
@Data
@NoArgsConstructor
public class OrderInfo {
private Long id;
private String userName;
private String address;//地址
private String orderCode;//订单编号
private Integer comModity;//商品类型
public OrderInfo(String userName,String orderCode,Integer comModity){
this.userName=userName;
this.address="河南卫辉唐庄镇。。。";
this.orderCode=orderCode;
this.comModity=comModity;
}
}
package com.itxwl.rabbitmqstudy.seckill.eneity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Auther: 薛
* @Date: 2020/6/23 15:16
* @Description:
*/
@TableName("stok_order")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StokOrder {
private Long id;
private String comMondity;//商品类型
private Integer stockCount;//库存
}
package com.itxwl.rabbitmqstudy.seckill.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.itxwl.rabbitmqstudy.seckill.eneity.StokOrder;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
@Mapper
@SuppressWarnings("ALL")
public interface StokOrderMapper extends BaseMapper<StokOrder> {
@Select("select stock_count from stok_order where com_modity=#{shopType}")
Integer findCountByShopType(@Param("shopType") Integer shopType);
}
package com.itxwl.rabbitmqstudy.seckill.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.itxwl.rabbitmqstudy.seckill.eneity.OrderInfo;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface OrderInfoMapper extends BaseMapper<OrderInfo> {
}
注意:springboot普通类加载注入IOC对象通常为空,因为普通类加载并没有将属性加载入spring容器当中去,所以需要使用一个注解@PostConstruct:在当前类加载时初始化一次赋值对象即可使用IOC特性
package com.itxwl.rabbitmqstudy.seckill.config;
import com.itxwl.rabbitmqstudy.seckill.eneity.OrderInfo;
import com.itxwl.rabbitmqstudy.seckill.mapper.OrderInfoMapper;
import com.itxwl.rabbitmqstudy.seckill.mapper.StokOrderMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @Auther: 薛
* @Date: 2020/6/23 16:15
* @Description:
*/
@Component
@Configuration
@SuppressWarnings("ALL")
public class SendKillListener {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "xwl_exchange";
//队列名称
public static final String ITEM_QUEUE = "xwl_queue";
private RedisTemplate re;
private StokOrderMapper stokOrderMapper;
private OrderInfoMapper orderInfoMapper;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private StokOrderMapper sr;
@Autowired
private OrderInfoMapper oo;
@PostConstruct
public void init() {
this.re = redisTemplate;
this.orderInfoMapper = oo;
this.stokOrderMapper = sr;
}
/**
* 监听主队列~
*
* @param message
* @param map
* @param channel
* @throws InterruptedException
* @throws IOException
*/
@RabbitListener(queues = "xwl_queue")
public void sendMiss(Message message, @Headers Map<String, Object> map, Channel channel) throws InterruptedException, IOException {
String msg = new String(message.getBody(), "UTF-8");
Integer shopCount=0;
//第一个请求进来获取库存-先去缓存redis找对应key值如果没有发送一个连接查询后续无需再次获取库存
if (StringUtils.isEmpty(re.opsForValue().get("stockCount"))) {
re.opsForValue().set("stockCount", stokOrderMapper.findCountByShopType(1), 60, TimeUnit.MINUTES);
shopCount = ((Integer) re.opsForValue().get("stockCount"));
}else {
//自减缓存内库存量- 每次减-
shopCount = ((Integer) re.opsForValue().get("stockCount"))-1;
}
//如果库存量小于等于0即已经抢完
if (shopCount<= 0) {
//即放入死信队列-推送后续队列内消息即为抢单失败-
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//返回 不做后续处理
return;
}
//如果库存数不为0即没有抢完
//数据库存储订单
orderInfoMapper.insert(new OrderInfo(msg, UUID.randomUUID().toString(), 1));
//设置缓存库存量key过期时间-redis自行删除(赋值减值)
re.opsForValue().set("stockCount", shopCount, 60, TimeUnit.MINUTES);
//手动设置ACK接收确认当前消息消费完毕
;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(msg + "抢购成功,恭喜!");
}
/**
* 监听死信队列-即推送抢单失败
*
* @param message
* @param map
* @param channel
* @throws InterruptedException
* @throws IOException
*/
@RabbitListener(queues = "deal_queue")
public static void sendMiss2(Message message, @Headers Map<String, Object> map, Channel channel) throws InterruptedException, IOException {
String msg = new String(message.getBody(), "UTF-8");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(msg + "商品已被抢空~下次再来");
}
}
https://github.com/xwlgithub/xuewenliang/tree/master/rabbitmq-study
如果我的这个业务处理有问题的话加我一块交流交流
Q:2509647976
wx: x331191249