Java并发编程 专题
专题目录
您的位置:java > Java并发编程专题 > 实现自己的Executor
实现自己的Executor
作者:--    发布时间:2019-11-22

前面我们已经提到,JDK自带的executors的实现都是基于线程池和任务队列的。不过,接口规范并没有对executors的实现细节有着严格限制。本小节从代码的角度说明为什么JDK要采取"线程池+任务队列"的方式实现executors。

Executor 接口为例,这个接口只定义了一个execute方法

public interface Executor {
    void execute(Runnable command);
}

在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {
     public void execute(Runnable r) { 
         r.run(); 
     } 
 }

在这种情况下,如果向Executor中提交一个任务,会立即进行执行。其缺点在于,任务的执行不是异步的,某个线程向Executor中提交一个任务之后,必须等到这个线程执行完成才能继续往下执行。

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor { 
    public void execute(Runnable r) { 
        new Thread(r).start(); 
    } 
}

在这种情况下,调用者线程提交完任务之后,由于是启动一个新的线程来完成这个任务。因此调用者线程可以立即去执行接下来的代码。

不过这种实现也存在一些问题,如果同时提交的大量的任务,必定会创建大量的线程,线程的上下文切换、创建与销毁,都是很浪费资源的,因此这种设计方式可能会超出计算机的处理能力。

所以又有了以下的改进版,executor中维护了一定数量的线程,而提交的任务到放到一个任务队列中。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Created by TIANSHOUZHI336 on 2016/7/31.
 */
public class SimpleThreadPoolExecutor implements Executor{

    private BlockingQueue<Runnable> taskQueue = null;//任务队列
    private List<WorkerThread> threads = new ArrayList<WorkerThread>();//线程池
    private boolean isStopped = false;

    public SimpleThreadPoolExecutor(int noOfThreads, int maxNoOfTasks ) {
        taskQueue = new LinkedBlockingQueue<Runnable>();
        for (int i =0; i<noOfThreads; i++) {
            threads.add(new WorkerThread(taskQueue));
        }
        for (WorkerThread thread : threads) {
            thread.start();
        }
    }

    public  synchronized void execute(Runnable task) {
        if(this .isStopped ) throw
                new IllegalStateException("SimpleThreadPoolExecutor is stopped" );

        this.taskQueue .add(task);
    }

    public synchronized void stop() {
        this.isStopped = true;
        for (WorkerThread thread : threads) {
            thread.stop();//循环中断每一个线程
        }
    }

}

class WorkerThread extends Thread {

    private BlockingQueue<Runnable> taskQueue = null;
    private boolean  isStopped = false;

    public WorkerThread(BlockingQueue<Runnable> queue) {
        taskQueue = queue ;
    }

    public void run() {
        //因为需要不断从的任务列出中取出task执行,因此需要放在一个循环中,否则线程对象执行完一个任务就会立刻结束
        while (!isStopped()) {
            try {
                Runnable runnable =taskQueue .take();
                runnable.run();
            } catch(Exception e ) {
                // 写日志或者报告异常,
                // 但保持线程池运行.
            }
        }
    }

    public synchronized void toStop() {
        isStopped = true ;
        this.interrupt(); //如果线程正在任务队列中获取任务,或者没有任务被阻塞,需要响应这个中断
    }

    public synchronized boolean isStopped() {
        return isStopped ;
    }
}

线程池的实现由两部分组成。类  SimpleThreadPoolExecutor是线程池的公开接口,而类 WorkerThread 用来实现执行任务的子线程。

为了执行一个任务,方法 SimpleThreadPoolExecutor .execute(Runnable r) 用 Runnable 的实现作为调用参数。在内部,Runnable 对象被放入 阻塞队列 (Blocking Queue),等待着被子线程取出队列。

一个空闲的 WorkerThread 线程会把 Runnable 对象从队列中取出并执行。你可以在 WorkerThread .run() 方法里看到这些代码。执行完毕后,WorkerThread 进入循环并且尝试从队列中再取出一个任务,直到线程终止。

调用 SimpleThreadPoolExecutor .stop() 方法可以停止 SimpleThreadPoolExecutor 。在内部,调用 stop 先会标记 isStopped 成员变量(为 true)。然后,线程池的每一个子线程都调用 WorkerThread .stop() 方法停止运行。注意,如果线程池的 execute() 在 stop() 之后调用,execute() 方法会抛出 IllegalStateException 异常。

子线程会在完成当前执行的任务后停止。注意 PoolThread.stop() 方法中调用了 this.interrupt()。它确保阻塞在 taskQueue.take() 里的处理等待状态的调用的线程能够跳出 等待状态。例如本例中,使用的LinkedBlockingQueue的take方法实现如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();//内部通过死循环尝试获取锁,每次循环都判断线程是否中断了,如果中断抛出异常
    try {
        while (count.get() == 0) {
            notEmpty.await();//如果任务数量为0,当前线程进入等待状态,等待signal信号
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

通常情况下,我们不需要自己实现Executor,java.util.concurrent包中,已经提供了默认的实现,如:ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。并且提供了一个工具类Executors,用于创建其实例。

网站声明:
本站部分内容来自网络,如您发现本站内容
侵害到您的利益,请联系本站管理员处理。
联系站长
373515719@qq.com
关于本站:
编程参考手册