privatestatic CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() { @Override publicvoidrun(){ System.out.println(Thread.currentThread() + " task merge result"); } }); publicstaticvoidmain(String[] args){ ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(() - > { System.out.println(Thread.currentThread() + " task-1 start"); System.out.println(Thread.currentThread() + " task-1 enter in barrier"); try { cyclicBarrier.await(); } catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + " task-1 enter out barrier"); }); executorService.execute(() - > { System.out.println(Thread.currentThread() + " task-2 start"); System.out.println(Thread.currentThread() + " task-2 enter in barrier"); try { cyclicBarrier.await(); } catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + " task-2 enter out barrier"); }); executorService.shutdown(); } }
输出结果:
1 2 3 4 5 6 7
Thread[pool-1-thread-1,5,main] task-1 start Thread[pool-1-thread-1,5,main] task-1 enter in barrier Thread[pool-1-thread-2,5,main] task-2 start Thread[pool-1-thread-2,5,main] task-2 enter in barrier Thread[pool-1-thread-2,5,main] task merge result Thread[pool-1-thread-2,5,main] task-2 enter out barrier Thread[pool-1-thread-1,5,main] task-1 enter out barrier
privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;
if (g.broken) thrownew BrokenBarrierException();
if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); }
int index = --count; if (index == 0) { // index==0说明所有线程都到了屏障点,此时执行初始化时传递的任务 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); // 执行任务 ranAction = true; nextGeneration(); // 激活其他因调用await方法而阻塞的线程,并重置CyclicBarrier return0; } finally { if (!ranAction) breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out // index != 0 for (;;) { try { if (!timed) // 没有设置超时时间 trip.await(); elseif (nanos > 0L) // 设置了超时时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // 唤醒条件队列中的阻塞队列 // set up next generation count = parties; // 重置CyclicBarrier generation = new Generation(); }
publicSemaphore(int permits){ sync = new NonfairSync(permits); }
publicSemaphore(int permits, boolean fair){ sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Sync(int permits) { setState(permits); }
在如上代码中Semaphore 默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore 对象。另外,如CountDownLatch 构造函数传递的初始化信号量permits 被赋给了AQS state 状态变量一样,这里AQS state 表示当前持有的信号量个数。
finalintnonfairTryAcquireShared(int acquires){ for (;;) { int available = getState(); // 当前信号量值 int remaining = available - acquires; // 剩余值 if (remaining < 0 || compareAndSetState(available, remaining)) // 如果当前剩余值小于0或者CAS设置成功则返回 return remaining; } }
// 公平策略 FairSync protectedinttryAcquireShared(int acquires){ for (;;) { if (hasQueuedPredecessors()) // 公平策略,看当前线程节点的前驱节点是否也在等待获取此资源,如果是则当前线程会被放到AQS阻塞队列,否则直接获取 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protectedfinalbooleantryReleaseShared(int releases){ for (;;) { int current = getState(); // 当前信号量值 int next = current + releases; // 当前信号量+1 if (next < current) // overflow thrownew Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // CAS修改信号量值 returntrue; } }