之前我写了如何实现分布式锁和分布式限流 <https://www.cnblogs.com/huangqingshi/p/10290615.html>
,这次我们继续在这块功能上推进,实现一个秒杀系统,采用spring boot 2.x + mybatis+ redis + swagger2 +
lombok实现。


  先说说基本流程,就是提供一个秒杀接口,然后针对秒杀接口进行限流,限流的方式目前我实现了两种,上次实现的是累计计数方式,这次还有这个功能,并且我增加了令牌桶方式的lua脚本进行限流。


  然后不被限流的数据进来之后,加一把分布式锁,获取分布式锁之后就可以对数据库进行操作了。直接操作数据库的方式可以,但是速度会比较慢,咱们直接通过一个初始化接口,将库存数据放到缓存中,然后对缓存中的数据进行操作。写库的操作采用异步方式,实现的方式就是将操作好的数据放入到队列中,然后由另一个线程对队列进行消费。当然,也可以将数据直接写入mq中,由另一个线程进行消费,这样也更稳妥。

  好了,看一下项目的基本结构:




  看一下入口controller类,入口类有两个方法,一个是初始化订单的方法,即秒杀开始的时候,秒杀接口才会有效,这个方法可以采用定时任务自动实现也可以。初始化后就可以调用placeOrder的方法了。在placeOrder上面有个自定义的注解DistriLimitAnno,这个是我在上篇文章写的,用作限流使用。采用的方式目前有两种,一种是使用计数方式限流,一种方式是令牌桶,上次使用了计数,咱们这次采用令牌桶方式实现。
package com.hqs.flashsales.controller; import
com.hqs.flashsales.annotation.DistriLimitAnno;import
com.hqs.flashsales.aspect.LimitAspect;import
com.hqs.flashsales.lock.DistributedLock;import
com.hqs.flashsales.limit.DistributedLimit;import
com.hqs.flashsales.service.OrderService;import lombok.extern.slf4j.Slf4j; import
org.springframework.beans.factory.annotation.Autowired;import
org.springframework.data.redis.core.RedisTemplate;import
org.springframework.data.redis.core.script.RedisScript;import
org.springframework.stereotype.Controller;import
org.springframework.web.bind.annotation.GetMapping;import
org.springframework.web.bind.annotation.PostMapping;import
org.springframework.web.bind.annotation.ResponseBody;import
javax.annotation.Resource;import java.util.Collections; /** * @author
huangqingshi * @Date 2019-01-23*/ @Slf4j @Controller public class
FlashSaleController { @Autowired OrderService orderService; @Autowired
DistributedLock distributedLock; @Autowired LimitAspect limitAspect;//
注意RedisTemplate用的String,String,后续所有用到的key和value都是String的 @Autowired
RedisTemplate<String, String> redisTemplate; private static final String
LOCK_PRE = "LOCK_ORDER"; @PostMapping("/initCatalog") @ResponseBody public
String initCatalog() {try { orderService.initCatalog(); } catch (Exception e) {
log.error("error", e); } return "init is ok"; } @PostMapping("/placeOrder")
@ResponseBody @DistriLimitAnno(limitKey= "limit", limit = 100, seconds = "1")
public Long placeOrder(Long orderId) { Long saleOrderId = 0L; boolean locked =
false; String key = LOCK_PRE + orderId; String uuid = String.valueOf(orderId);
try { locked = distributedLock.distributedLock(key, uuid, "10" ); if(locked) {
//直接操作数据库 // saleOrderId = orderService.placeOrder(orderId); //操作缓存 异步操作数据库
saleOrderId = orderService.placeOrderWithQueue(orderId); } log.info(
"saleOrderId:{}", saleOrderId); } catch (Exception e) {
log.error(e.getMessage()); }finally { if(locked) {
distributedLock.distributedUnlock(key, uuid); } }return saleOrderId; } }

  令牌桶的方式比直接计数更加平滑,直接计数可能会瞬间达到最高值,令牌桶则把最高峰给削掉了,令牌桶的基本原理就是有一个桶装着令牌,然后又一队人排队领取令牌,领到令牌的人就可以去做做自己想做的事情了,没有领到令牌的人直接就走了(也可以重新排队)。发令牌是按照一定的速度发放的,所以这样在多人等令牌的时候,很多人是拿不到的。当桶里边的令牌在一定时间内领完后,则没有令牌可领,都直接走了。如果过了一定的时间之后可以再次把令牌桶装满供排队的人领。基本原理是这样的,看一下脚本简单了解一下,里边有一个key和四个参数,第一个参数是获取一个令牌桶的时间间隔,第二个参数是重新填装令牌的时间(精确到毫秒),第三个是令牌桶的数量限制,第四个是隔多长时间重新填装令牌桶。
