线程间通信

线程间通信

线程拥有自己的栈空间,线程并不是孤立的运行,除了公共区域,线程间也要进行协作。

一. volatile和synchronized

Java支持多个线程同时访问一个对象或者对象的成员变量,由于各个线程各自拥有拷贝,所以线程看到和处理的变量未必是最新值。

  • volatile关键字可以保证线程访问变量的可见性,但会降低运行效率。
  • synchronized关键字保证了线程访问变量的可见性和排他性。synchronized锁同步块通过 monitorentermonitorexit 指令,同步方法则是通过方法修饰符的 ACC_SYNCHRONIZED 来实现。无论是哪种方法,本质都是获取对象的监视器(monitor),获取过程是排他的,只有一个线程可以拿到synchronized保护对象的监视器。

更多内容可以参考volatilesynchronized


二. 等待/通知机制

“生产者”线程修改了一个对象的值,”消费者”线程会感知到变化,最简单的实现就是让消费者线程不断地循环检查变量是否符合预期,如下所示。

1
2
3
4
while (value != desire){ // 满足条件则跳出循环
Thread.sleep(1000);
}
doSomething();

但以上方式存在许多问题:

  1. 难以保证及时性,睡眠时间过长无法及时发现条件已修改,过短则会进行过多无效的尝试。
  2. 难以降低开销,通过降低睡眠时间会导致消耗更多的处理器资源。

等待/通知机制则有效的解决了上述问题,并设计在Java的Object对象上。

回顾一下Object的 notify()wait() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 通知一个在对象上等待的线程,使其从wait()方法返回,返回的前提是该线程已获取了对象的锁
@HotSpotIntrinsicCandidate
public final native void notify();

// 通知所有等待在该对象上的线程
@HotSpotIntrinsicCandidate
public final native void notifyAll();

// 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,在调用此方法后会释放对象的锁
public final void wait() throws InterruptedException {
wait(0L);
}

// 等待给定时间后若无通知,则超时返回
public final native void wait(long timeout) throws InterruptedException;

public final void wait(long timeout, int nanos) throws InterruptedException {
if (timeout < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}

if (nanos > 0) {
timeout++;
}

wait(timeout);
}
  • **notify()**:唤醒正在监听此对象的监视线程,同时多个线程等待时随机唤醒一个,选择是任意的可以由执行部分来决定,线程通过调用 wait() 等待对象,在当前线程放弃对象锁之前被唤醒的线程只能继续等待。唤醒的线程将和其它任意线程竞争并没有优先权力。
  • **wait()**:使当前线程等待,直到它被唤醒,当前线程需要拥有此对象的监视器锁。此方法会将当前线程放入此对象的等待集合中,放弃在此对象上的同步声明(只是此对象的锁),当线程等待时,所有可以同步此线程的任何其它对象都会保持锁定状态。(因为只能通过synchronized来获取监视器锁,所以这几个方法应该在同步代码块内调用,否则会抛出异常 java.lang.IllegalMonitorStateException )。

出于线程调度的目的,线程将会被禁用,并处于休眠状态,直到被 notify() 任意选择中此线程唤醒,或其它线程调用 notifyAll() 唤醒,或其它线程调用 interrupt() 中断了当前线程,或指定的等待时间不为0且已到指定时间。

等待/通知机制就是指一个线程A调用了对象O的 wait() 进入等待状态,另一个线程B调用了对象O的 notify()notifyAll() ,线程A收到通知后从对象O的 wait() 返回,进而执行后续操作,两个线程通过对象O作为中介进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();

public static void main(String[] args)throws Exception{
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(),"NotifyThread");
notifyThread.start();
}

