ForkJoin & Quasar

发表信息: by

用ForkJoin&Quasar对IO密集型服务进行优化

背景介绍

做业务系统开发面对的服务大都是IO密集型服务,这里指的IO可大致分为如下几种:

  • 数据库 IO
  • 缓存 IO
  • 网络 IO

这里暂时不谈 缓存IO,因为缓存IO都发生在内存上,速度很快,可以忽略这部分IO

也有人可能会疑惑为什么会存在网路IO?

现在稍微大一点的公司都在做微服务,根据业务领域进行服务划分,将服务与服务尽可能的进行解耦,方便管理,方便维护,将开发效率最大化。

但也因此,很多单服务不存在的问题也会随即而来,比如增加网络IO:所有特定业务领域内的服务将可提供服务注册到治理中心,由治理中心进行服务调度分发,服务与服务之间不再简单的通过内存接口进行交流,而是将请求信息通过网络打到所依赖的服务上,网络延迟可是不可避免的。如果网络较好的情况下大概几毫秒到几十毫秒,如果网络不好就可能出现几百毫秒甚至秒级。

微服务调用

这里我们给出个概念图,当请求A服务时,A服务要调用BCD三个服务,并且要查询一次数据库 E, 并且将四个结果组装成一个实体返回出去。 如果A内部代码是顺序处理的,那么这次请求的整体响应时间则为 B的响应时间+C的响应时间+D的响应时间+E的响应时间+A内部逻辑处理时间。 (B的响应时间包括A请求B的网络延迟时间)。也就是说A的响应时间不单单与自己的代码逻辑有关,还要与其他服务的代码逻辑有关,还要与网络延迟有关, 如果其中有一个环节延迟增加都会增加整个请求的响应时间。

一个请求大概是这样的(OtherService包括DB读写):

顺序时序图1

优化

服务层面的优化的方向大致有下面这几种,优化的地方都是在 MyService -> OtherService 这一步上:

  • 合并多次请求
  • 请求异步代理
  • 请求并发处理

合并多次请求

将多次请求合并成一次,将单量调用改成批量调用。 这种是变动最小,风险最小的办法,却也是比较难推动的办法。

如果我们调用的服务仅仅提供了单量调用接口,就需要被调用方进行批量接口提供。需要各服务之间协作支持(也许我们提出的是一个比较特别的需求,也许提供接口的性价比不高,很容易遭受到依赖方的拒绝~),这种方法无关技术,属于交流解决问题。

请求异步代理

这种方法需要在 MyServiceOtherService 之间再介入一层 Agent ,大致结构如下:

请求异步代理1

A要调用OtherService的时候,会让Agent异步去请求OtherService,当AgentOtherService请求有结果的时候将结果异步返回给A,这样整体的响应时间也就变成 内部逻辑时间 + max(OtherService Response Times)

但这种架构的修改带来系统稳定性的降低,服务A和服务B、C、D之间的通讯增加了复杂性。同时,因为是异步方式,服务A的业务也要实现异步方式,否则还是一个阻塞的架构。 而且对Agent的实现也有着比较高的并发要求。

请求并发处理

请求并发处理应该是相对来说比较简单且实用的实现办法, 而且有现成的框架支持并发处理,比如Executor, ForkJoin, Quasar …

Executor

提供现有的线程池,代码简单

ExecutorService executorPool = Executors.newFixedThreadPool(10);
Future<Object> future = executorService.submit(new Callable<Object>() {
    public Object call() throws Exception {
        OtherService otherService = new OtherService();
        return otherService.dispose();
    }
})
// future.get() 拿到返回值 OR 异常

ForkJoin

与 Executor 类似, ForkJoin 提供的线程池(感觉不应该叫做线程池了)最大为 cpu核心数,虽然Excutor线程池大小可以自定义,但其实真正能并发运行的不过CPU的核心数那么多,cpu核心数 的线程池感觉足够了。

ForkJoin 还增加了work-stealing(工作窃取算法),为了提高对任务的处理效率。

工作窃取1,图片来源网络,侵删

