Java实现线程间通信的9种手法SHOW
  CFu9A7vdykDj 2023年11月02日 29 0

为了给公司新来的妹子说清楚线程之间的通信,我用了8种方法! (qq.com)

4年工作经验,多线程间的5种通信方式都说不出来,你敢信? (qq.com)

线程间通信大致可以分为3种:

  1. 通过共享对象,即共享内存进行通信
  2. 等待通知机制,(利用 wait(),notify(),notifyAll(),Lock,以及 AQS 等)
  3. 利用管道输入,输出流。

Talk is cheap, show me the code.

一、使用 volatile 关键字

基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想。大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式。

public class TestSync {
    enum ThreadRunFlag{PRINT_NUM, PRINT_CHAR}
    private volatile static ThreadRunFlag threadRunFlag = ThreadRunFlag.PRINT_NUM;

    public static void main(String[] args) {

        new Thread(() -> {
            int i = 1;
            while (i <= 26) {
                while(threadRunFlag == ThreadRunFlag.PRINT_CHAR){}
                System.out.print(i+"-");
                i++;
                threadRunFlag = ThreadRunFlag.PRINT_CHAR;
            }
        }).start();

        new Thread(()->{
            char i = 'A';
            while (i <= 'Z'){
                while (threadRunFlag == ThreadRunFlag.PRINT_NUM){}
                System.out.print(i+"-");
                i++;
                threadRunFlag = ThreadRunFlag.PRINT_NUM;
            }
        }).start();

    }
}

1-A-2-B-3-C-4-D-5-E-6-F-7-G-8-H-9-I-10-J-11-K-12-L-13-M-14-N-15-O-16-P-17-Q-18-R-19-S-20-T-21-U-22-V-23-W-24-X-25-Y-26-Z-

二、使用 Object 类的 wait()/notify()

