并发和同步

一、并发

 首先思考一个问题,并发和并行在概念上有何区别?

 并发描述场景:多个用户交替使用同一资源;并行描述场景:多个用户同时使用多个资源;而串行则是描述:多个用户同时使用同一资源,前者未用完后者只能一直等待。

并发编程的三个问题

  • 原子性:即一个或多个操作,要么全部执行,要么全不执行。保证一系列操作的原子性,要么加锁,要么通过CAS即实现乐观锁的方法,在修改时比较此时数据是否和之前读的时候一致。
  • 可见性:多个线程访问同一个变量时,一个线程修改了此变量,其他线程也能立即看到最新的值。
  • 有序性:即程序执行的顺序按代码的先后顺序,JVM执行代码时可能会因为指令重排序导致执行顺序和代码顺序不一致,但JVM会保证最终结果一致,JVM会让指令依赖的上一条指令先执行,而不会在上条执行前执行依赖于其的指令。但多线程时可能会有问题,某一指令并不依赖于任何指令,但其执行顺序如果和编程者所安排的不一致,可能会导致逻辑错误,在不该执行的时候执行,不该终止的时候终止。

二、同步

 在实际应用场景中,多个线程往往需要共享同一数据的存取,当多个线程都有对数据进行修改就会导致数据异常,这种情况叫竞争条件(race condition)。对这些竞争行为进行管控和设计来保证数据的正确性,也就是同步。

案例

 假设我们需要实现一个银行类来存储用户账户集合,并通过多线程环境不断进行存取金额操作,那么若运行正确总金额应该一直保持不变。

