Java并发系列[8]----CyclicBarrier源码分析

发布时间:2023-12-29 14:00

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类。利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。下图演示了这一过程。

\"Java并发系列[8]----CyclicBarrier源码分析_第1张图片\"

在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。

 1 //同步操作锁
 2 private final ReentrantLock lock = new ReentrantLock();
 3 //线程拦截器
 4 private final Condition trip = lock.newCondition();
 5 //每次拦截的线程数
 6 private final int parties;
 7 //换代前执行的任务
 8 private final Runnable barrierCommand;
 9 //表示栅栏的当前代
10 private Generation generation = new Generation();
11 //计数器
12 private int count;
13 
14 //静态内部类Generation
15 private static class Generation {
16     boolean broken = false;
17 }

上面贴出了CyclicBarrier所有的成员变量,可以看到CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。CyclicBarrier有一个静态内部类Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand表示换代前执行的任务,当count减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务。接下来我们看看它的构造器。

 1 //构造器1
 2 public CyclicBarrier(int parties, Runnable barrierAction) {
 3     if (parties <= 0) throw new IllegalArgumentException();
 4     this.parties = parties;
 5     this.count = parties;
 6     this.barrierCommand = barrierAction;
 7 }
 8 
 9 //构造器2
10 public CyclicBarrier(int parties) {
11     this(parties, null);
12 }

CyclicBarrier有两个构造器,其中构造器1是它的核心构造器,在这里你可以指定本局游戏的参与者数量(要拦截的线程数)以及本局结束时要执行的任务,还可以看到计数器count的初始值被设置为parties。CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

 1 //非定时等待
 2 public int await() throws InterruptedException, BrokenBarrierException {
 3     try {
 4         return dowait(false, 0L);
 5     } catch (TimeoutException toe) {
 6         throw new Error(toe);
 7     }
 8 }
 9 
10 //定时等待
11 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
12     return dowait(true, unit.toNanos(timeout));
13 }

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。下面我们就来看看dowait方法都做了些什么。

 1 //核心等待方法
 2 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
 3     final ReentrantLock lock = this.lock;
 4     lock.lock();
 5     try {
 6         final Generation g = generation;
 7         //检查当前栅栏是否被打翻
 8         if (g.broken) {
 9             throw new BrokenBarrierException();
10         }
11         //检查当前线程是否被中断
12         if (Thread.interrupted()) {
13             //如果当前线程被中断会做以下三件事
14             //1.打翻当前栅栏
15             //2.唤醒拦截的所有线程
16             //3.抛出中断异常
17             breakBarrier();
18             throw new InterruptedException();
19         }
20        //每次都将计数器的值减1
21        int index = --count;
22        //计数器的值减为0则需唤醒所有线程并转换到下一代
23        if (index == 0) {
24            boolean ranAction = false;
25            try {
26                //唤醒所有线程前先执行指定的任务
27                final Runnable command = barrierCommand;
28                if (command != null) {
29                    command.run();
30                }
31                ranAction = true;
32                //唤醒所有线程并转到下一代
33                nextGeneration();
34                return 0;
35            } finally {
36                //确保在任务未成功执行时能将所有线程唤醒
37                if (!ranAction) {
38                    breakBarrier();
39                }
40            }
41        }
42 
43        //如果计数器不为0则执行此循环
44        for (;;) {
45            try {
46                //根据传入的参数来决定是定时等待还是非定时等待
47                if (!timed) {
48                    trip.await();
49                }else if (nanos > 0L) {
50                    nanos = trip.awaitNanos(nanos);
51                }
52            } catch (InterruptedException ie) {
53                //若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
54                if (g == generation && ! g.broken) {
55                    breakBarrier();
56                    throw ie;
57                } else {
58                    //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
59                    Thread.currentThread().interrupt();
60                }
61            }
62            //如果线程因为打翻栅栏操作而被唤醒则抛出异常
63            if (g.broken) {
64                throw new BrokenBarrierException();
65            }
66            //如果线程因为换代操作而被唤醒则返回计数器的值
67            if (g != generation) {
68                return index;
69            }
70            //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
71            if (timed && nanos <= 0L) {
72                breakBarrier();
73                throw new TimeoutException();
74            }
75        }
76     } finally {
77         lock.unlock();
78     }
79 }

上面贴出的代码中注释都比较详细,我们只挑一些重要的来讲。可以看到在dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,如果等于0的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties,最后会重新设置栅栏代次,在执行完nextGeneration方法之后就意味着游戏进入下一局。如果计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)还是trip.await()方法,这两方法对应着定时和非定时等待。如果在等待过程中当前线程被中断就会执行breakBarrier方法,该方法叫做打破栅栏,意味着游戏在中途被掐断,设置generation的broken状态为true并唤醒所有线程。同时这也说明在等待过程中有一个线程被中断整盘游戏就结束,所有之前被阻塞的线程都会被唤醒。线程醒来后会执行下面三个判断,看看是否因为调用breakBarrier方法而被唤醒,如果是则抛出异常;看看是否是正常的换代操作而被唤醒,如果是则返回计数器的值;看看是否因为超时而被唤醒,如果是的话就调用breakBarrier打破栅栏并抛出异常。这里还需要注意的是,如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒。下面贴出nextGeneration方法和breakBarrier方法的具体代码。

 1 //切换栅栏到下一代
 2 private void nextGeneration() {
 3     //唤醒条件队列所有线程
 4     trip.signalAll();
 5     //设置计数器的值为需要拦截的线程数
 6     count = parties;
 7     //重新设置栅栏代次
 8     generation = new Generation();
 9 }
