热点商品QPS过高的流控处理
- 热点商品QPS过高的流控处理
- 1. 背景
- 2. 解决办法
- 3. 先看效果
- 4. 代码
- 3.1 TokenBucketContext.java
- 3.2 TokenBucketConfig.java
- 3.3 TokenBucketMonitor.java
- 3.4 ContainerType.java
- 3.5 TokenBucketLO.java
- 3.5 TokenBucketRegionLO.java
- 3.7 FlowControlException.java
- 3.8 TokenBucketContainer.java
- 3.8 CaffeineTokenBucketContainer.java
- 3.9 LRUMapTokenBucketContainer.java
- 3.10 TokenBucketContainerWrapper.java
- 3.11 TokenBucketAop.java
- 3.12 TokenBucketAspect.java
- 4 配置介绍
热点商品QPS过高的流控处理
1. 背景
问题:
服务为居于电商交易核心链路上,本身服务需要抗的QPS就特别高(百万QPS量级),导致机器负载和底层存储(比如缓存,DB等)压力就特别大,如果再有一些热点商品的请求(比如一个商品请求一个接口的QPS可达万级别)的话,对底层存储的压力就会尤其严重,甚至会打垮存储。
目标:
想要写个流控代码,流控代码的预期功能如下:
- 支持对商品ID请求一个接口或几个接口的QPS做控制,如果一个商品请求一个/几个接口的QPS和超过阈值,则触发熔断,快速失败返回上游。
- 支持对一个接口/几个接口进行分组,也就是说可以支持商品ID再这个组内的QPS和超过阈值。
- 支持对特殊商品单独控制,比如普通商品流控是1wQPS,但大促商品可以稍微放宽到5wQPS,在保证预期内的系统稳定前提下,最大限度的保证GMV。
- 支持临时调整流控阈值,并且实时生效,不需要发布服务。
- 支持一键开关。
除了业务上的功能预期,还有些技术上的预期:
- 支持灵活可配置key,比如有些是用商品ID,有些使用订单SN字符串。
- 支持低代码侵入,最好是注解到方法上即可,有些不能用注解的,也要提供代码侵入式的接入方式。
2. 解决办法
2.1 怎么实现流控
所能想到的流控办法有:
- 计数器
- 滑动窗口
- 令牌桶
- 漏斗
我认为所有的流控手段,当时间粒度越小的时候,越相似,如果时间粒度放大,才会放大相应算法的缺点,比如计数器算法有临界问题,滑动窗口需要划分时间粒度来滑动,令牌桶要定期放令牌等等。
2.2 怎么实现维度变换
解决了控流量的问题,怎么解决需要控制的维度随时变换的问题呢? 比如我们控的是商品Id的流速,但商品那么多,难道需要把所有商品的Id都装到流速控制器中,然后定期的给令牌?(比如我们用令牌桶)这种肯定是不现实的,所以采用LRU算法,来保证桶的大小可控。
2.3 怎么实现动态变更,实时生效
采用Zookeeper的订阅机制。
2.4 怎么实现针对一个接口/多个接口进行分组
在桶的维度上,再抽象一层,叫做region,每个接口都要配置一个region,如果存在多个接口使用一个桶,就将多个接口配置到一个region上。
2.5 怎么实现灵活可配置
使用Spring 的SpEL,可以根据配置的字符串,动态解析。
3. 先看效果
使用姿势:
@Override
@TokenBucketAop(region = "goods_activity_for_detail", keyArgName = "useActivityDTO", keySpEL = "#root.goodsId")
public BaseResponse<GoodsActivityForOrderDTO> queryBestActivity(UseActivityDTO useActivityDTO) {
}
效果为:
4. 代码
整体结构如下图:
代码结构如下:
token_bucket
└── aop # 这里防止 AOP 注解相关的代码
└── TokenBucketAop.java # AOP 注解
└── TokenBucketAspect.java # AOP 实现
└── exception # 这里放置 被流控 向上抛的异常
└── FlowControlException.java # 异常
└── lo # Zookeeper 定制变更后需要序列化出来的对象
└── ContainerType.java # 容器类型
└── TokenBucketLO.java # 整体配置
└── TokenBucketRegionLO.java # 单个桶配置
└── container # 这里放置容器实现类型
└── CaffeineTokenBucketContainer.java # 咖啡因的实现
└── LRUMapTokenBucketContainer.java # LRU Map 的实现
└── TokenBucketContainer.java # 容器标准
└── TokenBucketContainerWrapper.java # 容器的Wrapper
└── TokenBucketConfig.java # 监听ZK的代码
└── TokenBucketContext.java # import 的配置文件
└── TokenBucketMonitor.java # 监控相关
3.1 TokenBucketContext.java
package me.int32.common.tokenbucket;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("me.int32.common.tokenbucket")
public class LRUTokenBucketContext {
}
3.2 TokenBucketConfig.java
package me.int32.common.tokenbucket;
import me.int32.utils.JsonUtils;
import me.int32.arch.leo.client.ConfigCache;
import me.int32.arch.leo.util.LeoUtils;
import me.int32.common.tokenbucket.container.TokenBucketContainerWrapper;
import me.int32.common.tokenbucket.lo.TokenBucketLO;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TokenBucketConfig {
private final static String LRU_TOKEN_BUCKET_CONFIG = "lruTokenBucket.config";
@Autowired
private TokenBucketContainerWrapper containerWrapper;
private TokenBucketLO config;
// 初始化LEO取值.
@PostConstruct
public void initialize() {
String leoValue = LeoUtils.getProperty(LRU_TOKEN_BUCKET_CONFIG, "");
buildConfig(leoValue);
refreshContainer();
// 挂载监听LEO事件变更
ConfigCache.getInstance().addChange((k, v) -> {
if (LRU_TOKEN_BUCKET_CONFIG.equals(k)) {
buildConfig(v);
refreshContainer();
}
});
}
public TokenBucketLO getConfig() {
return config;
}
public TokenBucketRegionLO getConfig(String region) {
if (null == this.config || null == this.config.getRegion()) {
return null;
}
return this.getConfig().getRegion().get(region);
}
public boolean isOpen() {
return null != this.config && this.config.checkOpen();
}
private void buildConfig(String leoValue) {
this.config = JsonUtils.fromJson(leoValue, TokenBucketLO.class);
if (null == this.config) {
this.config = new TokenBucketLO();
}
}
private void refreshContainer() {
if (null == this.containerWrapper) {
return;
}
if (null == this.config) {
return;
}
config.getRegion().forEach((k, v) -> {
containerWrapper.resetContainerSize(k, v);
});
}
}
3.3 TokenBucketMonitor.java
package me.int32.common.tokenbucket;
import me.int32.arch.roc.metric.Metrics;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.HashMap;
import java.util.Map;
public class TokenBucketMonitor {
// 将ROC打点,都汇聚到一个服务里面
private static final String CONVERGENCE_ROC_SERVICE_NAME = "monitor";
// 未期望异常 ROC 监控
private static final String TOKEN_BUCKET_NOT_EXPECTED_ERROR_MONITOR = "roc-token_bucket_not_expect_error";
// 流控 ROC 监控
private static final String TOKEN_BUCKET_FLOW_CONTROL_MONITOR = "roc-token_bucket_flow_control";
// 流控 命中率 ROC 监控
private static final String TOKEN_BUCKET_CONTAINER_HIT_RATE = "roc-token_bucket_container_hit_rate";
public static void monitorNotExpectedError(String desc) {
Map<String, String> tags = new HashMap<>();
tags.put("name", "令牌桶流控非预期错误监控");
tags.put("errMsg", desc);
tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);
// 2.2 监控打点
Metrics.logForCount(TOKEN_BUCKET_NOT_EXPECTED_ERROR_MONITOR, 1, tags);
}
public static void monitorFlowControl(String region, boolean pass) {
Map<String, String> tags = new HashMap<>();
tags.put("name", "令牌桶流控监控");
tags.put("region", region);
tags.put("control", pass ? "pass" : "reject");
tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);
// 2.2 监控打点
Metrics.logForCount(TOKEN_BUCKET_FLOW_CONTROL_MONITOR, 1, tags);
}
public static void monitorHitAndMiss(String region, ContainerType containerType, boolean hit) {
Map<String, String> tags = new HashMap<>();
tags.put("name", "令牌桶流控容器命中率监控");
tags.put("region", region);
tags.put("containerType", containerType.getCode());
tags.put("hit", String.valueOf(hit));
tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);
// 2.2 监控打点
Metrics.logForCount(TOKEN_BUCKET_CONTAINER_HIT_RATE, 1, tags);
}
}
3.4 ContainerType.java
package me.int32.common.tokenbucket.lo;
public enum ContainerType {
LRU_MAP("lru_map"),
CAFFEINE("caffeine");
private String code;
ContainerType(String v) {
this.code = v;
}
public String getCode() {
return code;
}
public static ContainerType calContainerType(String code) {
for (ContainerType containerType : ContainerType.values()) {
if (containerType.code.equals(code)) {
return containerType;
}
}
// 默认 Caffeine
return CAFFEINE;
}
}
3.5 TokenBucketLO.java
package me.int32.common.tokenbucket.lo;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class TokenBucketLO {
// 总开关
private boolean open = false;
// 灰度比率 (10000)
private int grayRate = 0;
// 每个Region的配置
private Map<String, TokenBucketRegionLO> region = new HashMap<>();
public boolean getOpen() {
return open;
}
public void setOpen(boolean open) {
this.open = open;
}
public int getGrayRate() {
return grayRate;
}
public void setGrayRate(int grayRate) {
this.grayRate = grayRate;
}
public Map<String, TokenBucketRegionLO> getRegion() {
return region;
}
public void setRegion(
Map<String, TokenBucketRegionLO> region) {
this.region = region;
}
public boolean checkOpen() {
// 如果总开关没有开,则返回false
if (!this.getOpen()) {
return false;
}
// 如果没有完成灰度,且没有命中灰度流量,返回false
int MAX_RATE = 10000;
if (this.getGrayRate() < MAX_RATE) {
int random = new Random().nextInt(MAX_RATE);
return random < this.getGrayRate();
}
return true;
}
}
3.5 TokenBucketRegionLO.java
package me.int32.common.tokenbucket.lo;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class TokenBucketRegionLO {
// 灰度比率 (10000)
private int grayRate = 0;
// 拦截开关,如果拦截开关没有打开,则命中限流,只打点
private boolean rejectOpen = false;
// LUR 容器的Size
private int capacity = 10000;
// 令牌数量
private int tkCount = Integer.MAX_VALUE;
// 特殊配置
private Map<String, Integer> special = new HashMap<>();
// 容器类型,可选 lruMap 或者 caffeine
private String container = "caffeine";
public boolean getRejectOpen() {
return rejectOpen;
}
public void setRejectOpen(boolean rejectOpen) {
this.rejectOpen = rejectOpen;
}
public int getGrayRate() {
return grayRate;
}
public void setGrayRate(int grayRate) {
this.grayRate = grayRate;
}
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
this.capacity = capacity;
}
public int getTkCount() {
return tkCount;
}
public void setTkCount(int tkCount) {
this.tkCount = tkCount;
}
public Map<String, Integer> getSpecial() {
return special;
}
public void setSpecial(Map<String, Integer> special) {
this.special = special;
}
public boolean checkInGray() {
// 如果没有完成灰度,且没有命中灰度流量,返回false
int MAX_RATE = 10000;
if (this.getGrayRate() < MAX_RATE) {
int random = new Random().nextInt(MAX_RATE);
return random < this.getGrayRate();
}
return true;
}
public String getContainer() {
return container;
}
public void setContainer(String container) {
this.container = container;
}
// 优先取 special 里面的 tk count,如果key不是special的key,则再使用默认 tkCount
public int calculateTkCount(String key) {
if (null != this.getSpecial() && null != this.getSpecial().get(key)) {
return this.getSpecial().get(key);
}
return this.getTkCount();
}
public ContainerType calContainerType() {
return ContainerType.calContainerType(this.getContainer());
}
}
3.7 FlowControlException.java
package me.int32.common.tokenbucket.exception;
public class FlowControlException extends RuntimeException {
public FlowControlException() {
}
public FlowControlException(String message) {
super(message);
}
}
3.8 TokenBucketContainer.java
package me.int32.common.tokenbucket.container;
import me.int32.common.util.TimeUtils;
import org.springframework.stereotype.Component;
@Component
public interface TokenBucketContainer {
void resetContainerSize(String region, int capacity);
boolean acquire(String region, String key, int tokenCount, int capacity);
default String calculateKey(String key) {
// LRU的key是按秒统计的,所以每秒一个Key
Long secondTime = TimeUtils.currentTime();
return String.format("%s-%s", key, secondTime);
}
}
3.8 CaffeineTokenBucketContainer.java
package me.int32.common.tokenbucket.container;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import me.int32.arch.roc.metric.Metrics;
import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Component;
@Component("caffeineTokenBucketContainer")
public class CaffeineTokenBucketContainer implements TokenBucketContainer {
private Map<String, Cache<String, AtomicInteger>> LRU_TK_CONTAINER = new ConcurrentHashMap<>();
@Override
public void resetContainerSize(String region, int capacity) {
synchronized (CaffeineTokenBucketContainer.class) {
LRU_TK_CONTAINER.put(region, buildContainer(region, capacity));
}
}
@Override
public boolean acquire(String region, String key, int tokenCount, int capacity) {
// 小优化逻辑: 如果 tokenCount 为 Integer.MAX_VALUE 2147483647 , 则不再去真正拿令牌,可以默认拿到令牌了
if (Integer.MAX_VALUE == tokenCount) {
return true;
}
Cache<String, AtomicInteger> container = getOrRegisterRegionContainer(region, capacity);
String tokenKey = calculateKey(key);
AtomicInteger ifPresent = container.getIfPresent(tokenKey);
TokenBucketMonitor.monitorHitAndMiss(region, ContainerType.CAFFEINE, null != ifPresent);
AtomicInteger atomicInteger = container.get(tokenKey, k -> new AtomicInteger(tokenCount));
// 这里理论上不会为null
if (null != atomicInteger) {
return atomicInteger.decrementAndGet() >= 0;
}
return true;
}
private Cache<String, AtomicInteger> getOrRegisterRegionContainer(String region, int capacity) {
Cache<String, AtomicInteger> container = LRU_TK_CONTAINER.get(region);
// 一般只有系统刚开始运作的时候才会出现 container 为null,所以这里的 synchronized 几乎不影响性能
if (null == container) {
synchronized (CaffeineTokenBucketContainer.class) {
if (null == container) {
container = buildContainer(region, capacity);
LRU_TK_CONTAINER.put(region, container);
}
}
}
return container;
}
private Cache<String, AtomicInteger> buildContainer(String region, int size) {
return Caffeine.newBuilder()
.expireAfterWrite(3, TimeUnit.SECONDS) // 3 秒之后 自动淘汰
.maximumSize(size)
.removalListener(buildRemovalListener(region))
.build();
}
private RemovalListener<String, AtomicInteger> buildRemovalListener(String region) {
return (key, value, cause) -> {
Map<String, String> tags = new HashMap<>();
tags.put("name", "Caffeine缓存Key移除监听");
tags.put("region", region);
tags.put("cause", String.valueOf(cause));
tags.put("platform_service", "monitor");
// 2.2 监控打点
Metrics.logForCount("roc-caffeine_removal_listener", 1, tags);
};
}
}
3.9 LRUMapTokenBucketContainer.java
package me.int32.common.tokenbucket.container;
import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.map.LRUMap;
import org.springframework.stereotype.Component;
@Component("lruMapTokenBucketContainer")
public class LRUMapTokenBucketContainer implements TokenBucketContainer {
private final Map<String, Map<String, AtomicInteger>> LRU_TK_CONTAINER = new ConcurrentHashMap<>();
private Map<String, AtomicInteger> getOrRegisterRegionContainer(String region, int containerSize) {
Map<String, AtomicInteger> container = LRU_TK_CONTAINER.get(region);
// 一般只有系统刚开始运作的时候才会出现 container 为null,所以这里的 synchronized 几乎不影响性能
if (null == container) {
synchronized (LRUMapTokenBucketContainer.class) {
if (null == container) {
container = Collections.synchronizedMap(new LRUMap<>(containerSize));
LRU_TK_CONTAINER.put(region, container);
}
}
}
return container;
}
@Override
public void resetContainerSize(String region, int capacity) {
synchronized (LRUMapTokenBucketContainer.class) {
Map<String, AtomicInteger> newContainer = Collections.synchronizedMap(new LRUMap<>(capacity));
LRU_TK_CONTAINER.put(region, newContainer);
}
}
@Override
public boolean acquire(String region, String key, int tokenCount, int capacity) {
// 小优化逻辑: 如果 tokenCount 为 Integer.MAX_VALUE 2147483647 , 则不再去真正拿令牌,可以默认拿到令牌了
if (Integer.MAX_VALUE == tokenCount) {
return true;
}
Map<String, AtomicInteger> container = getOrRegisterRegionContainer(region, capacity);
String tokenKey = calculateKey(key);
boolean hit = container.containsKey(tokenKey);
TokenBucketMonitor.monitorHitAndMiss(region, ContainerType.LRU_MAP, hit);
if (hit) {
// 如果当前key存在container中,则令牌-1
return container.get(tokenKey).decrementAndGet() >= 0;
} else {
// 如果当前key不存在container中,则初始化令牌桶后,令牌-1
// 这里要用synchronized做双重检查
// region.intern 是为了保证让 字符串强制刷新到常量池中,使得锁只在region维度上
synchronized (region.intern()) {
if (!container.containsKey(tokenKey)) {
container.put(tokenKey, new AtomicInteger(tokenCount));
}
return container.get(tokenKey).decrementAndGet() >= 0;
}
}
}
}
3.10 TokenBucketContainerWrapper.java
package me.int32.common.tokenbucket.container;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import me.int32.common.tokenbucket.lo.ContainerType;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
@Component("tokenBucketContainerWrapper")
public class TokenBucketContainerWrapper {
@Resource(name = "lruMapTokenBucketContainer")
private TokenBucketContainer lruMapTokenBucketContainer;
@Resource(name = "caffeineTokenBucketContainer")
private TokenBucketContainer caffeineTokenBucketContainer;
public void resetContainerSize(String region, TokenBucketRegionLO regionConfig) {
ContainerType containerType = regionConfig.calContainerType();
if (ContainerType.LRU_MAP == containerType) {
this.lruMapTokenBucketContainer.resetContainerSize(region, regionConfig.getCapacity());
} else {
this.caffeineTokenBucketContainer.resetContainerSize(region, regionConfig.getCapacity());
}
}
public boolean acquire(String region, String key, TokenBucketRegionLO regionConfig) {
Transaction t = Cat.newTransaction("TokenBucketContainerWrapper", region);
try {
ContainerType containerType = regionConfig.calContainerType();
if (ContainerType.LRU_MAP == containerType) {
t.setStatus(Transaction.SUCCESS);
return this.lruMapTokenBucketContainer.acquire(region, key, regionConfig.calculateTkCount(key),
regionConfig.getCapacity());
} else {
t.setStatus(Transaction.SUCCESS);
return this.caffeineTokenBucketContainer.acquire(region, key, regionConfig.calculateTkCount(key),
regionConfig.getCapacity());
}
} catch (Throwable ex) {
t.setStatus(ex);
throw ex;
} finally {
t.complete();
}
}
}
3.11 TokenBucketAop.java
package me.int32.common.tokenbucket.aop;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketAop {
// 分区
String region();
// key的参数位置
String keyArgName();
// key的搜索SpEL表达时
String keySpEL();
}
3.12 TokenBucketAspect.java
package me.int32.common.tokenbucket.aop;
import me.int32.utils.JsonUtils;
import me.int32.common.tokenbucket.TokenBucketConfig;
import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.container.TokenBucketContainerWrapper;
import me.int32.common.tokenbucket.exception.FlowControlException;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Component;
import org.aspectj.lang.reflect.MethodSignature;
@Aspect
@Component
public class TokenBucketAspect {
@Autowired
private TokenBucketConfig tokenBucketConfig;
@Autowired
private TokenBucketContainerWrapper tokenBucketContainerWrapper;
private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketAspect.class);
@Around("@annotation(me.int32.common.tokenbucket.aop.TokenBucketAop)")
public Object process(ProceedingJoinPoint pjp) throws Throwable {
// 1. 判断配置是否有效
if (null == tokenBucketConfig || null == tokenBucketContainerWrapper) {
TokenBucketMonitor.monitorNotExpectedError("令牌桶配置未取到");
return pjp.proceed();
}
// 2. 开关/灰度判断
if (!tokenBucketConfig.isOpen()) {
TokenBucketMonitor.monitorNotExpectedError("流量未命中总开关灰度");
return pjp.proceed();
}
// 3. 准备各种数据
MethodSignature signature = (MethodSignature) pjp.getSignature();
TokenBucketAop annotation = signature.getMethod().getAnnotation(TokenBucketAop.class);
String keyArgName = annotation.keyArgName();
String keySpEL = annotation.keySpEL();
String region = annotation.region();
TokenBucketRegionLO regionConfig = tokenBucketConfig.getConfig(region);
// 4. 判断配置是否有效
if (null == regionConfig) {
TokenBucketMonitor.monitorNotExpectedError("令牌桶配置未取到");
return pjp.proceed();
}
// 5. region开关未打开,或者region未配置
if (!regionConfig.checkInGray()) {
TokenBucketMonitor.monitorNotExpectedError("流量未命中分区开关灰度");
return pjp.proceed();
}
// 6. 判断注解参数是否合法
if (StringUtils.isEmpty(keyArgName) || StringUtils.isEmpty(region) || null == keySpEL) {
LOGGER.info("TokenBucketAspect annotation error keyArgName:{}, keySpEL:{}, region:{}",
keyArgName, keySpEL, region);
TokenBucketMonitor.monitorNotExpectedError("注解参数错误");
return pjp.proceed();
}
// 7. 找到注解参数指定的 参数类型 和 参数值
Object parameterValue = findParamsValue(signature, keyArgName, pjp.getArgs());
if (null == parameterValue) {
TokenBucketMonitor.monitorNotExpectedError("获取参数值失败");
return pjp.proceed();
}
// 8.获取key
Object key = findKey(parameterValue, keySpEL);
if (null == key) {
TokenBucketMonitor.monitorNotExpectedError("获取Key失败");
return pjp.proceed();
}
// 9. 判断key类型是否有效
if (!checkGrayKeyTypeValid(key)) {
TokenBucketMonitor.monitorNotExpectedError("校验Key类型错误");
return pjp.proceed();
}
// 8. 令牌判断
boolean acquire = true;
try {
String keyStr = key.toString();
acquire = tokenBucketContainerWrapper.acquire(region, keyStr, regionConfig);
} catch (Exception e) {
TokenBucketMonitor.monitorNotExpectedError("获取请求令牌异常");
LOGGER.info("TokenBucketAspect acquire error region:{}, key:{}, regionConfig:{}", region, key,
JsonUtils.toJson(regionConfig));
}
if (!acquire) {
TokenBucketMonitor.monitorFlowControl(region, false);
FlowControlException exception = new FlowControlException("单个参数的QPS过高,命中活动价流量控制");
LOGGER.error("TokenBucketAspect acquire error region:{}, key:{}, regionConfig:{}", region, key,
JsonUtils.toJson(regionConfig));
if (!regionConfig.getRejectOpen()){
TokenBucketMonitor.monitorNotExpectedError("拦截开关未打开");
}else {
throw exception;
}
}
TokenBucketMonitor.monitorFlowControl(region, true);
return pjp.proceed();
}
/**
* 判断key是否类型有效
* note: 只支持 Long, Integer, String 类型
*/
private boolean checkGrayKeyTypeValid(Object grayKey) {
try {
if (grayKey instanceof Long) {
return true;
}
if (grayKey instanceof Integer) {
return true;
}
if (grayKey instanceof String) {
return true;
}
} catch (Throwable e) {
LOGGER.error("TokenBucketAspect checkGrayKeyValid error ", e);
}
return false;
}
/**
* 获取控制的Key
*/
private Object findKey(Object parameterValue, String keySpEL) {
Object grayKey = null;
try {
// 如果不存在args 的SpEL表达式解析, 则直接用对应的参数Value进行控制
if (StringUtils.isNotEmpty(keySpEL)) {
ExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression(keySpEL);
grayKey = expression.getValue(parameterValue);
} else {
grayKey = parameterValue;
}
} catch (Throwable e) {
LOGGER.error("TokenBucketAspect findGrayKey error keySearchSpEL:{}", keySpEL, e);
}
return grayKey;
}
/**
* 找到对应方法的参数类型,和传入的参数值
*/
private Object findParamsValue(MethodSignature signature, String keyArgName, Object[] args) {
try {
// 找到对应的参数名字
String[] parameterNames = signature.getParameterNames();
int index = -1;
for (int i = 0; i < parameterNames.length; i++) {
if (Objects.equals(keyArgName, parameterNames[i])) {
index = i;
break;
}
}
// 获取参数值
return args[index];
} catch (Throwable e) {
LOGGER.error("TokenBucketAspect findParamsValue error keyArgName:{}", keyArgName, e);
return null;
}
}
}
4 配置介绍
{
"open":true, // 总开关, 默认false
"grayRate":10000, // 灰度流量,open = true 且 命中 灰度流量,才会起作用, MAX= 10000,默认0
"region":{
"goods_activity_for_detail":{ // 具体region
"grayRate":10000, // 灰度流量,命中灰度流量,才会起作用, MAX= 10000,默认0
"rejectOpen":false, // 命中限流是否开启拦截,true:命中后会抛异常,false:不会抛异常, 默认 false
"capacity":10000, // 桶的容积,可以容纳多少 key。 默认 10000,
"tkCount":100, // 限流阈值 默认 2147483647
"special":{ // 特殊key配置
"123": 3 // key=123, 阈值为3
},
"container":"caffeine" // 容器类型,可选 caffeine 或者 lru_map,默认 caffeine
}
}
}