查看原文
其他

Java并发:分布式应用限流实践

鹏磊 搜云库技术团队 2019-04-07
搜云库互联网/架构/开发/运维关注

任何限流都不是漫无目的的,也不是一个开关就可以解决的问题,常用的限流算法有:令牌桶,漏桶。在之前的文章中,也讲到过,但是那是基于单机场景来写。

之前文章:接口限流算法:漏桶算法&令牌桶算法

然而再牛逼的机器,再优化的设计,对于特殊场景我们也是要特殊处理的。就拿秒杀来说,可能会有百万级别的用户进行抢购,而商品数量远远小于用户数量。如果这些请求都进入队列或者查询缓存,对于最终结果没有任何意义,徒增后台华丽的数据。对此,为了减少资源浪费,减轻后端压力,我们还需要对秒杀进行限流,只需保障部分用户服务正常即可。

就秒杀接口来说,当访问频率或者并发请求超过其承受范围的时候,这时候我们就要考虑限流来保证接口的可用性,以防止非预期的请求对系统压力过大而引起的系统瘫痪。通常的策略就是拒绝多余的访问,或者让多余的访问排队等待服务。

分布式限流

单机限流,可以用到 AtomicIntegerRateLimiterSemaphore 这些。但是在分布式中,就不能使用了。常用分布式限流用 Nginx 限流,但是它属于网关层面,不能解决所有问题,例如内部服务,短信接口,你无法保证消费方是否会做好限流控制,所以自己在应用层实现限流还是很有必要的。

本文不涉及 nginx+lua,简单介绍 redis+lua分布式限流的实现。如果是需要在接入层限流的话,应该直接采用nginx自带的连接数限流模块和请求限流模块。

Redis + Lua 限流示例

本次项目使用 SpringBoot2.0.4,使用到 Redis 集群, Lua 限流脚本

引入依赖

  1. <dependencies>

  2.    <dependency>

  3.        <groupId>org.springframework.boot</groupId>

  4.        <artifactId>spring-boot-starter-web</artifactId>

  5.    </dependency>

  6.    <dependency>

  7.        <groupId>org.springframework.boot</groupId>

  8.        <artifactId>spring-boot-starter-data-redis</artifactId>

  9.    </dependency>

  10.    <dependency>

  11.        <groupId>org.springframework.boot</groupId>

  12.        <artifactId>spring-boot-starter-aop</artifactId>

  13.    </dependency>

  14.    <dependency>

  15.        <groupId>org.apache.commons</groupId>

  16.        <artifactId>commons-lang3</artifactId>

  17.    </dependency>

  18.    <dependency>

  19.        <groupId>org.springframework.boot</groupId>

  20.        <artifactId>spring-boot-starter-test</artifactId>

  21.    </dependency>

  22. </dependencies>

Redis 配置

application.properties

  1. spring.application.name=spring-boot-limit

  2. # Redis数据库索引

  3. spring.redis.database=0

  4. # Redis服务器地址

  5. spring.redis.host=10.4.89.161

  6. # Redis服务器连接端口

  7. spring.redis.port=6379

  8. # Redis服务器连接密码(默认为空)

  9. spring.redis.password=

  10. # 连接池最大连接数(使用负值表示没有限制)

  11. spring.redis.jedis.pool.max-active=8

  12. # 连接池最大阻塞等待时间(使用负值表示没有限制)

  13. spring.redis.jedis.pool.max-wait=-1

  14. # 连接池中的最大空闲连接

  15. spring.redis.jedis.pool.max-idle=8

  16. # 连接池中的最小空闲连接

  17. spring.redis.jedis.pool.min-idle=0

  18. # 连接超时时间(毫秒)

  19. spring.redis.timeout=10000

Lua 脚本

参考: 聊聊高并发系统之限流特技http://jinnianshilongnian.iteye.com/blog/2305117

  1. local key = "rate.limit:" .. KEYS[1] --限流KEY

  2. local limit = tonumber(ARGV[1])        --限流大小

  3. local current = tonumber(redis.call('get', key) or "0")

  4. if current + 1 > limit then --如果超出限流大小

  5.  return 0

  6. else  --请求数+1,并设置2秒过期

  7.  redis.call("INCRBY", key,"1")

  8.   redis.call("expire", key,"2")

  9.   return current + 1

  10. end

