热点商品QPS过高的流控处理

发表信息: by

热点商品QPS过高的流控处理

1. 背景

问题:
服务为居于电商交易核心链路上,本身服务需要抗的QPS就特别高(百万QPS量级),导致机器负载和底层存储(比如缓存,DB等)压力就特别大,如果再有一些热点商品的请求(比如一个商品请求一个接口的QPS可达万级别)的话,对底层存储的压力就会尤其严重,甚至会打垮存储。

目标:
想要写个流控代码,流控代码的预期功能如下:

  1. 支持对商品ID请求一个接口或几个接口的QPS做控制,如果一个商品请求一个/几个接口的QPS和超过阈值,则触发熔断,快速失败返回上游。
  2. 支持对一个接口/几个接口进行分组,也就是说可以支持商品ID再这个组内的QPS和超过阈值。
  3. 支持对特殊商品单独控制,比如普通商品流控是1wQPS,但大促商品可以稍微放宽到5wQPS,在保证预期内的系统稳定前提下,最大限度的保证GMV。
  4. 支持临时调整流控阈值,并且实时生效,不需要发布服务。
  5. 支持一键开关。

除了业务上的功能预期,还有些技术上的预期:

  1. 支持灵活可配置key,比如有些是用商品ID,有些使用订单SN字符串。
  2. 支持低代码侵入,最好是注解到方法上即可,有些不能用注解的,也要提供代码侵入式的接入方式。

2. 解决办法

2.1 怎么实现流控

所能想到的流控办法有:

  • 计数器
  • 滑动窗口
  • 令牌桶
  • 漏斗

我认为所有的流控手段,当时间粒度越小的时候,越相似,如果时间粒度放大,才会放大相应算法的缺点,比如计数器算法有临界问题,滑动窗口需要划分时间粒度来滑动,令牌桶要定期放令牌等等。

2.2 怎么实现维度变换

解决了控流量的问题,怎么解决需要控制的维度随时变换的问题呢? 比如我们控的是商品Id的流速,但商品那么多,难道需要把所有商品的Id都装到流速控制器中,然后定期的给令牌?(比如我们用令牌桶)这种肯定是不现实的,所以采用LRU算法,来保证桶的大小可控。

2.3 怎么实现动态变更,实时生效

采用Zookeeper的订阅机制。

2.4 怎么实现针对一个接口/多个接口进行分组

在桶的维度上,再抽象一层,叫做region,每个接口都要配置一个region,如果存在多个接口使用一个桶,就将多个接口配置到一个region上。

2.5 怎么实现灵活可配置

使用Spring 的SpEL,可以根据配置的字符串,动态解析。

3. 先看效果

使用姿势:

@Override
@TokenBucketAop(region = "goods_activity_for_detail", keyArgName = "useActivityDTO", keySpEL = "#root.goodsId")
public BaseResponse<GoodsActivityForOrderDTO> queryBestActivity(UseActivityDTO useActivityDTO) {
}

效果为:

图片

4. 代码

整体结构如下图:

图片

代码结构如下:

token_bucket
└── aop # 这里防止 AOP 注解相关的代码
    └── TokenBucketAop.java # AOP 注解
    └── TokenBucketAspect.java # AOP 实现
└──  exception # 这里放置 被流控 向上抛的异常
    └── FlowControlException.java # 异常
└── lo # Zookeeper 定制变更后需要序列化出来的对象
    └── ContainerType.java # 容器类型
    └── TokenBucketLO.java # 整体配置
    └── TokenBucketRegionLO.java # 单个桶配置
└── container # 这里放置容器实现类型
    └── CaffeineTokenBucketContainer.java # 咖啡因的实现
    └── LRUMapTokenBucketContainer.java # LRU Map 的实现
    └── TokenBucketContainer.java # 容器标准
    └── TokenBucketContainerWrapper.java # 容器的Wrapper
└── TokenBucketConfig.java # 监听ZK的代码
└── TokenBucketContext.java # import 的配置文件
└── TokenBucketMonitor.java # 监控相关

