Contents

2 叉与 3 叉 Software Combining Tree 的理解与实现

在《The Art of Multiprocessor Programming》中介绍了一种 Java 实现的 2 叉 Combining Tree,可以用于并发计数。相比于使用 AtomicInteger,具有更好的并发性。因为实际上原子型整数尽管使用了原子指令来实现,算术运算本身仍然是串行的,并没有做到 “real parallelism”。本文介绍一下 Combining Tree 的思想,并按照书后的习题,将它改成 3 叉的实现。在本文末尾包含几项简单的测试,用于比较 Combining Tree 与 AtomicInteger 的性能差异。

简单介绍

Combining Tree 是一种可以用作 Barrier 实现的数据结构。多个并发线程访问同一个对象,且只访问一次,等到每个线程都确认所有的线程都已经访问过该对象(并发计数器数值达到线程数),线程就可以离开 Barrier。使用 Combining Tree 可以提高并发程度,从而达到比 AtomicInteger 更低的平均延迟。

思路

要让 “计数” 这个操作并行,我们考虑将每一个操作并行执行,再将结果树状合并在一起。树的叶子上摆放着若干线程,而树的每一个节点(包括叶节点和内部节点)都将自己的子节点的数据合并在一起向上传递,等待上层结果返回信息,再将该结果继续向下层节点返回。对于 2 叉树,来自子节点的两个线程发生竞争。定义先到达本节点的线程为主动线程,后到达本节点的线程为被动线程。被动线程只告知本节点来自子树的数值,而主动线程负责将数据向上层传递,再从上层向下分发结果。被动线程只需要一直等待主动线程把上层的结果传递回来即可。如果合理地安排线程间的同步,完成上述操作,树内部节点的竞争问题也就解决了。但是,还需要照顾到的一个情形是:可能某些线程根本就没参与并发计数(例如说它还卡在执行并发计数之前的一些操作),此时树内部的某些节点根本就没有发生竞争,两棵子树只有一棵传递来了数值。那么节点在这种情况下,只需要向上传递,再单侧地向下传递即可。

上面的分析简单解释了如果想要通过树状的数据结构进行并发计数需要些什么。但是我们真正能分配的任务,是每一个线程,在线程内部安排特定方法调用,让线程内部按顺序执行。因此我们再从线程的角度,重新审视一下需要的操作。一个线程从叶子节点出发,试图向上行进,传递操作数。检测是否有线程在争抢当前的节点,如果没有的话则继续向上传递操作数,如果发现已经有线程在该节点,则被动等待主动线程回传操作数。而主动线程负责合并两个线程的数值,携带着该数值继续向上。最后主动线程带着上层的结果返回本节点,告知被动线程。此后两个线程各自向自己的子树告知合并结果。

这里有个逻辑上的小骗局,实际上,只有后来的那一个线程可以方便地探测是否存在竞争,先到的那个在后来的那个来之前,确实认为没人跟自己竞争!先到的那个线程只有在试图合并两个子树结果的阶段,才会感知到第二个线程的存在。

2 叉实现

基于上面的一些分析,一个线程要执行这样的一些操作。此处我们把线程均匀分配在叶子节点上,保持每个叶子节点两个线程。每个叶子一个线程也是一样的,只不过所有的叶子节点都没有竞争,第一次竞争发生在叶子节点的上一层而已。

public int getAndAdd(int val) throws Exception {
        Stack<Node> stack = new Stack<>();
        Node myLeaf = leaf[((int) Thread.currentThread().getId()) % leaf.length];
        Node node = myLeaf;
        // pre-combining phase
        while (node.preCombined()) {
            node = node.parent;
        }
        Node stop = node;
        // combining phase
        node = myLeaf;
        int combined = val;
        while (node != stop) {
            combined = node.combine(combined);
            stack.push(node);
            node = node.parent;
        }
        // operation phase
        int prior = stop.op(combined);
        // distribution phase
        while (!stack.empty()) {
            node = stack.pop();
            node.distribute(prior);
        }
        return prior;
    }

用这样的一张图来表示数据流向(这图是我自己画的,我画了蛮久,自认为还是比较全面的)

images/software_combining_tree.png

节点 N 的并发数据流向

