本文將介紹 java.util.concurrent.Phar,一個常常被大家忽略的并發工具。它和 CyclicBarrier 以及 CountDownLatch很像,但是使用上更加的靈活,本文會進行一些對比介紹。
和之前的文章不同,本文不寫源碼分析了,就只是從各個角度介紹下它是怎么用的。本文比較簡單,我覺得對于初學者大概需要 20 分鐘左右吧。
其實我對這個需要多少時間很沒概念,有沒有讀者愿意記錄下所花費的時間,在評論區反饋一下。
使用示例
我們來實現一個小需求,啟動 10 個線程執行任務,由于啟動時間有先后,我們希望等到所有的線程都啟動成功以后再開始執行,讓每個線程在同一個起跑線上開始執行業務操作。
下面,分別介紹 CountDownLatch、CyclicBarrier 和 Phar 怎么實現該需求。
1、這種 ca 最容易使用的就是 CountDownLatch,代碼很簡單:
// 1. 設置 count 為 1CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 10; i++) { new Thread(() -> { try { // 2. 每個線程都等在柵欄這里,等待放開柵欄,不會因為有些線程先啟動就先跑路了 latch.await(); // doWork(); } catch (InterruptedException ignore) { } }).start();}doSomethingEL(); // 確保在下面的代碼執行之前,上面每個線程都到了 await() 上。// 3. 放開柵欄latch.countDown();
簡單回顧一下 CountDownLatch 的原理:AQS 共享模式的典型使用,構造函數中的 1 是設置給 AQS 的 state 的。latch.await() 方法會阻塞,而 latch.countDown() 方法就是用來將 state-- 的,減到 0 以后,喚醒所有的阻塞在 await() 方法上的線程。
2、這種 ca 用 CyclicBarrier 來實現更簡單:
// 1. 構造函數中指定了 10 個 partiesCyclicBarrier barrier = new CyclicBarrier(10);for (int i = 0; i < 10; i++) { executorService.submit(() -> { try { // 2. 每個線程"報告"自己到了, // 當第10個線程到的時候,也就是所有的線程都到齊了,一起通過 barrier.await(); // doWork() } catch (InterruptedException | BrokenBarrierException ex) { ex.printStackTrace(); } });}
CyclicBarrier 的原理不是 AQS 的共享模式,是 AQS Condition 和 ReentrantLock 的結合使用
CyclicBarrier 可以被重復使用,我們這里只使用了一個周期,當第十個線程到了以后,所有的線程一起通過,此時開啟了新的一個周期,在 CyclicBarrier 中,周期用 generation 表示。
3、我們來介紹今天的主角 Phar,用 Phar 實現這個需求也很簡單:
Phar phar = new Phar();// 1. 注冊一個 partyphar.register();for (int i = 0; i < 10; i++) { phar.register(); executorService.submit(() -> { // 2. 每個線程到這里進行阻塞,等待所有線程到達柵欄 phar.arriveAndAwaitAdvance(); // doWork() });}phar.arriveAndAwaitAdvance();
Phar 比較靈活,它不需要在構造的時候指定固定數目的 parties,而 CountDownLatch 和 CyclicBarrier 需要在構造函數中明確指定一個數字。
我們可以看到,上面的代碼總共執行了 11 次 phar.register() ,可以把 11 理解為 CountDownLatch 中的 count 和 CyclicBarrier 中的 parties。
這樣讀者應該很容易理解 phar.arriveAndAwaitAdvance() 了,這是一個阻塞方法,直到該方法被調用 11 次,所有的線程才能同時通過。
這里和 CyclicBarrier 是一個意思,湊齊了所有的線程,一起通過柵欄。
Phar 也有周期的概念,一個周期定義為一個 pha,從 0 開始。
Phar 介紹
上面我們介紹了 Phar 中的兩個很重要的接口,register() 和 arriveAndAwaitAdvance(),這節我們來看它的其他的一些重要的接口使用。
畫一張圖壓著:
重要接口介紹
Phar 還是有 parties 概念的,但是它不需要在構造函數中指定,而是可以很靈活地動態增減。
我們來看 3 個代碼片段,看看 parties 是怎么來的。
1、首先是 Phar 有一個帶 parties 參數的構造方法:
public Phar(int parties) { this(null, parties);}
2、register() 方法:
public int register() { return doRegister(1);}
這個方法會使得 parties 加 1
3、bulkRegister(int parties) 方法:
public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) return getPha(); return doRegister(parties);}
一次注冊多個,這個方法會使得 parties 增加相應數值
parties 也可以減少,因為有些線程可能在執行過程中,不和大家玩了,會進行退出,調用 arriveAndDeregister() 即可,這個方法的名字已經說明了它的用途了。
再看一下這個圖,pha-1 結束的時候,黑色的線程離開了大家,此時就只有 3 個 parties 了。
這里說一下 Phar 的另一個概念 pha,它代表 Phar 中的周期或者叫階段,pha 從 0 開始,一直往上遞增。
通過調用 arrive() 或 arriveAndDeregister() 來標記有一個成員到達了一個 pha 的柵欄,當所有的成員都到達柵欄以后,開啟一個新的 pha。
這里我們來看看和 pha 相關的幾個方法:
1、arrive()
這個方法標記當前線程已經到達柵欄,但是該方法不會阻塞,注意,它不會阻塞。
大家要理解一點,party 本和線程是沒有關系的,不能說一個線程代表一個 party,因為我們完全可以在一個線程中重復調用 arrive() 方法。這么表達純粹是方便理解用。
2、arriveAndDeregister()
和上面的方法一樣,當前線程通過柵欄,非阻塞,但是它執行了 deregister 操作,意味著總的 parties 減 1。
3、arriveAndAwaitAdvance()
這個方法應該一目了然,就是等其他線程都到了柵欄上再一起通過,進入下一個 pha。
4、awaitAdvance(int pha)
這個方法需要指定 pha 參數,也就是說,當前線程會進行阻塞,直到指定的 pha 打開。
5、protected boolean onAdvance(int pha, registeredParties)
這個方法是 protected 的,所以它不是 phar 提供的 API,從方法名字上也可以看出,它會在一個 pha 結束的時候被調用。
它的返回值代表是否應該終結(terminate)一個 phar,之所以拿出來說,是因為我們經常會見到有人通過覆寫該方法來自定義 phar 的終結邏輯,如:
protected boolean onAdvance(int pha, int registeredParties) { return pha >= N || registeredParties == 0;}
1、我們可以通過 phar.isTerminated() 來檢測一個 phar 實例是否已經終結了
2、當一個 phar 實例被終結以后,register()、arrive() 等這些方法都沒有什么意義了,大家可以玩一玩,觀察它們的返回值,原本應該返回 pha 值的,但是這個時候會返回一個負數。
Phar 的監控方法
介紹下幾個用于返回當前 phar 狀態的方法:
getPha():返回當前的 pha,前面說了,pha 從 0 開始計算,最大值是 Integer.MAX_VALUE,超過又從 0 開始
getRegisteredParties():當前有多少 parties,隨著不斷地有 register 和 deregister,這個值會發生變化
getArrivedParties():有多少個 party 已經到達當前 pha 的柵欄
getUnarrivedParties():還沒有到達當前柵欄的 party 數
Phar 的分層結構
Tiering 這個詞本身就不好翻譯,大家將就一下,要表達的意思就是,將多個 Phar 實例構造成一棵樹。
1、第一個問題來了,為什么要把多個 Phar 實例構造成一棵樹,解決什么問題?有什么優點?
Phar 內部用一個 state 來管理狀態變化,隨著 parties 的增加,并發問題帶來的性能影響會越來越嚴重。
/** * 0-15: unarrived * 16-31: parties, 所以一個 phar 實例最大支持 2^16-1=65535 個 parties * 32-62: pha, 31 位,那么最大值是 Integer.MAX_VALUE,達到最大值后又從 0 開始 * 63: terminated */private volatile long state;
通常我們在說 0-15 位這種,說的都是從低位開始的
state 的各種操作依賴于 CAS,典型的無鎖操作,但是,在大量競爭的情況下,可能會造成很多的自旋。
而構造一棵樹就是為了降低每個節點(每個 Phar 實例)的 parties 的數量,從而有效降低單個 state 值的競爭。
2、第二個問題,它的結構是怎樣的?
這里我們不講源碼,用通俗一點的語言表述一下。我們先寫段代碼構造一棵樹:
Phar root = new Phar(5);Phar n1 = new Phar(root, 5);Phar n2 = new Phar(root, 5);Phar m1 = new Phar(n1, 5);Phar m2 = new Phar(n1, 5);Phar m3 = new Phar(n1, 5);Phar m4 = new Phar(n2, 5);
根據上面的代碼,我們可以畫出下面這個很簡單的圖:
這棵樹上有 7 個 phar 實例,每個 phar 實例在構造的時候,都指定了 parties 為 5,但是,對于每個擁有子節點的節點來說,每個子節點都是它的一個 party,我們可以通過 phar.getRegisteredParties() 得到每個節點的 parties 數量:
m1、m2、m3、m4 的 parties 為 5n1 的 parties 為 5 + 3,n2 的 parties 為 5 + 1root 的 parties 為 5 + 2結論應該非常容易理解,我們來闡述一下過程。
在子節點注冊第一個 party 的時候,這個時候會在父節點注冊一個 party,注意這里說的是子節點添加第一個 party 的時候,而不是說實例構造的時候。
在上面代碼的基礎上,大家可以試一下下面的這個代碼:
Phar m5 = new Phar(n2);System.out.println("n2 parties: " + n2.getRegisteredParties());m5.register();System.out.println("n2 parties: " + n2.getRegisteredParties());
第一行代碼中構造了 m5 實例,但是此時它的 parties == 0,所以對于父節點 n2 來說,它的 parties 依然是 6,所以第二行代碼輸出 6。第三行代碼注冊了 m5 的第一個 party,顯然,第四行代碼會輸出 7。
當子節點的 parties 降為 0 的時候,會從父節點中"剝離",我們在上面的基礎上,再加兩行代碼:
m5.arriveAndDeregister();System.out.println("n2 parties: " + n2.getRegisteredParties());
由于 m5 之前只有一個 parties,所以一次 arriveAndDeregister() 就會使得它的 parties 變為 0,此時第二行代碼輸出父節點 n2 的 parties 為 6。
還有一點有趣的是(其實也不一定有趣吧),在非樹的結構中,此時 m5 應該處于 terminated 狀態,因為它的 parties 降為 0 了,不過在樹的結構中,這個狀態由 root 控制,所以我們依然可以執行 m5.register()...
3、每個 phar 實例的 pha 周期有快有慢,怎么協調的?
在組織成樹的這種結構中,每個 phar 實例的 pha 已經不受自己控制了,由 root 來統一協調,也就是說,root 當前的 pha 是多少,每個 phar 的 pha 就是多少。
那又有個問題,如果子節點的一個周期很快就結束了,要進入下一個周期怎么辦?需要等!這個時候其實要等所有的節點都結束當前 pha,因為只有這樣,root 節點才有可能結束當前 pha。
我覺得 Phar 中的樹結構我們要這么理解,我們要把整棵樹當做一個 phar 實例,每個節點只是輔助用于降低并發而存在,整棵樹還是需要滿足 Phar 語義的。
本文發布于:2023-02-28 20:08:00,感謝您對本站的認可!
本文鏈接:http://www.newhan.cn/zhishi/a/167765733279545.html
版權聲明:本站內容均來自互聯網,僅供演示用,請勿用于商業和其他非法用途。如果侵犯了您的權益請與我們聯系,我們將在24小時內刪除。
本文word下載地址:phaser3117(phaser3117打印機怎么安裝).doc
本文 PDF 下載地址:phaser3117(phaser3117打印機怎么安裝).pdf
| 留言與評論(共有 0 條評論) |