前面我们已经提到,JDK自带的executors的实现都是基于线程池和任务队列的。不过,接口规范并没有对executors的实现细节有着严格限制。本小节从代码的角度说明为什么JDK要采取"线程池+任务队列"的方式实现executors。
以Executor
接口为例,这个接口只定义了一个execute方法
public interface Executor { void execute(Runnable command); }
在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
在这种情况下,如果向Executor中提交一个任务,会立即进行执行。其缺点在于,任务的执行不是异步的,某个线程向Executor中提交一个任务之后,必须等到这个线程执行完成才能继续往下执行。
更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
在这种情况下,调用者线程提交完任务之后,由于是启动一个新的线程来完成这个任务。因此调用者线程可以立即去执行接下来的代码。
不过这种实现也存在一些问题,如果同时提交的大量的任务,必定会创建大量的线程,线程的上下文切换、创建与销毁,都是很浪费资源的,因此这种设计方式可能会超出计算机的处理能力。
所以又有了以下的改进版,executor中维护了一定数量的线程,而提交的任务到放到一个任务队列中。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; /** * Created by TIANSHOUZHI336 on 2016/7/31. */ public class SimpleThreadPoolExecutor implements Executor{ private BlockingQueue<Runnable> taskQueue = null;//任务队列 private List<WorkerThread> threads = new ArrayList<WorkerThread>();//线程池 private boolean isStopped = false; public SimpleThreadPoolExecutor(int noOfThreads, int maxNoOfTasks ) { taskQueue = new LinkedBlockingQueue<Runnable>(); for (int i =0; i<noOfThreads; i++) { threads.add(new WorkerThread(taskQueue)); } for (WorkerThread thread : threads) { thread.start(); } } public synchronized void execute(Runnable task) { if(this .isStopped ) throw new IllegalStateException("SimpleThreadPoolExecutor is stopped" ); this.taskQueue .add(task); } public synchronized void stop() { this.isStopped = true; for (WorkerThread thread : threads) { thread.stop();//循环中断每一个线程 } } } class WorkerThread extends Thread { private BlockingQueue<Runnable> taskQueue = null; private boolean isStopped = false; public WorkerThread(BlockingQueue<Runnable> queue) { taskQueue = queue ; } public void run() { //因为需要不断从的任务列出中取出task执行,因此需要放在一个循环中,否则线程对象执行完一个任务就会立刻结束 while (!isStopped()) { try { Runnable runnable =taskQueue .take(); runnable.run(); } catch(Exception e ) { // 写日志或者报告异常, // 但保持线程池运行. } } } public synchronized void toStop() { isStopped = true ; this.interrupt(); //如果线程正在任务队列中获取任务,或者没有任务被阻塞,需要响应这个中断 } public synchronized boolean isStopped() { return isStopped ; } }
线程池的实现由两部分组成。类 SimpleThreadPoolExecutor是线程池的公开接口,而类 WorkerThread 用来实现执行任务的子线程。
为了执行一个任务,方法 SimpleThreadPoolExecutor .execute(Runnable r) 用 Runnable 的实现作为调用参数。在内部,Runnable 对象被放入 阻塞队列 (Blocking Queue),等待着被子线程取出队列。
一个空闲的 WorkerThread 线程会把 Runnable 对象从队列中取出并执行。你可以在 WorkerThread .run() 方法里看到这些代码。执行完毕后,WorkerThread 进入循环并且尝试从队列中再取出一个任务,直到线程终止。
调用 SimpleThreadPoolExecutor .stop() 方法可以停止 SimpleThreadPoolExecutor 。在内部,调用 stop 先会标记 isStopped 成员变量(为 true)。然后,线程池的每一个子线程都调用 WorkerThread .stop() 方法停止运行。注意,如果线程池的 execute() 在 stop() 之后调用,execute() 方法会抛出 IllegalStateException 异常。
子线程会在完成当前执行的任务后停止。注意 PoolThread.stop()
方法中调用了 this.interrupt()
。它确保阻塞在 taskQueue.take()
里的处理等待状态的调用的线程能够跳出 等待状态。例如本例中,使用的LinkedBlockingQueue的take方法实现如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//内部通过死循环尝试获取锁,每次循环都判断线程是否中断了,如果中断抛出异常 try { while (count.get() == 0) { notEmpty.await();//如果任务数量为0,当前线程进入等待状态,等待signal信号 } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
通常情况下,我们不需要自己实现Executor,java.util.concurrent包中,已经提供了默认的实现,如:ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。并且提供了一个工具类Executors,用于创建其实例。