多线程
Async
@Async(value = “指定自定义Bean线程池”) [根据不同的业务可以使用不同的线程池来做异步处理]
如果不指明使用的线程池,在SpringBoot 2.1.0之前默认使用的是SimpleAsyncTaskExecutor [一次性的异步执行器] 会随着请求数量的增加创建越来越多的线程,没有线程管理、也没有任务队列,很容易就会导致OOM(类似于直接通过Executors构造的线程池),而在SpringBoot2.1之后的版本之中Async使用的就是默认的TaskExecutors,有核心线程设置、但是线程等待队列、最大线程数都是Integer.MAX_VALUE,拒绝策略只有abortPolicy。如果我们不在配置文件之中添加相关的配置项,这两种方式都存在着创建大量的线程造成系统资源耗尽的问题。
(Async默认的线程池配置可以 在yaml / yaml之中可以添加spring.task.execution.xxx-xxx-xxx = 配置)
需要配合@EnableAsync使用、使用方法必须是public、不可以是static方法、必须是外部调用自己调用自己不行(做动态代理实现的多线程调用)
CompletableFuture
单单依靠Future没办法解决有上下文的异步线程任务,也就是下一个异步任务需要使用上一个异步结果的情况,也无法处理出现异常的情况。在这个背景下CompletableFuture在Java1.8出现了。
CompletableFuture实现了Future和CompletionStage (实现Future本身等同于FutureTask),新增的CompletionStage(利用函数式接口)帮助我们可以通过链式操作的方式管理和关联多个异步操作。
CompletionStage的关键方法:
- thenApply:用于对之前异步操作的结果进行转换
- thenAccept:用操作结果进行一些消费操作
- thenCombine:用于将两个CompletionStage结果直接合并
- thenCompose:用于创建一个新的stage,值由之前的stage推出来
CompletabelFuture常见方法
- supplyAsync、2. allOf / anyOf、3. get、4. join、5. thenApply、6. thenAccept、7. whenComplete
CompletableFuture结果获取
get方法
简单的来说get就是获取CompletableFuture的执行结果,并且处理获取执行结果所需要处理的异常 (InterruptedException、ExecutionException两大异常)
join方法
简单的来说join就是不处理异常版本的get,一般来说不会用join
getNow方法
简单的来说立刻获取future的结果,但是不会使程序进入阻塞的状态,并且不需要处理异常 (如果异步线程的工作已经完成,那么直接获取结果的值;如果异步线程的工作还没有完成,那么获得默认的 valuefAbsent )
isDone方法
判断当前的CompletableFuture对象是否是有返回值的,如果没有代表还没执行完毕
complete方法
如果CompletableFuture还没完成/发生异常的时候,那么可以通过方法直接赋值,并直接完成该CompletableFuture (当CompletableFuture是异常的时候,将会通过设置值直接结束异常结果)
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
i /= 0;
return i;
});
// do something main thread
System.out.println("main Thread Keep going");
try {
threadPart.complete(114514);
System.out.println(threadPart.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
completeExceptionally方法
如果CompletableFuture还没完成/已经得到正常结果时,那么可以通过方法直接赋异常对象,并直接完成该CompletableFuture (当CompletableFuture是正常结果时的时候,将会通过设置异常对象的值直接结束) [需要注意,和complete方法一起使用的时候,不管前后顺序如何,都会失效]
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
});
// do something main thread
System.out.println("main Thread Keep going");
try {
threadPart.completeExceptionally(new Throwable("不是,你也配?"));
System.out.println(threadPart.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
CompletableFuture构造方法
supplyAsync (异步执行有返回值的方法)
本质上是接受一个Supplier接口的对象作为参数,该Supplier提供get()方法用以生成结果。然后supplyAsync()以异步的方式来执行Supplier任务。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier);} public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}
runAsync (异步执行没有返回值的方法)
本质上是接受一个实现了Runnable的对象作为参数,然后返回一个CompletabeFuture,在遇到get调用的时候其实就是直接开一个线程去执行。
public static CompletableFuture<Void> runAsync (Runnable runnable) {return asyncRunStage(asyncPool, runnable)}; public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) {return asyncRunStage(screenExecutor(executor), runnable)};
如果不传入线程池,CompatableFuture将会使用它自身默认的ForkJoinPool线程池
如果传入线程池,CompatableFuture将会使用线程池来执行方法
asyncSupplyStage / asyncRunStage两个方法内部都是直接调用runAsync,创建异步线程 Thread 来进行处理
CompatableFuture的子任务构建 (结果回调)
whenComplete / whenCompleteAsync
参数是 函数式接口 BiConsumer ,可以接受 两个输入参数并且没有返回类型的方法,实际上第一个是上一个任务的返回值,第二个则是前面的链式可能发生了的异常,需要注意的是whenComplete不会将异常拦截下来处理掉,而是单纯的可以判断之前的链式有无异常
whenCompleteAsync:本身其实和whenComplete区别就是改成多线程执行,如果supplyAsync使用的是自定义的线程池这里用的就是自定义的,如果supplyAsync用的是默认的,这里就是默认的ForkJoinPool.commonPool()获取线程来执行。当然你可以使用有指定线程池whenCompleteAsync的重载方法
(如果whenComplete之中要消耗较大的资源,可以换成该方法,否则为了避免线程切换应当用普通whenComplete)
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
使用示例
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
// i /= 0;
return i;
}, commonThreadPool).whenComplete((res, exception) -> {
if (exception == null) {
System.out.println("父结果结束,返回:" + res);
}
});
// 继续处理其他业务
System.out.println("main Thread Keep going");
// 需要前置处理结果
try {
Integer i = result.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
结果:
// 如果成功执行
父任务 Executing ···
main Thread Keep going
父任务 Ending ···
父结果结束,返回:792944539
main working···
// 如果存在异常
父任务 Executing ···
main Thread Keep going
父任务 Ending ···
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
···
Caused by: java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
···
Caused by: java.lang.ArithmeticException: / by zero
thenApply / thenApplyAsync
参数只有一个 就是上一个异步工作的结果
跟whenComplete的主要区别在于,thenApply在遇到异常的时候是完全不会执行的,并且会导致后续的thenApply / handle 也不会执行,直到最后的whenComplete才会被 BiConsumer 拦截到异常正常执行
thenApplyAsync:本身其实和thenApplyAsync区别就是改成多线程执行,如果supplyAsync使用的是自定义的线程池这里用的就是自定义的,如果supplyAsync用的是默认的,这里就是默认的ForkJoinPool.commonPool()获取线程来执行。当然你可以使用有指定线程池thenApplyAsync的重载方法
(如果thenApply 之中要消耗较大的资源,可以换成该方法,否则为了避免线程切换应当用普通thenApply )
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
使用示例
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
}, commonThreadPool).thenApply(var -> {
System.out.println("handle1 execute");
// var /= 0;
return var;
}).thenApply(var -> {
System.out.println("handle2 execute");
return var;
});
// 继续处理其他业务
System.out.println("main Thread Keep going");
// 需要前置处理结果
try {
Integer i = result.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
handle / handleAsync
本质上参数是 BiConsumer 可以接受上一个链式的结果,也可以拦截到前面链式可能发生了的异常
与thenApply不一样的是,handle可以拦截异常,也就是不管前面thenApply或者是supplyAsync之中产生的异常,都可以被handle之中拦截到并做一些处理操作,需要注意的是handle一定会将异常消费掉 (需要注意的是如果要使用handle,那么必须要在handle内部对异常进行处理,否则将忽略!)
另外需要注意的一点是,如果有多个handle,只要在第一个handle之前出现过异常,那么在后续中的所有handle之中param都是null
handleAsync:本身其实和handleAsync区别就是改成多线程执行,如果supplyAsync使用的是自定义的线程池这里用的就是自定义的,如果supplyAsync用的是默认的,这里就是默认的ForkJoinPool.commonPool()获取线程来执行。当然你可以使用有指定线程池handleAsync的重载方法
(如果handle之中要消耗较大的资源,可以换成该方法,否则为了避免线程切换应当用普通handle
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
// i /= 0;
return i;
}, commonThreadPool).handle((var, exception) -> {
if (exception == null) {
System.out.println("handle execute···");
} else {
System.out.println("has exception:" + exception.getMessage());
}
return var;
});
// 继续处理其他业务
System.out.println("main Thread Keep going");
// 需要前置处理结果
try {
Integer i = result.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
结果 (说明使用handle的时候,会强制消费异常)
// 成功,没有异常
父任务 Executing ···
main Thread Keep going
父任务 Ending ···
handle execute···
main working···
// 出现异常时
父任务 Executing ···
main Thread Keep going
父任务 Ending ···
has exception:java.lang.ArithmeticException: / by zero
main working···
CompletableFuture (结果消费)
thenAccept / thenAcceptAsync
与thenApply有一些相似,都是只接受一个参数,就是前面链式执行下来的结果,但不同的点在于thenAccept是完全消费作为目标的方法,它直接针对前面链式的结果进行消费,并且只返回一个空的CompletableFuture对象,该CompletabelFuture接受到的内容是void类型。最终的值其实是null
thenAcceptAsync:本身其实和thenAccept区别就是改成多线程执行,如果supplyAsync使用的是自定义的线程池这里用的就是自定义的,如果supplyAsync用的是默认的,这里就是默认的ForkJoinPool.commonPool()获取线程来执行。当然你可以使用有指定线程池thenAcceptAsync的重载方法
(如果thenAccept之中要消耗较大的资源,可以换成该方法,否则为了避免线程切换应当用普通thenAccept
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
}, commonThreadPool).thenAccept(param -> {
System.out.println("结束咧" + param);
});
// 继续处理其他业务
System.out.println("main Thread Keep going");
try {
Void unused = voidCompletableFuture.get();
System.out.println(unused);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
thenRun / thenRunAsync
简单的来说thenRun的目标其实是在CompletableFuture完成某个动作之后,执行某一个Runnable,并且他不关心CompletableFuture执行的结果是成功还是异常,都会执行该Runnable,并返回一个新的CompletableFuture
thenRunAsync:本身其实和thenRun区别就是改成多线程执行,如果supplyAsync使用的是自定义的线程池这里用的就是自定义的,如果supplyAsync用的是默认的,这里就是默认的ForkJoinPool.commonPool()获取线程来执行。当然你可以使用有指定线程池thenRunAsync的重载方法
(如果thenRun之中要消耗较大的资源,可以换成该方法,否则为了避免线程切换应当用普通thenRun
示例代码
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun( () -> System.out.println("Computation finished.") );
CompletableFuture异常消费
handle / handleAsync
前面说过这里不再赘述
exceptionally
只有在前面的链式调用之中出现了抛出的异常的时候,才会被触发,并且需要注意的是,该方法具有返回值并且会针对异常进行消费。一般用在如果出现异常就给结果填充一个默认的返回值的场景下
代码示例
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
i /= 0;
return i;
}).exceptionally((Throwable throwable) -> {
return new Integer(-1);
});
// do something main thread
System.out.println("main Thread Keep going");
try {
System.out.println(Optional.ofNullable(threadPart.get()).map(now -> now.toString()).orElseThrow(() -> new Exception("NullPointer")));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
CompletableFuture (多任务处理)
多任务And处理
allOf
组合多个异步任务(不限于两个异步任务),是他们都全部完成之后,才会继续往下,在执行get / join获取的是装载null的CompletableFuture对象。 (不接受多个异步任务的执行结果)
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,这里可以拦截拦截线程
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务1 Ending ···");
return i;
});
CompletableFuture<Integer> threadPart2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,可以拦截当前的线程使其先进入休眠状态
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务2 Ending···");
return i;
});
CompletableFuture<Integer> threadPart3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,可以拦截当前的线程使其先进入休眠状态
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务3 Ending···");
return i;
});
// do something main thread
System.out.println("main Thread Keep going");
try {
String resultMsg = CompletableFuture.allOf(threadPart1, threadPart2, threadPart3).handle((completeRes, exception) -> {
String execute = "";
if (exception != null) {
System.out.println("异常结果如下");
execute += exception.getMessage();
} else {
System.out.println("结果成功执行");
execute += completeRes;
}
return execute;
}).get();
System.out.println(resultMsg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
thenCompose / thenComposeAsync
将两个异步任务形成改为前后顺序执行,可归并成同一个线程进行处理,也可以拆开两个线程处理
我们针对多个不同的线程任务的时候,可以通过使用thenCompose组合起来一起来执行。实际效果跟thenApply非常相近,就是在某个任务结束之后将返回值作为参数传入另外一个指定的方法,并最终返回一个新的CompletableFuture (跟thenApply不一样的是,thenApply实际上是返回的是CompletabelFuture
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
}).whenComplete((var, exception) -> {
if (exception != null) {
System.out.println("不是,哥们,发生异常了?");
}
});
CompletableFuture<Integer> threadPart2 = threadPart1.thenCompose(params -> getThreadPart2(params));
// 继续处理其他业务
System.out.println("main Thread Keep going");
try {
Integer threadPart1Res = threadPart1.get();
Integer threadPart2Res = threadPart2.get();
System.out.println("Thread Part1 Result: " + threadPart1Res);
System.out.println("Thread Part2 Result: " + threadPart2Res);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
private static CompletableFuture<Integer> getThreadPart2(Integer threadPart1) {
CompletableFuture<Integer> threadPart2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
i = threadPart1.compareTo(i) > 0 ? i : threadPart1; // 取小的
// 为了验证是否是多线程实现的功能,可以拦截当前的线程使其先进入休眠状态
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务2 Ending···");
return i;
}).whenComplete((var, exception) -> {
if (exception != null) {
System.out.println("不是,哥们,发生异常了?");
}
});
return threadPart2;
}
thenCombine / thenCombineAsyc
组合两个并行执行的任务,实际效果是在被组合的两个任务一起都完成之后,针对他们两个的工作可以做处理。(所以前提是两个都能正常执行,并且能成功执行结束,才会进行下一步工作)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
}).whenComplete((var, exception) -> {
if (exception != null) {
System.out.println("不是,哥们,发生异常了?");
}
});
CompletableFuture<Integer> threadPart2 = getThreadPartTwo();
CompletableFuture<String> thenCombineRes = threadPart1.thenCombine(threadPart2, (res1, res2) -> "Thread1:" + res1 + " Thread2:" + res2);
// 继续处理其他业务
System.out.println("main Thread Keep going");
try {
String res = thenCombineRes.get();
System.out.println(res);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
thenAcceptBoth / thenAcceptBothAsync
thenCombine相比最大的区别在于不做返回,但被合并的两个线程都执行结束会拿到他们的结果进行处理,但不会对外进行输出,相当于thenAccept一样本质上是直接消费的函数。(所以前提是两个都能正常执行,并且能成功执行结束,才会进行下一步工作)
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
});
// 继续处理其他业务
System.out.println("main Thread Keep going");
try {
// 一样要调用get / join,否则会不执行
threadPart1.thenAcceptBoth(getThreadPartTwo(), (res1,res2) -> {
System.out.println("Thread1:" + res1 + " Thread2:" + res2);
}).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
runAfterBoth / runAfterBothAsync
runAfterBoth相比thenCombine的区别在于,runAfterBoth既不要两个异步线程的执行结果,也不会返回自身的结果,只是单纯的执行自己的线程任务,往往是要在两个结果都成功的结果上需要做一些操作的时候,才会通过使用runAfterBoth来实现某些功能。
示例代码
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,这里可以拦截拦截线程
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
});
CompletableFuture<Integer> threadPart2 = getThreadPartTwo();
// 继续处理其他业务
System.out.println("main Thread Keep going");
try {
threadPart1.runAfterBoth(threadPart2, () -> {
System.out.println("所有都执行结束了");
}).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// 后面的业务处理
System.out.println("main working···");
commonThreadPool.shutdown();
}
多任务Or处理
anyOf
组合多个异步任务(不限于两个异步任务),只要其中一个完成了,就会往下执行,不再等待其他的执行完毕,在执行get / join获取的是装载null的CompletableFuture对象。 (不接受多个异步任务的执行结果)
代码示例
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,这里可以拦截拦截线程
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务1 Ending ···");
return i;
});
CompletableFuture<Integer> threadPart2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,可以拦截当前的线程使其先进入休眠状态
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务2 Ending···");
return i;
});
CompletableFuture<Integer> threadPart3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现的功能,可以拦截当前的线程使其先进入休眠状态
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务3 Ending···");
return i;
});
// do something main thread
System.out.println("main Thread Keep going");
try {
String resultMsg = CompletableFuture.anyOf(threadPart1, threadPart2, threadPart3).handle((completeRes, exception) -> {
String execute = "";
if (exception != null) {
System.out.println("异常结果如下");
execute += exception.getMessage();
} else {
System.out.println("某一个执行成功了!");
execute += completeRes;
}
return execute;
}).get();
System.out.println(resultMsg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
执行结果
任务2 Executing ···
任务3 Executing ···
任务1 Executing ···
main Thread Keep going
任务3 Ending···
某一个执行成功了!
-1641673675
main working···
applyToEither / applyToEitherAsync
在组合的两个异步任务之中,谁先完成取谁的结果作为参数,并对其结果进行处理,最终返回一个CompletabelFuture
代码示例
public static void main(String[] args) {
ExecutorService commonThreadPool = SingletonBeanSample.SAMPLE.getBean();
CompletableFuture<Integer> threadPart1 = CompletableFuture.supplyAsync(() -> {
System.out.println("父任务 Executing ···");
// do something get result
int i = ThreadLocalRandom.current().nextInt();
// 为了验证是否是多线程实现,这里可以拦截当前线程使其休眠
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("父任务 Ending ···");
return i;
});
CompletableFuture<Integer> firstRes = threadPart1.applyToEither(getThreadPartTwo(), (res1) -> {
System.out.println("First finsh res:" + res1);
return res1;
});
// do something main thread
System.out.println("main Thread Keep going");
try {
String resultMsg = firstRes.get().toString();
// System.out.println(resultMsg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// do something main thread
System.out.println("main working···");
commonThreadPool.shutdown();
}
执行结果
任务2 Executing ···
父任务 Executing ···
main Thread Keep going
任务2 Ending···
First finsh res:-53303121
main working···
acceptToEither / acceptToEitherAsync
跟thenAcceptBoth与thenCombine的效果类似,相比于applyToEither一方面接受两个异步线程两者任意先执行完毕一方的执行结果,但不返回自身执行结果
(偷懒,不写示例了···)
runAfterEither / runAfterEitherAsync
跟runAfterBoth与thenCombine的效果类似,相比于applyToEither一方面不接受两个异步线程两者任意先执行完毕一方的执行结果,也没有返回值
(偷懒,不写示例了···)