3.1 TokenBucketContext.java

package me.int32.common.tokenbucket;

import org.springframework.context.annotation.ComponentScan;

@ComponentScan("me.int32.common.tokenbucket")
public class LRUTokenBucketContext {

}

3.2 TokenBucketConfig.java

package me.int32.common.tokenbucket;

import me.int32.utils.JsonUtils;
import me.int32.arch.leo.client.ConfigCache;
import me.int32.arch.leo.util.LeoUtils;
import me.int32.common.tokenbucket.container.TokenBucketContainerWrapper;
import me.int32.common.tokenbucket.lo.TokenBucketLO;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TokenBucketConfig {

    private final static String LRU_TOKEN_BUCKET_CONFIG = "lruTokenBucket.config";

    @Autowired
    private TokenBucketContainerWrapper containerWrapper;

    private TokenBucketLO config;

    // 初始化LEO取值.
    @PostConstruct
    public void initialize() {
        String leoValue = LeoUtils.getProperty(LRU_TOKEN_BUCKET_CONFIG, "");
        buildConfig(leoValue);
        refreshContainer();

        // 挂载监听LEO事件变更
        ConfigCache.getInstance().addChange((k, v) -> {
            if (LRU_TOKEN_BUCKET_CONFIG.equals(k)) {
                buildConfig(v);
                refreshContainer();
            }
        });
    }

    public TokenBucketLO getConfig() {
        return config;
    }

    public TokenBucketRegionLO getConfig(String region) {
        if (null == this.config || null == this.config.getRegion()) {
            return null;
        }

        return this.getConfig().getRegion().get(region);
    }

    public boolean isOpen() {
        return null != this.config && this.config.checkOpen();
    }

    private void buildConfig(String leoValue) {
        this.config = JsonUtils.fromJson(leoValue, TokenBucketLO.class);

        if (null == this.config) {
            this.config = new TokenBucketLO();
        }
    }

    private void refreshContainer() {
        if (null == this.containerWrapper) {
            return;
        }

        if (null == this.config) {
            return;
        }

        config.getRegion().forEach((k, v) -> {
            containerWrapper.resetContainerSize(k, v);
        });
    }
}

3.3 TokenBucketMonitor.java

package me.int32.common.tokenbucket;

import me.int32.arch.roc.metric.Metrics;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.HashMap;
import java.util.Map;

public class TokenBucketMonitor {

    // 将ROC打点,都汇聚到一个服务里面
    private static final String CONVERGENCE_ROC_SERVICE_NAME = "monitor";

    // 未期望异常 ROC 监控
    private static final String TOKEN_BUCKET_NOT_EXPECTED_ERROR_MONITOR = "roc-token_bucket_not_expect_error";

    // 流控 ROC 监控
    private static final String TOKEN_BUCKET_FLOW_CONTROL_MONITOR = "roc-token_bucket_flow_control";

    // 流控 命中率 ROC 监控
    private static final String TOKEN_BUCKET_CONTAINER_HIT_RATE = "roc-token_bucket_container_hit_rate";


    public static void monitorNotExpectedError(String desc) {
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "令牌桶流控非预期错误监控");
        tags.put("errMsg", desc);
        tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);

        // 2.2 监控打点
        Metrics.logForCount(TOKEN_BUCKET_NOT_EXPECTED_ERROR_MONITOR, 1, tags);
    }

    public static void monitorFlowControl(String region, boolean pass) {
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "令牌桶流控监控");
        tags.put("region", region);
        tags.put("control", pass ? "pass" : "reject");
        tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);

        // 2.2 监控打点
        Metrics.logForCount(TOKEN_BUCKET_FLOW_CONTROL_MONITOR, 1, tags);
    }

    public static void monitorHitAndMiss(String region, ContainerType containerType, boolean hit) {
        Map<String, String> tags = new HashMap<>();
        tags.put("name", "令牌桶流控容器命中率监控");
        tags.put("region", region);
        tags.put("containerType", containerType.getCode());
        tags.put("hit", String.valueOf(hit));
        tags.put("platform_service", CONVERGENCE_ROC_SERVICE_NAME);

        // 2.2 监控打点
        Metrics.logForCount(TOKEN_BUCKET_CONTAINER_HIT_RATE, 1, tags);
    }
}

