记一次分布式调控系统的思考
背景
有幸在公司组内承担一个调控系统的改造,要改造系统也应该归属于分布式系统的范畴内吧,之前1.0的版本是也是公司大牛们开发出来的。在决定改造之前,心里总是忐忑不安:是否要对大牛写出来的系统进行改造? 我的设计是否能比大牛们设计出来的东西更加符合当前乃至后面的使用场景呢?是否会比之前设计的系统更加稳定,不容易出问题?定位问题是否更加方便呢?最终,咬牙坚持,决定对现在的实现逻辑进行全篇修改,因为有些事情不去体验,不去经历,是不知道你的想法是否真的可行,如果我放弃我的设计,仍然在现有的设计上进行修改、开发、兼容,我自己觉得心有不甘,想必也踩不到一些坑,无法成长吧。
人总有要坚持的东西,做一些不知对错,不知后果的事情。
调控系统
相关系统介绍
前提介绍
上面是系统的调用链路:
我们定义一个操作叫调控, 调控操作需要执行一些策略,我们暂定为:策略1, 策略2, 策略3, 策略4, 分别要修改DB一张表(店铺表)中的字段 a, b, c(店铺属性)。 对应关系如下:
策略1 —— 字段a
策略2 —— 字段b
策略3 —+
策略4 —— 字段c
注意,这里策略2 和 策略3都会修改字段b
且调控有两种类型的操作:
- 执行调控
- 取消调控
A.API 系统介绍
A.API
会让用户执行调控:
携带任意一组调控策略触发调控,触发策略的本意是要将一批店铺的属性,修改成对应传入的策略值
A.API
会让用户取消调控:
将店铺上的属性恢复成调控之前的状态
A 系统介绍
A 系统接受到A.API触发的调控,进行一定的业务逻辑处理,然后将调控操作记录到一张表中
A 系统会用fixedDelay为1:1的job去定时扫描未处理的调控,并且将这些调控同步调用B系统的B-interfaceA接口
B 系统介绍
B 系统接受到 A 系统调控调用之后做一定的逻辑处理,调用 C 系统进行数据操作(CRUD)
C 系统介绍
C 系统 是DataBase 的操作层,可以进行DB的CRUD操作
隐藏关卡
A系统为什么要用job
A为什么要把 A.API 的调控写入到表中,然后异步job扫表执行? 因为A.API会在一次调控中传入大量的店铺(上限制1000个),A系统要对每一个店铺的调控策略作合理性校验和其他的校验,会消耗较长的时间,为了不影响用户体验,所以A系统将A.API过来的操作做成异步处理,毕竟让页面卡一分钟可不是个明智的选择!
B 系统可能要承担的坑
我们先来想想B系统在这次调用链路中扮演什么角色,要承担什么责任:
- 承担满足大吞吐量的重任
- 避免操作乱序
- 避免并发操作造成数据覆盖
- 异常报告友好
我们一个一个说为什么要承担上面的责任,和如何解决上面承担的责任:
- 满足较大的吞吐量
A系统会调用B-interfaceA来让B系统进行真正的调控。刚才说了,A会接受到大量店铺,所以A会把这些店铺一同扔给B让B操作,而B要对这些店一一操作,所以肯定是循环执行A扔过来的店。但是又不能让这种跨服务的SOA调用时间无限制,不可控的长,所以要有两点操作来控制这个。
1.1 一次调用接口传入的参数不能过长(最终敲定一次调用调控1000家店铺)
1.2 对每家店的处理绝对不能同步for循环。因为处理一家店如果要用1s,1000家店就是1000s,SOA不容忍1000s的调用时长,且一次调用时长过高会阻塞吞吐量(1小时3600s,也就是说一天最多也只能调控 24 * 3600 / 1000 * 1000 家店, 注意,这只是接口for循环的次数,由于调控包括 执行调控/取消调用, 所以真正可以调控的店铺数量是 24 * 3600 / 1000 * 1000 / 2 = 42w) , 实际最终我们设计出的系统(1.0和2.0都满足)吞吐量达到400w左右。
- 避免消息乱序
因为对一家店的调控有两种 执行调控 和 取消调控,且 取消调控必须在执行调控之后执行(不要问为什么,仔细想一下 取消在执行之前会出现什么问题)。
A 系统扔给我们的是积攒了一分钟操作的数据,也就是说扔给我们的可能是这样的:
店铺1 执行调控,店铺2 执行调控, 店铺3 执行调控, 店铺1 取消调控, 店铺1 执行调控, 店铺1 取消调控,店铺3 取消调控……
- 避免并发操作造成的数据覆盖
如果调控要对店铺1执行策略2和策略3,因为要同时修改一个字段,且DB存在主从同步的延迟,会出现数据覆盖的问题。
在 C 系统上开启一个可以同时修改a, b, c三个策略的接口,接口接受四种策略,经过转换,修改店铺对应的数据, 这个接口要做成幂等的。
- 异常报告友好
对具体店铺调控异常要归类好,并且反馈给A系统。
1.0的设计
A.API 和 A 系统都不是我们自己的,所以这里只是简单介绍了下, 接下来我们主要是讲 B 和 C 的实现
系统内部简单逻辑图
在这张图中,主要介绍几个关键部分:
- 为什么会使用MQ?
使用MQ主要是考虑到接口SOA相应时间, 接口参数要接受最大1000个店铺的调控策略,如果用同步的方式for循环处理就会出现一次接口调用有上千秒的相应时间,所以由同步改成异步,先把每次调控记录入库,再用MQ去做异步。
- 大线程池的作用?
大线程池主要是为了提高吞吐量,不使用MQ consumer的方式,而是使用baseGET的方式去拉消息, 每台机器开50个线程去拉取消息, 一个机房10太机器, 双机房机器double, 最终大线程池的线程数便是 50 * 10 * 2 = 1000个线程在运行, 如果满载的且一个消息处理时间在1s左右,QPS能打到1000,即 1分钟6w的消息处理量,这个吞吐量 足够满足A系统的需求。
- MQ + 消息顺序 + 多线程 会出现什么问题?这里是怎么解决?
分布式消费消息就会完全打乱消息顺序的问题,而我们的调控在业务上最好是保证顺序的,即 取消和调控不能乱序, 但由于1.0的设计根本没有办法保证消息消费顺序,所有做了业务上的兼容和容忍,即 假如正常顺序是 A调控, A取消, 如果在不能确认A调控是否顺利执行的时候,将A调控干掉,保证A调控不再被执行的情况下去执行A取消。
所以这里的做法是,在每个线程拉取到currentRecord的时候,在Redis中种入key="record.id, shop_id", 并且每次处理currentRecord的时候都去查这个record是否是该店铺下最古老的待执行策略,如果是,则去执行这条record,如果不是,查出最古老的待执行记录oldestRecord,看Redis中是否存在"oldestRecord.id, shop_id"的key,如果存在,说明其他线程正在执行这个oldestRecord,则将currentRecord扔回MQ。 如果Redis中不存在"oldestRecord.id, shop_id"的key,则表明没有其他线程占用这个调控,说明这个调控可能因为某些问题导致没有执行且Redis的key过期掉了,则currentRecord在执行前负责将oldestRecord设置为调控失败之后再执行自己。
总结: 多线程一起抢任务,且这些任务在某一个维度上是顺序相关的, 可以用如下办法来解决这个问题: 1 维度上前面的任务没有执行的时候,不允许执行后面的任务;2 维度上后面的任务确认前面的任务已经没办法执行时,负责将前面的任务干掉并且保证不再执行。
为什么要重新设计
异常情况多,特别是在并发的条件下
这里所说的并发,并不是指这1000个线程的并发,而是对某一个维度上的操作并发, 本来对一个维度上的操作最省心的办法是顺序,如果对此并发的话,就会出现很多不应该出现的问题,而程序这个东西只在乎自己怎么执行,不会在乎其他线程上执行的是什么,执行到了哪里(除非强加入一些公共资源)。所以用一些手段去让程序不单单要关心自己怎么执行,还要关心别人执行了什么,别人执行的不对还要去帮助别人执行。本身这种乱入乱操作的现象就不好控制。 所以尽可能一个维度的事情让一个线程去执行。
补充一句, 我个人觉得,写代码只能考虑到一部分的事情,经验越多,考虑的事情就越多,但也是很难考虑的全, 而且考虑的事情都是单线逻辑上的,如果多线并发逻辑一起考虑,那难度可能就不是单纯的double了。所以很佩服那些厉害的架构师��
出现问题不好定位,就算撸日志
在1.0这种设计上,最怕的就是出问题,多个线程同时处理一个维度上的事情,ELK收集到的日志也是同一时间打出来的(不然怎么叫并发呢),根本没有先后,人为也分不出先后,因为可能这些日志是同一个机器上的几个线程打出来的,也可能是多个机器上的几个线程打出来的,找问题非常困难。之前就出过一个问题,跟着同事一起撸日志,撸代码,足足查了两三个小时才把当时的现场复现出来。
2.0的设计
这是2.0的内部逻辑设计
延用MQ
这里延用了 1.0中 MQ 的思想,因为用MQ做“实时”异步操作确实很合适,当然异步操作还可以用job,但是用job去扫全表终归是风险太高,对数据库压力太大,所以思来想去,仍然选用job。
线程抢到任务后的逻辑变更
当线程抢到任务后的逻辑,完全是跟1.0没有任何相似指出了,毕竟核心逻辑都变了, 应该是变的简单多了。
当线程拉去到消息后,就会根据record中的shop_id去Redis中setnx, 如果设置失败,则说明有其他线程已经把这个shop_id加锁了,该线程sleep1s之后就把record扔回MQ; 如果设置成功,则查询处被锁shop_id下所有未执行的调控, 然后根据调控时间先后顺序排序,循环执行每一个调控
2.0的逻辑有什么好处
- 将线程可操作的维度 从消息维度 转换成 店铺维度
店铺维度是我们允许划分的最大维度,在多线程并发操作时,最好让一个线程操作的维度变到可容忍的最大值,比如这里的店铺维度,因为消息维度太小,多个线程可以操作同一个店铺, 而店铺维度比较合适,一个店铺的调控只能在一个线程上执行。
- 自我补偿
2.0的消息作用不再跟1.0的消息作用一致,2.0的消息只是起到触发器的作用, 线程拉取到消息, 触发对消息中店铺的调控,则在线程中,会将这个店铺所有待调控的记录拉取出来,然后依次执行。 拉不到待执行记录就丢掉消息,不做任何处理。有个好处,如果前面的消息丢掉了,后面的消息到了,也会把前面的记录查询出来并且处理掉。
- 逻辑清晰,错误易查
逻辑清晰不必多说,对比下1.0便能知道, 一个线程负责一个店铺的处理,对一个店铺不存在并发, 所以错误日志也容易查询。
是否可以满足大吞吐量
仍然是1000个线程并发执行,但并发的粒度从调控维度提高到店铺维度,维度提高,相应的吞吐量必然下降, 但可以同时调控的QPS仍然是1000,只不过是1000个店罢了。
两次sleep是干什么的?
第一次sleep操作是发生在 当发现shopid已经被其他线程锁住了, sleep1s然后将消息扔回队列。 这里sleep是为了避免消息队列消息持有大量消息时间过长(最长1s)。 因为程序的运行速度是非常快的,发现shopid在锁中,就扔消息回去,这个消息会一直处于 扔回去,拿出来的状态下, 直到redis不再锁该shopid为止。
第二次sleep是发生在处理完一次调控之后, 这个sleep有没有必要存在,存在多久? 完全是看C系统的更新接口是怎么设计的,如果C系统是 select bind master, update model, 则这个sleep就没必要存在, 说白了,第二次sleep完全是为了DB更新主从同步而设置的。
MQ 丢消息怎么办?
但可能有经验的人会发现,用MQ做异步,那如果MQ丢失怎么办? 在1.0的系统中,MQ丢失确实是一个被忽略的地方,只是简单的借用了rabbitMQ自身持久化的功能,并没有在上面的代码做任何考虑。
2.0中关注了这个问题, 第一种解决办法是 上面说的, 下次消息都是对上一次消息的补偿; 第二种办法是 做一个补偿job, 把一段时间内仍然没有取消的调控给取消掉, 为什么这么做? 因为 我们丢的消息分为两种, 一种是应用调控消息,一种是取消调控的消息, 应用调控的消息丢了也无所谓,取消调控的消息是不能丢的, 丢了造成的影响就是 调控一直不会被取消,直到新的关于这个店铺的消息过来(第一种补偿起作用)。而我们做一个job,把一定时间内没有取消的调控取消掉,不就ok了吗?
其实我觉得, 避免MQ丢消息的办法有很多种,有技术上的, 有产品上的, 没必要都去绞尽脑汁的想怎么保证消息到达, 那样系统会很复杂。 我们想清楚如果丢了消息会怎么样? 丢消息之后我们怎么做(代码开发越少越好)就能让丢消息的影响降到最低便好了。
当然在工作中也有必须要求消息到达, 对消息丢失零容忍的需求,后面我会总结一下在工作中避免消息丢失/处理异常而做的一些事情。
在实践过程中所学习到的
异常Code和异常封装
在这次写代码的过程中,考虑到调控结果对其他系统通知,有些异常信息根本就不需要透传给其他服务, 所以要在异常上作一次封装,比如哪些异常归为一类, 哪些异常归为另一类。 但在划分异常的过程中遇到了一些比较棘手的问题, 比如 C 服务往外抛的异常都是 ServiceException, 那怎么去归类呢? 有两种, 一种是根据ErrorCode进行分, 一种是根据ErrorMessage进行分:
- ErrorCode 去划分
if ErrorCode in ('CODE1', 'CODE2', 'CODE3'):
return '业务异常1'
elif ErrorCode in ('CODE4', 'CODE5'):
return '业务异常2'
else:
return '服务异常'
- ErrorMessage 去划分
if 'mes1' in message or 'mes2' in message or 'mes3' in message:
return '业务异常'
else:
return '服务异常'
这样就能看出 异常Code和Msg有多么重要了, 特别是ErrorCode, 因为msg应该是一种提示的作用,不能强依赖这个msg来去区分异常, 而Code则是既定的,有限的几种,不应该跟msg一样灵活无边界, 每次增加一个code都应该去告知对接方, 可以打比方成 http code 一样。
一个线程最好包揽一个维度的事情
多线程确实是提高处理速度的利器,但却是因为并发抢占式而带来了很多问题,怎么才能让多线程带来的问题最小化呢? 我觉得可以在以下两点做思考:
- 一个线程最好包揽一个维度的事情
在这次1.0系统中, 一个线程的粒度已经到了record层次, 调控的最小粒度是店铺, 数据库允许并发的粒度也是店铺。 所以便形成多 多个线程对应一个店铺操作的事情, 多线程的合作肯定比单线程独立工作要难控制的多,一大堆问题随之而来了。
- 一个线程的处理逻辑尽可能的简单
在编写多线程并发代码时候, 尽可能的让彼此线程之间不受影响。 避免考虑当程序在这个线程运行的同时还要考虑其他线程运行怎么样。 其实这点也主要跟上一点有强关联, 如果线程并发的维度足够大, 就不会出现要彼此考虑的事情(当然,如果做不到完全不考虑彼此线程,也要尽可能减少逻辑上的交集)。
分布式锁
这里为了避免多个线程抢占一个店铺, 用Redis做了分布式锁, 因为是容忍网络抖动,发布等中断delete操作的, 毕竟一个店铺的执行延迟1min也没什么太大问题。
public boolean lockLpdRegulateShop(Long shopId) {
boolean isLock = false;
LocalDateTime lockTime = LocalDateTime.now();
long nowLockTimestamp = DateTimeUtil.toTimestamp(lockTime);
String redisKey = RedisKeysFactory.generateMessageLockRedisKey(shopId);
String redisValue = String.format("%s", nowLockTimestamp);
int expireSeconds = 60
long nxValue = redis.setnx(redisKey, redisValue);
if (nxValue == 1) {
// 如果当前Redis中没有锁这个店, 就把这个店上锁, 返回设置锁成功
redis.expire(redisKey, expireSeconds);
isLock = true;
} else {
// 如果当前Redis中锁了这个店,判断这个店是不是已经死锁了(过了超时时间,但是仍然没有释放锁)
// 如果死锁了,重新设置value和expire 返回设置锁成功
// 如果没有死锁,则返回设置锁失败
String nowRedisKeyLockTimeStr = redis.get(redisKey);
try {
Long nowRedisKeyLock = Long.parseLong(nowRedisKeyLockTimeStr);
// 这里多减5s是为了确定锁确实过期了
if ((nowLockTimestamp - nowRedisKeyLock - 5) > expireSeconds) {
logger.error(String.format("Redis中该key已经到了超市时间,但是仍然占用着redis锁 key:%s", redisKey));
redis.setex(redisKey, expireSeconds, redisValue);
isLock = true;
} else {
isLock = false;
}
} catch (Exception e) {
logger.error(String.format("序列化redis值异常 key:%s, value:%s", redisKey, nowRedisKeyLockTimeStr), e);
redis.setex(redisKey, expireSeconds, redisValue);
isLock = true;
}
}
return isLock;
}