您的位置:

分布式限流

一、分布式限流实现秒杀

在高并发场景下,流量很大的时候需要进行限流操作,否则可能会出现系统崩溃或者对相关业务造成影响。秒杀场景是一个典型的高并发场景。很多网站都会在一定时间内提供限量商品,用户抢购时都是在同一时间内进行,此时需要限制每个用户的请求次数,否则会出现恶意刷单等现象。而分布式限流可以使我们更加高效地解决这个问题。

下面是一个实现秒杀的分布式限流代码示例:

public boolean tryAcquireSeckill() {
    // 每秒放行5次请求
    RateLimiter rateLimiter = RateLimiter.create(5);
    return rateLimiter.tryAcquire();
}

二、分布式限流三种算法

分布式限流主要包括计数器算法、令牌桶算法和漏桶算法。

计数器算法是指通过统计一段时间内的请求次数,当超过一定的阈值时就进行限流操作。

public class CountAlgorithm {
    private static AtomicInteger count = new AtomicInteger(0);

    public static boolean tryAcquire() {
        int newCount = count.incrementAndGet();
        if(newCount > 10) {
            return false;
        } else {
            return true;
        }
    }
}

令牌桶算法则是需要令牌才能进行访问的一种算法。可以设置每秒钟发放多少个令牌,每发放一个令牌后就会扣除一个。当桶内令牌数不足时,无法进行访问。

public class TokenBucketAlgorithm {
    private static RateLimiter rateLimiter = RateLimiter.create(10);

    public static boolean tryAcquire() {
        return rateLimiter.tryAcquire();
    }
}

漏桶算法则是保证一定的流量输出,防止流量被突发的请求消耗。

public class LeakyBucketAlgorithm {
    // 桶容量
    private static int capacity = 10;
    // 水流速度
    private static int rate = 1;
    // 时间间隔
    private static int timeInterval = 1000;
    // 当前水量
    private static int water = 0;
    // 上次漏水的时间
    private static long lastTime = System.currentTimeMillis();

    public static synchronized boolean tryAcquire() {
        // 计算流入的水量
        long now = System.currentTimeMillis();
        water = Math.max(0, water - (int)((now - lastTime) / timeInterval) * rate);
        lastTime = now;
        if(water < capacity) {
            water++;
            return true;
        } else {
            return false;
        }
    }
}

三、分布式限流器

分布式限流器是一种通用的限流框架,可以在各种业务场景下使用。使用该框架可以有效控制流量,防止系统崩溃。

下面是一个基于Google Guava库实现的分布式限流器:

public class RateLimiterDecorator {

    private static final String SEPARATOR = "_";

    private int permitsPerSecond;

    private LoadingCache
    loadingCache;

    private Function
     keyMapper;

    public RateLimiterDecorator(int permitsPerSecond, Function
      keyMapper) {
        this.permitsPerSecond = permitsPerSecond;
        this.keyMapper = keyMapper;
        CacheLoader
       loader = new CacheLoader
       
        () { @Override public RateLimiter load(String key) throws Exception { return RateLimiter.create(permitsPerSecond); } }; loadingCache = CacheBuilder.newBuilder().build(loader); } public boolean tryAcquire(T key) { try { return loadingCache.get(keyMapper.apply(key)).tryAcquire(); } catch (ExecutionException e) { e.printStackTrace(); return false; } } }
       
      
     
    
   
  

四、分布式限流框架

分布式限流框架是一种将限流逻辑封装在框架中,可以通过配置参数实现对流量的限制,从而防止系统崩溃,同时可以有效提高程序的可维护性。

下面是一个基于Spring Cloud Gateway和Redis实现的分布式限流框架:

public class RateLimiterFilter implements GlobalFilter, Ordered {

    private static final String REQUEST_PATH_KEY = "request_path";
    private static final String REQUEST_METHOD_KEY = "request_method";
    private static final String ALLOW_PATH_PATTERN_KEY = "allow_path_pattern";
    private static final String RATE_LIMIT_KEY = "rate_limit";
    private static final String REDIS_RATE_LIMIT_KEY_PREFIX = "rate-limit:";

    private StringRedisTemplate redisTemplate;
    private PathMatcher pathMatcher = new AntPathMatcher();

    public RateLimiterFilter(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String requestMethod = exchange.getRequest().getMethodValue();
        String requestPath = exchange.getRequest().getPath().value();
        // 遍历配置文件中的限流规则
        redisTemplate.opsForHash().entries(RATE_LIMIT_KEY).entrySet().stream()
                .filter(entry -> {
                    String allowPathPattern = (String)entry.getValue();
                    return pathMatcher.match(allowPathPattern, requestPath);
                })
                .forEach(entry -> {
                    String redisKey = REDIS_RATE_LIMIT_KEY_PREFIX + entry.getKey() + ":" + requestMethod;
                    long count = redisTemplate.opsForValue().increment(redisKey, 1);
                    if(count == 1) {
                        redisTemplate.expire(redisKey, 60, TimeUnit.SECONDS);
                    }
                    long threshold = Long.parseLong((String)entry.getValue());
                    if(count > threshold) {
                        exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                        return exchange.getResponse().setComplete();
                    }
                });
        exchange.getAttributes().put(REQUEST_PATH_KEY, requestPath);
        exchange.getAttributes().put(REQUEST_METHOD_KEY, requestMethod);
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return -1000;
    }
}

  

