CAS原理

CAS

第一节 简介

Compare and Swap比较并交换,设计并发算法时常用到的一种技术,JVM中的CAS操作通过处理器提供的 CMPXCHG 指令来实现(原子操作的实现原理)。

CAS有三个操作数:内存值V旧的预期值A要修改的值B当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做并返回false

普通的比较和CAS的区别主要是保证了原子性,前者在并发执行时是不安全的,原因和并发和同步中的银行例子一样,当判断生效后线程挂起,其它线程执行导致线程下次执行时状态已经不符合条件判断,但还是执行并返回了修改的结果。所以需要像锁机制那样给代码块加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public int v = 1;
public int a = 1;

public boolean compareAndSwapInt(int b){//普通的比较
if (a == v){
v = b;
return true;
}
return false;
}

/**
* 比较obj的offset处内存位置中的值和期望的值,如果相同则更新。此更新是不可中断的。
*
* @param obj 需要更新的对象
* @param offset obj中整型field的偏移量
* @param expect 希望field中存在的值
* @param update 如果期望值expect与field的当前值相同,设置filed的值为这个新值
* @return 如果field的值被更改返回true
*/
@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
int expected,
int x);

CAS保证了比较和交换是一组原子操作,不会被中断执行。因为CAS是硬件级别的操作,所以效率比对普通的比较加锁要高一些。


第二节 AtomicInteger

java.util.concurrent.atomic 并发包下的原子操作类都是基于CAS实现的,有 AtomicIntegerAtomicBooleanAtomicLong

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();//Unsafe类
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));//对象域偏移
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");//对象域偏移
private volatile int value;

public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}

public final int addAndGet(int delta) {
return U.getAndAddInt(this, VALUE, delta) + delta;
}

public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}

public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

....
}

第三节 CAS实例-线程安全的计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class CAS {
//原子包装的整型
private AtomicInteger atomicI = new AtomicInteger(0);
private int i = 0;

/**
* 使用CAS实现线程安全计数器
*/
private void safeCount() {
for (;;) {
//获取原子包装的值
int i = atomicI.get();
//CAS更新值
boolean suc = atomicI.compareAndSet(i, ++i);//预期值i,更新值++i
if (suc) {//执行成功则跳出,否则循环尝试
break;
}
}
}

/**
* 非线程安全计数器
*/
private void count() {
i++;
}

public static void main(String[] args) {
final CAS cas = new CAS();
//线程集合
List<Thread> ts = new ArrayList<Thread>(600);
//开始时间
long start = System.currentTimeMillis();
//遍历线程集合,构建每个线程,每个线程都分别调用线程安全和不安全的计数器
for (int j = 0; j < 100; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {//分别调用安全和不安全的计数器
cas.count();
cas.safeCount();
}
}
});
ts.add(t);
}
//遍历执行所有线程
for (Thread t : ts) {
t.start();
}
//等待所有线程执行完成
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印最终结果
System.out.println(cas.i);
System.out.println(cas.atomicI.get());
System.out.println(System.currentTimeMillis() - start);
}
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

//执行结果:
991833
1000000
44

试着分析为何出现以上结果?

对于 count() 假设v初始为1,线程1和线程2各自读取并处理v的副本。线程1执行后挂起,线程2执行并将结果写回内存,线程1计算完毕后副本值覆盖2值,自然导致数据异常。

对于 safeCount() 假设v初始为1,线程1和线程2各自读取并处理v的副本。

线程1执行到 compareAndSet(v, v + delta) ,线程切换。

线程2执行 compareAndSet(v, v + delta) ,此时内存v为1,执行成功,所以更新为2,继续执行v更新为3,线程切换。

线程1恢复,执行 compareAndSet 发现内存为3,自己expected为1,所以执行失败返回false。

循环执行到 atomicI.get() 线程2的修改对于线程1来说是可见的,所以线程1可以了解线程2修改了v并已经结束,所以v更新为3,并继续执行 compareAndSet ,成功后v更新为4。

以上过程不管有多少线程插队执行,CAS都保证了线程最终要在最新的内存数据上进行修改,保证了线程安全。

CAS并不是完美的,因为其终究是根据值是否修改来判断是否有进行操作,当出现ABA操作导致值未变化也会被CAS认为是未被修改过,当然在大部分使用CAS的场景下,ABA就可以被当作未修改来处理,如果需要区分时CAS也就不一定是此场景的最优选择了。


第四节 CAS实现原子操作的三大问题

Java的并发包中有些并发框架也使用了自旋CAS来实现原子操作,如 LinkedTransferQueueXfer() 方法,CAS常用来实现乐观锁,即每次的数据操作都是CAS,只有判断当前执行过程中没有其它线程对数据进行修改时才将结果存入。

4.1 ABA问题

CAS操作值时要检查值有没有变化,所以如果值由A->B->A仍可以通过检测,实际是发生过变化的。

解决方法:如加入修改次数,版本号。1A->2B->3A。

Java 1.5后可以通过Atomic包中的 AtomicStampedReference 解决ABA问题。compareAndSet() 会先检查当前引用是否等于预期引用,并检查当前标志是否等于预期标志,如果都相等,则以原子方式将引用和标志设置为给定的更新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

4.2 循环时间长开销大

自旋CAS长时间不能执行成功,会给CPU带来巨大的性能开销。

解决办法:JVM支持处理器提供的pause指令。

pause指令有两个作用:

  1. 延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间根据具体的实现决定。
  2. 避免在退出循环时因为内存顺序冲突(Memory Order Violation)引起CPU流水线被清空(CPU Pipeline Flush),从而提高了CPU的执行效率。

4.3 只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性。

解决办法:可以用锁,或者是把多个共享变量合并成一个共享变量来操作。

比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。

从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。


参考博客和文章书籍等:

《Java并发编程的艺术》

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容