两个线程队列同时处理任务,如果线程队列1的任务已经处理完毕且没有新任务进来时,如果其他线程队列有任务没处理完,线程1回去其他队列里"偷取"任务用来加快处理。

这里一定是从队列尾偷任务

其实代码写起来也是蛮简单的, 与Executor类似

ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Object> future = forkJoinPool.submit(new Callable<Object>() {
    public Object call() throws Exception {
        OtherService otherService = new OtherService();
        return otherService.dispose();
    }
})

算法和线程池的配置都是封装好的,没什么可以说的。

Quasar

这个是Java的一个实现协程的框架,大致的原理如下:

  • Quasar 中存在默认的work线程
  • work线程轮询执行协程任务
  • 当该任务遇到阻塞的时候(比如IO任务),协程自动挂起
  • 当阻塞完成的时候,恢复协程,继续执行

是不是很像“请求异步代理”, work线程就像Agent服务,协程就像抛给Agent的任务,不同是Agent服务(前提是Agent服务是用异步线程实现的,如果用协程实现,没啥区别)将会用异步线程去等待IO并且返回结果,而Quasar的work线程是轮询这些协程任务,有IO完成的便让这个任务返回。

协程不是线程,更不是进程,所以协程并非异步,但却可以充分利用CPU,不要将CPU的宝贵资源浪费在等待IO阻塞上,因为不是线程,所以不用担心线程的维护。更不用担心大量线程阻塞导致系统可用资源下降。

使用代码随下章节给出。

性能比较

我们模拟一下遇到的服务场景,分别用 Executor, ForkJoin, Quasar 进行顺序请求代码的修改,看下效率如何

代码结构如下:

  • Main 充当请求方,可以并发对调用MyService中的方法
  • MyService 充当我们自己的服务,响应Main进来的请求
  • OtherService 充当别人的服务,MyService依赖OtherService中的方法
    • OtherService 以Sleep来模拟响应延迟

Main.java


public class Main {
    private static ExecutorService executorPool;
    private static int poolSize = 300;
    private static int requestSize = 300;

    private static void init() {
        MyService.DEFAULT_DEPEND_COUNT = 5; //MyService 调用其他服务的次数
        MyService.disposeType = MyService.DISPOSE_TYPE.COROUTINE; //MyService 处理类型,分为协程,单线程,多线程(forkjoin, executor)

        OtherService.IS_RANDOM_DELAY = Boolean.FALSE; // 延迟是否随即生成
        OtherService.DEFAULT_DELAY = 500; //默认延迟时间 IS_RANDOM_DELAY==FALSE 时候生效
        OtherService.DEFAULT_TIMEOUT = 10000; // 默认超时时间
        OtherService.DEFAULT_DELAY_UPPER = 10000; //随机延迟的上限 IS_RANDOM_DELAY==TRUE的时候生效

        poolSize = 1; // 请求方线程池的大小 这个无关乎MyService和OtherService
        requestSize = 1; //并发请求次数
        executorPool = Executors.newFixedThreadPool(poolSize);
    }

    public static void main(String[] args) {
        init();
        int successCount = 0;
        int failCount = 0;
        Long startTime = System.currentTimeMillis();

        final CountDownLatch countDownLatch = new CountDownLatch(requestSize);
        final List<Future<Boolean>> results = new ArrayList<Future<Boolean>>();
        for (int i = 0; i < requestSize; i++) {
            results.add(executorPool.submit(new Callable<Boolean>() {
                public Boolean call() throws Exception {
                    boolean result = new MyService().dispose();
                    countDownLatch.countDown();
                    return result;
                }
            }));
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (Future<Boolean> future : results) {
            try {
                if (future.get()) {
                    successCount++;
                } else {
                    failCount++;
                }
            } catch (Exception e) {
                failCount++;
            }
        }

        Long endTime = System.currentTimeMillis();

        System.out.println(String.format("请求完成, 耗时 %s ms, 请求成功%s次, 失败%s次", (endTime - startTime), successCount, failCount));

        finish();
    }

    private static void finish() {
        executorPool.shutdown();
    }
}

MyService.java

public class MyService {
    public static int DEFAULT_DEPEND_COUNT = 3; // 默认依赖其他服务调用次数
    public static DISPOSE_TYPE disposeType = DISPOSE_TYPE.SEQUENCE;
    private int dependCount = DEFAULT_DEPEND_COUNT;