3.4 ContainerType.java

package me.int32.common.tokenbucket.lo;

public enum ContainerType {
    LRU_MAP("lru_map"),
    CAFFEINE("caffeine");

    private String code;

    ContainerType(String v) {
        this.code = v;
    }

    public String getCode() {
        return code;
    }

    public static ContainerType calContainerType(String code) {
        for (ContainerType containerType : ContainerType.values()) {
            if (containerType.code.equals(code)) {
                return containerType;
            }
        }

        // 默认 Caffeine
        return CAFFEINE;
    }
}

3.5 TokenBucketLO.java

package me.int32.common.tokenbucket.lo;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class TokenBucketLO {

    // 总开关
    private boolean open = false;

    // 灰度比率 (10000)
    private int grayRate = 0;

    // 每个Region的配置
    private Map<String, TokenBucketRegionLO> region = new HashMap<>();

    public boolean getOpen() {
        return open;
    }

    public void setOpen(boolean open) {
        this.open = open;
    }

    public int getGrayRate() {
        return grayRate;
    }

    public void setGrayRate(int grayRate) {
        this.grayRate = grayRate;
    }

    public Map<String, TokenBucketRegionLO> getRegion() {
        return region;
    }

    public void setRegion(
            Map<String, TokenBucketRegionLO> region) {
        this.region = region;
    }

    public boolean checkOpen() {
        // 如果总开关没有开,则返回false
        if (!this.getOpen()) {
            return false;
        }

        // 如果没有完成灰度,且没有命中灰度流量,返回false
        int MAX_RATE = 10000;
        if (this.getGrayRate() < MAX_RATE) {
            int random = new Random().nextInt(MAX_RATE);
            return random < this.getGrayRate();
        }

        return true;
    }
}

3.5 TokenBucketRegionLO.java

package me.int32.common.tokenbucket.lo;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class TokenBucketRegionLO {

    // 灰度比率 (10000)
    private int grayRate = 0;

    // 拦截开关,如果拦截开关没有打开,则命中限流,只打点
    private boolean rejectOpen = false;

    // LUR 容器的Size
    private int capacity = 10000;

    // 令牌数量
    private int tkCount = Integer.MAX_VALUE;

    // 特殊配置
    private Map<String, Integer> special = new HashMap<>();

    // 容器类型,可选 lruMap 或者 caffeine
    private String container = "caffeine";

    public boolean getRejectOpen() {
        return rejectOpen;
    }

    public void setRejectOpen(boolean rejectOpen) {
        this.rejectOpen = rejectOpen;
    }

    public int getGrayRate() {
        return grayRate;
    }

    public void setGrayRate(int grayRate) {
        this.grayRate = grayRate;
    }

    public int getCapacity() {
        return capacity;
    }

    public void setCapacity(int capacity) {
        this.capacity = capacity;
    }

    public int getTkCount() {
        return tkCount;
    }

    public void setTkCount(int tkCount) {
        this.tkCount = tkCount;
    }

    public Map<String, Integer> getSpecial() {
        return special;
    }

    public void setSpecial(Map<String, Integer> special) {
        this.special = special;
    }

    public boolean checkInGray() {
        // 如果没有完成灰度,且没有命中灰度流量,返回false
        int MAX_RATE = 10000;
        if (this.getGrayRate() < MAX_RATE) {
            int random = new Random().nextInt(MAX_RATE);
            return random < this.getGrayRate();
        }

        return true;
    }

    public String getContainer() {
        return container;
    }

    public void setContainer(String container) {
        this.container = container;
    }

    // 优先取 special 里面的 tk count,如果key不是special的key,则再使用默认 tkCount
    public int calculateTkCount(String key) {
        if (null != this.getSpecial() && null != this.getSpecial().get(key)) {
            return this.getSpecial().get(key);
        }

        return this.getTkCount();
    }

    public ContainerType calContainerType() {
        return ContainerType.calContainerType(this.getContainer());
    }
}