图中左侧的 A 线程先到达 N 节点,右侧的 B 线程后到达。参照着这张图,再来解释一下这段代码。preCombine 方法用于探测竞争,先到的线程 A 认为没有竞争,于是走向更高的层级。后到的线程 B 执行 op 方法,把自己带来的数值写入 N 节点,然后开始自旋等待。A 线程在 combine 操作阶段发现了竞争,一直等待 B 线程写入 B 侧的数值,等待拿到这个数之后,将它和自己这侧的数相加,并向上传递。A 线程在更高层次执行完相应的操作后,会从上至下 distribute,把上层的值向下传递。A 线程在 N 节点处,拿到上层的结果,通知 B 线程完整的数值,之后 A 自己继续向下 distribute。B 线程得到 A 告知的上层值之后,开始向右侧的子树执行自己这侧的 distribute。

总结一下:

  • 从线程的角度看,一个线程试图 combine 当前无竞争节点的数值,遇到竞争之后等待竞争线程告知结果,此后向下层 distribute。
  • 从节点的角度看
    • 无竞争的执行路径是:
      • preCombine, combine, distribute
    • 有竞争时的执行路径是:
      • A: preCombine
      • B: preCombine
      • B: op
      • A: combine
      • A: distribute

接下来我们详细讲一下这 4 个方法之间是如何控制同步的。该数据结构有两种类型的同步:4 个主要方法都是 synchronized,方法调用本身保持原子性,称为短程同步;locked 域用于跨方法的同步,强制调整方法执行的先后顺序,成为长程同步。(短程和长程是我胡诌的两个词,英语原文分别是 short-term/long-term)。每个节点本身具有状态,除根节点外,每个节点的初始状态都是 IDLE,根节点始终保持 ROOT 状态,方便特殊处理。

synchronized boolean preCombine() throws Exception {
        while (locked) wait();
        switch (cStatus) {
            case IDLE:
                cStatus = CStatus.FIRST;
                return true;
            case FIRST:
                locked = true;
                cStatus = CStatus.SECOND;
                return false;
            case ROOT:
                return false;
            default:
                // If correctly implemented, this branch never executes.
                throw new Exception("Unexpected Node state" + cStatus);
        }
    }

先来看 preCombine 方法。第一个进入节点 N 的线程 A 执行了第一次 preCombine,设置节点状态为 FIRST,表明节点当前有被第一个节点访问。但是此时线程 A 并不会设置 locked 为 true,这样如果有第二个线程执行 preCombine,也可能进入该线程。第二个进入该节点(如果存在的话) N 的线程 B 执行了第二次 preCombine,设置节点状态为 SECOND,表明节点已经被第二个线程访问。同时锁住节点 N,禁止其余的线程进入。此后,直到 AB 两个线程关于 N 的操作悉数结束后,其余的线程才能开始参与 N 的下一轮竞争。

synchronized int combine(int combined) throws Exception {
        while (locked) wait();
        locked = true;
        firstValue = combined;
        switch (cStatus) {
            case FIRST:
                return firstValue;
            case SECOND:
                return firstValue + secondValue;
            default:
                // If correctly implemented, this branch never executes.
                throw new Exception("Unexpected Node state" + cStatus);
        }
    }

接下来看看 combine 方法。对于节点 N 而言,只有先到达它的 A 线程才会执行 combine 方法,后到达的 B 线程会执行 op 方法。 Combine 方法会检测 locked 域,如果被锁住则等待。这对应了第二个线程执行 preCombine 锁住节点 N 的操作。其实这里就是一个重要的分歧点:如果线程 B 还没来得及执行 preCombine 的时候,线程 A 已经执行了 combine,那么节点会被锁住,B 线程会在 preCombine 入口阻塞,在之后的执行流程中节点 N 都被 A 线程独享;而如果 B 抢先执行了 preCombine,那么 A 线程会在 combine 入口阻塞,直到 B 线程在合适的时候释放节点 N 的锁。

此时,如果在当前线程看来节点 N 被自己独享(cStatus == FIRST),那么我直接返回 firstValue 即可(firstValue 代表下方子树的线程交给我的值)。如果节点存在竞争,之前已经说过,直到 B 在合适的时候唤醒 A,A 才会开始 combine,此时 B 已经写入了合理的 secondValue 值,此值代表另一个子树向上传递的值。那么 A 节点需要将左右两侧的值相加合并,再继续向上传递。

