近来,在工作中多次使用了多线程编程,其中使用最多的就是ExecutorService
了,在使用中遇到了诸多问题,最主要的问题是——何时关闭线程池?
何时关闭线程池的问题可以转化为如何获知所有线程执行完毕。关于这个问题,stackoverflow上有一篇很有意思的讨论
可以总结出以下几种方式:
1. shutdown + awaitTermination的方式
模式:ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}
调用了shutdown
方法后,线程池不再接受新的任务,等待当前所有任务执行完毕后就会退出。这种方式适用于所有线程是一次性执行的,也就是说线程池中的线程只会被调用一次,如果需要执行的任务次数大于线程池的数量,调用shutdown方法会导致后续任务无法分配到线程执行。
2. 使用countDownLatch
CountDownLatch是一个常用于线程同步的闭锁,这个适用于已知任务总执行次数的情况。
模式:CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
try {
latch.await();
} catch (InterruptedException E) {
// handle
}
在每个线程中执行一次countDown操作。
3. 使用Future.get()
因为future.get()
方法会等待线程执行完成,并获取返回值,所以当所有task对应的future都有返回值了,就可以关闭线程池了。
模式:ExecutorService pool = Executors.newFixedThreadPool(concurrent);
List<Future> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
futures.add(pool.submit(new MyCallableTask());
}
for (Future future : futures) {
future.get();
}
pool.shutdown();
4. 使用Future + CompletionService
CompletionService
也是java.util.concurrent提供的。
模式:ExecutorService threadPool = Executors.newFixedThreadPool(N_THREADS);
CompletionService<List<Object>> pool = new
ExecutorCompletionService<~>(threadPoolpool);
List<Future<List<Object>>> futures = new ArrayList<>();
while(...) {
futures.add(pool.submit(new CallableTask()));
}
for (future in futures) {
future.get();
}
treadPool.shutdown();
com.best.oasis.express.util.kafka.utils.KafkaConsumerUtils
中就是使用这种模式来获取每个分区上对应时间段内的消息的。该模式适用于需要处理task返回值的情况
5. 使用CompletableFuture (需要JDK1.8)
用的不多,直接看模式:CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, pool))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
pool.shutdown();
6. 使用ListenableFuture (需要Guava)
上面几种模式都是等待线程执行完,而这个则是通过回调,在所有任务执行完后通知用户。
模式:ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
.newFixedThreadPool(concurrent));
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (Map<String, Date> dateSplit : dateSplits) {
ListenableFuture<Object> lf = service.submit(new MyCallableTask());
futures.add(lf);
}
ListenableFuture<List<Object>> lf = Futures.successfulAsList(futures);
Futures.addCallback(lf, new FutureCallback<List<Object>>() {
public void onSuccess(List<Object> result) {
logger.info("所有线程处理完毕");
service.shutdownNow();
}
public void onFailure(Throwable t) {
logger.info("某个线程出错了" + t);
}
});
successfulAsList
在所有Future都执行成功后返回一个由这些Future返回值组成的List,而且错误或者取消的Future都是在onSuccess中处理的,只是返回值为null。顺便一提:使用这个Future扩展形式的话,task中的异常是会被拦截的