前几天看hdfs QJM的代码,里面看到一个ListenableFuture,说实话对于Java,目前我还只是通过看代码,遇到没见过的再去查的方式,也着实是没有时间和精力再去通篇研读诸如《thinking in Java》这样的大砖块了,现在这样的方式,目前来说应该是够用了。重点还是放在系统和业务上,语言本身本不应该成为障碍。言归正传,回到ListenableFuture, 在网上看了一下相关的资料,把它的来龙去脉了解了一下,在这里记录一下。
前面提到的ListenableFuture, 是google开源的自己的Java Library Guava(http://code.google.com/p/guava-libraries/)中的一个模块,它本身是继承是Java的Future。严格来讲,Future是一种Design Pattern, 它本身跟语言是没有关系的。最新的C++11中,也加入了Future的支持,不过笔者以前写C++的时候,C++11还没正式发布,加上它本身也是比较新的,主流的编译器支持的本身也不是很好,因此只是知道它的存在,并没有去研究过它是做什么的,怎么来使用的。接下来,我会通过一些Java的Future的例子,来一步步介绍Future及其用法。
简单来讲,Future是这样一种Pattern: 它本身表示‘将来(future)’,你提交一个异步的任务,比如提交到一个threadpool,与此同时拿到一个Future对象,任务的执行是异步的,这时候你可以去做其它的事情,等到异步任务结束的时候,你可通过前面的Future对象拿到异步执行的任务的结果。下面通过一个简单的例子来直观感受一下Future:
123456789101112131415161718192021222324252627282930 | import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.FutureTask; public class FutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Future< String> future = executor.submit(new Callable< String>() { public String call() { return "Hello futue!"; } }); try { Thread.sleep(1000); System.out.println(future.get()); } catch (InterruptedException e) { future.cancel(true); } catch (ExecutionException e) { future.cancel(true); } finally { executor.shutdown(); } }} |
在上面的程序中,和我注意到第20行的Thread.sleep(1000),这里的原本的含义是在异步任务执行期间,原线程可以做任何事情,不会阻塞。这里简单用了sleep,但要表达的意思是清楚的。
看了上面的例子,细心的朋友总会有这样的疑问,Future要获取异步任务执行的结果,需要通过轮询或者阻塞等待的方式,这样的方式,总显得不太‘完美’,我们知道,比较好的方式,应该是异步执行结束后,自动通知用户异步任务结束了,你可以通过Future来获取执行结果了。这就诞生了google的ListenableFuture,用户可以向它注册一个回调函数和提供一个线程池(可选),当异步任务执行结束后,它会自动在用户提供的线程池里调用用户注册的回调函数,通知用户异步任务执行结束了。当然,如果用户不提供线程池,它会在运行异步任务的工作线程里运行回调函数,这种情况适用于工作线程本身的任务比较轻量级的情景。下面通过几个例子,说明ListenableFuture的具体的用法:
12345678910111213141516171819202122232425262728293031323334353637 | import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors; import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import com.google.common.util.concurrent.ListeningExecutorService;import com.google.common.util.concurrent.MoreExecutors; public class ListenableFutureTest { public static void main(String[] args) { ListeningExecutorService executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(1)); final ListenableFuture< String> future = executor.submit( new Callable< String>() { public String call() throws Exception { return "Hello listenable future"; } }); future.addListener(new Runnable() { public void run() { try { System.out.println(future.get()); } catch (InterruptedException e) { future.cancel(true); } catch (ExecutionException e) { future.cancel(true); } } }, Executors.newFixedThreadPool(1)); System.out.println("exit.."); }} |
上面的程序中,第35行的System.out.println(“exit..”);可能会先于“Hello listenable future” print到stdout上,这也间接能说明回调函数的执行是在别的线程中的。另外,在上面的例子中,我们发现,用户需要在注册的回调函数中处理InterruptedException和ExecutionException, 显得略为麻烦。这里Guava还提供了另为一种使用方式,接口上来看,更加清晰一些,下面是这种方式的使用的一个例子:
123456789101112131415161718192021222324252627282930313233343536 | import java.util.concurrent.Callable;import java.util.concurrent.Executors; import com.google.common.util.concurrent.FutureCallback;import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import com.google.common.util.concurrent.ListeningExecutorService;import com.google.common.util.concurrent.MoreExecutors; public class ListenableFutureTest2 { public static void main(String[] args) { ListeningExecutorService executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(1)); final ListenableFuture< String> future = executor.submit( new Callable< String>() { public String call() throws Exception { return "Hello listenable future"; } }); Futures.addCallback(future, new FutureCallback< String>() { public void onSuccess(String result) { System.out.println(result); }k public void onFailure(Throwable t) { System.out.println("error: " + t); } }, Executors.newFixedThreadPool(1)); System.out.println("exit.."); }} |
上面几个例子中,都显式的提供了用户线程池,用来执行回调函数。用户也可以不提供线程,或者可以通过 MoreExecutors.sameThreadExecutor()把当前线程传进去,用来执行回函数。
http://www.wuzesheng.com/?p=2485