您的位置:

CyclicBarrier实现多线程任务同步的完整示例

一、CyclicBarrier简介

CyclicBarrier是Java并发包中的一个实用工具, 它可以用于多线程间的同步。它让一组线程在某个节点处等待,直到所有线程都到达该节点,然后再一起继续执行后续任务。

CyclicBarrier可以用于以下场景:

  • 分阶段执行任务:将任务分为若干个阶段,在每个阶段结束后等待所有线程到达这个节点,然后再开始后续的阶段。
  • 分治计算:将大规模的计算任务分解为若干小的计算任务,在每个小任务完成后等待其他小任务完成,最终再合并结果。

二、CyclicBarrier的用法

1. 创建CyclicBarrier对象

CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties, Runnable action);

其中parties是需要等待的线程数,action是在所有线程都到达屏障时需要执行的任务。CyclicBarrier还有一个重载方法,不需要指定Runnable任务。

CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties);

2. 等待线程到达屏障点

cyclicBarrier.await();

调用await方法的线程会在此处等待,知道全部线程到达该屏障点,才会一起继续执行。

3. CyclicBarrier的reset方法

cyclicBarrier.reset();

reset方法可以使CyclicBarrier的状态恢复到初始化状态,方便在重复使用CyclicBarrier时调用。

三、完整代码示例

1. 在主线程中启动多个其他线程,等待所有线程完成后,主线程再继续执行

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    private static final int THREADS = 5;

    public static void main(String[] args) {
        Runnable barrierAction = new Runnable() {
            public void run() {
                System.out.println("All threads have arrived at the barrier.");
            }
        };
        CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);

        for (int i = 0; i < THREADS; i++) {
            Thread thread = new Thread(new Worker(barrier), "Thread " + i);
            thread.start();
        }

        System.out.println("Main thread continues to do its work.");
    }

    static class Worker implements Runnable {
        private final CyclicBarrier cyclicBarrier;

        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        public void run() {
            try {
                System.out.println("Thread " + Thread.currentThread().getName() + " is doing some work.");
                Thread.sleep(1000);
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

2. 多个线程分别执行不同任务,等待所有任务执行完成后再继续

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    private static final int THREADS = 5;

    public static void main(String[] args) {
        Runnable barrierAction = new Runnable() {
            public void run() {
                System.out.println("All threads have arrived at the barrier.");
            }
        };
        CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);

        for (int i = 0; i < THREADS; i++) {
            Thread thread = new Thread(new Worker(barrier, i), "Thread " + i);
            thread.start();
        }

        System.out.println("Main thread continues to do its work.");
    }

    static class Worker implements Runnable {
        private final CyclicBarrier cyclicBarrier;
        private final int workId;

        public Worker(CyclicBarrier cyclicBarrier, int workId) {
            this.cyclicBarrier = cyclicBarrier;
            this.workId = workId;
        }

        public void run() {
            try {
                System.out.println("Thread " + Thread.currentThread().getName() + " is doing work " + workId);
                Thread.sleep((workId + 1) * 1000);
                cyclicBarrier.await();
                System.out.println("Thread " + Thread.currentThread().getName() + " continues to do its work after barrier.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

参考资料