以下测试代码来自corejava源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 银行类
**/
public class Bank{
private final double[] accounts;//用户账户集合

public Bank(int n, double initialBalance){
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
}

public void transfer(int from, int to, double amount){//存取操作
if (accounts[from] < amount) return;//额度不足,直接返回
System.out.print(Thread.currentThread());
accounts[from] -= amount;//源账户减去对应额度
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;//目标账户增加对应额度
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}

public double getTotalBalance(){//获取当前总金额
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}

public int size(){//获取账户数量
return accounts.length;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 多线程测试类
**/
public class UnsynchBankTest{
public static final int NACCOUNTS = 100;
public static final double INITIAL_BALANCE = 1000;
public static final double MAX_AMOUNT = 1000;
public static final int DELAY = 10;

public static void main(String[] args){
Bank bank = new Bank(NACCOUNTS, INITIAL_BALANCE);
for (int i = 0; i < NACCOUNTS; i++){
int fromAccount = i;
Runnable r = () -> {
try{
while (true){
//取随机数
int toAccount = (int) (bank.size() * Math.random());
double amount = MAX_AMOUNT * Math.random();
//存取金额
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}catch (InterruptedException e){
}
};
Thread t = new Thread(r);
t.start();
}
}
}

 运行结果如下,很明显总金额并不能保证一直正确。

1
2
3
4
5
6
7
8
...
Thread[Thread-23,5,main] 148.04 from 23 to 78 Total Balance: 100000.00
Thread[Thread-50,5,main] 584.19 from 50 to 22 Total Balance: 100000.00
Thread[Thread-84,5,main] 250.77 from 84 to 12 Total Balance: 100000.00
Thread[Thread-59,5,main] 664.33 from 59 to 32Thread[Thread-2,5,main] 723.99 from 2 to 4 Total Balance: 99335.67
Thread[Thread-27,5,main] 606.12 from 27 to 25 Total Balance: 99335.67
Thread[Thread-5,5,main] 647.61 from 5 to 76 Total Balance: 99335.67
...

 为何总金额会有异常状态呢?

 回顾上面所谈到的概念:并发特性。测试例子中对于账户的更新操作是:

1
2
accounts[from] -= amount;
accounts[to] += amount;

 此指令并非一个原子操作,所以可以被分解为:1.将accounts[to]加载到寄存器。2.增加amount。3.将结果写回accounts[to]。

 假设有线程1和2同时执行此指令,线程1执行步骤1和2,然后被剥离运行状态,线程2被唤醒并执行了这一系列指令,然后切换线程1执行完步骤3。所以线程1返回的结果会覆盖掉线程2的操作,导致最终金额不匹配

 为了避免这种情况,保证线程安全,需要满足并发特性,所以就有了锁机制

 既然因为线程不安全,导致并发执行时总金额异常,我们尝试改写测试例子中Bank.transfer()方法,增加以下锁机制:ReentrantLock和synchronized。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Lock bankLock = new ReentrantLock();
//Lock
public void transfer(int from, int to, double amount) throws InterruptedException{
bankLock.lock();
try{
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}finally{
bankLock.unlock();
}
}

public double getTotalBalance(){
bankLock.lock();
try{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}finally{
bankLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//synchronized
public synchronized void transfer(int from, int to, double amount) throws InterruptedException{
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}

public synchronized double getTotalBalance(){
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}

 再运行观察结果,和修改之前进行对比。

 多个线程并发进行存取操作,当线程1被中断时,线程2因为无法获得锁而被阻塞在lock(),等待线程1执行完毕释放锁,线程2方能开始运行。通过锁机制,保证了多线程执行时操作的原子性,从而使运行时总金额能保持正确性,虽然会损失掉一些性能。

 需要注意ReentrantLock锁,其锁的是对象,只有多线程调用同一对象的方法时才会阻塞,不然互不影响。

锁是可重入的,线程可以重复的获得已持有的锁,所以锁会实现一个计数器来根据对lock()的嵌套调用,线程每一次调用lock()都要执行unlock()来解锁,因为这个特性,所以被某个锁保护的代码可以调用使用同一个锁的方法。

 所以调用Bank.transfer()方法时,又调用了Bank.getTotalBalance()方法,所以此时计数为2,退出时计数退为1,当transfer()也执行结束,计数退为0,线程释放锁。


条件对象(条件变量)

 在线程获得锁后,但需要满足某些条件才能执行时,需要引入条件对象或叫条件变量

 在测试用例的银行实现中增加需求:转移资金需要满足当前余额大于转出额度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (accounts[from] >= amount){//若线程在判断通过之后,业务操作执行之前被中断,则很有可能会导致执行时状态其实并不满足条件约束
...
}

private Condition sufficientFunds;
sufficientFunds = bankLock.newCondition();

bankLock.lock();
try{
while (accounts[from] < amount)//当账户余额不足,等待直到另外一个账户向此账户注入资金,但此线程拥有锁,就会排斥其他线程的操作,所以需要条件对象Condition。
sufficientFunds.await();//此线程会阻塞,并放弃锁,这时其他线程就可以对此对象进行操作了。
...
sufficientFunds.signalAll();//此调用激活所有因为条件而阻塞的线程,其中某个会从await()返回,获得锁并继续执行。
}finally{
bankLock.unlock();
}

 所以我们需要在进行存取操作前,对账户余额进行一次条件判断,而不满足条件的线程需要执行await()放弃锁,并进入阻塞状态,而其他满足条件的线程执行signalAll()后被唤醒。调用await()阻塞的线程和等待锁阻塞的线程不同,前者进入条件的等待集合,当锁可用时,此线程不能解除阻塞,需要等到另一线程调用同个条件的signalAll()方法才行。

 阻塞在条件变量上的线程可以有多个,这些阻塞线程会被串联成一个条件等待队列。当signalAll()被调用时,会唤醒所有的阻塞线程,让所有的阻塞线程重新开始争抢锁。如果调用的是signal()只会唤醒队列头部的线程,这样可以避免「惊群问题」。

 await()方法必须立即释放锁,否则临界区状态就不能被其它线程修改,condition_is_true()返回的结果也就不会改变。这也是为什么条件变量必须由锁对象来创建,条件变量需要持有锁对象的引用这样才可以释放锁以及被signal唤醒后重新加锁。创建条件变量的锁必须是排他锁,如果是共享锁被await()方法释放了并不能保证临界区的状态可以被其它线程来修改,可以修改临界区状态的只能是排他锁。这也是为什么ReadWriteLock.ReadLock类的newCondition方法定义如下。

1
2
3
public Condition newCondition() {
throw new UnsupportedOperationException();
}

 那么因为条件而阻塞的线程只能寄希望于有其他线程来帮助他脱离这一情况,否则他永远都不会再运行了,这就会导致死锁。

 所以我们可以引入Condition条件来解决我们新的需求,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//Lock
public void transfer(int from, int to, double amount) throws InterruptedException{
bankLock.lock();
try{
while (accounts[from] < amount)
sufficientFunds.await();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
sufficientFunds.signalAll();
}finally{
bankLock.unlock();
}
}

public double getTotalBalance(){
bankLock.lock();
try{
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}finally{
bankLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//synchronized
public synchronized void transfer(int from, int to, double amount) throws InterruptedException{
while (accounts[from] < amount)
wait();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
notifyAll();
}

public synchronized double getTotalBalance(){
double sum = 0;
for (double a : accounts)
sum += a;
return sum;
}

总结上述内容:

  • 锁用来保护代码块
  • 锁用来管理试图进入被保护区域的线程
  • 锁拥有一个或多个条件对象
  • 每个条件对象管理获得锁后还不能执行的线程

 条件等待队列可以参考条件等待队列


参考博客和文章书籍等:

《Java核心技术 卷Ⅰ》

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容