如何愉快的使用线程池

近来,在工作中多次使用了多线程编程,其中使用最多的就是ExecutorService了,在使用中遇到了诸多问题,最主要的问题是——何时关闭线程池?

何时关闭线程池的问题可以转化为如何获知所有线程执行完毕。关于这个问题,stackoverflow上有一篇很有意思的讨论

可以总结出以下几种方式:

1. shutdown + awaitTermination的方式

模式:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}

调用了shutdown方法后,线程池不再接受新的任务,等待当前所有任务执行完毕后就会退出。这种方式适用于所有线程是一次性执行的,也就是说线程池中的线程只会被调用一次,如果需要执行的任务次数大于线程池的数量,调用shutdown方法会导致后续任务无法分配到线程执行。

2. 使用countDownLatch

CountDownLatch是一个常用于线程同步的闭锁,这个适用于已知任务总执行次数的情况。
模式:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}

try {
latch.await();
} catch (InterruptedException E) {
// handle
}

在每个线程中执行一次countDown操作。

3. 使用Future.get()

因为future.get()方法会等待线程执行完成,并获取返回值,所以当所有task对应的future都有返回值了,就可以关闭线程池了。
模式:

ExecutorService pool = Executors.newFixedThreadPool(concurrent);
List<Future> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
futures.add(pool.submit(new MyCallableTask());
}
for (Future future : futures) {
future.get();
}
pool.shutdown();

4. 使用Future + CompletionService

CompletionService也是java.util.concurrent提供的。
模式:

ExecutorService threadPool = Executors.newFixedThreadPool(N_THREADS);
CompletionService<List<Object>> pool = new
ExecutorCompletionService<~>(threadPoolpool);
List<Future<List<Object>>> futures = new ArrayList<>();
while(...) {
futures.add(pool.submit(new CallableTask()));
}

for (future in futures) {
future.get();
}
treadPool.shutdown();

com.best.oasis.express.util.kafka.utils.KafkaConsumerUtils中就是使用这种模式来获取每个分区上对应时间段内的消息的。该模式适用于需要处理task返回值的情况

5. 使用CompletableFuture (需要JDK1.8)

用的不多,直接看模式:

CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, pool))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
pool.shutdown();

6. 使用ListenableFuture (需要Guava)

上面几种模式都是等待线程执行完,而这个则是通过回调,在所有任务执行完后通知用户。
模式:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
.newFixedThreadPool(concurrent));
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
ListenableFuture<Object> lf = service.submit(new MyCallableTask());
futures.add(lf);
}
ListenableFuture<List<Object>> lf = Futures.successfulAsList(futures);
Futures.addCallback(lf, new FutureCallback<List<Object>>() {
@Override
public void onSuccess(List<Object> result) {
logger.info("所有线程处理完毕");
service.shutdownNow();
}

@Override
public void onFailure(Throwable t) {
logger.info("某个线程出错了" + t);
}
});

successfulAsList在所有Future都执行成功后返回一个由这些Future返回值组成的List,而且错误或者取消的Future都是在onSuccess中处理的,只是返回值为null。顺便一提:使用这个Future扩展形式的话,task中的异常是会被拦截的