分布式限流实现与原理
一、分布式限流实现秒杀
在高并发场景下,流量很大的时候需要进行限流操作,否则可能会出现系统崩溃或者对相关业务造成影响。秒杀场景是一个典型的高并发场景。很多网站都会在一定时间内提供限量商品,用户抢购时都是在同一时间内进行,此时需要限制每个用户的请求次数,否则会出现恶意刷单等现象。而分布式限流可以使我们更加高效地解决这个问题。 下面是一个实现秒杀的分布式限流代码示例:
public boolean tryAcquireSeckill() {
// 每秒放行5次请求
RateLimiter rateLimiter = RateLimiter.create(5);
return rateLimiter.tryAcquire();
}
二、分布式限流三种算法
分布式限流主要包括计数器算法、令牌桶算法和漏桶算法。
计数器算法
计数器算法是指通过统计一段时间内的请求次数,当超过一定的阈值时就进行限流操作。
public class CountAlgorithm {
private static AtomicInteger count = new AtomicInteger(0);
public static boolean tryAcquire() {
int newCount = count.incrementAndGet();
if(newCount > 10) {
return false;
} else {
return true;
}
}
}
令牌桶算法
令牌桶算法则是需要令牌才能进行访问的一种算法。可以设置每秒钟发放多少个令牌,每发放一个令牌后就会扣除一个。当桶内令牌数不足时,无法进行访问。
public class TokenBucketAlgorithm {
private static RateLimiter rateLimiter = RateLimiter.create(10);
public static boolean tryAcquire() {
return rateLimiter.tryAcquire();
}
}
漏桶算法
漏桶算法则是保证一定的流量输出,防止流量被突发的请求消耗。
public class LeakyBucketAlgorithm {
// 桥梁容量
private static int capacity = 10;
// 水流速度
private static int rate = 1;
// 时间间隔
private static int timeInterval = 1000;
// 当前水量
private static int water = 0;
// 上次漏水的时间
private static long lastTime = System.currentTimeMillis();
public static synchronized boolean tryAcquire() {
// 计算流入的水量
long now = System.currentTimeMillis();
water = Math.max(0, water - (int)((now - lastTime) / timeInterval) * rate);
lastTime = now;
if(water < capacity) {
water++;
return true;
} else {
return false;
}
}
}
三、分布式限流器
分布式限流器是一种通用的限流框架,可以在各种业务场景下使用。使用该框架可以有效控制流量,防止系统崩溃。 下面是一个基于Google Guava库实现的分布式限流器:
public class RateLimiterDecorator<T> {
private static final String SEPARATOR = "_";
private int permitsPerSecond;
private LoadingCache<String, RateLimiter> loadingCache;
private Function<T, String> keyMapper;
public RateLimiterDecorator(int permitsPerSecond, Function<T, String> keyMapper) {
this.permitsPerSecond = permitsPerSecond;
this.keyMapper = keyMapper;
CacheLoader<String, RateLimiter> loader = new CacheLoader<String, RateLimiter>() {
@Override
public RateLimiter load(String key) throws Exception {
return RateLimiter.create(permitsPerSecond);
}
};
loadingCache = CacheBuilder.newBuilder().build(loader);
}
public boolean tryAcquire(T key) {
try {
return loadingCache.get(keyMapper.apply(key)).tryAcquire();
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
}
}
四、分布式限流框架
分布式限流框架是一种将限流逻辑封装在框架中,可以通过配置参数实现对流量的限制,从而防止系统崩溃,同时可以有效提高程序的可维护性。 下面是一个基于Spring Cloud Gateway和Redis实现的分布式限流框架:
public class RateLimiterFilter implements GlobalFilter, Ordered {
private static final String REQUEST_PATH_KEY = "request_path";
private static final String REQUEST_METHOD_KEY = "request_method";
private static final String ALLOW_PATH_PATTERN_KEY = "allow_path_pattern";
private static final String RATE_LIMIT_KEY = "rate_limit";
private static final String REDIS_RATE_LIMIT_KEY_PREFIX = "rate-limit:";
private StringRedisTemplate redisTemplate;
private PathMatcher pathMatcher = new AntPathMatcher();
public RateLimiterFilter(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String requestMethod = exchange.getRequest().getMethodValue();
String requestPath = exchange.getRequest().getPath().value();
// 遍历配置文件中的限流规则
redisTemplate.opsForHash().entries(RATE_LIMIT_KEY).entrySet().stream()
.filter(entry -> {
String allowPathPattern = (String)entry.getValue();
return pathMatcher.match(allowPathPattern, requestPath);
})
.forEach(entry -> {
String redisKey = REDIS_RATE_LIMIT_KEY_PREFIX + entry.getKey() + ":" + requestMethod;
long count = redisTemplate.opsForValue().increment(redisKey, 1);
if(count == 1) {
redisTemplate.expire(redisKey, 60, TimeUnit.SECONDS);
}
long threshold = Long.parseLong((String)entry.getValue());
if(count > threshold) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
});
exchange.getAttributes().put(REQUEST_PATH_KEY, requestPath);
exchange.getAttributes().put(REQUEST_METHOD_KEY, requestMethod);
return chain.filter(exchange);
}
@Override
public int getOrder() {
return -1000;
}
}
五、分布式限流方案
针对不同的系统架构和业务场景,可以选择不同的分布式限流方案。比如基于Redis的分布式限流、基于Zookeeper的分布式限流等。下面是一个基于Redis的分布式限流代码示例:
public class RedisRateLimiter {
private static final String REDIS_RATE_LIMIT_KEY_PREFIX = "rate-limit:";
private StringRedisTemplate redisTemplate;
public RedisRateLimiter(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public boolean tryAcquire(String key, int permits, int seconds) {
String redisKey = REDIS_RATE_LIMIT_KEY_PREFIX + key;
long count = redisTemplate.opsForValue().increment(redisKey, 1);
if(count == 1) {
redisTemplate.expire(redisKey, seconds, TimeUnit.SECONDS);
}
long threshold = permits;
if(count > threshold) {
return false;
} else {
return true;
}
}
}
六、分布式限流组件
随着分布式限流的应用日益广泛,市面上也出现了一些比较成熟的分布式限流组件,如Sentinel、Zookeeper。这些组件具有使用方便、稳定性高的特点,可以为我们提供基础限流组件,加快开发效率。 下面是一个基于Sentinel的分布式限流代码示例:
public class SentinelRateLimiter {
private static final String RESOURCE_NAME = "resource_name";
private static final String FLOW_CONTROLLER = "flow_controller";
private static final String THRESHOLD_TYPE = "grade";
public boolean tryAcquire(String resourceId, double permits) {
Entry entry = null;
try {
entry = SphU.entry(resourceId, ResourceType.COMMON, EntryType.IN);
if(entry != null) {
// do something
return true;
}
} catch (BlockException e) {
// handle exception
} finally {
if(entry != null) {
entry.exit();
}
}
return false;
}
public static void initFlowRule(double threshold) {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setClusterMode(false);
rule.setCount(threshold);
rule.setResource(RESOURCE_NAME);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
rule.setStrategy(RuleConstant.STRATEGY_DIRECT);
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
rule.setWarmUpPeriodSec(10);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
public static void registerFlowController(double threshold) {
String type = THRESHOLD_TYPE;
AbstractRule rule = null;
switch(type) {
case RuleConstant.FLOW_GRADE_QPS:
rule = new FlowRule(RESOURCE_NAME)
.setCount(threshold)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
break;
case RuleConstant.FLOW_GRADE_THREAD:
rule = new DegradeRule(RESOURCE_NAME)
.setCount(threshold)
.setGrade(RuleConstant.FLOW_GRADE_THREAD);
break;
case RuleConstant.FLOW_GRADE_EXCEPTION_RATIO:
rule = new DegradeRule(RESOURCE_NAME)
.setCount(threshold)
.setGrade(RuleConstant.FLOW_GRADE_EXCEPTION_RATIO);
break;
default:
break;
}
if(rule != null) {
FlowRuleManager.register2Property(rule.property());
FLOW_CONTROLLER = type;
}
}
}
七、阿里哨兵分布式限流原理
阿里哨兵是一个针对分布式系统的流量控制组件,它通过不同的限流策略进行流量控制,从而防止流量被突发的请求消耗。 下面是阿里哨兵的分布式限流原理: 首先,哨兵会将应用程序的不同资源进行抽象,并且对每个资源设置不同的阈值。然后,每个应用程序都需要从阿里云哨兵服务中获取相关资源的限流规则,当应用程序达到限流规则的阈值时哨兵就会进行限流控制。哨兵还可以通过实时监控和动态感知应用程序资源的变化,从而实现动态调整限流策略的功能。
八、分布式潮流控制器
分布式潮流控制是指在网络系统中对电力负荷进行控制,从而保障电网的正常运行。分布式潮流控制器可以通过协调不同的电力负荷,使得电网的总负荷保持在一个正常的范围内,从而避免电网出现过载状态。 下面是一个分布式潮流控制器的代码示例:
public class DistributedPowerFlowController {
private static final String LOAD_NODE = "load";
private static final String GENERATOR_NODE = "generator";
private static final String TRANSFORMER_NODE = "transformer";
private static double[] load = {100, 100, 100};
private static double[] generator = {200, 200, 200};
private static double[] transformer = {300, 300, 300};
private static final double[][] Y = {
{1, -1, 0},
{0, 1, -1},
{-1, 0, 1}
};
public static double[] getFlow() {
// 构造潮流控制方程
double[][] A = new double[3][3];
double[] b = new double[3];
for(int i=0; i<Y.length; i++) {
for(int j=0; j<Y[i].length; j++) {
if(i == j) {
A[i][j] = Y[i][j];
} else {
A[i][j] = -Y[i][j];
}
}
}
// 解方程
double[] flow = new double[3];
for(int i=0; i<3; i++) {
flow[i] = 0;
for(int j=0; j<3; j++) {
flow[i] += A[i][j] * load[j];
}
}
return flow;
}
}