synchronized int op(int combined) throws Exception {
        switch (cStatus) {
            case ROOT:
                int prior = result;
                result += combined;
                return prior;
            case SECOND:
                secondValue = combined;
                locked = false;
                notifyAll();
                while (cStatus != CStatus.RESULT) wait();
                locked = false;
                notifyAll();
                cStatus = CStatus.IDLE;
                return result;
            default:
                // If correctly implemented, this branch never executes.
                throw new Exception("Unexpected Node state" + cStatus);
        }
    }

再来看看 op 方法。此处再次提醒,对于处于竞争的节点 N,由后到达 N 的线程 B 执行 op,而线程 A 是不会执行 op 的。根节点是特殊情况,在此处更新节点的 result,并返回旧 result(因为我们在实现 getAndAdd 方法,所以要返回旧值)。由于只有 B 线程执行 op,所以此处除 ROOT 外只有 SECOND 状态是可能的,此时 B 写入下层的 combined 值到 secondValue。然后唤醒阻塞在 combine 入口的 A。此后 A 可以读到正确的 firstValue 与 secondValue,它们分别来自节点 N 的两棵子树,然后 A 执行了正确的一系列操作,直到节点 N 内部被写入正确的结果(写结果时状态置为 RESULT)。因此 B 在释放锁唤醒 A 之后,自旋等待结果产生,然后再次释放锁(线程 A combine 阶段加的锁,交给了此时 B 释放),并设置节点 N 状态为 IDLE。此后 B 线程执行的操作其实和节点 N 就无关了:B 线程会在 N 的一些子节点上执行 distribute,当然,distribute 函数的参数就是本次 op 返回的结果。

synchronized void distribute(int prior) throws Exception {
        switch (cStatus) {
            case FIRST:
                cStatus = CStatus.IDLE;
                locked = false;
                break;
            case SECOND:
                result = prior + firstValue;
                cStatus = CStatus.RESULT;
                break;
            default:
                // If correctly implemented, this branch never executes.
                throw new Exception("Unexpected Node state" + cStatus);
        }
        notifyAll();
    }

接下来再看看 distribute 函数。其实之前的几个方法该做的都差不多了,distribute 就是把结果整合一下。只不过有一些合并的小 tricks 在。Distribute 方法承担着对非根节点更新 result 的功能。当节点执行 distribute 时处于 FIRST 状态,这意味着整个更新过程其实都没有别的线程竞争,那么直接释放即可,不需要更新 result。因为非根节点的 result 这个域其实只在一轮竞争中是有效的,下一轮竞争就赋新值了(或者下一轮也还是没用上)。当处于 SECOND 状态时,说明另一个线程也占用了当前节点。参数 prior 来自于本线程作为主动线程的(在更高层的)所有节点的 combine 之和,它包含了除当前在 N 竞争的两个线程以外的线程已经生效(已经发送到高层节点)的更新。将 prior 与 firstValue 相加(firstValue 是来自于 A 的值),即 A 更新后的结果,这一结果包含了除 B 线程以外的更新。这里需要多说明一下,在节点 N 这里 A 是主动 B 是被动,但是在 N 的右子树 B 才是主动线程,需要由 B 告知更低层的阻塞在 op 等待结果的线程 P,高层的结果,这个结果是需要只包含 A 不包含 B 的,因为 P 期望获得一个除了 P 自己的更新没生效,其余的并发更新都生效了的结果。P 的更新因为 combine,而被包含在了 B 的值中,所以 P 期望获得一个来自更高层次的除自身线程以外的更新。实际上这里的 P 相当于是低层次节点上 A 的角色。

从 2 叉到 3 叉

2 叉的 Combining Tree 中,每个节点最多只有两个线程参与 preCombine 之后的竞争,后到的线程 B 在执行 op 操作时,前一个线程 B 一定阻塞在 combine 处,B 对节点的解锁操作是希望 A 线程开始执行后续的操作。但是对于 3 叉的情况而言,此时可能还有一个线程 C 即将执行 preCombine。此时线程 B 如果先执行了 op,通知阻塞在 combine 阶段的线程 A 解锁,同时 C 进入 preCombine,修改了节点状态,则 A 线程的 combine 操作会出错,无法合并出正确的结果。因此需要修改节点的 locked 域为一个整型变量,通过它的值来阻塞或放行处于不同阶段的操作。期望的几条可能的执行路线如下:

  • 该节点没有竞争,由一个线程独享。由该独享线程执行 preCombine,combine,distribute。
  • 该节点有两个线程竞争。进入该状态的执行历史是:第一个线程 A 执行了 preCombine,第二个线程执行 preCombine,op。  此时需要将第一个线程阻塞在 combine 阶段,同时确保可能存在的第三个线程无法进行 preCombine,让它参与下一轮竞争。
  • 该节点有三个线程竞争。进入该状态的执行历史是:第一个线程 A 执行了 preCombine,第二个线程 B 执行了 preCombine,  第三个线程 C 执行了 preCombine。此后 B,C 中的某一个执行了 op 操作。此时需要保证 A 线程阻塞在 combine 阶段,  其余的竞争线程全部阻塞在 preCombine 阶段,让它们参与下一轮竞争。

