??? 本文由 cnblogs 博主Caoer(草兒)原創(chuàng),此處為轉(zhuǎn)載。原文出處為 http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html
??? 如原創(chuàng)作者認(rèn)為本文侵權(quán),請(qǐng)通知本博。
?
??? java.util.concurrent 包含許多線程安全、測(cè)試良好、高性能的并發(fā)構(gòu)建塊。不客氣地說,創(chuàng)建 java.util.concurrent 的目的就是要實(shí)現(xiàn) Collection 框架對(duì)數(shù)據(jù)結(jié)構(gòu)所執(zhí)行的并發(fā)操作。通過提供一組可靠的、高性能并發(fā)構(gòu)建塊,開發(fā)人員可以提高并發(fā)類的線程安全、可伸縮性、性能、可讀性和可靠性。
如果一些類名看起來相似,可能是因?yàn)?java.util.concurrent 中的許多概念源自 Doug Lea 的 util.concurrent 庫(kù)(請(qǐng)參閱 參考資料)。
JDK 5.0 中的并發(fā)改進(jìn)可以分為三組:
???? ? JVM 級(jí)別更改 。大多數(shù)現(xiàn)代處理器對(duì)并發(fā)對(duì)某一硬件級(jí)別提供支持,通常以 compare-and-swap (CAS)指令形式。CAS 是一種低級(jí)別的、細(xì)粒度的技術(shù),它允許多個(gè)線程更新一個(gè)內(nèi)存位置,同時(shí)能夠檢測(cè)其他線程的沖突并進(jìn)行恢復(fù)。它是許多高性能并發(fā)算法的基礎(chǔ)。在 JDK 5.0 之前,Java 語言中用于協(xié)調(diào)線程之間的訪問的惟一原語是同步,同步是更重量級(jí)和粗粒度的。公開 CAS 可以開發(fā)高度可伸縮的并發(fā) Java 類。這些更改主要由 JDK 庫(kù)類使用,而不是由開發(fā)人員使用。
???? ? 低級(jí)實(shí)用程序類 -- 鎖定和原子類。 使用 CAS 作為并發(fā)原語,ReentrantLock 類提供與 synchronized 原語相同的鎖定和內(nèi)存語義,然而這樣可以更好地控制鎖定(如計(jì)時(shí)的鎖定等待、鎖定輪詢和可中斷的鎖定等待)和提供更好的可伸縮性(競(jìng)爭(zhēng)時(shí)的高性能)。大多數(shù)開發(fā)人員將不再直接使用 ReentrantLock 類,而是使用在 ReentrantLock 類上構(gòu)建的高級(jí)類。
??? ? 高級(jí)實(shí)用程序類 。這些類實(shí)現(xiàn)并發(fā)構(gòu)建塊,每個(gè)計(jì)算機(jī)科學(xué)文本中都會(huì)講述這些類 -- 信號(hào)、互斥、閂鎖、屏障、交換程序、線程池和線程安全集合類等。大部分開發(fā)人員都可以在應(yīng)用程序中用這些類,來替換許多(如果不是全部)同步、wait() 和 notify() 的使用,從而提高性能、可讀性和正確性。
??? 本教程將重點(diǎn)介紹 java.util.concurrent 包提供的高級(jí)實(shí)用程序類 -- 線程安全集合、線程池和同步實(shí)用程序。這些是初學(xué)者和專家都可以使用的"現(xiàn)成"類。
??? 在第一小節(jié)中,我們將回顧并發(fā)的基本知識(shí),盡管它不應(yīng)取代對(duì)線程和線程安全的了解。那些一點(diǎn)都不熟悉線程的讀者應(yīng)該先參考一些關(guān)于線程的介紹,如"Introduction to Java Threads"教程(請(qǐng)參閱參考資料)。
??? 接下來的幾個(gè)小節(jié)將研究 java.util.concurrent 中的高級(jí)實(shí)用程序類 -- 線程安全集合、線程池、信號(hào)和同步工具。
??? 最后一小節(jié)將介紹 java.util.concurrent 中的低級(jí)并發(fā)構(gòu)建塊,并提供一些性能測(cè)評(píng)來顯示新 java.util.concurrent 類的可伸縮性的改進(jìn)。
什么是線程?
??? 所有重要的操作系統(tǒng)都支持進(jìn)程的概念 -- 獨(dú)立運(yùn)行的程序,在某種程度上相互隔離。
??? 線程有時(shí)稱為 輕量級(jí)進(jìn)程。與進(jìn)程一樣,它們擁有通過程序運(yùn)行的獨(dú)立的并發(fā)路徑,并且每個(gè)線程都有自己的程序計(jì)數(shù)器,稱為堆棧和本地變量。然而,線程存在于進(jìn)程中,它們與同一進(jìn)程內(nèi)的其他線程共享內(nèi)存、文件句柄以及每進(jìn)程狀態(tài)。
??? 今天,幾乎每個(gè)操作系統(tǒng)都支持線程,允許執(zhí)行多個(gè)可獨(dú)立調(diào)度的線程,以便共存于一個(gè)進(jìn)程中。因?yàn)橐粋€(gè)進(jìn)程中的線程是在同一個(gè)地址空間中執(zhí)行的,所以多個(gè)線程可以同時(shí)訪問相同對(duì)象,并且它們從同一堆棧中分配對(duì)象。雖然這使線程更易于與其他線程共享信息,但也意味著您必須確保線程之間不相互干涉。
??? 正確使用線程時(shí),線程能帶來諸多好處,其中包括更好的資源利用、簡(jiǎn)化開發(fā)、高吞吐量、更易響應(yīng)的用戶界面以及能執(zhí)行異步處理。
??? Java 語言包括用于協(xié)調(diào)線程行為的原語,從而可以在不違反設(shè)計(jì)原型或者不破壞數(shù)據(jù)結(jié)構(gòu)的前提下安全地訪問和修改共享變量。
線程有哪些功能?
??? 在 Java 程序中存在很多理由使用線程,并且不管開發(fā)人員知道線程與否,幾乎每個(gè) Java 應(yīng)用程序都使用線程。許多 J2SE 和 J2EE 工具可以創(chuàng)建線程,如 RMI、Servlet、Enterprise JavaBeans 組件和 Swing GUI 工具包。
使用線程的理由包括:
??? ? 更易響應(yīng)的用戶界面。 事件驅(qū)動(dòng)的 GUI 工具包(如 AWT 或 Swing)使用單獨(dú)的事件線程來處理 GUI 事件。從事件線程中調(diào)用通過 GUI 對(duì)象注冊(cè)的事件監(jiān)聽器。然而,如果事件監(jiān)聽器將執(zhí)行冗長(zhǎng)的任務(wù)(如文檔拼寫檢查),那么 UI 將出現(xiàn)凍結(jié),因?yàn)槭录€程直到冗長(zhǎng)任務(wù)完畢之后才能處理其他事件。通過在單獨(dú)線程中執(zhí)行冗長(zhǎng)操作,當(dāng)執(zhí)行冗長(zhǎng)后臺(tái)任務(wù)時(shí),UI 能繼續(xù)響應(yīng)。
??? ? 使用多處理器。 多處理器(MP)系統(tǒng)變得越來越便宜,并且分布越來越廣泛。因?yàn)檎{(diào)度的基本單位通常是線程,所以不管有多少處理器可用,一個(gè)線程的應(yīng)用程序一次只能在一個(gè)處理器上運(yùn)行。在設(shè)計(jì)良好的程序中,通過更好地利用可用的計(jì)算機(jī)資源,多線程能夠提高吞吐量和性能。
??? ? 簡(jiǎn)化建模。 有效使用線程能夠使程序編寫變得更簡(jiǎn)單,并易于維護(hù)。通過合理使用線程,個(gè)別類可以避免一些調(diào)度的詳細(xì)、交叉存取操作、異步 IO 和資源等待以及其他復(fù)雜問題。相反,它們能專注于域的要求,簡(jiǎn)化開發(fā)并改進(jìn)可靠性。
??? ? 異步或后臺(tái)處理。 服務(wù)器應(yīng)用程序可以同時(shí)服務(wù)于許多遠(yuǎn)程客戶機(jī)。如果應(yīng)用程序從 socket 中讀取數(shù)據(jù),并且沒有數(shù)據(jù)可以讀取,那么對(duì) read() 的調(diào)用將被阻塞,直到有數(shù)據(jù)可讀。在單線程應(yīng)用程序中,這意味著當(dāng)某一個(gè)線程被阻塞時(shí),不僅處理相應(yīng)請(qǐng)求要延遲,而且處理所有請(qǐng)求也將延遲。然而,如果每個(gè) socket 都有自己的 IO 線程,那么當(dāng)一個(gè)線程被阻塞時(shí),對(duì)其他并發(fā)請(qǐng)求行為沒有影響。
線程安全
??? 如果將這些類用于多線程環(huán)境中,雖然確保這些類的線程安全比較困難,但線程安全卻是必需的。java.util.concurrent 規(guī)范進(jìn)程的一個(gè)目標(biāo)就是提供一組線程安全的、高性能的并發(fā)構(gòu)建塊,從而使開發(fā)人員能夠減輕一些編寫線程安全類的負(fù)擔(dān)。
??? 線程安全類非常難以明確定義,大多數(shù)定義似乎都是完全循環(huán)的。快速 Google 搜索會(huì)顯示下列線程安全代碼定義的例子,但這些定義(或者更確切地說是描述)通常沒什么幫助:
??? ? . . . can be called from multiple programming threads without unwanted interaction between the threads.
??? ? . . . may be called by more than on thread at a time without requiring any other action on the caller's part.
??? 通過類似這樣的定義,不奇怪我們?yōu)槭裁磳?duì)線程安全如此迷惑。這些定義幾乎就是在說"如果可以從多個(gè)線程安全調(diào)用類,那么該類就是線程安全的"。這當(dāng)然是線程安全的解釋,但對(duì)我們區(qū)別線程安全類和不安全類沒有什么幫助。我們使用"安全"是為了說明什么?
??? 要成為線程安全的類,首先它必須在單線程環(huán)境中正確運(yùn)行。如果正確實(shí)現(xiàn)了類,那么說明它符合規(guī)范,對(duì)該類的對(duì)象的任何順序的操作(公共字段的讀寫、公共方法的調(diào)用)都不應(yīng)該使對(duì)象處于無效狀態(tài);觀察將處于無效狀態(tài)的對(duì)象;或違反類的任何變量、前置條件或后置條件。
??? 而且,要成為線程安全的類,在從多個(gè)線程訪問時(shí),它必須繼續(xù)正確運(yùn)行,而不管運(yùn)行時(shí)環(huán)境執(zhí)行那些線程的調(diào)度和交叉,且無需對(duì)部分調(diào)用代碼執(zhí)行任何其他同步。結(jié)果是對(duì)線程安全對(duì)象的操作將用于按固定的整體一致順序出現(xiàn)所有線程。
??? 如果沒有線程之間的某種明確協(xié)調(diào),比如鎖定,運(yùn)行時(shí)可以隨意在需要時(shí)在多線程中交叉操作執(zhí)行。
??? 在 JDK 5.0 之前,確保線程安全的主要機(jī)制是 synchronized 原語。訪問共享變量(那些可以由多個(gè)線程訪問的變量)的線程必須使用同步來協(xié)調(diào)對(duì)共享變量的讀寫訪問。java.util.concurrent 包提供了一些備用并發(fā)原語,以及一組不需要任何其他同步的線程安全實(shí)用程序類。
令人厭煩的并發(fā)
??? 即使您的程序從沒有明確創(chuàng)建線程,也可能會(huì)有許多工具或框架代表您創(chuàng)建了線程,這時(shí)要求從這些線程調(diào)用的類是線程安全的。這樣會(huì)對(duì)開發(fā)人員帶來較大的設(shè)計(jì)和實(shí)現(xiàn)負(fù)擔(dān),因?yàn)殚_發(fā)線程安全類比開發(fā)非線程安全類有更多要注意的事項(xiàng),且需要更多的分析。
AWT 和 Swing
??? 這些 GUI 工具包創(chuàng)建了稱為時(shí)間線程的后臺(tái)線程,將從該線程調(diào)用通過 GUI 組件注冊(cè)的監(jiān)聽器。因此,實(shí)現(xiàn)這些監(jiān)聽器的類必須是線程安全的。
TimerTask
??? JDK 1.3 中引入的 TimerTask 工具允許稍后執(zhí)行任務(wù)或計(jì)劃定期執(zhí)行任務(wù)。在 Timer 線程中執(zhí)行 TimerTask 事件,這意味著作為 TimerTask 執(zhí)行的任務(wù)必須是線程安全的。
Servlet 和 JavaServer Page 技術(shù)
??? Servlet 容器可以創(chuàng)建多個(gè)線程,在多個(gè)線程中同時(shí)調(diào)用給定 servlet,從而進(jìn)行多個(gè)請(qǐng)求。因此 servlet 類必須是線程安全的。
RMI
??? 遠(yuǎn)程方法調(diào)用(remote method invocation,RMI)工具允許調(diào)用其他 JVM 中運(yùn)行的操作。實(shí)現(xiàn)遠(yuǎn)程對(duì)象最普遍的方法是擴(kuò)展 UnicastRemoteObject。例示 UnicastRemoteObject 時(shí),它是通過 RMI 調(diào)度器注冊(cè)的,該調(diào)度器可能創(chuàng)建一個(gè)或多個(gè)線程,將在這些線程中執(zhí)行遠(yuǎn)程方法。因此,遠(yuǎn)程類必須是線程安全的。
??? 正如所看到的,即使應(yīng)用程序沒有明確創(chuàng)建線程,也會(huì)發(fā)生許多可能會(huì)從其他線程調(diào)用類的情況。幸運(yùn)的是,java.util.concurrent 中的類可以大大簡(jiǎn)化編寫線程安全類的任務(wù)。
例子 -- 非線程安全 servlet
??? 下列 servlet 看起來像無害的留言板 servlet,它保存每個(gè)來訪者的姓名。然而,該 servlet 不是線程安全的,而這個(gè) servlet 應(yīng)該是線程安全的。問題在于它使用 HashSet 存儲(chǔ)來訪者的姓名,HashSet 不是線程安全的類。
??? 當(dāng)我們說這個(gè) servlet 不是線程安全的時(shí),是說它所造成的破壞不僅僅是丟失留言板輸入。在最壞的情況下,留言板數(shù)據(jù)結(jié)構(gòu)都可能被破壞并且無法恢復(fù)。
public class UnsafeGuestbookServlet extends HttpServlet {
??? private Set visitorSet = new HashSet();
??? protected void doGet(HttpServletRequest httpServletRequest,
???????????? HttpServletResponse httpServletResponse) throws ServletException, IOException {
??????? String visitorName = httpServletRequest.getParameter("NAME");
??????? if (visitorName != null)
??????????? visitorSet.add(visitorName);
??? }
}
??? 通過將 visitorSet 的定義更改為下列代碼,可以使該類變?yōu)榫€程安全的:
??? private Set visitorSet = Collections.synchronizedSet(new HashSet());
??? 如上所示的例子顯示線程的內(nèi)置支持是一把雙刃劍 -- 雖然它使構(gòu)建多線程應(yīng)用程序變得很容易,但它同時(shí)要求開發(fā)人員更加注意并發(fā)問題,甚至在使用留言板 servlet 這樣普通的東西時(shí)也是如此。
線程安全集合
??? JDK 1.2 中引入的 Collection 框架是一種表示對(duì)象集合的高度靈活的框架,它使用基本接口 List、Set 和 Map。通過 JDK 提供每個(gè)集合的多次實(shí)現(xiàn)(HashMap、Hashtable、TreeMap、WeakHashMap、HashSet、TreeSet、Vector、ArrayList、LinkedList 等等)。其中一些集合已經(jīng)是線程安全的(Hashtable 和 Vector),通過同步的封裝工廠(Collections.synchronizedMap()、synchronizedList() 和 synchronizedSet()),其余的集合均可表現(xiàn)為線程安全的。
??? java.util.concurrent 包添加了多個(gè)新的線程安全集合類(ConcurrentHashMap、CopyOnWriteArrayList 和 CopyOnWriteArraySet)。這些類的目的是提供高性能、高度可伸縮性、線程安全的基本集合類型版本。
??? java.util 中的線程集合仍有一些缺點(diǎn)。例如,在迭代鎖定時(shí),通常需要將該鎖定保留在集合中,否則,會(huì)有拋出 ConcurrentModificationException 的危險(xiǎn)。(這個(gè)特性有時(shí)稱為條件線程安全;有關(guān)的更多說明,請(qǐng)參閱參考資料。)此外,如果從多個(gè)線程頻繁地訪問集合,則常常不能很好地執(zhí)行這些類。java.util.concurrent 中的新集合類允許通過在語義中的少量更改來獲得更高的并發(fā)。
??? JDK 5.0 還提供了兩個(gè)新集合接口 -- Queue 和 BlockingQueue。Queue 接口與 List 類似,但它只允許從后面插入,從前面刪除。通過消除 List 的隨機(jī)訪問要求,可以創(chuàng)建比現(xiàn)有 ArrayList 和 LinkedList 實(shí)現(xiàn)性能更好的 Queue 實(shí)現(xiàn)。因?yàn)?List 的許多應(yīng)用程序?qū)嶋H上不需要隨機(jī)訪問,所以Queue 通常可以替代 List,來獲得更好的性能。
弱一致的迭代器
??? java.util 包中的集合類都返回 fail-fast 迭代器,這意味著它們假設(shè)線程在集合內(nèi)容中進(jìn)行迭代時(shí),集合不會(huì)更改它的內(nèi)容。如果 fail-fast 迭代器檢測(cè)到在迭代過程中進(jìn)行了更改操作,那么它會(huì)拋出 ConcurrentModificationException,這是不可控異常。
在迭代過程中不更改集合的要求通常會(huì)對(duì)許多并發(fā)應(yīng)用程序造成不便。相反,比較好的是它允許并發(fā)修改并確保迭代器只要進(jìn)行合理操作,就可以提供集合的一致視圖,如 java.util.concurrent 集合類中的迭代器所做的那樣。
??? java.util.concurrent 集合返回的迭代器稱為弱一致的(weakly consistent)迭代器。對(duì)于這些類,如果元素自從迭代開始已經(jīng)刪除,且尚未由 next() 方法返回,那么它將不返回到調(diào)用者。如果元素自迭代開始已經(jīng)添加,那么它可能返回調(diào)用者,也可能不返回。在一次迭代中,無論如何更改底層集合,元素不會(huì)被返回兩次。
CopyOnWriteArrayList 和 CopyOnWriteArraySet
??? 可以用兩種方法創(chuàng)建線程安全支持?jǐn)?shù)據(jù)的 List -- Vector 或封裝 ArrayList 和 Collections.synchronizedList()。java.util.concurrent 包添加了名稱繁瑣的 CopyOnWriteArrayList。為什么我們想要新的線程安全的List類?為什么Vector還不夠?
??? 最簡(jiǎn)單的答案是與迭代和并發(fā)修改之間的交互有關(guān)。使用 Vector 或使用同步的 List 封裝器,返回的迭代器是 fail-fast 的,這意味著如果在迭代過程中任何其他線程修改 List,迭代可能失敗。
Vector 的非常普遍的應(yīng)用程序是存儲(chǔ)通過組件注冊(cè)的監(jiān)聽器的列表。當(dāng)發(fā)生適合的事件時(shí),該組件將在監(jiān)聽器的列表中迭代,調(diào)用每個(gè)監(jiān)聽器。為了防止 ConcurrentModificationException,迭代線程必須復(fù)制列表或鎖定列表,以便進(jìn)行整體迭代,而這兩種情況都需要大量的性能成本。
CopyOnWriteArrayList 類通過每次添加或刪除元素時(shí)創(chuàng)建支持?jǐn)?shù)組的新副本,避免了這個(gè)問題,但是進(jìn)行中的迭代保持對(duì)創(chuàng)建迭代器時(shí)的當(dāng)前副本進(jìn)行操作。雖然復(fù)制也會(huì)有一些成本,但是在許多情況下,迭代要比修改多得多,在這些情況下,寫入時(shí)復(fù)制要比其他備用方法具有更好的性能和并發(fā)性。
如果應(yīng)用程序需要 Set 語義,而不是 List,那么還有一個(gè) Set 版本 -- CopyOnWriteArraySet。
ConcurrentHashMap
??? 正如已經(jīng)存在線程安全的 List 的實(shí)現(xiàn),您可以用多種方法創(chuàng)建線程安全的、基于 hash 的 Map -- Hashtable,并使用 Collections.synchronizedMap() 封裝 HashMap。JDK 5.0 添加了 ConcurrentHashMap 實(shí)現(xiàn),該實(shí)現(xiàn)提供了相同的基本線程安全的 Map 功能,但它大大提高了并發(fā)性。
??? Hashtable 和 synchronizedMap 所采取的獲得同步的簡(jiǎn)單方法(同步 Hashtable 中或者同步的 Map 封裝器對(duì)象中的每個(gè)方法)有兩個(gè)主要的不足。首先,這種方法對(duì)于可伸縮性是一種障礙,因?yàn)橐淮沃荒苡幸粋€(gè)線程可以訪問 hash 表。同時(shí),這樣仍不足以提供真正的線程安全性,許多公用的混合操作仍然需要額外的同步。雖然諸如 get() 和 put() 之類的簡(jiǎn)單操作可以在不需要額外同步的情況下安全地完成,但還是有一些公用的操作序列,例如迭代或者 put-if-absent(空則放入),需要外部的同步,以避免數(shù)據(jù)爭(zhēng)用。
??? Hashtable 和 Collections.synchronizedMap 通過同步每個(gè)方法獲得線程安全。這意味著當(dāng)一個(gè)線程執(zhí)行一個(gè) Map 方法時(shí),無論其他線程要對(duì) Map 進(jìn)行什么樣操作,都不能執(zhí)行,直到第一個(gè)線程結(jié)束才可以。
??? 對(duì)比來說,ConcurrentHashMap 允許多個(gè)讀取幾乎總是并發(fā)執(zhí)行,讀和寫操作通常并發(fā)執(zhí)行,多個(gè)同時(shí)寫入經(jīng)常并發(fā)執(zhí)行。結(jié)果是當(dāng)多個(gè)線程需要訪問同一 Map 時(shí),可以獲得更高的并發(fā)性。
??? 在大多數(shù)情況下,ConcurrentHashMap 是 Hashtable或 Collections.synchronizedMap(new HashMap()) 的簡(jiǎn)單替換。然而,其中有一個(gè)顯著不同,即 ConcurrentHashMap 實(shí)例中的同步不鎖定映射進(jìn)行獨(dú)占使用。實(shí)際上,沒有辦法鎖定 ConcurrentHashMap 進(jìn)行獨(dú)占使用,它被設(shè)計(jì)用于進(jìn)行并發(fā)訪問。為了使集合不被鎖定進(jìn)行獨(dú)占使用,還提供了公用的混合操作的其他(原子)方法,如 put-if-absent。ConcurrentHashMap 返回的迭代器是弱一致的,意味著它們將不拋出ConcurrentModificationException ,將進(jìn)行"合理操作"來反映迭代過程中其他線程對(duì) Map 的修改。
隊(duì)列
??? 原始集合框架包含三個(gè)接口:List、Map 和 Set。List 描述了元素的有序集合,支持完全隨即訪問 -- 可以在任何位置添加、提取或刪除元素。
??? LinkedList 類經(jīng)常用于存儲(chǔ)工作元素(等待執(zhí)行的任務(wù))的列表或隊(duì)列。然而,List 提供的靈活性比該公用應(yīng)用程序所需要的多得多,這個(gè)應(yīng)用程序通常在后面插入元素,從前面刪除元素。但是要支持完整 List 接口則意味著 LinkedList 對(duì)于這項(xiàng)任務(wù)不像原來那樣有效。Queue 接口比 List 簡(jiǎn)單得多,僅包含 put() 和 take() 方法,并允許比 LinkedList 更有效的實(shí)現(xiàn)。
??? Queue 接口還允許實(shí)現(xiàn)來確定存儲(chǔ)元素的順序。ConcurrentLinkedQueue 類實(shí)現(xiàn)先進(jìn)先出(first-in-first-out,F(xiàn)IFO)隊(duì)列,而 PriorityQueue 類實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列(也稱為堆),它對(duì)于構(gòu)建調(diào)度器非常有用,調(diào)度器必須按優(yōu)先級(jí)或預(yù)期的執(zhí)行時(shí)間執(zhí)行任務(wù)。
interface Queue extends Collection {
??? boolean offer(E x);
??? E poll();
??? E remove() throws NoSuchElementException;
??? E peek();
??? E element() throws NoSuchElementException;
}
實(shí)現(xiàn) Queue 的類是:
??? ? LinkedList 已經(jīng)進(jìn)行了改進(jìn)來實(shí)現(xiàn) Queue。
??? ? PriorityQueue 非線程安全的優(yōu)先級(jí)對(duì)列(堆)實(shí)現(xiàn),根據(jù)自然順序或比較器返回元素。
??? ? ConcurrentLinkedQueue 快速、線程安全的、無阻塞 FIFO 隊(duì)列。
任務(wù)管理之線程創(chuàng)建
線程最普遍的一個(gè)應(yīng)用程序是創(chuàng)建一個(gè)或多個(gè)線程,以執(zhí)行特定類型的任務(wù)。Timer 類創(chuàng)建線程來執(zhí)行 TimerTask 對(duì)象,Swing 創(chuàng)建線程來處理 UI 事件。在這兩種情況中,在單獨(dú)線程中執(zhí)行的任務(wù)都假定是短期的,這些線程是為了處理大量短期任務(wù)而存在的。
在其中每種情況中,這些線程一般都有非常簡(jiǎn)單的結(jié)構(gòu):
while (true) {
? if (no tasks)
??? wait for a task;
? execute the task;
}
通過例示從 Thread 獲得的對(duì)象并調(diào)用 Thread.start() 方法來創(chuàng)建線程。可以用兩種方法創(chuàng)建線程:通過擴(kuò)展 Thread 和覆蓋 run() 方法,或者通過實(shí)現(xiàn) Runnable 接口和使用 Thread(Runnable) 構(gòu)造函數(shù):
class WorkerThread extends Thread {
? public void run() { /* do work */ }
}
Thread t = new WorkerThread();
t.start();
或者:
Thread t = new Thread(new Runnable() {
? public void run() { /* do work */ }
}
t.start();
重新使用線程
??? 因?yàn)槎鄠€(gè)原因,類似 Swing GUI 的框架為事件任務(wù)創(chuàng)建單一線程,而不是為每項(xiàng)任務(wù)創(chuàng)建新的線程。首先是因?yàn)閯?chuàng)建線程會(huì)有間接成本,所以創(chuàng)建線程來執(zhí)行簡(jiǎn)單任務(wù)將是一種資源浪費(fèi)。通過重新使用事件線程來處理多個(gè)事件,啟動(dòng)和拆卸成本(隨平臺(tái)而變)會(huì)分?jǐn)傇诙鄠€(gè)事件上。
??? Swing 為事件使用單一后臺(tái)線程的另一個(gè)原因是確保事件不會(huì)互相干涉,因?yàn)橹钡角耙皇录Y(jié)束,下一事件才開始處理。該方法簡(jiǎn)化了事件處理程序的編寫。
??? 使用多個(gè)線程,將要做更多的工作來確保一次僅一個(gè)線程地執(zhí)行線程相關(guān)的代碼。
如何不對(duì)任務(wù)進(jìn)行管理
??? 大多數(shù)服務(wù)器應(yīng)用程序(如 Web 服務(wù)器、POP 服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器或文件服務(wù)器)代表遠(yuǎn)程客戶機(jī)處理請(qǐng)求,這些客戶機(jī)通常使用 socket 連接到服務(wù)器。對(duì)于每個(gè)請(qǐng)求,通常要進(jìn)行少量處理(獲得該文件的代碼塊,并將其發(fā)送回 socket),但是可能會(huì)有大量(且不受限制)的客戶機(jī)請(qǐng)求服務(wù)。
??? 用于構(gòu)建服務(wù)器應(yīng)用程序的簡(jiǎn)單化模型會(huì)為每個(gè)請(qǐng)求創(chuàng)建新的線程。下列代碼段實(shí)現(xiàn)簡(jiǎn)單的 Web 服務(wù)器,它接受端口 80 的 socket 連接,并創(chuàng)建新的線程來處理請(qǐng)求。不幸的是,該代碼不是實(shí)現(xiàn) Web 服務(wù)器的好方法,因?yàn)樵谥刎?fù)載條件下它將失敗,停止整臺(tái)服務(wù)器。
class UnreliableWebServer {
? public static void main(String[] args) {
??? ServerSocket socket = new ServerSocket(80);
????? while (true) {
????? final Socket connection = socket.accept();
????? Runnable r = new Runnable() {
??????? public void run() {
????????? handleRequest(connection);
??????? }
????? };
????? // Don't do this!
????? new Thread(r).start();
??? }
? }
}
??? 當(dāng)服務(wù)器被請(qǐng)求吞沒時(shí),UnreliableWebServer 類不能很好地處理這種情況。每次有請(qǐng)求時(shí),就會(huì)創(chuàng)建新的類。根據(jù)操作系統(tǒng)和可用內(nèi)存,可以創(chuàng)建的線程數(shù)是有限的。
??? 不幸的是,您通常不知道限制是多少 -- 只有當(dāng)應(yīng)用程序因?yàn)?OutOfMemoryError 而崩潰時(shí)才發(fā)現(xiàn)。
??? 如果足夠快地在這臺(tái)服務(wù)器上拋出請(qǐng)求的話,最終其中一個(gè)線程創(chuàng)建將失敗,生成的 Error 會(huì)關(guān)閉整個(gè)應(yīng)用程序。當(dāng)一次僅能有效支持很少線程時(shí),沒有必要?jiǎng)?chuàng)建上千個(gè)
??? 線程,無論如何,這樣使用資源可能會(huì)損害性能。創(chuàng)建線程會(huì)使用相當(dāng)一部分內(nèi)存,其中包括有兩個(gè)堆棧(Java 和 C),以及每線程數(shù)據(jù)結(jié)構(gòu)。如果創(chuàng)建過多線程,其中
??? 每個(gè)線程都將占用一些 CPU 時(shí)間,結(jié)果將使用許多內(nèi)存來支持大量線程,每個(gè)線程都運(yùn)行得很慢。這樣就無法很好地使用計(jì)算資源。
使用線程池解決問題
??? 為任務(wù)創(chuàng)建新的線程并不一定不好,但是如果創(chuàng)建任務(wù)的頻率高,而平均任務(wù)持續(xù)時(shí)間低,我們可以看到每項(xiàng)任務(wù)創(chuàng)建一個(gè)新的線程將產(chǎn)生性能(如果負(fù)載不可預(yù)知,還有穩(wěn)定性)問題。
??? 如果不是每項(xiàng)任務(wù)創(chuàng)建一個(gè)新的線程,則服務(wù)器應(yīng)用程序必須采取一些方法來限制一次可以處理的請(qǐng)求數(shù)。這意味著每次需要啟動(dòng)新的任務(wù)時(shí),它不能僅調(diào)用下列代碼。
? new Thread(runnable).start()
??? 管理一大組小任務(wù)的標(biāo)準(zhǔn)機(jī)制是組合工作隊(duì)列和線程池。工作隊(duì)列就是要處理的任務(wù)的隊(duì)列,前面描述的 Queue 類完全適合。線程池是線程的集合,每個(gè)線程都提取公用工作隊(duì)列。當(dāng)一個(gè)工作線程完成任務(wù)處理后,它會(huì)返回隊(duì)列,查看是否有其他任務(wù)需要處理。如果有,它會(huì)轉(zhuǎn)移到下一個(gè)任務(wù),并開始處理。
??? 線程池為線程生命周期間接成本問題和資源崩潰問題提供了解決方案。通過對(duì)多個(gè)任務(wù)重新使用線程,創(chuàng)建線程的間接成本將分布到多個(gè)任務(wù)中。作為一種額外好處,因?yàn)檎?qǐng)求到達(dá)時(shí),線程已經(jīng)存在,從而可以消除由創(chuàng)建線程引起的延遲。因此,可以立即處理請(qǐng)求,使應(yīng)用程序更易響應(yīng)。而且,通過正確調(diào)整線程池中的線程數(shù),可以強(qiáng)制超出特定限制的任何請(qǐng)求等待,直到有線程可以處理它,它們等待時(shí)所消耗的資源要少于使用額外線程所消耗的資源,這樣可以防止資源崩潰。
Executor 框架
??? java.util.concurrent 包中包含靈活的線程池實(shí)現(xiàn),但是更重要的是,它包含用于管理實(shí)現(xiàn) Runnable 的任務(wù)的執(zhí)行的整個(gè)框架。該框架稱為 Executor 框架。
??? Executor 接口相當(dāng)簡(jiǎn)單。它描述將運(yùn)行 Runnable 的對(duì)象:
public interface Executor {
? void execute(Runnable command);
}
??? 任務(wù)運(yùn)行于哪個(gè)線程不是由該接口指定的,這取決于使用的 Executor 的實(shí)現(xiàn)。它可以運(yùn)行于后臺(tái)線程,如 Swing 事件線程,或者運(yùn)行于線程池,或者調(diào)用線程,或者新的線程,它甚至可以運(yùn)行于其他 JVM!通過同步的 Executor 接口提交任務(wù),從任務(wù)執(zhí)行策略中刪除任務(wù)提交。Executor 接口獨(dú)自關(guān)注任務(wù)提交 -- 這是Executor 實(shí)現(xiàn)的選擇,確定執(zhí)行策略。這使在部署時(shí)調(diào)整執(zhí)行策略(隊(duì)列限制、池大小、優(yōu)先級(jí)排列等等)更加容易,更改的代碼最少。
??? java.util.concurrent 中的大多數(shù) Executor 實(shí)現(xiàn)還實(shí)現(xiàn) ExecutorService 接口,這是對(duì) Executor 的擴(kuò)展,它還管理執(zhí)行服務(wù)的生命周期。這使它們更易于管理,并向生命可能比單獨(dú) Executor 的生命更長(zhǎng)的應(yīng)用程序提供服務(wù)。
public interface ExecutorService extends Executor {
? void shutdown();
? List shutdownNow();
? boolean isShutdown();
? boolean isTerminated();
? boolean awaitTermination(long timeout,
?????????????????????????? TimeUnit unit);
? // other convenience methods for submitting tasks
}
Executor
??? java.util.concurrent 包包含多個(gè) Executor 實(shí)現(xiàn),每個(gè)實(shí)現(xiàn)都實(shí)現(xiàn)不同的執(zhí)行策略。什么是執(zhí)行策略?執(zhí)行策略定義何時(shí)在哪個(gè)線程中運(yùn)行任務(wù),執(zhí)行任務(wù)可能消耗的資源級(jí)別(線程、內(nèi)存等等),以及如果執(zhí)行程序超載該怎么辦。
??? 執(zhí)行程序通常通過工廠方法例示,而不是通過構(gòu)造函數(shù)。Executors 類包含用于構(gòu)造許多不同類型的 Executor 實(shí)現(xiàn)的靜態(tài)工廠方法:
??? ? Executors.newCachedThreadPool() 創(chuàng)建不限制大小的線程池,但是當(dāng)以前創(chuàng)建的線程可以使用時(shí)將重新使用那些線程。如果沒有現(xiàn)有線程可用,
??? ? 將創(chuàng)建新的線程并將其添加到池中。使用不到 60 秒的線程將終止并從緩存中刪除。
??? ? Executors.newFixedThreadPool(int n) 創(chuàng)建線程池,其重新使用在不受限制的隊(duì)列之外運(yùn)行的固定線程組。在關(guān)閉前,所有線程都會(huì)因?yàn)閳?zhí)行
??? ? 過程中的失敗而終止,如果需要執(zhí)行后續(xù)任務(wù),將會(huì)有新的線程來代替這些線程。
??? ? Executors.newSingleThreadExecutor() 創(chuàng)建 Executor,其使用在不受限制的隊(duì)列之外運(yùn)行的單一工作線程,與 Swing 事件線程非常相似。
??? ? 保證順序執(zhí)行任務(wù),在任何給定時(shí)間,不會(huì)有多個(gè)任務(wù)處于活動(dòng)狀態(tài)。
更可靠的 Web 服務(wù)器 -- 使用 Executor
??? 前面 如何不對(duì)任務(wù)進(jìn)行管理 中的代碼顯示了如何不用編寫可靠服務(wù)器應(yīng)用程序。幸運(yùn)的是,修復(fù)這個(gè)示例非常簡(jiǎn)單,只需將 Thread.start() 調(diào)用替換為向 Executor 提交任務(wù)即可:
class ReliableWebServer {
? Executor pool =
??? Executors.newFixedThreadPool(7);
??? public static void main(String[] args) {
??? ServerSocket socket = new ServerSocket(80);
????? while (true) {
????? final Socket connection = socket.accept();
????? Runnable r = new Runnable() {
??????? public void run() {
????????? handleRequest(connection);
??????? }
????? };
????? pool.execute(r);
??? }
? }
}
注意,本例與前例之間的區(qū)別僅在于 Executor 的創(chuàng)建以及如何提交執(zhí)行的任務(wù)。
定制 ThreadPoolExecutor
??? Executors 中的 newFixedThreadPool 和 newCachedThreadPool 工廠方法返回的 Executor 是類 ThreadPoolExecutor 的實(shí)例,是高度可定制的。
??? 通過使用包含 ThreadFactory 變量的工廠方法或構(gòu)造函數(shù)的版本,可以定義池線程的創(chuàng)建。ThreadFactory 是工廠對(duì)象,其構(gòu)造執(zhí)行程序要使用的新線程。
??? 使用定制的線程工廠,創(chuàng)建的線程可以包含有用的線程名稱,并且這些線程是守護(hù)線程,屬于特定線程組或具有特定優(yōu)先級(jí)。
??? 下面是線程工廠的例子,它創(chuàng)建守護(hù)線程,而不是創(chuàng)建用戶線程:
public class DaemonThreadFactory implements ThreadFactory {
??? public Thread newThread(Runnable r) {
??? ????Thread thread = new Thread(r);
??????? thread.setDaemon(true);
??????? return thread;
??? }
}
??? 有時(shí),Executor 不能執(zhí)行任務(wù),因?yàn)樗呀?jīng)關(guān)閉或者因?yàn)?Executor 使用受限制隊(duì)列存儲(chǔ)等待任務(wù),而該隊(duì)列已滿。在這種情況下,需要咨詢執(zhí)行程序的 RejectedExecutionHandler 來確定如何處理任務(wù) -- 拋出異常(默認(rèn)情況),放棄任務(wù),在調(diào)用者的線程中執(zhí)行任務(wù),或放棄隊(duì)列中最早的任務(wù)以為新任務(wù)騰出空間。ThreadPoolExecutor.setRejectedExecutionHandler 可以設(shè)置拒絕的執(zhí)行處理程序。
??? 還可以擴(kuò)展 ThreadPoolExecutor,并覆蓋方法 beforeExecute 和 afterExecute,以添加裝置,添加記錄,添加計(jì)時(shí),重新初始化線程本地變量,或進(jìn)行其他執(zhí)行定制。
? 需要特別考慮的問題
??? 使用 Executor 框架會(huì)從執(zhí)行策略中刪除任務(wù)提交,一般情況下,人們希望這樣,那是因?yàn)樗试S我們靈活地調(diào)整執(zhí)行策略,不必更改許多位置的代碼。然而,當(dāng)提交代碼暗含假設(shè)特定執(zhí)行策略時(shí),存在多種情況,在這些情況下,重要的是選擇的 Executor 實(shí)現(xiàn)一致的執(zhí)行策略。
??? 這類情況中的其中的一種就是一些任務(wù)同時(shí)等待其他任務(wù)完成。在這種情況下,當(dāng)線程池沒有足夠的線程時(shí),如果所有當(dāng)前執(zhí)行的任務(wù)都在等待另一項(xiàng)任務(wù),而該任務(wù)因?yàn)榫€程池已滿不能執(zhí)行,那么線程池可能會(huì)死鎖。
??? 另一種相似的情況是一組線程必須作為共同操作組一起工作。在這種情況下,需要確保線程池能夠容納所有線程。
??? 如果應(yīng)用程序?qū)μ囟▓?zhí)行程序進(jìn)行了特定假設(shè),那么應(yīng)該在 Executor 定義和初始化的附近對(duì)這些進(jìn)行說明,從而使善意的更改不會(huì)破壞應(yīng)用程序的正確功能。
調(diào)整線程池
??? 創(chuàng)建 Executor 時(shí),人們普遍會(huì)問的一個(gè)問題是"線程池應(yīng)該有多大?"。當(dāng)然,答案取決于硬件和將執(zhí)行的任務(wù)類型(它們是受計(jì)算限制或是受 IO 的限制?)。
??? 如果線程池太小,資源可能不能被充分利用,在一些任務(wù)還在工作隊(duì)列中等待執(zhí)行時(shí),可能會(huì)有處理器處于閑置狀態(tài)。
??? 另一方面,如果線程池太大,則將有許多有效線程,因?yàn)榇罅烤€程或有效任務(wù)使用內(nèi)存,或者因?yàn)槊宽?xiàng)任務(wù)要比使用少量線程有更多上下文切換,性能可能會(huì)受損。
??? 所以假設(shè)為了使處理器得到充分使用,線程池應(yīng)該有多大?如果知道系統(tǒng)有多少處理器和任務(wù)的計(jì)算時(shí)間和等待時(shí)間的近似比率,Amdahl 法則提供很好的近似公式。
??? 用 WT 表示每項(xiàng)任務(wù)的平均等待時(shí)間,ST 表示每項(xiàng)任務(wù)的平均服務(wù)時(shí)間(計(jì)算時(shí)間)。則 WT/ST 是每項(xiàng)任務(wù)等待所用時(shí)間的百分比。對(duì)于 N 處理器系統(tǒng),池中可以近似有 N*(1+WT/ST) 個(gè)線程。
??? 好的消息是您不必精確估計(jì) WT/ST。"合適的"池大小的范圍相當(dāng)大;只需要避免"過大"和"過小"的極端情況即可。
Future 接口
??? Future 接口允許表示已經(jīng)完成的任務(wù)、正在執(zhí)行過程中的任務(wù)或者尚未開始執(zhí)行的任務(wù)。通過 Future 接口,可以嘗試取消尚未完成的任務(wù),查詢?nèi)蝿?wù)已經(jīng)完成還是取消了,以及提取(或等待)任務(wù)的結(jié)果值。
??? FutureTask 類實(shí)現(xiàn)了 Future,并包含一些構(gòu)造函數(shù),允許將 Runnable 或 Callable(會(huì)產(chǎn)生結(jié)果的 Runnable)和 Future 接口封裝。因?yàn)?FutureTask 也實(shí)現(xiàn) Runnable,所以可以只將 FutureTask 提供給 Executor。一些提交方法(如 ExecutorService.submit())除了提交任務(wù)之外,還將返回 Future 接口。
??? Future.get() 方法檢索任務(wù)計(jì)算的結(jié)果(或如果任務(wù)完成,但有異常,則拋出 ExecutionException)。如果任務(wù)尚未完成,那么 Future.get() 將被阻塞,直到任務(wù)完成;如果任務(wù)已經(jīng)完成,那么它將立即返回結(jié)果。
使用 Future 構(gòu)建緩存
??? 該示例代碼與 java.util.concurrent 中的多個(gè)類關(guān)聯(lián),突出顯示了 Future 的功能。它實(shí)現(xiàn)緩存,使用 Future 描述緩存值,該值可能已經(jīng)計(jì)算,或者可能在其他線程中"正在構(gòu)造"。
??? 它利用 ConcurrentHashMap 中的原子 putIfAbsent() 方法,確保僅有一個(gè)線程試圖計(jì)算給定關(guān)鍵字的值。如果其他線程隨后請(qǐng)求同一關(guān)鍵字的值,它僅能等待(通過 Future.get() 的幫助)第一個(gè)線程完成。因此兩個(gè)線程不會(huì)計(jì)算相同的值。
public class Cache {
??? ConcurrentMap> map = new ConcurrentHashMap();
??? Executor executor = Executors.newFixedThreadPool(8);
??? public V get(final K key) {
??????? FutureTask f = map.get(key);
??????? if (f == null) {
??????????? Callable c = new Callable() {
??????????????? public V call() {
??????????????????? // return value associated with key
??????????????? }
??????????? };
??????????? f = new FutureTask(c);
??????????? FutureTask old = map.putIfAbsent(key, f);
? ??????????if (old == null)
??????????????? executor.execute(f);
??????????? else
??????????????? f = old;
??????? }
??????? return f.get();
??? }
}
CompletionService
??? CompletionService 將執(zhí)行服務(wù)與類似 Queue 的接口組合,從任務(wù)執(zhí)行中刪除任務(wù)結(jié)果的處理。CompletionService 接口包含用來提交將要執(zhí)行的任務(wù)的 submit() 方法和用來詢問下一完成任務(wù)的 take()/poll() 方法。
??? CompletionService 允許應(yīng)用程序結(jié)構(gòu)化,使用 Producer/Consumer 模式,其中生產(chǎn)者創(chuàng)建任務(wù)并提交,消費(fèi)者請(qǐng)求完成任務(wù)的結(jié)果并處理這些結(jié)果。CompletionService 接口由 ExecutorCompletionService 類實(shí)現(xiàn),該類使用 Executor 處理任務(wù)并從 CompletionService 導(dǎo)出 submit/poll/take 方法。
?? ?下列代碼使用 Executor 和 CompletionService 來啟動(dòng)許多"solver"任務(wù),并使用第一個(gè)生成非空結(jié)果的任務(wù)的結(jié)果,然后取消其余任務(wù):
void solve(Executor e, Collection> solvers)
????? throws InterruptedException {
??????? CompletionService ecs = new ExecutorCompletionService(e);
??????? int n = solvers.size();
??????? List> futures = new ArrayList>(n);
??????? Result result = null;
??????? try {
??????????? for (Callable s : solvers)
??????????????? futures.add(ecs.submit(s));
??????????? for (int i = 0; i < n; ++i) {
??????????????? try {
??????????????????? Result r = ecs.take().get();
??????????????????? if (r != null) {
??????????????????????? result = r;
??????????????????????? break;
??????????????????? }
??????????????? } catch(ExecutionException ignore) {}
??????????? }
??????? }
?? ?????finally {
??????????? for (Future f : futures)
??????????????? f.cancel(true);
??????? }
??????? if (result != null)
??????????? use(result);
??? }
??? java.util.concurrent 中其他類別的有用的類也是同步工具。這組類相互協(xié)作,控制一個(gè)或多個(gè)線程的執(zhí)行流。
??? Semaphore、CyclicBarrier、CountdownLatch 和 Exchanger 類都是同步工具的例子。每個(gè)類都有線程可以調(diào)用的方法,方法是否被阻塞取決于正在使用的特定同步工具的狀態(tài)和規(guī)則。
Semaphore
??? Semaphore 類實(shí)現(xiàn)標(biāo)準(zhǔn) Dijkstra 計(jì)數(shù)信號(hào)。計(jì)數(shù)信號(hào)可以認(rèn)為具有一定數(shù)量的許可權(quán),該許可權(quán)可以獲得或釋放。如果有剩余的許可權(quán),acquire() 方法將成功,否則該方法將被阻塞,直到有可用的許可權(quán)(通過其他線程釋放許可權(quán))。線程一次可以獲得多個(gè)許可權(quán)。
??? 計(jì)數(shù)信號(hào)可以用于限制有權(quán)對(duì)資源進(jìn)行并發(fā)訪問的線程數(shù)。該方法對(duì)于實(shí)現(xiàn)資源池或限制 Web 爬蟲(Web crawler)中的輸出 socket 連接非常有用。
??? 注意信號(hào)不跟蹤哪個(gè)線程擁有多少許可權(quán);這由應(yīng)用程序來決定,以確保何時(shí)線程釋放許可權(quán),該信號(hào)表示其他線程擁有許可權(quán)或者正在釋放許可權(quán),以及其他線程知道它的許可權(quán)已釋放。
互斥
??? 計(jì)數(shù)信號(hào)的一種特殊情況是互斥,或者互斥信號(hào)。互斥就是具有單一許可權(quán)的計(jì)數(shù)信號(hào),意味著在給定時(shí)間僅一個(gè)線程可以具有許可權(quán)(也稱為二進(jìn)制信號(hào))。互斥可以用于管理對(duì)共享資源的獨(dú)占訪問。
??? 雖然互斥許多地方與鎖定一樣,但互斥還有一個(gè)鎖定通常沒有的其他功能,就是互斥可以由具有許可權(quán)的線程之外的其他線程來釋放。這在死鎖恢復(fù)時(shí)會(huì)非常有用。
??? CyclicBarrier 類可以幫助同步,它允許一組線程等待整個(gè)線程組到達(dá)公共屏障點(diǎn)。CyclicBarrier 是使用整型變量構(gòu)造的,其確定組中的線程數(shù)。當(dāng)一個(gè)線程到達(dá)屏障時(shí)(通過調(diào)用 CyclicBarrier.await()),它會(huì)被阻塞,直到所有線程都到達(dá)屏障,然后在該點(diǎn)允許所有線程繼續(xù)執(zhí)行。該操作與許多家庭逛商業(yè)街相似 -- 每個(gè)家庭成員都自己走,并商定 1:00 在電影院集合。當(dāng)您到電影院但不是所有人都到了時(shí),您會(huì)坐下來等其他人到達(dá)。然后所有人一起離開。
??? 認(rèn)為屏障是循環(huán)的是因?yàn)樗梢灾匦率褂茫灰坏┧芯€程都已經(jīng)在屏障處集合并釋放,則可以將該屏障重新初始化到它的初始狀態(tài)。 還可以指定在屏障處等待時(shí)的超時(shí);如果在該時(shí)間內(nèi)其余線程還沒有到達(dá)屏障,則認(rèn)為屏障被打破,所有正在等待的線程會(huì)收到 BrokenBarrierException。
??? 下列代碼將創(chuàng)建 CyclicBarrier 并啟動(dòng)一組線程,每個(gè)線程將計(jì)算問題的一部分,等待所有其他線程結(jié)束之后,再檢查解決方案是否達(dá)成一致。如果不一致,那么每個(gè)工作線程將開始另一個(gè)迭代。該例將使用 CyclicBarrier 變量,它允許注冊(cè) Runnable,在所有線程到達(dá)屏障但還沒有釋放任何線程時(shí)執(zhí)行 Runnable。
class Solver { // Code sketch
? void solve(final Problem p, int nThreads) {
? final CyclicBarrier barrier =
??? new CyclicBarrier(nThreads,
????? new Runnable() {
??????? public void run() { p.checkConvergence(); }}
??? );
??? for (int i = 0; i < nThreads; ++i) {
????? final int id = i;
????? Runnable worker = new Runnable() {
??????? final Segment segment = p.createSegment(id);
???? ???public void run() {
????????? try {
??????????? while (!p.converged()) {
????????????? segment.update();
????????????? barrier.await();
??????????? }
????????? }
????????? catch(Exception e) { return; }
??????? }
????? };
????? new Thread(worker).start();
?? }
}
CountdownLatch
??? CountdownLatch 類與 CyclicBarrier 相似,因?yàn)樗慕巧菍?duì)已經(jīng)在它們中間分?jǐn)偭藛栴}的一組線程進(jìn)行協(xié)調(diào)。它也是使用整型變量構(gòu)造的,指明計(jì)數(shù)的初始值,但是與 CyclicBarrier 不同的是,CountdownLatch 不能重新使用。
??? 其中,CyclicBarrier 是到達(dá)屏障的所有線程的大門,只有當(dāng)所有線程都已經(jīng)到達(dá)屏障或屏障被打破時(shí),才允許這些線程通過,CountdownLatch 將到達(dá)和等待功能分離。任何線程都可以通過調(diào)用 countDown() 減少當(dāng)前計(jì)數(shù),這種不會(huì)阻塞線程,而只是減少計(jì)數(shù)。await() 方法的行為與 CyclicBarrier.await() 稍微有所不同,調(diào)用 await() 任何線程都會(huì)被阻塞,直到閂鎖計(jì)數(shù)減少為零,在該點(diǎn)等待的所有線程才被釋放,對(duì) await() 的后續(xù)調(diào)用將立即返回。
??? 當(dāng)問題已經(jīng)分解為許多部分,每個(gè)線程都被分配一部分計(jì)算時(shí),CountdownLatch 非常有用。在工作線程結(jié)束時(shí),它們將減少計(jì)數(shù),協(xié)調(diào)線程可以在閂鎖處等待當(dāng)前這一批計(jì)算結(jié)束,然后繼續(xù)移至下一批計(jì)算。
??? 相反地,具有計(jì)數(shù) 1 的 CountdownLatch 類可以用作"啟動(dòng)大門",來立即啟動(dòng)一組線程;工作線程可以在閂鎖處等待,協(xié)調(diào)線程減少計(jì)數(shù),從而立即釋放所有工作線程。下例使用兩個(gè) CountdownLatche。一個(gè)作為啟動(dòng)大門,一個(gè)在所有工作線程結(jié)束時(shí)釋放線程:
class Driver { // ...
?? void main() throws InterruptedException {
???? CountDownLatch startSignal = new CountDownLatch(1);
???? CountDownLatch doneSignal = new CountDownLatch(N);
???? for (int i = 0; i < N; ++i) // create and start threads
?????? new Thread(new Worker(startSignal, doneSignal)).start();
???? doSomethingElse();??????????? // don't let them run yet
???? startSignal.countDown();????? // let all threads proceed
???? doSomethingElse();
???? doneSignal.await();?????????? // wait for all to finish
?? }
?}
?class Worker implements Runnable {
?? private final CountDownLatch startSignal;
?? private final CountDownLatch doneSignal;
?? Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
????? this.startSignal = startSignal;
????? this.doneSignal = doneSignal;
?? }
?? public void run() {
????? try {
??????? startSignal.await();
??????? doWork();
??????? doneSignal.countDown();
????? } catch (InterruptedException ex) {} // return;
?? }
?}
??? Exchanger 類方便了兩個(gè)共同操作線程之間的雙向交換;這樣,就像具有計(jì)數(shù)為 2 的 CyclicBarrier,并且兩個(gè)線程在都到達(dá)屏障時(shí)可以"交換"一些狀態(tài)。(Exchanger 模式有時(shí)也稱為聚集。)
??? Exchanger 通常用于一個(gè)線程填充緩沖(通過讀取 socket),而另一個(gè)線程清空緩沖(通過處理從 socket 收到的命令)的情況。當(dāng)兩個(gè)線程在屏障處集合時(shí),它們交換緩沖。下列代碼說明了這項(xiàng)技術(shù):
class FillAndEmpty {
?? Exchanger exchanger = new Exchanger();
?? DataBuffer initialEmptyBuffer = new DataBuffer();
?? DataBuffer initialFullBuffer = new DataBuffer();
?? class FillingLoop implements Runnable {
???? public void run() {
?????? DataBuffer currentBuffer = initialEmptyBuffer;
?????? try {
???????? while (currentBuffer != null) {
?????????? addToBuffer(currentBuffer);
?????????? if (currentBuffer.full())
???????????? currentBuffer = exchanger.exchange(currentBuffer);
???????? }
?????? } catch (InterruptedException ex) { ... handle ... }
???? }
?? }
?? class EmptyingLoop implements Runnable {
???? public void run() {
?????? DataBuffer currentBuffer = initialFullBuffer;
?????? try {
???????? while (currentBuffer != null) {
?????????? takeFromBuffer(currentBuffer);
?????????? if (currentBuffer.empty())
???????????? currentBuffer = exchanger.exchange(currentBuffer);
???????? }
?????? } catch (InterruptedException ex) { ... handle ...}
???? }
?? }
?? void start() {
???? new Thread(new FillingLoop()).start();
???? new Thread(new EmptyingLoop()).start();
? ?}
?}
鎖定和原子之Lock
??? Java 語言內(nèi)置了鎖定工具 -- synchronized 關(guān)鍵字。當(dāng)線程獲得監(jiān)視器時(shí)(內(nèi)置鎖定),其他線程如果試圖獲得相同鎖定,那么它們將被阻塞,直到第一個(gè)線程釋放該鎖定。同步還確保隨后獲得相同鎖定的線程可以看到之前的線程在具有該鎖定時(shí)所修改的變量的值,從而確保如果類正確地同步了共享狀態(tài)的訪問權(quán),那么線程將不會(huì)看到變量的"失效"值,這是緩存或編譯器優(yōu)化的結(jié)果。
??? 雖然同步?jīng)]有什么問題,但它有一些限制,在一些高級(jí)應(yīng)用程序中會(huì)造成不便。Lock 接口將內(nèi)置監(jiān)視器鎖定的鎖定行為普遍化,允許多個(gè)鎖定實(shí)現(xiàn),同時(shí)提供一些內(nèi)置鎖定缺少的功能,如計(jì)時(shí)的等待、可中斷的等待、鎖定輪詢、每個(gè)鎖定有多個(gè)條件等待集合以及無阻塞結(jié)構(gòu)的鎖定。
interface Lock {
??? void lock();
??? void lockInterruptibly() throws IE;
??? boolean tryLock();
??? boolean tryLock(long time,
??????????????????? TimeUnit unit) throws IE;??????????
??? void unlock();
??? Condition newCondition() throws
??????????????????? UnsupportedOperationException;
? }
ReentrantLock
??? ReentrantLock 是具有與隱式監(jiān)視器鎖定(使用 synchronized 方法和語句訪問)相同的基本行為和語義的 Lock 的實(shí)現(xiàn),但它具有擴(kuò)展的能力。
??? 作為額外收獲,在競(jìng)爭(zhēng)條件下,ReentrantLock 的實(shí)現(xiàn)要比現(xiàn)在的 synchronized 實(shí)現(xiàn)更具有可伸縮性。(有可能在 JVM 的將來版本中改進(jìn) synchronized 的競(jìng)爭(zhēng)性能。)
??? 這意味著當(dāng)許多線程都競(jìng)爭(zhēng)相同鎖定時(shí),使用 ReentrantLock 的吞吐量通常要比 synchronized 好。換句話說,當(dāng)許多線程試圖訪問 ReentrantLock 保護(hù)的共享資源時(shí),JVM 將花費(fèi)較少的時(shí)間來調(diào)度線程,而用更多個(gè)時(shí)間執(zhí)行線程。
??? 雖然 ReentrantLock 類有許多優(yōu)點(diǎn),但是與同步相比,它有一個(gè)主要缺點(diǎn) -- 它可能忘記釋放鎖定。建議當(dāng)獲得和釋放 ReentrantLock 時(shí)使用下列結(jié)構(gòu):
Lock lock = new ReentrantLock();
...
lock.lock();
try {
? // perform operations protected by lock
}
catch(Exception ex) {
?// restore invariants
}
finally {
? lock.unlock();
}
??? 因?yàn)殒i定失誤(忘記釋放鎖定)的風(fēng)險(xiǎn),所以對(duì)于基本鎖定,強(qiáng)烈建議您繼續(xù)使用 synchronized,除非真的需要 ReentrantLock 額外的靈活性和可伸縮性。
??? ReentrantLock 是用于高級(jí)應(yīng)用程序的高級(jí)工具 -- 有時(shí)需要,但有時(shí)用原來的方法就很好。
Condition
??? 就像 Lock 接口是同步的具體化,Condition 接口是 Object 中 wait() 和 notify() 方法的具體化。Lock 中的一個(gè)方法是 newCondition(),它要求鎖定向該鎖定返回新的 Condition 對(duì)象限制。await()、signal() 和 signalAll() 方法類似于 wait()、notify() 和 notifyAll(),但增加了靈活性,每個(gè) Lock 都可以創(chuàng)建多個(gè)條件變量。這簡(jiǎn)化了一些并發(fā)算法的實(shí)現(xiàn)。
ReadWriteLock
?? ?ReentrantLock 實(shí)現(xiàn)的鎖定規(guī)則非常簡(jiǎn)單 -- 每當(dāng)一個(gè)線程具有鎖定時(shí),其他線程必須等待,直到該鎖定可用。有時(shí),當(dāng)對(duì)數(shù)據(jù)結(jié)構(gòu)的讀取通常多于修改時(shí),可以使用更復(fù)雜的稱為讀寫鎖定的鎖定結(jié)構(gòu),它允許有多個(gè)并發(fā)讀者,同時(shí)還允許一個(gè)寫入者獨(dú)占鎖定。該方法在一般情況下(只讀)提供了更大的并發(fā)性,同時(shí)在必要時(shí)仍提供獨(dú)占訪問的安全性。ReadWriteLock 接口和 ReentrantReadWriteLock 類提供這種功能 -- 多讀者、單寫入者鎖定規(guī)則,可以用這種功能來保護(hù)共享的易變資源。
原子變量
??? 即使大多數(shù)用戶將很少直接使用它們,原子變量類(AtomicInteger、AtomicLong、AtomicReference 等等)也有充分理由是最顯著的新并發(fā)類。這些類公開對(duì) JVM 的低級(jí)別改進(jìn),允許進(jìn)行具有高度可伸縮性的原子讀-修改-寫操作。大多數(shù)現(xiàn)代 CPU 都有原子讀-修改-寫的原語,比如比較并交換(CAS)或加載鏈接/條件存儲(chǔ)(LL/SC)。原子變量類使用硬件提供的最快的并發(fā)結(jié)構(gòu)來實(shí)現(xiàn)。
??? 許多并發(fā)算法都是根據(jù)對(duì)計(jì)數(shù)器或數(shù)據(jù)結(jié)構(gòu)的比較并交換操作來定義的。通過暴露高性能的、高度可伸縮的 CAS 操作(以原子變量的形式),用 Java 語言實(shí)現(xiàn)高性能、無等待、無鎖定的并發(fā)算法已經(jīng)變得可行。
??? 幾乎 java.util.concurrent 中的所有類都是在 ReentrantLock 之上構(gòu)建的,ReentrantLock 則是在原子變量類的基礎(chǔ)上構(gòu)建的。所以,雖然僅少數(shù)并發(fā)專家使用原子變量類,但 java.util.concurrent 類的很多可伸縮性改進(jìn)都是由它們提供的。
??? 原子變量主要用于為原子地更新"熱"字段提供有效的、細(xì)粒度的方式,"熱"字段是指由多個(gè)線程頻繁訪問和更新的字段。另外,原子變量還是計(jì)數(shù)器或生成序號(hào)的自然機(jī)制。
性能與可伸縮性
??? 雖然 java.util.concurrent 努力的首要目標(biāo)是使編寫正確、線程安全的類更加容易,但它還有一個(gè)次要目標(biāo),就是提供可伸縮性。可伸縮性與性能完全不同,實(shí)際上,可伸縮性有時(shí)要以性能為代價(jià)來獲得。
??? 性能是"可以快速執(zhí)行此任務(wù)的程度"的評(píng)測(cè)。可伸縮性描述應(yīng)用程序的吞吐量如何表現(xiàn)為它的工作量和可用計(jì)算資源增加。可伸縮的程序可以按比例使用更多的處理器、內(nèi)存或 I/O 帶寬來處理更多個(gè)工作量。當(dāng)我們?cè)诓l(fā)環(huán)境中談?wù)摽缮炜s性時(shí),我們是在問當(dāng)許多線程同時(shí)訪問給定類時(shí),這個(gè)類的執(zhí)行情況。
??? java.util.concurrent 中的低級(jí)別類 ReentrantLock 和原子變量類的可伸縮性要比內(nèi)置監(jiān)視器(同步)鎖定高得多。因此,使用 ReentrantLock 或原子變量類來協(xié)調(diào)共享訪問的類也可能更具有可伸縮性。
Hashtable 與 ConcurrentHashMap
??? 作為可伸縮性的例子,ConcurrentHashMap 實(shí)現(xiàn)設(shè)計(jì)的可伸縮性要比其線程安全的上一代 Hashtable 的可伸縮性強(qiáng)得多。Hashtable 一次只允許一個(gè)線程訪問 Map;ConcurrentHashMap 允許多個(gè)讀者并發(fā)執(zhí)行,讀者與寫入者并發(fā)執(zhí)行,以及一些寫入者并發(fā)執(zhí)行。因此,如果許多線程頻繁訪問共享映射,使用 ConcurrentHashMap 的總的吞吐量要比使用 Hashtable 的好。
??? 下表大致說明了 Hashtable 和 ConcurrentHashMap 之間的可伸縮性差別。在每次運(yùn)行時(shí),N 個(gè)線程并發(fā)執(zhí)行緊密循環(huán),它們從 Hashtable 或 ConcurrentHashMap 中檢索隨即關(guān)鍵字,60% 的失敗檢索將執(zhí)行 put() 操作,2% 的成功檢索執(zhí)行 remove() 操作。測(cè)試在運(yùn)行 Linux 的雙處理器 Xeon 系統(tǒng)中執(zhí)行。數(shù)據(jù)顯示 10,000,000 個(gè)迭代的運(yùn)行時(shí)間,對(duì)于 ConcurrentHashMap,標(biāo)準(zhǔn)化為一個(gè)線程的情況。可以看到直到許多線程,ConcurrentHashMap 的性能仍保持可伸縮性,而 Hashtable 的性能在出現(xiàn)鎖定競(jìng)爭(zhēng)時(shí)幾乎立即下降。
??? 與通常的服務(wù)器應(yīng)用程序相比,這個(gè)測(cè)試中的線程數(shù)看起來很少。然而,因?yàn)槊總€(gè)線程未進(jìn)行其他操作,僅是重復(fù)地選擇使用該表,所以這樣可以模擬在執(zhí)行一些實(shí)際工作的情況下使用該表的大量線程的競(jìng)爭(zhēng)。
線程 ConcurrentHashMap? Hashtable
線程 |
ConcurrentHashMap |
Hashtable |
1 |
1.0 |
1.51 |
2 |
1.44 |
17.09 |
4 |
1.83 |
29.9 |
8 |
4.06 |
54.06 |
16 |
7.5 |
119.44 |
32 |
15.32 |
237.2 |
?
Lock 與 synchronized 與原子
??? 下列基準(zhǔn)說明了使用 java.util.concurrent 可能改進(jìn)可伸縮性的例子。該基準(zhǔn)將模擬旋轉(zhuǎn)骰子,使用線性同余隨機(jī)數(shù)生成器。有三個(gè)可用的隨機(jī)數(shù)生成器的實(shí)現(xiàn):一個(gè)使用同步來管理生成器的狀態(tài)(單一變量),一個(gè)使用 ReentrantLock,另一個(gè)則使用 AtomicLong。下圖顯示了在 8-way Ultrasparc3 系統(tǒng)上,逐漸增加線程數(shù)量時(shí)這三個(gè)版本的相對(duì)吞吐量。(該圖對(duì)原子變量方法的可伸縮性描述比較保守。)
圖 1. 使用同步、Lock 和 AtomicLong 的相對(duì)吞吐量
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
