上一节我们已经提到,Future
是用来执行任务的结果,JDK自带的Future实现FutureTask
,只能同步等待结果,当get方法被调用的时候,当前线程就会被阻塞,一直到任务执行完成,或者一直等待到超时。
在Netty中,提供了另外一种Future实现(ChannelFuture
),其是异步的,也就是说不需要同步等待执行结果,其可以在任务执行完成之后,回调用户指定的方法,以告诉我们任务的结果。
当然,这里并不是会去分析Netty中这种ChannelFuture的实现源码,我们只是要实现一个类似功能的Future实现,在这里称之为SmartFuture
。其支持同步等待返回结果,也支持异步通知结果。由于FutureTask已经提供了同步等待的功能,所以我们只需要让我们的SmartFuture继承FutureTask,再添加相关异步功能的方法即可。
SmartFuture源码:
public class SmartFuture<V> extends FutureTask<V>{ //异步通知的listener private Set<SmartFutureListener> listeners=null; //任务运行结果 Object result=null; private boolean hasResult; public SmartFuture(Callable<V> callable) { super(callable); listeners=new CopyOnWriteArraySet<SmartFutureListener>(); } public SmartFuture(Runnable runnable, V result) { super(runnable, result); listeners=new CopyOnWriteArraySet<SmartFutureListener>(); } public void addListener(SmartFutureListener listener){ if(listener==null){ throw new NullPointerException(); } if(hasResult){//如果添加listener的时候,任务已经执行完成,直接回调listener notifyListener(listener); }else{//如果任务没有执行完成,添加到监听队列 listeners.add(listener); } } //覆写set方法,结果运行成功 @Override protected void set(V v) { super.set(v); result=v; hasResult=true; notifyListeners(); } //覆写 setException方法 @Override protected void setException(Throwable t) { super.setException(t); result=t; hasResult=true; notifyListeners(); } //回调 private void notifyListeners() { for (SmartFutureListener listener : listeners) { notifyListener(listener); } } private void notifyListener(SmartFutureListener listener) { if(result instanceof Throwable){ listener.onError((Throwable) result); }else{ listener.onSuccess(result); } listeners=null; } }
用户在往线程池中提交任务后,可以获取到SmartFuture对象,通过调用其addListener
方法,添加监听器,SmartFuture 在执行完成时,会调用SmartFutureListener
的指定方法:
public interface SmartFutureListener<V> { public void onSuccess(V result); public void onError(Throwable throwable); }
SmartThreadExecutorPool:
要实现异步监听的Future,我们除了要实现Future对象,还要自己实现一个线程池,因为如果我们直接使用ThreadExecutorPool
提交任务(Callable,Runnable),其还是会将其包装成一个FutureTask对象,这是通过newFutureTask
方法创建的,我们可以对相关方法进行覆盖,使得返回的对象是SmartFuture
public class SmartThreadExecutorPool extends ThreadPoolExecutor{ public SmartThreadExecutorPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new SmartFuture<T>(runnable,value); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new SmartFuture<T>(callable); } //覆写这三个方法只是为了用户在使用的时候,方便一点,不需要将Future强转为SmartFuture @Override public SmartFuture<?> submit(Runnable task) { return (SmartFuture<?>) super.submit(task); } @Override public <T> SmartFuture<T> submit(Runnable task, T result) { return (SmartFuture<T>) super.submit(task, result); } @Override public <T> SmartFuture<T> submit(Callable<T> task) { return (SmartFuture<T>) super.submit(task); } }
测试代码:SmartFutureTest
public class SmartFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { SmartThreadExecutorPool smartThreadExecutorPool = new SmartThreadExecutorPool(5,10,10, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); //提交一个任务 SmartFuture<String> smartFuture = smartThreadExecutorPool.submit(new Callable<String>() { @Override public String call() throws Exception { return "当前时间:" + System.currentTimeMillis(); } }); smartFuture.addListener(new SmartFutureListener<String>() { @Override public void onSuccess(String result) { System.out.println("异步回调成功,"+result); } @Override public void onError(Throwable throwable) { System.out.println("异步回调失败,"+throwable); } }); String syncResult = smartFuture.get(); System.out.println("同步回调成功:"+syncResult); } }
运行程序,输出:
异步回调成功,当前时间:1484233401344 同步回调成功:当前时间:1484233401344 |