3.7 FlowControlException.java

package me.int32.common.tokenbucket.exception;

public class FlowControlException extends RuntimeException {

    public FlowControlException() {
    }

    public FlowControlException(String message) {
        super(message);
    }
}

3.8 TokenBucketContainer.java

package me.int32.common.tokenbucket.container;

import me.int32.common.util.TimeUtils;
import org.springframework.stereotype.Component;

@Component
public interface TokenBucketContainer {

    void resetContainerSize(String region, int capacity);

    boolean acquire(String region, String key, int tokenCount, int capacity);

    default String calculateKey(String key) {
        // LRU的key是按秒统计的,所以每秒一个Key
        Long secondTime = TimeUtils.currentTime();
        return String.format("%s-%s", key, secondTime);
    }
}

3.8 CaffeineTokenBucketContainer.java

package me.int32.common.tokenbucket.container;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import me.int32.arch.roc.metric.Metrics;
import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Component;

@Component("caffeineTokenBucketContainer")
public class CaffeineTokenBucketContainer implements TokenBucketContainer {

    private Map<String, Cache<String, AtomicInteger>> LRU_TK_CONTAINER = new ConcurrentHashMap<>();

    @Override
    public void resetContainerSize(String region, int capacity) {
        synchronized (CaffeineTokenBucketContainer.class) {
            LRU_TK_CONTAINER.put(region, buildContainer(region, capacity));
        }
    }

    @Override
    public boolean acquire(String region, String key, int tokenCount, int capacity) {
        // 小优化逻辑: 如果 tokenCount 为 Integer.MAX_VALUE 2147483647 , 则不再去真正拿令牌,可以默认拿到令牌了
        if (Integer.MAX_VALUE == tokenCount) {
            return true;
        }

        Cache<String, AtomicInteger> container = getOrRegisterRegionContainer(region, capacity);
        String tokenKey = calculateKey(key);

        AtomicInteger ifPresent = container.getIfPresent(tokenKey);
        TokenBucketMonitor.monitorHitAndMiss(region, ContainerType.CAFFEINE, null != ifPresent);

        AtomicInteger atomicInteger = container.get(tokenKey, k -> new AtomicInteger(tokenCount));
        // 这里理论上不会为null
        if (null != atomicInteger) {
            return atomicInteger.decrementAndGet() >= 0;
        }

        return true;
    }

    private Cache<String, AtomicInteger> getOrRegisterRegionContainer(String region, int capacity) {
        Cache<String, AtomicInteger> container = LRU_TK_CONTAINER.get(region);
        // 一般只有系统刚开始运作的时候才会出现 container 为null,所以这里的 synchronized 几乎不影响性能
        if (null == container) {
            synchronized (CaffeineTokenBucketContainer.class) {
                if (null == container) {
                    container = buildContainer(region, capacity);
                    LRU_TK_CONTAINER.put(region, container);
                }
            }
        }

        return container;
    }

    private Cache<String, AtomicInteger> buildContainer(String region, int size) {
        return Caffeine.newBuilder()
                .expireAfterWrite(3, TimeUnit.SECONDS) // 3 秒之后 自动淘汰
                .maximumSize(size)
                .removalListener(buildRemovalListener(region))
                .build();
    }

    private RemovalListener<String, AtomicInteger> buildRemovalListener(String region) {
        return (key, value, cause) -> {
            Map<String, String> tags = new HashMap<>();
            tags.put("name", "Caffeine缓存Key移除监听");
            tags.put("region", region);
            tags.put("cause", String.valueOf(cause));
            tags.put("platform_service", "monitor");

            // 2.2 监控打点
            Metrics.logForCount("roc-caffeine_removal_listener", 1, tags);
        };
    }
}