10 
11 //打翻当前栅栏
12 private void breakBarrier() {
13     //将当前栅栏状态设置为打翻
14     generation.broken = true;
15     //设置计数器的值为需要拦截的线程数
16     count = parties;
17     //唤醒所有线程
18     trip.signalAll();
19 }

上面我们已经通过源码将CyclicBarrier的原理基本都讲清楚了,下面我们就通过一个赛马的例子来深入掌握它的使用。

 1 class Horse implements Runnable {
 2     
 3     private static int counter = 0;
 4     private final int id = counter++;
 5     private int strides = 0;
 6     private static Random rand = new Random(47);
 7     private static CyclicBarrier barrier;
 8     
 9     public Horse(CyclicBarrier b) { barrier = b; }
10     
11     @Override
12     public void run() {
13         try {
14             while(!Thread.interrupted()) {
15                 synchronized(this) {
16                     //赛马每次随机跑几步
17                     strides += rand.nextInt(3);
18                 }
19                 barrier.await();
20             }
21         } catch(Exception e) {
22             e.printStackTrace();
23         }
24     }
25     
26     public String tracks() {
27         StringBuilder s = new StringBuilder();
28         for(int i = 0; i < getStrides(); i++) {
29             s.append(\"*\");
30         }
31         s.append(id);
32         return s.toString();
33     }
34     
35     public synchronized int getStrides() { return strides; }
36     public String toString() { return \"Horse \" + id + \" \"; }
37     
38 }
39 
40 public class HorseRace implements Runnable {
41     
42     private static final int FINISH_LINE = 75;
43     private static List horses = new ArrayList();
44     private static ExecutorService exec = Executors.newCachedThreadPool();
45     
46     @Override
47     public void run() {
48         StringBuilder s = new StringBuilder();
49         //打印赛道边界
50         for(int i = 0; i < FINISH_LINE; i++) {
51             s.append(\"=\");
52         }
53         System.out.println(s);
54         //打印赛马轨迹
55         for(Horse horse : horses) {
56             System.out.println(horse.tracks());
57         }
58         //判断是否结束
59         for(Horse horse : horses) {
60             if(horse.getStrides() >= FINISH_LINE) {
61                 System.out.println(horse + \"won!\");
62                 exec.shutdownNow();
63                 return;
64             }
65         }
66         //休息指定时间再到下一轮
67         try {
68             TimeUnit.MILLISECONDS.sleep(200);
69         } catch(InterruptedException e) {
70             System.out.println(\"barrier-action sleep interrupted\");
71         }
72     }
73     
74     public static void main(String[] args) {
75         CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
76         for(int i = 0; i < 7; i++) {
77             Horse horse = new Horse(barrier);
78             horses.add(horse);
79             exec.execute(horse);
80         }
81     }
82     
83 }

该赛马程序主要是通过在控制台不停的打印各赛马的当前轨迹,以此达到动态显示的效果。整场比赛有多个轮次,每一轮次各个赛马都会随机走上几步然后调用await方法进行等待,当所有赛马走完一轮的时候将会执行任务将所有赛马的当前轨迹打印到控制台上。这样每一轮下来各赛马的轨迹都在不停的增长,当其中某个赛马的轨迹最先增长到指定的值的时候将会结束整场比赛,该赛马成为整场比赛的胜利者!程序的运行结果如下:

\"Java并发系列[8]----CyclicBarrier源码分析_第2张图片\"

至此我们难免会将CyclicBarrier与CountDownLatch进行一番比较。这两个类都可以实现一组线程在到达某个条件之前进行等待,它们内部都有一个计数器,当计数器的值不断的减为0的时候所有阻塞的线程将会被唤醒。有区别的是CyclicBarrier的计数器由自己控制,而CountDownLatch的计数器则由使用者来控制,在CyclicBarrier中线程调用await方法不仅会将自己阻塞还会将计数器减1,而在CountDownLatch中线程调用await方法只是将自己阻塞而不会减少计数器的值。另外,CountDownLatch只能拦截一轮,而CyclicBarrier可以实现循环拦截。一般来说用CyclicBarrier可以实现CountDownLatch的功能,而反之则不能,例如上面的赛马程序就只能使用CyclicBarrier来实现。总之,这两个类的异同点大致如此,至于何时使用CyclicBarrier,何时使用CountDownLatch,还需要读者自己去拿捏。

转载于:https://www.cnblogs.com/liuyun1995/p/8529360.html

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号