Object 类提供了线程间通信的方法:wait()notify()notifyAll(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。

注意:wait/notify 必须配合 synchronized 使用,wait 方法释放锁,notify 方法不释放锁。wait 是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify(),notify并不释放锁,只是告诉调用过wait()的线程可以去参与获得锁的竞争了,但不是马上得到锁,因为锁还在别人手里,别人还没释放,调用 wait() 的一个或多个线程就会解除 wait 状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。

public class TestSync {
    public static void main(String[] args) {
        //定义一个锁对象
        Object lock = new Object();
        List<Integer> list = new ArrayList<>();
        // 线程A
        Thread threadA = new Thread(() -> {
            synchronized (lock) {
                for (int i = 1; i <= 10; i++) {
                    list.add(i);
                    System.out.println("线程A添加元素,此时list的size为:" + list.size());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (list.size() == 5)
                        lock.notify();//唤醒B线程
                }
            }
        });
        //线程B
        Thread threadB = new Thread(() -> {
            while (true) {
                synchronized (lock) {
                    if (list.size() != 5) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("线程B收到通知,开始执行自己的业务...");
                }
            }
        });
        //需要先启动线程B
        threadB.start();

        //再启动线程A
        threadA.start();
    }
}

线程A添加元素,此时list的size为:1 线程A添加元素,此时list的size为:2 线程A添加元素,此时list的size为:3 线程A添加元素,此时list的size为:4 线程A添加元素,此时list的size为:5 线程A添加元素,此时list的size为:6 线程A添加元素,此时list的size为:7 线程A添加元素,此时list的size为:8 线程A添加元素,此时list的size为:9 线程A添加元素,此时list的size为:10 线程B收到通知,开始执行自己的业务...

由输出结果,在线程 A 发出 notify() 唤醒通知之后,依然是走完了自己线程的业务之后,线程 B 才开始执行,正好说明 notify() 不释放锁,而 wait() 释放锁。

三、使用 ReentrantLock 结合 Condition

通过使用Lock,Condition的 signal() 和 await() 来进行换新阻塞交替打印 1-26 与 A-Z。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestSync {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        new Thread(() -> {
            try {
                lock.lock();
                int i = 1;
                while (i <= 26) {
                    System.out.println(i);
                    i++;
                    condition2.signal();
                    condition1.await();
                }
                condition2.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {
            try {
                lock.lock();
                char i = 'A';
                while (i <= 'Z') {
                    System.out.println(i);
                    i++;
                    condition1.signal();
                    condition2.await();
                }
                condition1.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }).start();
    }
}

1 A 2 B 3 C ...

这种方式使用起来并不是很好,代码编写复杂,而且线程 B 在被 A 唤醒之后由于没有获取锁还是不能立即执行,也就是说,A 在唤醒操作之后,并不释放锁。

这种方法跟 Object 的 wait()/notify() 一样。

四、使用 JUC 工具类 CountDownLatch

CountDownLatch是 JDK1.5 之后 JUC 中的一个并发工具类,它提供了一种在多个线程之间进行协调的机制。CountDownLatch通过一个计数器来实现,该计数器初始化为一个正整数,每当一个线程完成了自己的任务时,计数器的值就会减1。当计数器的值变为0时,所有等待的线程将被唤醒。

CountDownLatch的主要方法包括:

  1. 构造方法:CountDownLatch(int count):指定计数器的初始值。
  2. countDown():每当一个线程完成了任务,调用该方法将计数器的值减1。
  3. await():让当前线程等待,直到计数器的值变为0。

使用CountDownLatch的典型场景是,一个任务需要等待多个子任务都完成之后才能继续执行,可以按照以下步骤进行操作:

  1. 创建一个CountDownLatch对象,指定计数器的初始值,通常为子任务的个数。
  2. 在父任务中创建多个子任务(线程),并将CountDownLatch对象传递给每个子任务。
  3. 在每个子任务中,执行自己的任务,完成后调用countDown()方法,将计数器的值减1。
  4. 在父任务中调用await()方法,使父任务等待,直到计数器的值变为0。

示例代码如下:

public class TestSync {
    public static void main(String[] args) throws InterruptedException {
        int numberOfThreads = 5;
        CountDownLatch latch = new CountDownLatch(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            Thread thread = new Thread(new WorkerThread(latch), String.valueOf(i));
            thread.start();
        }

        latch.await();  // 等待计数器变为0

        System.out.println("所有线程完成任务了~");
    }

    static class WorkerThread implements Runnable {
        private final CountDownLatch latch;

        public WorkerThread(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            // 执行子任务
            System.out.println(String.format("线程%s 开始执行",Thread.currentThread().getName()));
            // 模拟任务执行时间
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("线程%s 执行任务完毕",Thread.currentThread().getName()));

            latch.countDown();  // 计数器减1
        }
    }
}

线程4 开始执行 线程3 开始执行 线程1 开始执行 线程0 开始执行 线程2 开始执行 线程1 执行任务完毕 线程0 执行任务完毕 线程4 执行任务完毕 线程3 执行任务完毕 线程2 执行任务完毕 所有线程完成任务了~

五、基于 LockSupport 实现线程间的阻塞和唤醒

LockSupport 用来创建锁和其他同步类的基本线程阻塞。当调用LockSupport.park()时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark()时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。

其中:

  • park函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞:

​ ① 调用unpark函数,释放该线程的许可。

​ ② 该线程被中断。

​ ③ 设置的时间到了。并且,当time为绝对时间时,isAbsolute 为 true,否则,isAbsolute 为 false。当 time 为 0 时,表示无限等待,直到 unpark 发生。

  • unpark函数,释放线程的许可,即激活调用park后阻塞的线程。这个函数不是安全的,调用这个函数时要确保线程依旧存活。
public class TestSync {
    private static Thread threadA = null;
    private static Thread threadB = null;

    public static void main(String[] args) {
        threadA = new Thread(() -> {
            int i = 1;
            while (i <= 26) {
                System.out.print(i+"-");
                i++;
                LockSupport.unpark(threadB);
                LockSupport.park();
            }
        });
        threadB = new Thread(() -> {
            char i = 'A';
            while (i <= 'Z') {
                LockSupport.park();
                System.out.print(i+"-");
                i++;
                LockSupport.unpark(threadA);
            }
        });
        threadA.start();
        threadB.start();
    }
}

1-A-2-B-3-C-4-D-5-E-6-F-7-G-8-H-9-I-10-J-11-K-12-L-13-M-14-N-15-O-16-P-17-Q-18-R-19-S-20-T-21-U-22-V-23-W-24-X-25-Y-26-Z-

六、AtomicInteger

同样利用了 AtomicInteger 的并发特性,来完成交替打印。

public class TestSync {
    private static AtomicInteger threadSignal = new AtomicInteger(1);
    public static void main(String[] args) {

        new Thread(() -> {
            int i = 1;
            while (i <= 26) {
                while(threadSignal.get() == 2){}
                System.out.print(i+" ");
                i++;
                threadSignal.set(2);
            }
        }).start();

        new Thread(()->{
            char i = 'A';
            while (i <= 'Z'){
                while (threadSignal.get() == 1){}
                System.out.print(i+" ");
                i++;
                threadSignal.set(1);
            }
        }).start();

    }
}

1 A 2 B 3 C 4 D 5 E 6 F 7 G 8 H 9 I 10 J 11 K 12 L 13 M 14 N 15 O 16 P 17 Q 18 R 19 S 20 T 21 U 22 V 23 W 24 X 25 Y 26 Z

七、CyclicBarrier

CyclicBarrier 的字面意思就是可循环使用的屏障,它可以让一组线程到达一个阻塞点(屏障)时被阻塞。直到最后一个线程到达阻塞点后,屏障才会开门,然后所有被拦截的线程就可以继续运行。

public class TestSync {
    private static final int THREAD_COUNT = 3;

    public static void main(String[] args) {
        // 创建一个 CyclicBarrier,指定等待的线程数量
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
            // 所有线程都到达 barrier 后执行的操作
            System.out.println("所有线程都OK!");
        });

        for (int i = 0; i < THREAD_COUNT; i++) {
            Thread thread = new Thread(new Worker(barrier));
            thread.start();
        }
    }

    static class Worker implements Runnable {
        private CyclicBarrier barrier;

        public Worker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println("线程" + Thread.currentThread().getName() + "正在执行任务");
                Thread.sleep(1000);
                System.out.println("线程" + Thread.currentThread().getName() + "执行任务完毕,等待其他线程");
                // 等待其他线程都到达 barrier
                barrier.await();
                System.out.println("线程" + Thread.currentThread().getName() + "继续执行下一步操作");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

线程Thread-1正在执行任务 线程Thread-0正在执行任务 线程Thread-2正在执行任务 线程Thread-0执行任务完毕,等待其他线程 线程Thread-1执行任务完毕,等待其他线程 线程Thread-2执行任务完毕,等待其他线程 所有线程都OK! 线程Thread-1继续执行下一步操作 线程Thread-0继续执行下一步操作 线程Thread-2继续执行下一步操作

八、Semaphore

使用信号量(Semaphore)可以实现线程间的同步与通信。信号量是一个计数器,用于控制对共享资源的访问。

Semaphone管理着一组“虚拟”的许可(permit),许可的初始数量可通过构造函数来指定。操作之前可以先获得许可,并在操作结束之后释放许可。

acquire 方法需要消耗一个许可,如果没有许可acquire将阻塞直到有许可(除非被中断、或者超时)可用。release方法则会添加一个许可。这里的许可,是一个虚拟的概念,并不存在一个名为“Permit"的对象,Semaphore只是记录可用的许可数量并执行相应的操作。

public class TestSync {

    // 创建一个计数器为 3 的信号量对象
    static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 创建 10 个线程
        for (int i = 1; i <= 10; i++) {
            Thread thread = new Thread(new WorkerThread(i));
            thread.start();
        }
    }

    static class WorkerThread implements Runnable {
        private int threadNumber;

        public WorkerThread(int threadNumber) {
            this.threadNumber = threadNumber;
        }

        @Override
        public void run() {
            try {
                // 尝试获取一个资源
                semaphore.acquire();

                // 执行任务
                System.out.println("Thread " + threadNumber + " 开始执行.");

                // 模拟任务执行时间
                Thread.sleep(800);

                // 释放资源
                System.out.println("Thread " + threadNumber + " 释放资源咯 ~ ");
                semaphore.release();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

Thread 1 开始执行. Thread 3 开始执行. Thread 2 开始执行. Thread 2 释放资源咯 ~ Thread 4 开始执行. Thread 1 释放资源咯 ~ Thread 3 释放资源咯 ~ Thread 5 开始执行. Thread 8 开始执行. Thread 5 释放资源咯 ~ Thread 4 释放资源咯 ~ Thread 8 释放资源咯 ~ Thread 6 开始执行. Thread 9 开始执行. Thread 7 开始执行. Thread 6 释放资源咯 ~ Thread 9 释放资源咯 ~ Thread 7 释放资源咯 ~ Thread 10 开始执行. Thread 10 释放资源咯 ~

可以看到同一个时刻最多只有3线程在执行

九、利用 Piped Stream

使用Stream中的Piped Stream分别控制输出,但是其运行速度极慢。

public class TestSync {

    private final PipedInputStream inputStream1;
    private final PipedOutputStream outputStream1;
    private final PipedInputStream inputStream2;
    private final PipedOutputStream outputStream2;
    private final byte[] MSG;

    public TestSync() {
        inputStream1 = new PipedInputStream();
        outputStream1 = new PipedOutputStream();
        inputStream2 = new PipedInputStream();
        outputStream2 = new PipedOutputStream();
        MSG = "Go".getBytes();
        try {
            inputStream1.connect(outputStream2);
            inputStream2.connect(outputStream1);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        TestSync signal = new TestSync();
        signal.threadA().start();
        signal.threadB().start();

    }

    public Thread threadA() {
        final String[] inputArr = new String[2];

        return new Thread() {
            String[] arr = inputArr;
            PipedInputStream in1 = inputStream1;
            PipedOutputStream out1 = outputStream1;

            @Override
            public void run() {
                int i = 1;
                while (i <= 26) {
                    try {
                        System.out.print(i + " ");
                        out1.write(MSG);
                        byte[] inArr = new byte[2];
                        in1.read(inArr);
                        while (!"Go".equals(new String(inArr))) {
                        }
                        i++;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }

    public Thread threadB() {
        final String[] inputArr = new String[2];
        return new Thread() {
            private String[] arr = inputArr;
            private PipedInputStream in2 = inputStream2;
            private PipedOutputStream out2 = outputStream2;

            @Override
            public void run() {
                char i = 'A';
                while (i <= 'Z') {
                    try {
                        byte[] inArr = new byte[2];
                        in2.read(inArr);
                        while (!"Go".equals(new String(inArr))) {
                        }
                        System.out.print(i + " ");
                        i++;
                        out2.write(MSG);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }
}

1 A 2 B 3 C 4 D 5 E 6 F 7 G 8 H 9 I 10 J 11 K 12 L 13 M 14 N 15 O 16 P 17 Q 18 R 19 S 20 T 21 U 22 V 23 W 24 X 25 Y 26 Z

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

CFu9A7vdykDj