四、异步回调方法
1、thenRun/thenRunAsync
通俗点讲就是,做完第一个任务后,再做第二个任务,这两个任务没有关联关系,第二个任务也没有返回值。
示例
@Test public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> { try { //执行任务A Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> cp2 = cp1.thenRun(() -> { try { //执行任务B Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } }); // get方法测试 System.out.println(cp2.get()); //模拟主程序耗时时间 Thread.sleep(600); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } //运行结果 /** * null * 总共用时1610ms */
thenRun 和thenRunAsync有什么区别呢?
如果你执行第一个任务的时候,传入了一个自定义线程池:
-
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。 -
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
说明
: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。
2、thenAccept/thenAcceptAsync
第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
示例
@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一个任务的返回结果为: " + a); }); cp2.get(); }
3、 thenApply/thenApplyAsync
表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
示例
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if(Objects.equals(a,"dev")){ return "dev"; } return "prod"; }); System.out.println("当前环境为:" + cp1.get()); //输出: 当前环境为:dev }
五、异常回调
当CompletableFuture的任务不论是正常完成还是出现异常它都会调用whenComplete这回调函数。
-
正常完成:whenComplete返回结果和上级任务一致,异常为null; -
出现异常:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。
下面来看看示例
1、只用whenComplete
@Test public void testCompletableWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("正常结束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }); System.out.println("最终返回的结果 = " + future.get()); }
正常完成,没有异常时:
正常结束 whenComplete aDouble is 0.11 whenComplete throwable is null 最终返回的结果 = 0.11
出现异常时:get()会抛出异常
whenComplete aDouble is null whenComplete throwable is java.lang.RuntimeException: 出错了 java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
2、whenComplete + exceptionally示例
@Test public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出错了"); } System.out.println("正常结束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }).exceptionally((throwable) -> { System.out.println("exceptionally中异常:" + throwable.getMessage()); return 0.0; }); System.out.println("最终返回的结果 = " + future.get()); }
当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。
whenComplete aDouble is null whenComplete throwable is java.lang.RuntimeException: 出错了 exceptionally中异常:java.lang.RuntimeException: 出错了 最终返回的结果 = 0.0
六、多任务组合回调
1、AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:当任务一和任务二都完成再执行任务三。
区别在于:
-
runAfterBoth 不会把执行结果当做方法入参,且没有返回值 -
thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值 -
thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
示例
@Test public void testCompletableThenCombine() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务2结束"); return result; }, executorService); //任务组合 CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> { System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId()); System.out.println("任务1返回值:" + f1); System.out.println("任务2返回值:" + f2); return f1 + f2; }, executorService); Integer res = task3.get(); System.out.println("最终结果:" + res); }
运行结果
异步任务1,当前线程是:17 异步任务1结束 异步任务2,当前线程是:18 异步任务2结束 执行任务3,当前线程是:19 任务1返回值:2 任务2返回值:2 最终结果:4
2、OR组合关系
applyToEither / acceptEither / runAfterEither 都表示:两个任务,只要有一个任务完成,就执行任务三。
区别在于:
-
runAfterEither:不会把执行结果当做方法入参,且没有返回值 -
acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值 -
applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
示例
@Test public void testCompletableEitherAsync() { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务2结束"); return result; }, executorService); //任务组合 task.acceptEitherAsync(task2, (res) -> { System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId()); System.out.println("上一个任务的结果为:"+res); }, executorService); }
运行结果
//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果 异步任务1,当前线程是:17 异步任务1结束 异步任务2,当前线程是:18 执行任务3,当前线程是:19 上一个任务的结果为:2
注意
如果把上面的核心线程数改为1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。
异步任务1,当前线程是:17 异步任务1结束 异步任务2,当前线程是:17
3、多任务组合
-
allOf:等待所有任务完成 -
anyOf:只要有一个任务完成
示例
allOf:等待所有任务完成
@Test public void testCompletableAallOf() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("异步任务1结束"); return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务2结束"); return result; }, executorService); //开启异步任务3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId()); int result = 1 + 3; try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务3结束"); return result; }, executorService); //任务组合 CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3); //等待所有任务完成 allOf.get(); //获取任务的返回结果 System.out.println("task结果为:" + task.get()); System.out.println("task2结果为:" + task2.get()); System.out.println("task3结果为:" + task3.get()); }
循环里异步任务,并等待所有任务完成
List<CompletableFuture<?>> futureAll = new ArrayList<>(); for (int i = 0; i < 20; i++) { futureAll.add( CompletableFuture.supplyAsync(() -> { //System.out.println("异步任务"+i); return i; }) ); } //等待全部执行完成 CompletableFuture.allOf(futureAll.toArray(new CompletableFuture[0])).join();
anyOf: 只要有一个任务完成
@Test public void testCompletableAnyOf() throws ExecutionException, InterruptedException { //创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启异步任务1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { int result = 1 + 1; return result; }, executorService); //开启异步任务2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { int result = 1 + 2; return result; }, executorService); //开启异步任务3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { int result = 1 + 3; return result; }, executorService); //任务组合 CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3); //只要有一个有任务完成 Object o = anyOf.get(); System.out.println("完成的任务的结果:" + o); }
七、CompletableFuture使用有哪些注意点
CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。
1、Future需要获取返回值,才能获取异常信息
@Test public void testWhenCompleteExceptionally() { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("出错了"); } return 0.11; }); //如果不加 get()方法这一行,看不到异常信息 //future.get(); }
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。
小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。
2、CompletableFuture的get()方法是阻塞的。
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
//反例 CompletableFuture.get(); //正例 CompletableFuture.get(5, TimeUnit.SECONDS);
3、不建议使用默认线程池
CompletableFuture代码中又使用了默认的ForkJoin线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用
自定义线程池,优化线程池配置参数。
4、自定义线程池时,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好
使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。
文章评论