3.9 LRUMapTokenBucketContainer.java

package me.int32.common.tokenbucket.container;

import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.lo.ContainerType;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.map.LRUMap;
import org.springframework.stereotype.Component;

@Component("lruMapTokenBucketContainer")
public class LRUMapTokenBucketContainer implements TokenBucketContainer {

    private final Map<String, Map<String, AtomicInteger>> LRU_TK_CONTAINER = new ConcurrentHashMap<>();

    private Map<String, AtomicInteger> getOrRegisterRegionContainer(String region, int containerSize) {
        Map<String, AtomicInteger> container = LRU_TK_CONTAINER.get(region);
        // 一般只有系统刚开始运作的时候才会出现 container 为null,所以这里的 synchronized 几乎不影响性能
        if (null == container) {
            synchronized (LRUMapTokenBucketContainer.class) {
                if (null == container) {
                    container = Collections.synchronizedMap(new LRUMap<>(containerSize));
                    LRU_TK_CONTAINER.put(region, container);
                }
            }
        }

        return container;
    }

    @Override
    public void resetContainerSize(String region, int capacity) {
        synchronized (LRUMapTokenBucketContainer.class) {
            Map<String, AtomicInteger> newContainer = Collections.synchronizedMap(new LRUMap<>(capacity));
            LRU_TK_CONTAINER.put(region, newContainer);
        }
    }

    @Override
    public boolean acquire(String region, String key, int tokenCount, int capacity) {
        // 小优化逻辑: 如果 tokenCount 为 Integer.MAX_VALUE 2147483647 , 则不再去真正拿令牌,可以默认拿到令牌了
        if (Integer.MAX_VALUE == tokenCount) {
            return true;
        }

        Map<String, AtomicInteger> container = getOrRegisterRegionContainer(region, capacity);
        String tokenKey = calculateKey(key);

        boolean hit = container.containsKey(tokenKey);
        TokenBucketMonitor.monitorHitAndMiss(region, ContainerType.LRU_MAP, hit);

        if (hit) {
            // 如果当前key存在container中,则令牌-1
            return container.get(tokenKey).decrementAndGet() >= 0;
        } else {
            // 如果当前key不存在container中,则初始化令牌桶后,令牌-1
            // 这里要用synchronized做双重检查
            // region.intern 是为了保证让 字符串强制刷新到常量池中,使得锁只在region维度上
            synchronized (region.intern()) {
                if (!container.containsKey(tokenKey)) {
                    container.put(tokenKey, new AtomicInteger(tokenCount));
                }

                return container.get(tokenKey).decrementAndGet() >= 0;
            }
        }
    }
}

3.10 TokenBucketContainerWrapper.java

package me.int32.common.tokenbucket.container;

import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import me.int32.common.tokenbucket.lo.ContainerType;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;

@Component("tokenBucketContainerWrapper")
public class TokenBucketContainerWrapper {

    @Resource(name = "lruMapTokenBucketContainer")
    private TokenBucketContainer lruMapTokenBucketContainer;

    @Resource(name = "caffeineTokenBucketContainer")
    private TokenBucketContainer caffeineTokenBucketContainer;


    public void resetContainerSize(String region, TokenBucketRegionLO regionConfig) {
        ContainerType containerType = regionConfig.calContainerType();

        if (ContainerType.LRU_MAP == containerType) {
            this.lruMapTokenBucketContainer.resetContainerSize(region, regionConfig.getCapacity());
        } else {
            this.caffeineTokenBucketContainer.resetContainerSize(region, regionConfig.getCapacity());
        }
    }

