【www.bbyears.com--js教程】
CountDownLatch
字面意思就是倒计数闩,后面会讲到,这里的同步允许一个或多个线程等待,,知道其他线程进行的一系列操作完成。而CountDownLatch通过一个参数count(数目)来构造,而await()则阻塞当前线程,直到countDown()将count减为了0,然后,所有的阻塞线程被释放,也就是那些调用了await方法的线程立即返回,注意,这是一次性的,也就是说count不能被自动重置,如果你想这么做,CyclicBarrier是可以的。
CountDownLatch用处很多,当用count=1来构造的时候,这就相当于一个开关,所有调用了await方法的线程都在等待,直到有一个线程调用了countDown(),CountDownLatch通过count=N构造的话,就可以使一个线程等待其他N个线程完成操作,或者一个操作被做N次。
简单的demo:
一组worker(工人)线程使用两个CountDownLatch
第一个是开始信号,用来阻止工人提前操作,直到(driver)传送带准备好了才允许开始
第二个是完成信号,他使传送带等待直到所有的worker都完成
class Driver { // …
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // 创建并启动线程
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // 传送带做点准备工作
startSignal.countDown(); // 减为0,工人可以开始了
doSomethingElse();
doneSignal.await(); // 等待直到所有的工人完成任务
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();//工人们等待开关打开
doWork(); //做事
doneSignal.countDown(); //做完了就给countdownlatch 减去1
} catch (InterruptedException ex) {} // return;
}
void doWork() { … }
}
另一个典型的例子就是把问题分成N部分,通过线程执行每一部分,具体的话是将线程入队到一个Executor对象里。然后调用execute方法。当执行完毕一部分,就并给latch 减去1,当减到0的时候调用await的方法就可以继续运行了,当需要重复计数的话,用CyclicBarrier代替
class Driver2 { // …
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = …
for (int i = 0; i < N; ++i) // 创建并开始线程
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // 等待所有的线程完成
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // 返回;
}
void doWork() { … }
}
CyclicBarrier
字面意思回环栅栏。这里的同步是允许一系列的线程互相等待直到大家都来到一个barrier前面,当程序中有互相等待的情况的时候,非常有用,叫做回环是因为当所有等待线程都被释放以后,cyclicBarrier可以被重用
同时,CyclicBarrier支持一个可选的Runnable参数,当所有的线程都来到barrier前面的时候,程序开始执行runable线程动作,当需要更新所有线程共享的某个状态的时候,该动作非常有用。例子是一个并行分解的例子
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(…);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
这里每一个worker线程处理矩阵的一行,然后在barrier前面等待,直到所有的行都处理完成,当所有的都完成以后,这里
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(…);
}
});
的Runnable参数执行来合并这些行,一旦有了正确的结果waitUntilDown()就返回true,然后所有的线程终止。
如果barrier执行的这个动作没有对那些等待的线程做出回应,以使的这些线程做一些别的操作,我们可以这么写,将
barrier.await()
改为
if (barrier.await() == 0) {
// 记录本次迭代的位置
}
如果一个线程由于中断,失败或者超时过早的离开了barrier,那么所有的线程都会通过BrokenBarrierException引发异常。