-- bucket name local key = KEYS[1] -- token generate interval local
intervalPerPermit =tonumber(ARGV[1]) -- grant timestamp local refillTime =
tonumber(ARGV[2]) -- limit token count local limit = tonumber(ARGV[3]) --
ratelimit time period local interval = tonumber(ARGV[4]) local counter =
redis.call('hgetall', key) if table.getn(counter) == 0 then -- first check if
bucket not exists, if yes, create a new one with full capacity, then grant
access redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining'
, limit -1) -- expire will save memory redis.call('expire', key, interval)
return 1 elseif table.getn(counter) == 4 then -- if bucket exists, first we try
to refill the token bucket local lastRefillTime, tokensRemaining = tonumber
(counter[2]), tonumber(counter[4]) local currentTokens if refillTime >
lastRefillTimethen -- check if refillTime larger than lastRefillTime. -- if
not, it means some other operation later than this call made the call first. --
there is no need to refill the tokens. local intervalSinceLast = refillTime -
lastRefillTimeif intervalSinceLast > interval then currentTokens = limit
redis.call('hset', key, 'lastRefillTime', refillTime) else local grantedTokens =
math.floor(intervalSinceLast / intervalPerPermit) if grantedTokens > 0 then --
ajust lastRefillTime, we want shift left the refill time. local padMillis =
math.fmod(intervalSinceLast, intervalPerPermit) redis.call('hset', key, '
lastRefillTime', refillTime - padMillis) end currentTokens = math.min
(grantedTokens + tokensRemaining, limit) end else -- if not, it means some
other operation later than this call made the call first. -- there is no need
to refill the tokens. currentTokens = tokensRemaining end assert(currentTokens
>=0) if currentTokens == 0 then -- we didn't consume any keys redis.call('hset'
, key,'tokensRemaining', currentTokens) return 0 else -- we take 1 token from
the bucket redis.call('hset', key, 'tokensRemaining', currentTokens - 1) return
1 end else error("Size of counter is " .. table.getn(counter) .. ", Should Be 0
or 4.") end
  看一下调用令牌桶lua的JAVA代码,也比较简单:
public Boolean distributedRateLimit(String key, String limit, String seconds)
{ Long id= 0L; long intervalInMills = Long.valueOf(seconds) * 1000; long
limitInLong = Long.valueOf(limit); long intervalPerPermit = intervalInMills /
limitInLong;// Long refillTime = System.currentTimeMillis(); //
log.info("调用redis执行lua脚本, {} {} {} {} {}", "ratelimit", intervalPerPermit,
refillTime,// limit, intervalInMills); try { id =
redisTemplate.execute(rateLimitScript, Collections.singletonList(key),
String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
String.valueOf(limitInLong), String.valueOf(intervalInMills)); }catch
(Exception e) { log.error("error", e); } if(id == 0L) { return false; } else {
return true; } }
   创建两张简单表,一个库存表,一个是销售订单表:
CREATE TABLE `catalog` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name`
varchar(50) NOT NULL DEFAULT '' COMMENT '名称', `total` int(11) NOT NULL COMMENT '
库存', `sold` int(11) NOT NULL COMMENT '已售', `version` int(11) NULL COMMENT '
乐观锁,版本号', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE
`sales_order` ( `id`int(11) unsigned NOT NULL AUTO_INCREMENT, `cid` int(11) NOT
NULL COMMENT '库存ID', `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
`create_time`timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT
CHARSET=utf8;

  基本已经准备完毕,然后启动程序,打开swagger(http://localhost:8080/swagger-ui.html#),执行初始化方法initCatalog:



    日志里边会输出初始化的记录内容,初始化库存为1000:



  初始化执行的方法,十分简单,写到缓存中。
@Override public void initCatalog() { Catalog catalog = new Catalog();
catalog.setName("mac"); catalog.setTotal(1000L); catalog.setSold(0L);
catalogMapper.insertCatalog(catalog); log.info("catalog:{}", catalog);
redisTemplate.opsForValue().set(CATALOG_TOTAL+ catalog.getId(),
catalog.getTotal().toString()); redisTemplate.opsForValue().set(CATALOG_SOLD+
catalog.getId(), catalog.getSold().toString()); log.info("redis value:{}",
redisTemplate.opsForValue().get(CATALOG_TOTAL + catalog.getId()));
handleCatalog(); }
  我写了一个测试类,启动3000个线程,然后去进行下单请求:
package com.hqs.flashsales; import lombok.extern.slf4j.Slf4j; import
org.junit.Test;import org.junit.runner.RunWith; import
org.springframework.beans.factory.annotation.Autowired;import
org.springframework.boot.test.context.SpringBootTest;import
org.springframework.boot.test.web.client.TestRestTemplate;import
org.springframework.test.context.junit4.SpringRunner;import
org.springframework.util.LinkedMultiValueMap;import
org.springframework.util.MultiValueMap;import java.util.concurrent.TimeUnit;
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(classes =
FlashsalesApplication.class, webEnvironment =
SpringBootTest.WebEnvironment.RANDOM_PORT)public class
FlashSalesApplicationTests { @Autowiredprivate TestRestTemplate
testRestTemplate; @Testpublic void flashsaleTest() { String url =
"http://localhost:8080/placeOrder"; for(int i = 0; i < 3000; i++) { try {
TimeUnit.MILLISECONDS.sleep(20); new Thread(() -> { MultiValueMap<String,
String> params =new LinkedMultiValueMap<>(); params.add("orderId", "1"); Long
result= testRestTemplate.postForObject(url, params, Long.class); if(result != 0
) { System.out.println("-------------" + result); } } ).start(); } catch
(Exception e) { log.info("error:{}", e.getMessage()); } } } @Test public void
contextLoads() { } }
  然后开始运行测试代码,查看一下测试日志和程序日志,均显示卖了1000后直接显示SOLD OUT了。分别看一下日志和数据库:





  商品库存catalog表和订单明细表sales_order表,都是1000条,没有问题。





  总结:


    通过采用分布式锁和分布式限流,即可实现秒杀流程,当然分布式限流也可以用到很多地方,比如限制某些IP在多久时间访问接口多少次,都可以的。令牌桶的限流方式使得请求可以得到更加平滑的处理,不至于瞬间把系统达到最高负载。在这其中其实还有一个小细节,就是Redis的锁,单机情况下没有任何问题,如果是集群的话需要注意,一个key被hash到同一个slot的时候没有问题,如果说扩容或者缩容的话,如果key被hash到不同的slot,程序可能会出问题。在写代码的过程中还出现了一个小问题,就是写controller的方法的时候,方法一定要声明成public的,否则自定义的注解用不了,其他service的注解直接变为空,这个问题也是找了很久才找到。

  好了代码地址:https://github.com/stonehqs/flashsales.git

  欢迎拍砖~