    public boolean acquire(String region, String key, TokenBucketRegionLO regionConfig) {
        Transaction t = Cat.newTransaction("TokenBucketContainerWrapper", region);
        try {
            ContainerType containerType = regionConfig.calContainerType();

            if (ContainerType.LRU_MAP == containerType) {
                t.setStatus(Transaction.SUCCESS);
                return this.lruMapTokenBucketContainer.acquire(region, key, regionConfig.calculateTkCount(key),
                        regionConfig.getCapacity());
            } else {
                t.setStatus(Transaction.SUCCESS);
                return this.caffeineTokenBucketContainer.acquire(region, key, regionConfig.calculateTkCount(key),
                        regionConfig.getCapacity());
            }
        } catch (Throwable ex) {
            t.setStatus(ex);
            throw ex;
        } finally {
            t.complete();
        }
    }
}

3.11 TokenBucketAop.java

package me.int32.common.tokenbucket.aop;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketAop {

    // 分区
    String region();

    // key的参数位置
    String keyArgName();

    // key的搜索SpEL表达时
    String keySpEL();
}

3.12 TokenBucketAspect.java

package me.int32.common.tokenbucket.aop;

import me.int32.utils.JsonUtils;
import me.int32.common.tokenbucket.TokenBucketConfig;
import me.int32.common.tokenbucket.TokenBucketMonitor;
import me.int32.common.tokenbucket.container.TokenBucketContainerWrapper;
import me.int32.common.tokenbucket.exception.FlowControlException;
import me.int32.common.tokenbucket.lo.TokenBucketRegionLO;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Component;
import org.aspectj.lang.reflect.MethodSignature;

@Aspect
@Component
public class TokenBucketAspect {

    @Autowired
    private TokenBucketConfig tokenBucketConfig;

    @Autowired
    private TokenBucketContainerWrapper tokenBucketContainerWrapper;