static class Wait implements Runnable{
@Override
public void run(){
// 加锁,拥有lock的monitor
synchronized (lock){
// 当条件不满足时,继续wait,同时释放了lock的锁
while (flag){
try {
System.out.println(Thread.currentThread() + " flag is true. wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
// 条件满足时,完成工作
System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}

static class Notify implements Runnable{
@Override
public void run(){
// 加锁,拥有lock的monitor
synchronized (lock){
// 获取lock的锁,然后进行通知,通知时不会释放lock的锁
// 直到当前线程释放了lock后,WaitThread才能从wait()中返回
System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock){
// 当条件不满足时,继续wait,同时释放了lock的锁
System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}

}
}
}

public class SleepUtils {
public static final void second(long seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
}catch (InterruptedException ex){
ex.printStackTrace();
}
}
}

运行结果如下。

1
2
3
4
Thread[WaitThread,5,main] flag is true. wait @ 18:15:32
Thread[NotifyThread,5,main] hold lock. notify @ 18:15:33
Thread[NotifyThread,5,main] hold lock again. sleep @ 18:15:38
Thread[WaitThread,5,main] flag is false. running @ 18:15:43

运行步骤:

  1. 调用wait(),notify()和notifyAll()时,需要先对调用对象加锁。

  2. 调用wait()后,线程由RUNNING转为WAITING,并将当前线程放置到对象的等待队列。

  3. notify()和notifyAll()调用后,等待线程依旧不会从wait()返回,需要等调用notify()和notifyAll()的线程释放锁后,等待线程才有机会从wait()返回

  4. notify()方法将等待队列中的一个等待线程转移到同步队列中,而notifyAll()则将等待队列所有线程转移到同步队列中,被移动的线程状态由WAITING转为BLOCKED。

  5. 从wait()方法返回的前提是获取了调用对象的锁。

等待/通知机制依托于同步机制,目的就是确保等待线程从 wait() 返回时能够感知到通知线程对变量做出的修改。

WaitNotify.java运行过程

WaitThread线程和NotifyThread线程模拟了线程间等待通知机制流程,WaitThread首先获取了对象的锁,然后调用了对象的wait()方法,从而放弃锁并进入对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。


三. 等待/通知机制的经典范式

3.1 等待方-消费者

遵守如下原则:

  1. 获取对象的锁。
  2. 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
  3. 条件满足则执行对应的逻辑。

伪代码:

1
2
3
4
5
6
synchronized(对象){
while(条件不满足){
对象.wait();
}
对应的处理逻辑
}

3.2 通知方-生产者

遵守如下原则:

  1. 获取对象的锁。
  2. 改变条件。
  3. 通知所有等待在对象上的线程。

伪代码:

1
2
3
4
synchronized(对象){
改变条件
对象.notifyAll();
}

四. 管道输入/输出流

管道输入/输出流和文件输入/输出流或者网络输入/输出流的不同在于它主要用于线程之间的数据传输,传输的媒介为内存。

  1. PipedOutputStream 面向字节
  2. PipedInputStream 面向字节
  3. PipedReader 面向字符
  4. PipedWriter 面向字符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Piped {
public static void main(String[] args)throws Exception{
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
//将输入输出流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in),"PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1){
out.write(receive);
}
}finally {
out.close();
}
}

static class Print implements Runnable{
private PipedReader in;
public Print(PipedReader in){
this.in = in;
}

@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1){
//char转asc码
System.out.print((char) receive);
}
}catch (IOException ex){
ex.printStackTrace();
}
}
}
}

运行结果如下。main线程接收console的输入字符串,通过PipedWriter写入,然后PrintThread则通过PipedReader读出内容并打印。

1
2
3
4
test word
test word
ok ok ok
ok ok ok

五. join()

当有线程A执行了 thread.join() ,表示线程A需要等待thread线程终止后才能从 join() 返回。等待前驱线程结束,接收前驱线程的结束通知即等待通知机制。

更多内容请参考Thread类源码剖析


六. ThreadLocal

ThreadLocal,即线程变量,以ThreadLocal为键、以任意对象为值得存储结构。所以可以根据ThreadLocal给线程绑定值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Profiler {
// 第一次get()调用会进行初始化(若没有调用set()),每个线程会调用一次
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
@Override
protected Long initialValue() {
return System.currentTimeMillis();
}
};

public static final void begin(){
TIME_THREADLOCAL.set(System.currentTimeMillis());
}

public static final long end(){
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}

public static void main(String[] args)throws Exception{
Profiler.begin();
TimeUnit.SECONDS.sleep(2);
System.out.println("Cost: " + Profiler.end() + " mills");
}
}

运行结果如下。

1
Cost: 2001 mills

Profiler可以用来计算函数耗时,在函数调用前执行 begin() ,调用后执行 end() ,而且两次调用不用再一个方法或类内,比如AOP编程中,在调用前切入点执行 begin() ,在调用后切入点执行 end()


参考:

🔗《Java并发编程的艺术》