1、我们通过KEYS[1] 获取传入的key参数 2、通过ARGV[1]获取传入的limit参数 3、redis.call方法,从缓存中get和key相关的值,如果为nil那么就返回0 4、接着判断缓存中记录的数值是否会大于限制大小,如果超出表示该被限流,返回0 5、如果未超过,那么该key的缓存值+1,并设置过期时间为1秒钟以后,并返回缓存值+1

限流注解

注解的目的,是在需要限流的方法上使用

  1. package com.souyunku.example.annotation;

  2. /**

  3. * 描述: 限流注解

  4. *

  5. * @author yanpenglei

  6. * @create 2018-08-16 15:24

  7. **/

  8. @Target({ElementType.TYPE, ElementType.METHOD})

  9. @Retention(RetentionPolicy.RUNTIME)

  10. public @interface RateLimit {

  11.    /**

  12.     * 限流唯一标示

  13.     *

  14.     * @return

  15.     */

  16.    String key() default "";

  17.    /**

  18.     * 限流时间

  19.     *

  20.     * @return

  21.     */

  22.    int time();

  23.    /**

  24.     * 限流次数

  25.     *

  26.     * @return

  27.     */

  28.    int count();

  29. }

公共配置

  1. package com.souyunku.example.config;

  2. @Component

  3. public class Commons {

  4.    /**

  5.     * 读取限流脚本

  6.     *

  7.     * @return

  8.     */

  9.    @Bean

  10.    public DefaultRedisScript<Number> redisluaScript() {

  11.        DefaultRedisScript<Number> redisScript = new DefaultRedisScript<>();

  12.        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimit.lua")));

  13.        redisScript.setResultType(Number.class);

  14.        return redisScript;

  15.    }

  16.    /**

  17.     * RedisTemplate

  18.     *

  19.     * @return

  20.     */

  21.    @Bean

  22.    public RedisTemplate<String, Serializable> limitRedisTemplate(LettuceConnectionFactory redisConnectionFactory) {

  23.        RedisTemplate<String, Serializable> template = new RedisTemplate<String, Serializable>();

  24.        template.setKeySerializer(new StringRedisSerializer());

  25.        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

  26.        template.setConnectionFactory(redisConnectionFactory);

  27.        return template;

  28.    }

  29. }

拦截器

通过拦截器 拦截 @RateLimit注解的方法,使用 Redsiexecute 方法执行我们的限流脚本,判断是否超过限流次数

以下下是核心代码

  1. package com.souyunku.example.config;

  2. /**

  3. * 描述:拦截器

  4. *

  5. * @author yanpenglei

  6. * @create 2018-08-16 15:33

  7. **/

  8. @Aspect

  9. @Configuration

  10. public class LimitAspect {

  11.    private static final Logger logger = LoggerFactory.getLogger(LimitAspect.class);

  12.    @Autowired

  13.    private RedisTemplate<String, Serializable> limitRedisTemplate;

  14.    @Autowired

  15.    private DefaultRedisScript<Number> redisluaScript;

  16.    @Around("execution(* com.souyunku.example.controller ..*(..) )")

  17.    public Object interceptor(ProceedingJoinPoint joinPoint) throws Throwable {

  18.        MethodSignature signature = (MethodSignature) joinPoint.getSignature();

  19.        Method method = signature.getMethod();

  20.        Class<?> targetClass = method.getDeclaringClass();

  21.        RateLimit rateLimit = method.getAnnotation(RateLimit.class);

  22.        if (rateLimit != null) {

  23.            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();

  24.            String ipAddress = getIpAddr(request);

  25.            StringBuffer stringBuffer = new StringBuffer();

  26.            stringBuffer.append(ipAddress).append("-")

  27.                    .append(targetClass.getName()).append("- ")

  28.                    .append(method.getName()).append("-")

  29.                    .append(rateLimit.key());

  30.            List<String> keys = Collections.singletonList(stringBuffer.toString());

  31.            Number number = limitRedisTemplate.execute(redisluaScript, keys, rateLimit.count(), rateLimit.time());

  32.            if (number != null && number.intValue() != 0 && number.intValue() <= rateLimit.count()) {

  33.                logger.info("限流时间段内访问第:{} 次", number.toString());

  34.                return joinPoint.proceed();

  35.            }

  36.        } else {

  37.            return joinPoint.proceed();

  38.        }

  39.        throw new RuntimeException("已经到设置限流次数");

  40.    }

  41.    public static String getIpAddr(HttpServletRequest request) {

  42.        String ipAddress = null;

  43.        try {

  44.            ipAddress = request.getHeader("x-forwarded-for");

  45.            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {

  46.                ipAddress = request.getHeader("Proxy-Client-IP");

  47.            }

  48.            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {

  49.                ipAddress = request.getHeader("WL-Proxy-Client-IP");

  50.            }

  51.            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {

  52.                ipAddress = request.getRemoteAddr();

  53.            }

  54.            // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割

  55.            if (ipAddress != null && ipAddress.length() > 15) { // "***.***.***.***".length()

  56.                // = 15

  57.                if (ipAddress.indexOf(",") > 0) {

  58.                    ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));

  59.                }

  60.            }

  61.        } catch (Exception e) {

  62.            ipAddress = "";

  63.        }

  64.        return ipAddress;

  65.    }

  66. }