五、分布式限流方案

针对不同的系统架构和业务场景,可以选择不同的分布式限流方案。比如基于Redis的分布式限流、基于Zookeeper的分布式限流等。下面是一个基于Redis的分布式限流代码示例:

public class RedisRateLimiter {

    private static final String REDIS_RATE_LIMIT_KEY_PREFIX = "rate-limit:";
    private StringRedisTemplate redisTemplate;

    public RedisRateLimiter(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public boolean tryAcquire(String key, int permits, int seconds) {
        String redisKey = REDIS_RATE_LIMIT_KEY_PREFIX + key;
        long count = redisTemplate.opsForValue().increment(redisKey, 1);
        if(count == 1) {
            redisTemplate.expire(redisKey, seconds, TimeUnit.SECONDS);
        }
        long threshold = permits;
        if(count > threshold) {
            return false;
        } else {
            return true;
        }
    }
}

六、分布式限流组件

随着分布式限流的应用日益广泛,市面上也出现了一些比较成熟的分布式限流组件,如Sentinel、Zookeeper。这些组件具有使用方便、稳定性高的特点,可以为我们提供基础限流组件,加快开发效率。

下面是一个基于Sentinel的分布式限流代码示例:

public class SentinelRateLimiter {

    private static final String RESOURCE_NAME = "resource_name";
    private static final String FLOW_CONTROLLER = "flow_controller";
    private static final String THRESHOLD_TYPE = "grade";

    public boolean tryAcquire(String resourceId, double permits) {
        Entry entry = null;
        try {
            entry = SphU.entry(resourceId, ResourceType.COMMON, EntryType.IN);
            if(entry != null) {
                // do something
                return true;
            }
        } catch (BlockException e) {
            // handle exception
        } finally {
            if(entry != null) {
                entry.exit();
            }
        }
        return false;
    }

    public static void initFlowRule(double threshold) {
        List rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        rule.setClusterMode(false);
        rule.setCount(threshold);
        rule.setResource(RESOURCE_NAME);
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
        rule.setStrategy(RuleConstant.STRATEGY_DIRECT);
        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        rule.setWarmUpPeriodSec(10);
        rules.add(rule);
        FlowRuleManager.loadRules(rules);
    }

    public static void registerFlowController(double threshold) {
        String type = THRESHOLD_TYPE;
        AbstractRule rule = null;
        switch(type) {
            case RuleConstant.FLOW_GRADE_QPS:
                rule = new FlowRule(RESOURCE_NAME)
                        .setCount(threshold)
                        .setGrade(RuleConstant.FLOW_GRADE_QPS);
                break;
            case RuleConstant.FLOW_GRADE_THREAD:
                rule = new DegradeRule(RESOURCE_NAME)
                        .setCount(threshold)
                        .setGrade(RuleConstant.FLOW_GRADE_THREAD);
                break;
            case RuleConstant.FLOW_GRADE_EXCEPTION_RATIO:
                rule = new DegradeRule(RESOURCE_NAME)
                        .setCount(threshold)
                        .setGrade(RuleConstant.FLOW_GRADE_EXCEPTION_RATIO);
                break;
            default:
                break;
        }
        if(rule != null) {
            FlowRuleManager.register2Property(rule.property());
            FLOW_CONTROLLER = type;
        }
    }
}

  

七、阿里哨兵分布式限流原理

阿里哨兵是一个针对分布式系统的流量控制组件,它通过不同的限流策略进行流量控制,从而防止流量被突发的请求消耗。

下面是阿里哨兵的分布式限流原理:

首先,哨兵会将应用程序的不同资源进行抽象,并且对每个资源设置不同的阈值。然后,每个应用程序都需要从阿里云哨兵服务中获取相关资源的限流规则,当应用程序达到限流规则的阈值时哨兵就会进行限流控制。哨兵还可以通过实时监控和动态感知应用程序资源的变化,从而实现动态调整限流策略的功能。

八、分布式潮流控制器

分布式潮流控制是指在网络系统中对电力负荷进行控制,从而保障电网的正常运行。分布式潮流控制器可以通过协调不同的电力负荷,使得电网的总负荷保持在一个正常的范围内,从而避免电网出现过载状态。

下面是一个分布式潮流控制器的代码示例:

public class DistributedPowerFlowController {

    private static final String LOAD_NODE = "load";
    private static final String GENERATOR_NODE = "generator";
    private static final String TRANSFORMER_NODE = "transformer";

    private static double[] load = {100, 100, 100};
    private static double[] generator = {200, 200, 200};
    private static double[] transformer = {300, 300, 300};

    private static final double[][] Y = {
            {1, -1, 0},
            {0, 1, -1},
            {-1, 0, 1}
    };

    public static double[] getFlow() {

        // 构造潮流控制方程
        double[][] A = new double[3][3];
        double[] b = new double[3];
        for(int i=0; i