Java 基础教程

Java 面向对象

Java 高级教程

Java 笔记

Java 并发执行任务且并发获取及设置取所有结果的超时设置

Java 笔记 Java 笔记


Java 编程中,如果并发的任务想在指定的时间内,获取所有完成的作业,未完成则丢弃,则可以用 CompletionService 类,它实际是一个接口,具体实现类为 ExecutorCompletionService,内部是通过 BlockingQueue 阻塞队列来实现了生产者-消费者模式,CompletionService 就是将生产者(新的异步线程生产的 Future)和消费者(获取生产出来的 Future)进行解耦,生产者只需要 submit(Callable/Runnable) 将新的任务提交,就可以生产 Future,而消费者只要 take() 或 poll() 即可获取消费对象。

推荐方式

利用 CompletionService 具体步骤如下:

  1. 创建一个具体实例 ExecutorCompletionService,并指定具体的线程池 Executor;
  2. 提交(submit)所有可异步的任务;
  3. 设置获取所有任务的超时总时间,while 轮询去调用 CompletionService 的 poll 方法,poll 可以以 5 毫秒时间粒度的超时获取已完成的任务。

具体示例代码如下:

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class CompletionServiceDemo {

    /**
     * 异步执行任务及在指定的时间内快速获取所有的任务结果
     *
     * @param taskList 待执行的任务
     * @param executor 使用的线程池
     * @param timeout  所有任务获取的总超时时间
     */
    public void taskConcurrentExecuteAndGet(List<Object> taskList, Executor executor, int timeout) {
        CompletionService<Object> completionService = new ExecutorCompletionService<>(executor);
        for (Object task : taskList) {
            completionService.submit(() -> {
                Object result = new Object();
                System.out.println("do task and set result");
                return result;
            });
        }

        //  待完成的任务
        int taskSize = taskList.size();
        //  已完成的任务
        int completedTaskSize = 0;
        Instant start = Instant.now();
        try {
            while (Duration.between(start, Instant.now()).toMillis() < timeout && (completedTaskSize < taskSize)) {
                //  以 5 毫秒为超时时间粒度获取结果
                Future<Object> future = completionService.poll(5, TimeUnit.MILLISECONDS);
                if (Objects.nonNull(future)) {
                    Object result = future.get();
                    completedTaskSize++;
                    //  handle result
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}