上面的分析解决了竞争的问题。当有 3 个线程参与竞争时,后两个线程都会执行 op 操作,但顺序并不确定。实际上我们只希望这两个线程一个写入 secondValue 一个写入 thirdValue,然后由第一个线程执行 combine。因此不同于 FIRST 和 SECOND,3 线程竞争的状态被拆分为了 2 个:THIRD1 和 THIRD2。THIRD1 表示正在等待后进入的两个线程中的任意一个写入值,第一个执行 op 的线程写入 secondValue 并修改状态为 THIRD2,它不需要唤醒第一个线程执行 combine 因为第三个线程还没写入合理的值。THIRD2 表示正在等待第三个线程写入值,第二个执行 op 的线程写入 thirdValue,并唤醒第一个线程执行 combine。

这里就不详细贴代码了,完整的代码可以参见:https://github.com/LinHeLurking/binary-and-trinary-combining-tree

性能比对

费了这么久的努力,如果没有性能提升怎么能行。用如下的一份代码来比较原子整数,二叉 Combining Tree,三叉 Combining Tree 的性能差异。

public class Main {
    public static void testPerformance(Runnable task, String taskName, int threadNum) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        long start = System.nanoTime();
        for (int i = 0; i < threadNum; ++i) {
            executor.submit(task);
        }
        executor.shutdown();
        if (executor.awaitTermination(20, TimeUnit.SECONDS)) {
            long end = System.nanoTime();
            double duration = (double) (end - start) / 1000; // us
            System.out.format("%s Finished.\n", taskName);
            System.out.format("Duration: %s us\n\n", duration);
        } else {
            System.out.format("%s timeout!", taskName);
        }
    }

    public static void main(String[] args) throws Exception {
        int threadNum = 10;
        int eachRepeat = 1;
        AtomicInteger ai = new AtomicInteger(0);
        CombiningTree ct2 = new CombiningTree(threadNum, 2);
        CombiningTree ct3 = new CombiningTree(threadNum, 3);
        testPerformance(() -> {
            for (int i = 0; i < eachRepeat; ++i) {
                ai.getAndIncrement();
            }
        }, "AtomicInteger", threadNum);
        testPerformance(() -> {
            try {
                for (int i = 0; i < eachRepeat; ++i) {
                    ct2.getAndIncrement();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Binary Combining Tree", threadNum);
        testPerformance(() -> {
            try {
                for (int i = 0; i < eachRepeat; ++i) {
                    ct3.getAndIncrement();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Trinary Combining Tree", threadNum);
        boolean flag = ai.get() == ct2.get() && ai.get() == ct3.get();
        if (flag) {
            System.out.println("Yes! Results are correct!");
        } else {
            System.out.println("No! Results are incorrect!");
        }
    }
}

这个计时没有剔除掉线程池构造等等开销,因此绝对数值没有明显参考意义,比较一下相对值即可。在我的 Windows 上,它会跑出这样的结果:

AtomicInteger Finished.
Duration: 4295.4 us

Binary Combining Tree Finished.
Duration: 3739.1 us

Trinary Combining Tree Finished.
Duration: 1845.2 us

Yes! Results are correct!

你可能会注意到,每个线程都只进行了一次操作,这合理吗?实际上如果再次回到文章开头介绍的那个 Barrier 问题,Combining Tree 就是适合于解决这种情形的。而如果真的把每个线程的操作次数增加,可以明显观察到性能下降,Combining Tree 会花费数十倍于原子整数的时间。这一点也是很合理的,因为当操作数增多之后,靠近树根的位置会出现激烈的竞争,平均延迟也会因此巨幅上升。所以说 Combining Tree 适合于每个线程执行的操作数不太多的情形,更加普适更加一般的场景下,基于原子指令的 AtomicInteger 会是更好的选择。毕竟计算机科学很大程度上就是 trade-off 的艺术,有得必有失也是符合我们历来的经验的。