    public enum DISPOSE_TYPE {MULTI_THREAD, FORKJOIN, COROUTINE, SEQUENCE}

    public MyService() {
    }

    public MyService(int dependCount) {
        if (dependCount < 0) {
            this.dependCount = DEFAULT_DEPEND_COUNT;
        }
    }

    public boolean dispose() {
        switch (disposeType) {
            case MULTI_THREAD:
                return disposeByExecutor();
            case COROUTINE:
                return disposeByCoroutine();
            case FORKJOIN:
                return disposeByForkJoin();
            default:
                return disposeBySequence();
        }
    }

    //forkjoin处理
    private boolean disposeByForkJoin() {
        boolean result = false;

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(dependCount);
        //forkjoin 线程池 带work-stealing
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        final CountDownLatch countDownLatch = new CountDownLatch(dependCount);

        for (int i = 0; i < dependCount; i++) {
            futures.add(forkJoinPool.submit(new Callable<Boolean>() {
                public Boolean call() throws Exception {
                    OtherService otherService = new OtherService();
                    boolean result = otherService.dispose();
                    countDownLatch.countDown();
                    return result;
                }
            }));
        }

        try {
            countDownLatch.await();
            result = Boolean.TRUE;
            for (Future<Boolean> future : futures) {
                result &= future.get();
            }
        } catch (Exception e) {
            result = Boolean.FALSE;
        }

        return result;
    }

    // 协程处理
    private boolean disposeByCoroutine() {
        boolean result = false;

        List<Fiber<Boolean>> fibers = new ArrayList<Fiber<Boolean>>();
        for (int i = 0; i < dependCount; i++) {
            fibers.add(new Fiber<Boolean>(new SuspendableCallable<Boolean>() {
                public Boolean run() throws SuspendExecution, InterruptedException {
                    OtherService otherService = new OtherService();
                    return otherService.dispose();
                }
            }).start());
        }

        result = Boolean.TRUE;
        try {
            for (Fiber<Boolean> fiber : fibers) {
                result &= fiber.get();
            }
        } catch (Exception e) {
            result = Boolean.FALSE;
        }

        return result;
    }
    
    // Executor 处理
    private boolean disposeByExecutor() {
        boolean result = false;

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(dependCount);
        // 普通executor线程池 不带work-stealing
        ExecutorService executorService = Executors.newFixedThreadPool(dependCount);
        final CountDownLatch countDownLatch = new CountDownLatch(dependCount);

        for (int i = 0; i < dependCount; i++) {
            futures.add(executorService.submit(new Callable<Boolean>() {
                public Boolean call() throws Exception {
                    OtherService otherService = new OtherService();
                    boolean result = otherService.dispose();
                    countDownLatch.countDown();
                    return result;
                }
            }));
        }

        try {
            countDownLatch.await();
            result = Boolean.TRUE;
            for (Future<Boolean> future : futures) {
                result &= future.get();
            }
        } catch (Exception e) {
            result = Boolean.FALSE;
        }

        executorService.shutdown();

        return result;
    }

    // 顺序处理
    private boolean disposeBySequence() {
        boolean result = false;

        OtherService otherService = new OtherService();

        try {
            for (int i = 0; i < dependCount; i++) {
                // 调用其他服务,300ms的延迟
                result = otherService.dispose();

                if (!result) {
                    break;
                }
            }
        } catch (Exception e) {
            result = Boolean.FALSE;
        }


        return result;
    }
}

OtherService


public class OtherService {
    public static int DEFAULT_TIMEOUT = 3000; //默认超时时间 3s
    public static int DEFAULT_DELAY = 300; //默认延迟300ms
    public static int DEFAULT_DELAY_UPPER = 5000; //默认延迟上限 10s
    public static boolean IS_RANDOM_DELAY = Boolean.FALSE; //延迟随机