    private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucketAspect.class);

    @Around("@annotation(me.int32.common.tokenbucket.aop.TokenBucketAop)")  
    public Object process(ProceedingJoinPoint pjp) throws Throwable {
        // 1. 判断配置是否有效
        if (null == tokenBucketConfig || null == tokenBucketContainerWrapper) {
            TokenBucketMonitor.monitorNotExpectedError("令牌桶配置未取到");
            return pjp.proceed();
        }

        // 2. 开关/灰度判断
        if (!tokenBucketConfig.isOpen()) {
            TokenBucketMonitor.monitorNotExpectedError("流量未命中总开关灰度");
            return pjp.proceed();
        }

        // 3. 准备各种数据
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        TokenBucketAop annotation = signature.getMethod().getAnnotation(TokenBucketAop.class);
        String keyArgName = annotation.keyArgName();
        String keySpEL = annotation.keySpEL();
        String region = annotation.region();

        TokenBucketRegionLO regionConfig = tokenBucketConfig.getConfig(region);

        // 4. 判断配置是否有效
        if (null == regionConfig) {
            TokenBucketMonitor.monitorNotExpectedError("令牌桶配置未取到");
            return pjp.proceed();
        }

        // 5. region开关未打开,或者region未配置
        if (!regionConfig.checkInGray()) {
            TokenBucketMonitor.monitorNotExpectedError("流量未命中分区开关灰度");
            return pjp.proceed();
        }

        // 6. 判断注解参数是否合法
        if (StringUtils.isEmpty(keyArgName) || StringUtils.isEmpty(region) || null == keySpEL) {
            LOGGER.info("TokenBucketAspect annotation error keyArgName:{}, keySpEL:{}, region:{}",
                    keyArgName, keySpEL, region);
            TokenBucketMonitor.monitorNotExpectedError("注解参数错误");
            return pjp.proceed();
        }

        // 7. 找到注解参数指定的 参数类型 和 参数值
        Object parameterValue = findParamsValue(signature, keyArgName, pjp.getArgs());
        if (null == parameterValue) {
            TokenBucketMonitor.monitorNotExpectedError("获取参数值失败");
            return pjp.proceed();
        }

        // 8.获取key
        Object key = findKey(parameterValue, keySpEL);
        if (null == key) {
            TokenBucketMonitor.monitorNotExpectedError("获取Key失败");
            return pjp.proceed();
        }

        // 9. 判断key类型是否有效
        if (!checkGrayKeyTypeValid(key)) {
            TokenBucketMonitor.monitorNotExpectedError("校验Key类型错误");
            return pjp.proceed();
        }

        // 8. 令牌判断
        boolean acquire = true;
        try {
            String keyStr = key.toString();
            acquire = tokenBucketContainerWrapper.acquire(region, keyStr, regionConfig);
        } catch (Exception e) {
            TokenBucketMonitor.monitorNotExpectedError("获取请求令牌异常");
            LOGGER.info("TokenBucketAspect acquire error region:{}, key:{}, regionConfig:{}", region, key,
                    JsonUtils.toJson(regionConfig));
        }

        if (!acquire) {
            TokenBucketMonitor.monitorFlowControl(region, false);

            FlowControlException exception = new FlowControlException("单个参数的QPS过高,命中活动价流量控制");
            LOGGER.error("TokenBucketAspect acquire error region:{}, key:{}, regionConfig:{}", region, key,
                    JsonUtils.toJson(regionConfig));

            if (!regionConfig.getRejectOpen()){
                TokenBucketMonitor.monitorNotExpectedError("拦截开关未打开");
            }else {
                throw exception;
            }
        }

        TokenBucketMonitor.monitorFlowControl(region, true);
        return pjp.proceed();
    }


    /**
     * 判断key是否类型有效
     * note: 只支持 Long, Integer, String 类型
     */
    private boolean checkGrayKeyTypeValid(Object grayKey) {
        try {
            if (grayKey instanceof Long) {
                return true;
            }
            if (grayKey instanceof Integer) {
                return true;
            }
            if (grayKey instanceof String) {
                return true;
            }
        } catch (Throwable e) {
            LOGGER.error("TokenBucketAspect checkGrayKeyValid error ", e);
        }
        return false;
    }

    /**
     * 获取控制的Key
     */
    private Object findKey(Object parameterValue, String keySpEL) {
        Object grayKey = null;
        try {
            // 如果不存在args 的SpEL表达式解析, 则直接用对应的参数Value进行控制
            if (StringUtils.isNotEmpty(keySpEL)) {
                ExpressionParser parser = new SpelExpressionParser();
                Expression expression = parser.parseExpression(keySpEL);
                grayKey = expression.getValue(parameterValue);
            } else {
                grayKey = parameterValue;
            }
        } catch (Throwable e) {
            LOGGER.error("TokenBucketAspect findGrayKey error keySearchSpEL:{}", keySpEL, e);
        }
        return grayKey;
    }

    /**
     * 找到对应方法的参数类型,和传入的参数值
     */
    private Object findParamsValue(MethodSignature signature, String keyArgName, Object[] args) {
        try {
            // 找到对应的参数名字
            String[] parameterNames = signature.getParameterNames();
            int index = -1;
            for (int i = 0; i < parameterNames.length; i++) {
                if (Objects.equals(keyArgName, parameterNames[i])) {
                    index = i;
                    break;
                }
            }
            // 获取参数值
            return args[index];
        } catch (Throwable e) {
            LOGGER.error("TokenBucketAspect findParamsValue error keyArgName:{}", keyArgName, e);
            return null;
        }
    }
}

4 配置介绍

{
    "open":true, // 总开关, 默认false
    "grayRate":10000, // 灰度流量,open = true  命中 灰度流量,才会起作用, MAX= 10000,默认0
    "region":{
        "goods_activity_for_detail":{ // 具体region
            "grayRate":10000, // 灰度流量,命中灰度流量,才会起作用, MAX= 10000,默认0
            "rejectOpen":false, // 命中限流是否开启拦截,true:命中后会抛异常,false:不会抛异常, 默认 false
            "capacity":10000, // 桶的容积,可以容纳多少 key。 默认 10000
            "tkCount":100, // 限流阈值 默认 2147483647
            "special":{ // 特殊key配置
                "123": 3 // key=123, 阈值为3
            },
            "container":"caffeine" // 容器类型,可选 caffeine 或者 lru_map,默认 caffeine
        }
    }
}