SpringBoot - 优雅的实现【流控】

概述

限流 简言之就是当请求达到一定的并发数或速率,就对服务进行等待、排队、降级、拒绝服务等操作。

漏桶算法

1.png

把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流

令牌桶算法

2.png

可以简单地理解为医去银行办理业务,只有拿到号以后才可以进行业务办理。

系统会维护一个令牌(token)桶,以一个恒定的速度往桶里放入令牌(token),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token),当桶里没有令牌(token)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。

==V1.0==

上代码

<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency>
@Slf4j @RestController @RequestMapping("/rateLimit") public class RateLimitController { // 限流策略 : 1秒钟1个请求 private final RateLimiter limiter = RateLimiter.create(1); private DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss"); @SneakyThrows @GetMapping("/test") public String testLimiter() { //500毫秒内,没拿到令牌,就直接进入服务降级 boolean tryAcquire = limiter.tryAcquire(500, TimeUnit.MILLISECONDS); if (!tryAcquire) { log.warn("BOOM 服务降级,时间{}", LocalDateTime.now().format(dtf)); return "系统繁忙,请稍后再试!"; } log.info("获取令牌成功,时间{}", LocalDateTime.now().format(dtf)); return "业务处理成功"; }

我们可以看到RateLimiter的2个核心方法:create()、tryAcquire()

  • acquire() 获取一个令牌, 改方法会阻塞直到获取到这一个令牌, 返回值为获取到这个令牌花费的时间
  • acquire(int permits) 获取指定数量的令牌, 该方法也会阻塞, 返回值为获取到这 N 个令牌花费的时间
  • tryAcquire() 判断时候能获取到令牌, 如果不能获取立即返回 false
  • tryAcquire(int permits) 获取指定数量的令牌, 如果不能获取立即返回 false
  • tryAcquire(long timeout, TimeUnit unit) 判断能否在指定时间内获取到令牌, 如果不能获取立即返回 false
  • tryAcquire(int permits, long timeout, TimeUnit unit) 同上

验证

自己验......🐶🐶🐶

==V2.0==

自定义注解+AOP实现接口限流

V1.0的功能实现了,但是业务代码和限流代码混在一起,非常的不美观。

上代码

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1-jre</version> </dependency>
@Target(value = {ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface FlowLimit { /** * 资源的key,唯一 * 作用:不同的接口,不同的流量控制 */ String key() default ""; /** * 最大访问次数 */ long times(); /** * 获取令牌最大等待时间(单位:timeunit) */ long timeout(); /** * 时间单位(.../分钟/秒/毫秒/...) 默认:毫秒 */ TimeUnit timeunit() default TimeUnit.MILLISECONDS; /** * 得不到令牌的提示语 */ String message() default "请求太频繁啦,请稍慢一点!"; }
@Slf4j @Aspect @Component public class FlowLimitAop { /** * 不同的接口,不同的流量控制 * map的key为 FlowLimit.key */ private final Map<String, RateLimiter> limitMap = Maps.newConcurrentMap(); @Around("@annotation(com.liang.bbs.rest.annotation.FlowLimit)") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { // 获取request ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String ip = null; if (sra != null) { ip = IpUtil.getIP(sra.getRequest()); } MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); // 拿FlowLimit的注解 FlowLimit limit = method.getAnnotation(FlowLimit.class); if (limit != null) { // key作用:不同的接口,不同的流量控制 String key = limit.key(); // 有ip按ip限制流量 if (StringUtils.isNotEmpty(ip)) { key += ip; } // 速率(Rate)限流器(Limiter) RateLimiter rateLimiter; // 验证缓存是否有命中key if (!limitMap.containsKey(key)) { // 创建令牌桶(【每秒】按照limit.times()数量生成令牌) rateLimiter = RateLimiter.create(limit.times()); limitMap.put(key, rateLimiter); log.info("新建了令牌桶={},容量={}", key, limit.times()); } rateLimiter = limitMap.get(key); // 尝试立即获取令牌,可以尝试获取多个(tryAcquire是阻塞的) boolean acquire = rateLimiter.tryAcquire(limit.timeout(), limit.timeunit()); // 拿不到命令,直接返回异常提示 if (!acquire) { log.error("令牌桶={},容量={},获取令牌失败", key, limit.times()); throw BusinessException.build(ResponseCode.SYSTEM_EXCEPTION, limit.message()); } } return joinPoint.proceed(); } }

验证

@FlowLimit(key = "AdController.getAds", times = 3, timeout = 100) @GetMapping("getAds") public ResponseResult<List<AdDTO>> getAds() { return ResponseResult.success(adService.getAds()); }