控制层

添加 @RateLimit() 注解,会在 Redsi 中生成 10 秒中,可以访问5次 的key

RedisAtomicLong 是为测试例子例,记录累计访问次数,跟限流没有关系。

  1. package com.souyunku.example.controller;

  2. /**

  3. * 描述: 测试页

  4. *

  5. * @author yanpenglei

  6. * @create 2018-08-16 15:42

  7. **/

  8. @RestController

  9. public class LimiterController {

  10.    @Autowired

  11.    private RedisTemplate redisTemplate;

  12.    // 10 秒中,可以访问10次

  13.    @RateLimit(key = "test", time = 10, count = 10)

  14.    @GetMapping("/test")

  15.    public String luaLimiter() {

  16.        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger("entityIdCounter", redisTemplate.getConnectionFactory());

  17.        String date = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");

  18.        return date + " 累计访问次数:" + entityIdCounter.getAndIncrement();

  19.    }

  20. }

启动服务

  1. package com.souyunku.example;

  2. @SpringBootApplication

  3. public class SpringBootLimitApplication {

  4.    public static void main(String[] args) {

  5.        SpringApplication.run(SpringBootLimitApplication.class, args);

  6.    }

  7. }

启动项目页面访问:http://127.0.0.1:8080/test

10 秒中,可以访问10次,超过十次,页面就报错,等够10秒,重新计算。

后台日志

  1. 2018-08-16 18:41:08.205  INFO 18076 --- [nio-8080-exec-1] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:1 次

  2. 2018-08-16 18:41:08.426  INFO 18076 --- [nio-8080-exec-3] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:2 次

  3. 2018-08-16 18:41:08.611  INFO 18076 --- [nio-8080-exec-5] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:3 次

  4. 2018-08-16 18:41:08.819  INFO 18076 --- [nio-8080-exec-7] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:4 次

  5. 2018-08-16 18:41:09.021  INFO 18076 --- [nio-8080-exec-9] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:5 次

  6. 2018-08-16 18:41:09.203  INFO 18076 --- [nio-8080-exec-1] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:6 次

  7. 2018-08-16 18:41:09.406  INFO 18076 --- [nio-8080-exec-3] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:7 次

  8. 2018-08-16 18:41:09.629  INFO 18076 --- [nio-8080-exec-5] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:8 次

  9. 2018-08-16 18:41:09.874  INFO 18076 --- [nio-8080-exec-7] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:9 次

  10. 2018-08-16 18:41:10.178  INFO 18076 --- [nio-8080-exec-9] com.souyunku.example.config.LimitAspect  : 限流时间段内访问第:10 次

  11. 2018-08-16 18:41:10.702 ERROR 18076 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: 已经到设置限流次数] with root cause

  12. java.lang.RuntimeException: 已经到设置限流次数

  13.    at com.souyunku.example.config.LimitAspect.interceptor(LimitAspect.java:73) ~[classes/:na]

  14.    at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source) ~[na:na]

  15.    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_112]

  16.    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_112]

往期精彩

Dubbo 整合 Pinpoint 做分布式服务请求跟踪

接口限流:漏桶算法&令牌桶算法

Java并发:Semaphore信号量源码分析

Java并发:深入浅出AQS之共享锁模式源码分析

Java并发:深入浅出AQS之独占锁模式源码分析

Java并发:了解无锁CAS就从源码分析

Java并发:CAS原理分析

长按:二维码关注

专注于开发技术研究与知识分享

公众号回复:”进群” 加微信群深入交流

回复 JavaDocker等关键字可获得学习资料

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存