Java 基础教程

Java 面向对象

Java 高级教程

Java 笔记

Java 执行器


构建一个新的线程是有一定代价的,因为涉及与操作系统的交互。如果程序中创建了大量的生命期很短的线程,应该使用线程池(thread pool)。一个线程池中包含许多准备运行的空闲线程。将 Runnable 对象交给线程池,就会有一个线程调用 run 方法。当 run 方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。

另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。

线程池

执行器 (Executor)类有许多静态工厂方法用来构建线程池:

执行器工厂方法
方法 描述
Executors.newCachedThreadPool 必要时创建新线程;空闲线程会被保留 60 秒
Executors.newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
Executors.newSingleThreadExecutor 只有一个线程的“池”,该线程顺序执行每一个提交的任务
Executors.newScheduledThreadPool 用于预定执行而构建的固定线程池,替代 java.util.Timer
Executors.newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“池”

newCachedThreadPool 方法构建了一个线程池,对于每个任务,如果有空闲线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。

newFixedThreadPool 方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到服务的任务放置到队列中。当其他任务完成以后再运行它们。

newSingleThreadExecutor 是一个退化了的大小为 1 的线程池:由一个线程执行提交的任务,一个接着一个。

如上 3 个方法是返回实现了 ExecutorService 接口的 ThreadPoolExecutor 类的对象。

可用下面的方法之一将一个 Runnable 对象或 Callable 对象提交给 ExecutorService:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

该池会在方便的时候尽早执行提交的任务。调用 submit 时,会得到一个 Future 对象,可用来查询该任务的状态。

第一个 submit 方法返回一个奇怪样子的 Future<?>。可以使用这样一个对象来调用 isDone cancel 或 isCancelled。但是 get 方法在完成的时候只是简单地返回 null。

第二个版本的 submit 也提交一个 Runnable,并且 Future 的 get 方法在完成的时候返回指定的 result 对象。

第三个版本的 submit 提交一个 Callable,并且返回的 Future 对象将在计算结果准备好的时候得到它。

当用完一个线程池的时候,调用 shutdown。该方法启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。另一种方法是调用 shutdownNow。该池取消尚未开始的所有任务并试图中断正在运行的线程。

下面总结了在使用连接池时应该做的事:

  1. 调用 Executors 类中静态的方法 newCachedThreadPool 或 newFixedThreadPool。
  2. 调用 submit 提交 Runnable 或 Callable 对象。
  3. 如果想要取消一个任务,或如果提交 Callable 对象,那就要保存好返回的 Future 对象。
  4. 当不再提交任何任务时,调用 shutdown。

预定执行

ScheduledExecutorService 接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的 java.util.Timer 的泛化。Executors 类的 newScheduledThreadPool 和 newSingleThreadScheduledExecutor 方法将返回实现了 ScheduledExecutorService 接口的对象。

可以预定 Runnable 或 Callable 在初始的延迟之后只运行一次,也可以预定一个 Runnable 对象周期性地运行。

java.util.concurrent.Executors 中构建预定执行器的相关 API:

//   返回一个线程池,它使用给定的线程数来调度任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//   返回一个执行器,它在一个单独线程中调度任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

java.util.concurrent.ScheduledExecutorService 中任务调度的相关 API:

/**
 * 预定在指定的时间之后执行 Runnable 任务
 */
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);

/**
 * 预定在指定的时间之后执行 Callable 任务
 */
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit);

/**
 * 预定在初始的延迟结束后,周期性地运行给定的任务,周期长度是 period
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);

/**
 * 预定在初始的延迟结束后周期性地运行给定的任务,在一次调用完成和下一次调用开始之间有长度为 delay 的延迟
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

控制任务组

有时,使用执行器有更有实际意义的原因是控制一组相关任务。

例如,可以在执行器中使用 shutdownNow 方法取消所有的任务。

invokeAny 方法提交所有对象到一个 Callable 对象的集合中,并返回某个已经完成了的任务的结果。无法知道返回的究竟是哪个任务的结果,也许是最先完成的那个任务的结果。

invokeAll 方法提交所有对象到一个 Callable 对象的集合中,并返回一个 Future 对象的列表,代表所有任务的解决方案。当计算结果可获得时,可以像下面这样对结果进行处理:

List<Callab1e<T>> tasks = . . .;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results)
    processFurther(result.get());

这个方法的缺点是如果第一个任务恰巧花去了很多时间,则可能不得不进行等待。将结果按可获得的顺序保存起来更有实际意义。可以用 ExecutorCompletionService 来进行排列。

用常规的方法获得一个执行器。然后,构建一个 ExecutorCompletionService,提交任务给完成服务(completion service)。该服务管理 Future 对象的阻塞队列,其中包含已经提交的任务的执行结果(当这些结果成为可用时)。这样一来,相比前面的计算,一个更有效的组织形式如下:

ExecutorCompletionService<T> service = new ExecutorCompletionService<>(executor);
for (Callable<T> task : tasks)
    service.submit(task);
for (int i = 0; i < tasks.size(); i++)
    processFurther(service.take().get());

Fork-Join 框架

有些应用使用了大量线程,但其中大多数都是空闲的。举例来说,一个 Web 服务器可能会为每个连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像或视频处理。Java SE 7 中新引入了 fork-join 框架,专门用来支持后一类应用,但其前提是假设有一个处理任务,它可以很自然地分解为子任务,即任务操作支持并行处理

要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展 RecursiveTask<T> 的类(如果计算会生成一个类型为 T 的结果)或者提供一个扩展 RecursiveAction 的类(如果不生成任何结果)。再覆盖 compute 方法来生成并调用子任务,然后合并其结果。

在后台,fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列(deque)来完成任务。一个工作线程将子任务压入其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。

public static void main(String[] args) throws ExecutionException, InterruptedException {

    ForkJoinTask<Integer> fjt = new Fibonacci(45);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future<Integer> result = forkJoinPool.submit(fjt);

    System.out.println(result.get());

}

static class Fibonacci extends RecursiveTask<Integer> {
    
    final int n;

    Fibonacci(int n) {
        this.n = n;
    }

    private int compute(int small) {
        final int[] results = {1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89};
        return results[small];
    }

    public Integer compute() {
        if (n <= 10) {
            return compute(n);
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        Fibonacci f2 = new Fibonacci(n - 2);
        f1.fork();
        f2.fork();
        return f1.join() + f2.join();
    }
}

上例中,我们用到了 RecursiveTask 提供的方法 fork() 和 join()。它们分别表示:子任务的异步执行和阻塞等待结果完成。

可完成的 Future

处理非阻塞调用的传统方法是使用事件处理器,程序员为任务完成之后要出现的动作注册一个处理器。当然,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。尽管程序员会认为“ 先做步骤 1,然后是步骤 2,再完成步骤 3”,但实际上程序逻辑会分散到不同的处理器中。如果必须增加错误处理,情况会更糟糕。假设步骤 2 是“用户登录”。可能需要重复这个步骤,因为用户输入凭据时可能会出错。要尝试在一组事件处理器中实现这样一个控制流,或者想要理解所实现的这样一组事件处理器,会很有难度。

Java SE 8 的 CompletableFuture 类提供了一种候选方法。与事件处理器不同,“可完成 future” 可以“组合”(composed)。