    // 这个注解是Quasar提供的,表明这个方法发生阻塞的时候是可以被挂起的
    @Suspendable
    public boolean dispose() {
        boolean result = false;

        int delay = DEFAULT_DELAY;

        if (IS_RANDOM_DELAY) {
            Random random = new Random();
            delay = random.nextInt(DEFAULT_DELAY_UPPER);

        }

        if (delay > DEFAULT_TIMEOUT) {
            result = false;
        } else {
            result = true;
        }

        try {
            Strand.sleep(delay);
        } catch (Exception e) {
            e.printStackTrace();
            result = false;
        }

        return result;
    }
}

为了测试方便,在Main中,我们加入了一些可配置的东西,都在init()方法中

测试数据和结果

序号 请求次数 每次请求MyService调用OtherService次数 平均延迟 超时时间 调用方式 响应时间ms 备注
1-0 1 5 500ms 3000ms SEQUENCE 2730  
1-1 1 5 500ms 3000ms MULTI_THREAD 677 Executor线程池10
1-2 1 5 500ms 3000ms FORKJOIN 1160  
1-3 1 5 500ms 3000ms COROUTINE 1146  
序号 请求次数 每次请求MyService调用OtherService次数 平均延迟 超时时间 调用方式 响应时间ms 备注
2-0 1 1000 500ms 3000ms SEQUENCE 这个时间长的不想等了  
2-1 1 1000 500ms 3000ms MULTI_THREAD 5235 Executor线程池10
2-2 1 1000 500ms 3000ms FORKJOIN 126118  
2-3 1 1000 500ms 3000ms COROUTINE 1146  

ForkJoin 响应时间长的有点惊人,打印jstack发现仅仅有4个work线程处理任务 jstack

序号 请求次数 每次请求MyService调用OtherService次数 平均延迟 超时时间 调用方式 响应时间ms 备注
3-0 1000 5 500ms 3000ms SEQUENCE 3027  
3-1 1000 5 500ms 3000ms MULTI_THREAD OOM Executor线程池10
3-2 1000 5 500ms 3000ms FORKJOIN OOM  
3-3 1000 5 500ms 3000ms COROUTINE 1389  
序号 请求次数 每次请求MyService调用OtherService次数 平均延迟 超时时间 调用方式 响应时间ms 备注
4-0 10000 3 500ms 3000ms SEQUENCE 21186  
4-1 10000 3 500ms 3000ms MULTI_THREAD OOM Executor线程池10
4-2 10000 3 500ms 3000ms FORKJOIN OOM  
4-3 10000 3 500ms 3000ms COROUTINE 5906  

是不是感觉Quasar的表现棒棒的~

jstack

下面我们把延迟调长点:300000,超时时间:300000,请求次数5,依赖调用次数5次, 看下线程栈(调长点是为了打堆栈可以反应过来) 点击下面的链接可以下载堆栈文件:

👉 SEQUENCE

👉 MULTI_THREAD

👉 FORKJOIN

👉 COROUTINE

上面堆栈信息可以解释上面几个问题:

  • 3-2 / 3-3 的 OOM:

1000 * 5 数量级的线程被创建,估计很容易被OOM吧~

  • 2-2 响应时间 126118ms

上面的数量级打到1000,从jstack来看 ForkJoin 仅仅开了3个work线程,响应时间肯定会很长

  • Quasar 底层的work线程是用fork-join-pool实现的

还可以更好

毕竟Sleep模拟IO延迟不太合适,本来是准备了1000W的DB数据做 limit offset 来模拟 DB长时间IO的,但是考虑到多线程读DB有回带来另外的问题,加上实在是太晚了(已经凌晨两点了,第二天还要上班……),所以想了两个办法,1. 找时间搞一搞DB延迟。 2. 拿公司的项目试一下,看压测结果便知道效果了。

如果上面Demo写的哪里有问题,欢迎指出改进。

参考资料

感谢

感谢公司同事提供的各种帮助和引导,在争论中成长。

GitHub

这里的代码是从官方推荐代码 Clone下来并且做了些修改之后的代码