专注于快乐的事情

多线程并发控制方法

多线程并发控制方法

等待多线程完成

场景:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作。

使用join

代码实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
}
});

Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish");
}
});

parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}

join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。
join的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

当线程处于运行状态时,isAlive返回true。
wait(0)表示永远在调用join的线程上等待下去。直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里实现的。

使用CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CountDownLatchTest {

static CountDownLatch latch = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
latch.countDown();
System.out.println(2);
latch.countDown();
}
}).start();

latch.await();
System.out.println("3");
}
}

等待多线程在一点同时执行

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier初始化时规定一个屏障拦截的线程数量,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class TestCyclicBarrier {

private static final int THREAD_NUM = 5;

public static class WorkerThread implements Runnable {

CyclicBarrier barrier;

public WorkerThread(CyclicBarrier b) {
this.barrier = b;
}

@Override
public void run() {
// TODO Auto-generated method stub
try {
System.out.println("Worker's waiting");
//线程在这里等待,直到所有线程都到达barrier。
barrier.await();
System.out.println("ID:" + Thread.currentThread().getId() + " Working");
} catch (Exception e) {
e.printStackTrace();
}
}

}

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
CyclicBarrier cb = new CyclicBarrier(THREAD_NUM, new Runnable() {
//当所有线程到达barrier时执行
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Inside Barrier");

}
});

for (int i = 0; i < THREAD_NUM; i++) {
new Thread(new WorkerThread(cb)).start();
}
}

}

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

控制并发访问特定资源的线程数量

Semaphore(信号量)是用来控制同时访问特定资源的线程数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SemaphoreTest {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}

threadPool.shutdown();
}
}

为了创建信号量,必须使用可选的公平策略来确定许可的数量。任务通过调用信号量acquire() 方法来获得许可,可通过调用信号量的release()方法来释放许可。一旦获得许可,信号量中可用许可的数量减一。一旦释放,信号量的可用许可的总数加1。

Semaphore vs 线程池

线程池控制的是线程数量,而信号量控制的是并发数量,

信号量的调用,当达到数量后,线程还是存在的,只是被挂起了而已。而线程池,同时执行的线程数量是固定的,超过了数量的只能等待。
线程池是线程复用的;信号量是线程同步的。

两个线程之间交换数据使用Exchanger

当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class ExchangerTest {
private static volatile boolean isDone = false;

static class ExchangerProducer implements Runnable {
private Exchanger<Integer> exchanger;
private static int data = 1;

ExchangerProducer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}

@Override
public void run() {
while (!Thread.interrupted() && !isDone) {
for (int i = 1; i <= 3; i++) {
try {
TimeUnit.SECONDS.sleep(1);
data = i;
System.out.println("producer before: " + data);
data = exchanger.exchange(data);
System.out.println("producer after: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isDone = true;
}
}
}

static class ExchangerConsumer implements Runnable {
private Exchanger<Integer> exchanger;
private static int data = 0;

ExchangerConsumer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}

@Override
public void run() {
while (!Thread.interrupted() && !isDone) {
data = 0;
System.out.println("consumer before : " + data);
try {
TimeUnit.SECONDS.sleep(1);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer after : " + data);
}
}
}

/**
* @param args
*/
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<Integer> exchanger = new Exchanger<Integer>();
ExchangerProducer producer = new ExchangerProducer(exchanger);
ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
exec.execute(producer);
exec.execute(consumer);
exec.shutdown();
try {
exec.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

评论系统未开启,无法评论!