LongAdder | LongAccumulator简介

说到LongAdder,不得不提的就是AtomicLong。AtomicLong是JDK1.5开始出现的,里面主要使用了一个long类型的value作为成员变量。然后使用循环的CAS操作去操作value的值。

优化思想

LongAdder是JDK1.8开始出现的,所提供的API基本上可以替换掉原先的AtomicLong。LongAdder所使用的思想就是热点分离,这一点可以类比一下ConcurrentHashMap的设计思想。就是将value值分离成一个数组,当多线程访问时,通过hash算法映射到其中的一个数字进行计数。而最终的结果,就是这些数组的求和累加。这样一来,就减小了锁的粒度。如下图所示:

在实现的代码中,LongAdder一开始并不会直接使用Cell[]存储。而是先使用一个long类型的base存储,当casBase()出现失败时,则会创建Cell[]。此时,如果在单个Cell上面出现了cell更新冲突,那么会尝试创建新的Cell,或者将Cell[]扩容为2倍。代码如下:

    public void increment() {
        add(1L);
    }

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {// 如cells不为空,直接对cells操作;否则casBase
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))    // CAS cell
                longAccumulate(x, null, uncontended);    // 创建新的Cell或者扩容
        }
    }

对比测试

下面给出一段LongAdder和Atomic的对比测试代码:

public class CompareTest {
	public static final int THREAD_COUNT = 1000;
	static ExecutorService pool = Executors.newFixedThreadPool(THREAD_COUNT);
	static CompletionService<Long> completionService = new ExecutorCompletionService<Long>(pool);
	static final AtomicLong atomicLong = new AtomicLong(0L);
	static final LongAdder longAdder = new LongAdder();

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		long start = System.currentTimeMillis();
		for(int i = 0; i < THREAD_COUNT; i++) {
			completionService.submit(new Callable<Long>() {
				@Override
				public Long call() throws Exception {
					for(int j = 0; j < 100000; j++) {
//						atomicLong.incrementAndGet();
						longAdder.increment();
					}
					return 1L;
				}
			});
		}
		for(int i = 0; i < THREAD_COUNT; i++) {
			Future<Long> future = completionService.take();
			future.get();
		}
		System.out.println("耗时:" + (System.currentTimeMillis() - start));
		pool.shutdown();
	}
}

测试结果对比图如下:

从上结果图可以看出,在并发比较低的时候,LongAdder和AtomicLong的效果非常接近。但是当并发较高时,两者的差距会越来越大。上图中在线程数为1000,每个线程循环数为100000时,LongAdder的效率是AtomicLong的6倍左右。

LongAccumulator介绍

LongAccumulator是LongAdder的功能增强版。LongAdder的API只有对数值的加减,而LongAccumulator提供了自定义的函数操作。其构造函数如下:

    // accumulatorFunction:需要执行的二元函数(接收2个long作为形参,并返回1个long);identity:初始值
    public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }

上面构造函数,accumulatorFunction:需要执行的二元函数(接收2个long作为形参,并返回1个long);identity:初始值。下面看一个Demo:

public class LongAccumulatorDemo {

    // 找出最大值
	public static void main(String[] args) throws InterruptedException {
		LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
		Thread[] ts = new Thread[1000];

		for (int i = 0; i < 1000; i++) {
			ts[i] = new Thread(() -> {
				Random random = new Random();
				long value = random.nextLong();
				accumulator.accumulate(value); // 比较value和上一次的比较值,然后存储较大者
			});
			ts[i].start();
		}
		for (int i = 0; i < 1000; i++) {
			ts[i].join();
		}
		System.out.println(accumulator.longValue());
	}
}

从上面代码可以看出,accumulate(value)传入的值会与上一次的比较值对比,然后保留较大者,最后打印出最大值。


参考:《Java高并发程序设计》

代码段 小部件