基于Redis的限流器的实现(澳门新葡亰赌995577示例讲解)

1 概述

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。

系统中的接口通常都有限流,比如 70次/秒
,如何保证我们的接口的调用次数在超过第三方接口限流的时候快速失败呢?这时候就需要限流器了。下面是笔者用redis实现限流器的流程图。

前几天在DD的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案
–》原文,参考《redis in action》
实现了一个jedis版本的,都属于业务层次限制。 实际场景中常用的限流策略:

澳门新葡亰赌995577 1

Nginx接入层限流

2 代码

按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

/**
 * 获取限流权限
 * @param key
 * @param millisecond 毫秒数
 * @param limitCount 限流次数
 * @return
 */
public static boolean getCurrentLimiting(String key, Long millisecond, Integer limitCount){
  try {
    boolean currentLimitingLock = RedisDistributeLockUtil.lock(key);
    if(currentLimitingLock){
      Long llen = RedisClient.llen(getCurrentLimitingRedisKey(key));
      if(llen < limitCount){
        RedisClient.lpush(getCurrentLimitingRedisKey(key), DateUtil.current(false) + "");
        return true;
      }else{
        Long lastTime = Long.parseLong(RedisClient.lindex(getCurrentLimitingRedisKey(key), -1));
        if((DateUtil.current(false) - lastTime) >= millisecond){
          RedisClient.lpush(getCurrentLimitingRedisKey(key), DateUtil.current(false) + "");
          RedisClient.ltrim(getCurrentLimitingRedisKey(key), 0, limitCount);
          return true;
        }
      }
    }
  }finally {
    RedisDistributeLockUtil.unlock(key);
  }
  return false;
}

澳门新葡亰赌995577,业务应用系统限流

以上这篇基于Redis的限流器的实现(示例讲解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。

您可能感兴趣的文章:

代码实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;

/**
 * @email wangiegie@gmail.com
 * @data 2017-08
 */
public class RedisRateLimiter {
  private static final String BUCKET = "BUCKET";
  private static final String BUCKET_COUNT = "BUCKET_COUNT";
  private static final String BUCKET_MONITOR = "BUCKET_MONITOR";

  static String acquireTokenFromBucket(
      Jedis jedis, int limit, long timeout) {
    String identifier = UUID.randomUUID().toString();
    long now = System.currentTimeMillis();
    Transaction transaction = jedis.multi();

    //删除信号量
    transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
    ZParams params = new ZParams();
    params.weightsByDouble(1.0,0.0);
    transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);

    //计数器自增
    transaction.incr(BUCKET_COUNT);
    List<Object> results = transaction.exec();
    long counter = (Long) results.get(results.size() - 1);

    transaction = jedis.multi();
    transaction.zadd(BUCKET_MONITOR, now, identifier);
    transaction.zadd(BUCKET, counter, identifier);
    transaction.zrank(BUCKET, identifier);
    results = transaction.exec();
    //获取排名,判断请求是否取得了信号量
    long rank = (Long) results.get(results.size() - 1);
    if (rank < limit) {
      return identifier;
    } else {//没有获取到信号量,清理之前放入redis 中垃圾数据
      transaction = jedis.multi();
      transaction.zrem(BUCKET_MONITOR, identifier);
      transaction.zrem(BUCKET, identifier);
      transaction.exec();
    }
    return null;
  }
}

调用

测试接口调用

@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
  Jedis jedis = jedisPool.getResource();
  String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
  if (token == null) {
    response.sendError(500);
  }else{
    //TODO 你的业务逻辑
  }
  jedisPool.returnResource(jedis);
}

优化

使用拦截器 + 注解优化代码

拦截器

@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
  private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
  @Autowired
  private JedisPool jedisPool;

  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(new HandlerInterceptorAdapter() {
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                   Object handler) throws Exception {
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();
        RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
        if (rateLimiter != null){
          int limit = rateLimiter.limit();
          int timeout = rateLimiter.timeout();
          Jedis jedis = jedisPool.getResource();
          String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
          if (token == null) {
            response.sendError(500);
            return false;
          }
          logger.debug("token -> {}",token);
          jedis.close();
        }
        return true;
      }
    }).addPathPatterns("/*");
  }
}

定义注解