为了给公司新来的妹子说清楚线程之间的通信,我用了8种方法! (qq.com)
4年工作经验,多线程间的5种通信方式都说不出来,你敢信? (qq.com)
线程间通信大致可以分为3种:
- 通过共享对象,即共享内存进行通信
- 等待通知机制,(利用 wait(),notify(),notifyAll(),Lock,以及 AQS 等)
- 利用管道输入,输出流。
Talk is cheap, show me the code.
一、使用 volatile 关键字
基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想。大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。
public class TestSync {
enum ThreadRunFlag{PRINT_NUM, PRINT_CHAR}
private volatile static ThreadRunFlag threadRunFlag = ThreadRunFlag.PRINT_NUM;
public static void main(String[] args) {
new Thread(() -> {
int i = 1;
while (i <= 26) {
while(threadRunFlag == ThreadRunFlag.PRINT_CHAR){}
System.out.print(i+"-");
i++;
threadRunFlag = ThreadRunFlag.PRINT_CHAR;
}
}).start();
new Thread(()->{
char i = 'A';
while (i <= 'Z'){
while (threadRunFlag == ThreadRunFlag.PRINT_NUM){}
System.out.print(i+"-");
i++;
threadRunFlag = ThreadRunFlag.PRINT_NUM;
}
}).start();
}
}
1-A-2-B-3-C-4-D-5-E-6-F-7-G-8-H-9-I-10-J-11-K-12-L-13-M-14-N-15-O-16-P-17-Q-18-R-19-S-20-T-21-U-22-V-23-W-24-X-25-Y-26-Z-
二、使用 Object 类的 wait()/notify()
Object 类提供了线程间通信的方法:wait()
、notify()
、notifyAll()
,它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。
注意:wait/notify
必须配合 synchronized
使用,wait 方法释放锁,notify 方法不释放锁。wait 是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify()
,notify并不释放锁,只是告诉调用过wait()
的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在别人手里,别人还没释放,调用 wait()
的一个或多个线程就会解除 wait 状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。
public class TestSync {
public static void main(String[] args) {
//定义一个锁对象
Object lock = new Object();
List<Integer> list = new ArrayList<>();
// 线程A
Thread threadA = new Thread(() -> {
synchronized (lock) {
for (int i = 1; i <= 10; i++) {
list.add(i);
System.out.println("线程A添加元素,此时list的size为:" + list.size());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (list.size() == 5)
lock.notify();//唤醒B线程
}
}
});
//线程B
Thread threadB = new Thread(() -> {
while (true) {
synchronized (lock) {
if (list.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程B收到通知,开始执行自己的业务...");
}
}
});
//需要先启动线程B
threadB.start();
//再启动线程A
threadA.start();
}
}
线程A添加元素,此时list的size为:1 线程A添加元素,此时list的size为:2 线程A添加元素,此时list的size为:3 线程A添加元素,此时list的size为:4 线程A添加元素,此时list的size为:5 线程A添加元素,此时list的size为:6 线程A添加元素,此时list的size为:7 线程A添加元素,此时list的size为:8 线程A添加元素,此时list的size为:9 线程A添加元素,此时list的size为:10 线程B收到通知,开始执行自己的业务...
由输出结果,在线程 A 发出 notify()
唤醒通知之后,依然是走完了自己线程的业务之后,线程 B 才开始执行,正好说明 notify()
不释放锁,而 wait()
释放锁。
三、使用 ReentrantLock 结合 Condition
通过使用Lock,Condition的 signal() 和 await() 来进行换新阻塞交替打印 1-26 与 A-Z。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestSync {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
int i = 1;
while (i <= 26) {
System.out.println(i);
i++;
condition2.signal();
condition1.await();
}
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
char i = 'A';
while (i <= 'Z') {
System.out.println(i);
i++;
condition1.signal();
condition2.await();
}
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
}
1 A 2 B 3 C ...
这种方式使用起来并不是很好,代码编写复杂,而且线程 B 在被 A 唤醒之后由于没有获取锁还是不能立即执行,也就是说,A 在唤醒操作之后,并不释放锁。
这种方法跟 Object 的 wait()/notify()
一样。
四、使用 JUC 工具类 CountDownLatch
CountDownLatch是 JDK1.5 之后 JUC 中的一个并发工具类,它提供了一种在多个线程之间进行协调的机制。CountDownLatch通过一个计数器来实现,该计数器初始化为一个正整数,每当一个线程完成了自己的任务时,计数器的值就会减1。当计数器的值变为0时,所有等待的线程将被唤醒。
CountDownLatch的主要方法包括:
- 构造方法:CountDownLatch(int count):指定计数器的初始值。
- countDown():每当一个线程完成了任务,调用该方法将计数器的值减1。
- await():让当前线程等待,直到计数器的值变为0。
使用CountDownLatch的典型场景是,一个任务需要等待多个子任务都完成之后才能继续执行,可以按照以下步骤进行操作:
- 创建一个CountDownLatch对象,指定计数器的初始值,通常为子任务的个数。
- 在父任务中创建多个子任务(线程),并将CountDownLatch对象传递给每个子任务。
- 在每个子任务中,执行自己的任务,完成后调用countDown()方法,将计数器的值减1。
- 在父任务中调用await()方法,使父任务等待,直到计数器的值变为0。
示例代码如下:
public class TestSync {
public static void main(String[] args) throws InterruptedException {
int numberOfThreads = 5;
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(new WorkerThread(latch), String.valueOf(i));
thread.start();
}
latch.await(); // 等待计数器变为0
System.out.println("所有线程完成任务了~");
}
static class WorkerThread implements Runnable {
private final CountDownLatch latch;
public WorkerThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
// 执行子任务
System.out.println(String.format("线程%s 开始执行",Thread.currentThread().getName()));
// 模拟任务执行时间
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("线程%s 执行任务完毕",Thread.currentThread().getName()));
latch.countDown(); // 计数器减1
}
}
}
线程4 开始执行 线程3 开始执行 线程1 开始执行 线程0 开始执行 线程2 开始执行 线程1 执行任务完毕 线程0 执行任务完毕 线程4 执行任务完毕 线程3 执行任务完毕 线程2 执行任务完毕 所有线程完成任务了~
五、基于 LockSupport 实现线程间的阻塞和唤醒
LockSupport 用来创建锁和其他同步类的基本线程阻塞。当调用LockSupport.park()
时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark()
时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。
其中:
- park函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞:
① 调用unpark函数,释放该线程的许可。
② 该线程被中断。
③ 设置的时间到了。并且,当time为绝对时间时,isAbsolute 为 true,否则,isAbsolute 为 false。当 time 为 0 时,表示无限等待,直到 unpark 发生。
- unpark函数,释放线程的许可,即激活调用park后阻塞的线程。这个函数不是安全的,调用这个函数时要确保线程依旧存活。
public class TestSync {
private static Thread threadA = null;
private static Thread threadB = null;
public static void main(String[] args) {
threadA = new Thread(() -> {
int i = 1;
while (i <= 26) {
System.out.print(i+"-");
i++;
LockSupport.unpark(threadB);
LockSupport.park();
}
});
threadB = new Thread(() -> {
char i = 'A';
while (i <= 'Z') {
LockSupport.park();
System.out.print(i+"-");
i++;
LockSupport.unpark(threadA);
}
});
threadA.start();
threadB.start();
}
}
1-A-2-B-3-C-4-D-5-E-6-F-7-G-8-H-9-I-10-J-11-K-12-L-13-M-14-N-15-O-16-P-17-Q-18-R-19-S-20-T-21-U-22-V-23-W-24-X-25-Y-26-Z-
六、AtomicInteger
同样利用了 AtomicInteger 的并发特性,来完成交替打印。
public class TestSync {
private static AtomicInteger threadSignal = new AtomicInteger(1);
public static void main(String[] args) {
new Thread(() -> {
int i = 1;
while (i <= 26) {
while(threadSignal.get() == 2){}
System.out.print(i+" ");
i++;
threadSignal.set(2);
}
}).start();
new Thread(()->{
char i = 'A';
while (i <= 'Z'){
while (threadSignal.get() == 1){}
System.out.print(i+" ");
i++;
threadSignal.set(1);
}
}).start();
}
}
1 A 2 B 3 C 4 D 5 E 6 F 7 G 8 H 9 I 10 J 11 K 12 L 13 M 14 N 15 O 16 P 17 Q 18 R 19 S 20 T 21 U 22 V 23 W 24 X 25 Y 26 Z
七、CyclicBarrier
CyclicBarrier 的字面意思就是可循环使用的屏障,它可以让一组线程到达一个阻塞点(屏障)时被阻塞。直到最后一个线程到达阻塞点后,屏障才会开门,然后所有被拦截的线程就可以继续运行。
public class TestSync {
private static final int THREAD_COUNT = 3;
public static void main(String[] args) {
// 创建一个 CyclicBarrier,指定等待的线程数量
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
// 所有线程都到达 barrier 后执行的操作
System.out.println("所有线程都OK!");
});
for (int i = 0; i < THREAD_COUNT; i++) {
Thread thread = new Thread(new Worker(barrier));
thread.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
public Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("线程" + Thread.currentThread().getName() + "正在执行任务");
Thread.sleep(1000);
System.out.println("线程" + Thread.currentThread().getName() + "执行任务完毕,等待其他线程");
// 等待其他线程都到达 barrier
barrier.await();
System.out.println("线程" + Thread.currentThread().getName() + "继续执行下一步操作");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
线程Thread-1正在执行任务 线程Thread-0正在执行任务 线程Thread-2正在执行任务 线程Thread-0执行任务完毕,等待其他线程 线程Thread-1执行任务完毕,等待其他线程 线程Thread-2执行任务完毕,等待其他线程 所有线程都OK! 线程Thread-1继续执行下一步操作 线程Thread-0继续执行下一步操作 线程Thread-2继续执行下一步操作
八、Semaphore
使用信号量(Semaphore)可以实现线程间的同步与通信。信号量是一个计数器,用于控制对共享资源的访问。
Semaphone管理着一组“虚拟”的许可(permit),许可的初始数量可通过构造函数来指定。操作之前可以先获得许可,并在操作结束之后释放许可。
acquire 方法需要消耗一个许可,如果没有许可acquire将阻塞直到有许可(除非被中断、或者超时)可用。release方法则会添加一个许可。这里的许可,是一个虚拟的概念,并不存在一个名为“Permit"的对象,Semaphore只是记录可用的许可数量并执行相应的操作。
public class TestSync {
// 创建一个计数器为 3 的信号量对象
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
// 创建 10 个线程
for (int i = 1; i <= 10; i++) {
Thread thread = new Thread(new WorkerThread(i));
thread.start();
}
}
static class WorkerThread implements Runnable {
private int threadNumber;
public WorkerThread(int threadNumber) {
this.threadNumber = threadNumber;
}
@Override
public void run() {
try {
// 尝试获取一个资源
semaphore.acquire();
// 执行任务
System.out.println("Thread " + threadNumber + " 开始执行.");
// 模拟任务执行时间
Thread.sleep(800);
// 释放资源
System.out.println("Thread " + threadNumber + " 释放资源咯 ~ ");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Thread 1 开始执行. Thread 3 开始执行. Thread 2 开始执行. Thread 2 释放资源咯 ~ Thread 4 开始执行. Thread 1 释放资源咯 ~ Thread 3 释放资源咯 ~ Thread 5 开始执行. Thread 8 开始执行. Thread 5 释放资源咯 ~ Thread 4 释放资源咯 ~ Thread 8 释放资源咯 ~ Thread 6 开始执行. Thread 9 开始执行. Thread 7 开始执行. Thread 6 释放资源咯 ~ Thread 9 释放资源咯 ~ Thread 7 释放资源咯 ~ Thread 10 开始执行. Thread 10 释放资源咯 ~
可以看到同一个时刻最多只有3线程在执行
九、利用 Piped Stream
使用Stream中的Piped Stream分别控制输出,但是其运行速度极慢。
public class TestSync {
private final PipedInputStream inputStream1;
private final PipedOutputStream outputStream1;
private final PipedInputStream inputStream2;
private final PipedOutputStream outputStream2;
private final byte[] MSG;
public TestSync() {
inputStream1 = new PipedInputStream();
outputStream1 = new PipedOutputStream();
inputStream2 = new PipedInputStream();
outputStream2 = new PipedOutputStream();
MSG = "Go".getBytes();
try {
inputStream1.connect(outputStream2);
inputStream2.connect(outputStream1);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
TestSync signal = new TestSync();
signal.threadA().start();
signal.threadB().start();
}
public Thread threadA() {
final String[] inputArr = new String[2];
return new Thread() {
String[] arr = inputArr;
PipedInputStream in1 = inputStream1;
PipedOutputStream out1 = outputStream1;
@Override
public void run() {
int i = 1;
while (i <= 26) {
try {
System.out.print(i + " ");
out1.write(MSG);
byte[] inArr = new byte[2];
in1.read(inArr);
while (!"Go".equals(new String(inArr))) {
}
i++;
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
}
public Thread threadB() {
final String[] inputArr = new String[2];
return new Thread() {
private String[] arr = inputArr;
private PipedInputStream in2 = inputStream2;
private PipedOutputStream out2 = outputStream2;
@Override
public void run() {
char i = 'A';
while (i <= 'Z') {
try {
byte[] inArr = new byte[2];
in2.read(inArr);
while (!"Go".equals(new String(inArr))) {
}
System.out.print(i + " ");
i++;
out2.write(MSG);
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
}
}
1 A 2 B 3 C 4 D 5 E 6 F 7 G 8 H 9 I 10 J 11 K 12 L 13 M 14 N 15 O 16 P 17 Q 18 R 19 S 20 T 21 U 22 V 23 W 24 X 25 Y 26 Z