java基本知識手冊(Java8的異步利器)
2023-04-13 01:05:57 2
completableFuture 是JDK1.8版本新引入的類。下面是這個類:
實現了倆接口,本身是個class。這個是Future的實現類,使用 completionStage 接口去支持完成時觸發的函數和操作。
一個 completetableFuture 就代表了一個任務,他能用Future的方法,還能做一些之前說的 ExecutorService 配合 futures 做不了的。
之前future需要等待isDone為true才能知道任務跑完了,或者就是用get方法調用的時候會出現阻塞,而使用 completableFuture 的使用就可以用then,when等等操作來防止以上的阻塞和輪詢isDone的現象出現。
1.創建CompletableFuture直接new對象。一個 completableFuture 對象代表著一個任務,這個對象能跟這個任務產生聯繫。
下面用的 complete 方法意思就是這個任務完成了需要返回的結果,然後用 get 方法可以獲取到。
在本文的 CompletableFuture 中大量地使用了這些函數式接口。
註:這些聲明大量應用於方法的入參中,像 thenApply 和 thenAccept 這倆就是一個用Function一個用Consumer
而lambda函數正好是可以作為這些接口的實現。例如 s->{return 1;} 這個就相當於一個Function。因為有入參和返回結果。
(1)Function
(2)Consumer
對於前面有Bi的就是這樣的,BiConsumer就是兩個參數的。
(3)Predicate這個接口聲明是一個入參,返回一個boolean。
(4)supplier
帶有Async就是異步執行的意思、也是一個 completableFuture 對象代表著一個任務這個原則。
這種異步方法都可以指定一個線程池作為任務的運行環境,如果沒有指定就會使用 forkJoinPool 線程池來執行
(1) supplyAsync&runAsync 的使用例子。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool; executorService.submit(new Callable { @Override public Object call throws Exception { System.out.println("executorService 是否為守護線程 :" Thread.currentThread.isDaemon); return null; } }); final CompletableFuture completableFuture = CompletableFuture.supplyAsync( -> { System.out.println("this is lambda supplyAsync"); System.out.println("supplyAsync 是否為守護線程 " Thread.currentThread.isDaemon); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace; } System.out.println("this lambda is executed by forkJoinPool"); return "result1"; }); final CompletableFuture future = CompletableFuture.supplyAsync( -> { System.out.println("this is task with executor");System.out.println("supplyAsync 使用executorService 時是否為守護線程 : " Thread.currentThread.isDaemon); return "result2"; }, executorService); System.out.println(completableFuture.get); System.out.println(future.get); executorService.shutdown;}
這些任務中帶有supply是持有返回值的,run是void返回值的,在玩supply時發現一個問題:如果使用supplyAsync任務時不使用任務的返回值,即 不用get方法阻塞主線程會導致任務執行中斷。
註:跟get方法無關,後面有答案
然後我開始探索是否是只有 supplyAsync 是這樣。我測試了 runAsync 發現也是這樣。
下圖為與 supplyAsync 任務執行不全面一樣的問題,我甚至測試了將lambda換成runnable發現無濟於事。
造成這個原因是因為Daemon。因為 completableFuture 這套使用異步任務的操作都是創建成了守護線程,那麼我們沒有調用get方法不阻塞這個主線程的時候。主線程執行完畢,所有線程執行完畢就會導致一個問題,就是守護線程退出。
那麼我們沒有執行的代碼就是因為主線程不再跑任務而關閉導致的,可能這個不叫問題,因為在開發中我們主線程常常是一直開著的。但是這個小問題同樣讓我想了好久。
下面我們開一個非守護線程,可以看到程序執行順利。
下面證實守護線程在其他非守護線程全部退出的情況下不繼續執行。
final CompletableFuture completableFuture = CompletableFuture.supplyAsync( -> { System.out.println("this is lambda supplyAsync"); System.out.println("supplyAsync 是否為守護線程 " Thread.currentThread.isDaemon); try { TimeUnit.SECONDS.sleep(1); try(BufferedWriter writer = new BufferedWriter (new OutputStreamWriter(new FileOutputStream(new File("/Users/zhangyong/Desktop/temp/out.txt"))))){ writer.write("this is completableFuture daemon test"); }catch (Exception e){ System.out.println("exception find"); } } catch (InterruptedException e) { e.printStackTrace; } System.out.println("this lambda is executed by forkJoinPool"); return "result1";});
這個代碼就是操作本地文件,並且sleep了一秒。其他線程就一句控制臺輸出的代碼,最終的結果是文件沒有任何變化。
當我把主線程 sleep 5 秒時,本地文件會寫入一句 this is completableFuture daemon test 驗證成功。
(2)allOf&anyOf
這兩個方法的入參是一個 completableFuture 組、allOf就是所有任務都完成時返回,但是是個Void的返回值。
anyOf是當入參的 completableFuture 組中有一個任務執行完畢就返回,返回結果是第一個完成的任務的結果。
public static void otherStaticMethod throws ExecutionException, InterruptedException { final CompletableFuture futureOne = CompletableFuture.supplyAsync( -> { try { Thread.sleep(3000); } catch (InterruptedException e) { System.out.println("futureOne InterruptedException"); } return "futureOneResult"; }); final CompletableFuture futureTwo = CompletableFuture.supplyAsync( -> { try { Thread.sleep(6000); } catch (InterruptedException e) { System.out.println("futureTwo InterruptedException"); } return "futureTwoResult"; }); CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo); System.out.println(future.get);// CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);// System.out.println(completableFuture.get); }
(3) completedFuture 這個方法我沒懂他是幹啥的,源碼就是返回一個值。感覺沒啥意義。
(4)取值方法,除了get還有一個 getNow; 這個就比較特殊了。
這個方法是執行這個方法的時候任務執行完了就返回任務的結果,如果任務沒有執行完就返回你的入參。
(5)join方法跟線程的join用法差不多。
(6) whenXXX ,在一個任務執行完成之後調用的方法。
這個有三個名差不多的方法: whenComplete 、 whenCompleteAsync 、還有一個是 whenCompleteAsync 用自定義 Executor
首先看一下這個 whenComplete 實例方法。這個就是任務執行完畢調用的,傳入一個action,這個方法的執行線程是當前線程,意味著會阻塞當前線程。
下面圖中test的輸出跟 whenComplete 方法運行的線程有關,運行到main線程就會阻塞test的輸出,運行的是 completableFuture 線程則不會阻塞住test的輸出。
下面是任務執行的線程的探索。
根據測試得出的結論是:如果調用 whenComplete 的中途,還發生了其他事情,圖中的主線程的 sleep(400); 導致 completableFuture 這個任務執行完畢了,那麼就使用主線程調用。
如果調用的中途沒有發生其他任務且在觸碰到 whenComplete 方法時 completableFuture 這個任務還沒有徹底執行完畢那麼就會用 completableFuture 這個任務所使用的線程。
下面是 whenCompleteAsync 方法。這個方法就是新創建一個異步線程執行。所以不會阻塞。
(7) then方法瞅著挺多的,實際上就是異不異步和加不加自定義 Executor
註: whenComplete 中出現的問題在then中測試不存在、使用的就是上一個任務的線程。這個 thenCompose 就是一個任務執行完之後可以用它的返回結果接著執行的方法,方法返回的是另一個你期盼泛型的結果。
compose 理解就是上一個任務結果是then的一部分。
下面介紹一下 thenCombine
這個 combine 的理解就是結合兩個任務的結果。
綜上:這個線程的問題並不是大問題,只要你不用線程來做判斷條件,他並不會影響你的效率。試想pool線程都執行完了就用主線程跑唄。沒跑完,而使你等了那你就用pool線程唄。
thenRun就是這個任務運行完,再運行下一個任務,感覺像是join了一下。
其餘不再介紹,大同小異。
像 thenApply(Function); 這樣的就是有入參有返回值類型的。
像 thenAccept(Consumer); 這樣的就是有入參,但是沒有返回值的。詳情在上文中有過關於函數式接口的敘述。
,